当完成rtpbaseendpoint的初始化后,根据我们业务调用的逻辑。首先是在服务端调用genrateOff方法,从而生成服务端的sdp信息返回给远端RTP, 然后远端处理sdp后返回answer给本端rtp,再然后就是本端的sdpbaseendpoint继续answer这个offer后,进开始发送数据。流程大致如下
假如我们有A服务器的rtp端点需要和B服务器的rtp端点进行通信,具体的代码在kurento中调用方法的流程如下: 首先在A中,我们调用genrateOffer方法,实际是发送了1个generate_offer信号,引发了kmsbasesdpendpoint.c中注册的回调
kms_base_sdp_endpoint_class_init (KmsBaseSdpEndpointClass * klass)方法中 klass->generate_offer = kms_base_sdp_endpoint_generate_offer; klass->process_offer = kms_base_sdp_endpoint_process_offer; klass->process_answer = kms_base_sdp_endpoint_process_answer; A中将触发kms_base_sdp_endpoint_generate_offer方法,调用这个方法后,将调用 kms_base_sdp_endpoint_init_sdp_handlers (self, sess),初始化sdp处理类,这里升入解析后发现其实并么有真正处理SDP 然后通过 offer = kms_sdp_session_generate_offer (sess),这里才最终生成所需要的offer,详细生成offer的方法如下:
在kmssdpsession.c文件中. GstSDPMessage * kms_sdp_session_generate_offer (KmsSdpSession * self) { GstSDPMessage *offer = NULL; GError *err = NULL; gchar *sdp_str = NULL;
//这里方法有引用了sdp_agent的create_offer方法,真是层层调用. offer = kms_sdp_agent_create_offer (self->agent, &err); if (err != NULL) { GST_ERROR_OBJECT (self, "Generating SDP Offer: %s", err->message); goto error; }
kms_sdp_agent_set_local_description (self->agent, offer, &err); if (err != NULL) { GST_ERROR_OBJECT (self, "Generating SDP Offer: %s", err->message); goto error; }
if (gst_sdp_message_copy (offer, &self->local_sdp) != GST_SDP_OK) { GST_ERROR_OBJECT (self, "Generating SDP Offer: gst_sdp_message_copy"); goto error; }
GST_DEBUG_OBJECT (self, "Generated SDP Offer:\n%s", (sdp_str = gst_sdp_message_as_text (offer))); g_free (sdp_str); sdp_str = NULL;
return offer;
error: g_clear_error (&err);
if (offer != NULL) { gst_sdp_message_free (offer); }
return NULL; }
继续关注sdpagent.c中的方法
GstSDPMessage * kms_sdp_agent_create_offer (KmsSdpAgent * agent, GError ** error) { g_return_val_if_fail (KMS_IS_SDP_AGENT (agent), NULL);
return KMS_SDP_AGENT_GET_CLASS (agent)->create_offer (agent, error); }
继续看sdp_agent的create_offer方法的实现方法 klass->create_offer = kms_sdp_agent_create_offer_impl;具体方法实现如下
static GstSDPMessage * kms_sdp_agent_create_offer_impl (KmsSdpAgent * agent, GError ** error) { GstSDPMessage *offer = NULL; GstSDPOrigin o; GSList *tmp = NULL; gboolean state_changed = FALSE; gboolean failed = TRUE;
SDP_AGENT_LOCK (agent);
if (agent->priv->state != KMS_SDP_AGENT_STATE_UNNEGOTIATED && agent->priv->state != KMS_SDP_AGENT_STATE_NEGOTIATED) { g_set_error (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE, "Agent in state %s", SDP_AGENT_STATE (agent)); goto end; }
if (gst_sdp_message_new (&offer) != GST_SDP_OK) { g_set_error_literal (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE, "Can not allocate SDP offer"); goto end; }
if (!kms_sdp_agent_set_default_session_attributes (offer, error)) { goto end; }
if (agent->priv->state == KMS_SDP_AGENT_STATE_NEGOTIATED) { const GstSDPOrigin *orig;
orig = gst_sdp_message_get_origin (agent->priv->local_description); set_sdp_session_description (&agent->priv->local, orig->sess_id, orig->sess_version); } else { generate_sdp_session_description (&agent->priv->local); }
kms_sdp_agent_origin_init (agent, &o, agent->priv->local.id, agent->priv->local.version);
if (!kms_sdp_agent_set_origin (offer, &o, error)) { goto end; }
/* Execute pre-processing extensions */ if (!kms_sdp_agent_offer_processing_extensions (agent, offer, TRUE, error)) { goto end; }
tmp = g_slist_copy_deep (agent->priv->offer_handlers, (GCopyFunc) sdp_handler_ref, NULL); kms_sdp_agent_merge_offer_handlers (agent);
/* Process medias */ if (!kms_sdp_agent_create_media_offer (agent, offer, error)) { goto end; }
if (!kms_sdp_agent_update_session_version (agent, offer, error)) { goto end; }
/* Execute post-processing extensions */ if (!kms_sdp_agent_offer_processing_extensions (agent, offer, FALSE, error)) { gst_sdp_message_free (offer); offer = NULL; goto end; }
g_slist_free_full (tmp, (GDestroyNotify) kms_ref_struct_unref); SDP_AGENT_NEW_STATE (agent, KMS_SDP_AGENT_STATE_LOCAL_OFFER); state_changed = TRUE; tmp = NULL;
failed = FALSE;
end:
if (tmp != NULL) { g_slist_free_full (agent->priv->offer_handlers, (GDestroyNotify) kms_ref_struct_unref); agent->priv->offer_handlers = tmp; }
SDP_AGENT_UNLOCK (agent);
if (failed && offer != NULL) { gst_sdp_message_free (offer); return NULL; }
if (state_changed) { g_object_notify_by_pspec (G_OBJECT (agent), obj_properties[PROP_STATE]); }
return offer; }
其中几个主要的GST原生的结构体 GstSDPOrigin , spd协议中o=xxx,后面的内容
/** * GstSDPOrigin: * @username: the user's login on the originating host, or it is "-" * if the originating host does not support the concept of user ids. * @sess_id: is a numeric string such that the tuple of @username, @sess_id, * @nettype, @addrtype and @addr form a globally unique identifier for the * session. * @sess_version: a version number for this announcement * @nettype: the type of network. "IN" is defined to have the meaning * "Internet". * @addrtype: the type of @addr. * @addr: the globally unique address of the machine from which the session was * created. * * The contents of the SDP "o=" field which gives the originator of the session * (their username and the address of the user's host) plus a session id and * session version number. */ typedef struct { gchar *username; gchar *sess_id; gchar *sess_version; gchar *nettype; gchar *addrtype; gchar *addr; } GstSDPOrigin;
sdp协议中,c=xxx后面的内容
/** * GstSDPConnection: * @nettype: the type of network. "IN" is defined to have the meaning * "Internet". * @addrtype: the type of @address. * @address: the address * @ttl: the time to live of the address * @addr_number: the number of layers * * The contents of the SDP "c=" field which contains connection data. */ typedef struct { gchar *nettype; gchar *addrtype; gchar *address; guint ttl; guint addr_number; } GstSDPConnection;
sdp协议最后解析为结构体的格式
/** * GstSDPMessage: * @version: the protocol version * @origin: owner/creator and session identifier * @session_name: session name * @information: session information * @uri: URI of description * @emails: array of #gchar with email addresses * @phones: array of #gchar with phone numbers * @connection: connection information for the session * @bandwidths: array of #GstSDPBandwidth with bandwidth information * @times: array of #GstSDPTime with time descriptions * @zones: array of #GstSDPZone with time zone adjustments * @key: encryption key * @attributes: array of #GstSDPAttribute with session attributes * @medias: array of #GstSDPMedia with media descriptions * * The contents of the SDP message. */ typedef struct { gchar *version; GstSDPOrigin origin; gchar *session_name; gchar *information; gchar *uri; GArray *emails; GArray *phones; GstSDPConnection connection; GArray *bandwidths; GArray *times; GArray *zones; GstSDPKey key; GArray *attributes; GArray *medias; } GstSDPMessage;
//根据调用rtp的业务流程,当创建sdp并且建立网络链接后,baseRtpEndpoint将根据media的类型来创建并添加1个数据拆包的pad,然后根据再根据拉取数据的endpoint数据类型,看后续如何处理 //具体调用方法
static void kms_base_rtp_endpoint_rtpbin_pad_added( GstElement *rtpbin, GstPad *pad, KmsBaseRtpEndpoint *self) { //agnostic这里可以认为是1个临时的中间pad,用于根据map协商的处理媒体信息来确认下一步连接什么处理的拆包pad //depayloader则是拆包的bin,如果拉流和推流是相同的媒体类型,则只是拆包 //如果拉流和推流媒体类型不一致,并且都是系统支持的编码类型,则会自动进行转码 GstElement *agnostic, *depayloader; gboolean added = TRUE; KmsMediaType media; GstCaps *caps;
GST_PAD_STREAM_LOCK(pad);
//检测如果是音频,获取1个音频转码的element if (g_str_has_prefix(GST_OBJECT_NAME(pad), AUDIO_RTPBIN_RECV_RTP_SRC)) { agnostic = kms_element_get_audio_agnosticbin(KMS_ELEMENT(self)); media = KMS_MEDIA_TYPE_AUDIO; }
//检测如果是视频,获取1个视频可以转码的element else if (g_str_has_prefix( GST_OBJECT_NAME(pad), VIDEO_RTPBIN_RECV_RTP_SRC)) { agnostic = kms_element_get_video_agnosticbin(KMS_ELEMENT(self)); media = KMS_MEDIA_TYPE_VIDEO; //这里用于处理rtcp需要返回的相关信息 if (self->priv->rl != NULL) { self->priv->rl->event_manager = kms_utils_remb_event_manager_create(pad); } } //如果是其他的数据,则不处理 else { added = FALSE; goto end; } //获取rtp中sdp携带的当前处理媒体流的负载类型信息 //caps=application/x-rtp, media=(string)video, payload=(int)96, clock-rate=(int)90000, encoding-name=(string)H264, packetization-mode=(string)1, sprop-parameter-sets=(string)"Z2QAH62EAQwgCGEAQwgCGEAQwgCEK1AoAt03AQEBQAAAAwBAAAAMoQ\=\=\,aO48sA\=\=", profile-level-id=(string)64001F caps = gst_pad_query_caps(pad, NULL); GST_ERROR_OBJECT(self, "New pad: %" GST_PTR_FORMAT " for linking to %" GST_PTR_FORMAT " with caps %" GST_PTR_FORMAT, pad, agnostic, caps); //根据rtp的sdp协商的信息来获取正确的拆包的gst对象,目前video支持的h264和vp8 depayloader = kms_base_rtp_endpoint_get_depayloader_for_caps(caps); //释放caps,这里caps相当于再中间做一些存取的变量 gst_caps_unref(caps);
//如果找到了对应的拆包对象 if (depayloader != NULL) { GST_DEBUG_OBJECT(self, "Found depayloader %" GST_PTR_FORMAT, depayloader); kms_base_rtp_endpoint_update_stats(self, depayloader, media); gst_bin_add(GST_BIN(self), depayloader); gst_element_link_pads(depayloader, "src", agnostic, "sink"); gst_element_link_pads(rtpbin, GST_OBJECT_NAME(pad), depayloader, "sink"); gst_element_sync_state_with_parent(depayloader); } //如果没找到,则直接将数据输出到1个fakesink对象,这个相当于1个黑洞,类似于linux输出流到/dev/null else { GstElement *fake = gst_element_factory_make("fakesink", NULL);
g_object_set(fake, "async", FALSE, "sync", FALSE, NULL);
GST_WARNING_OBJECT( self, "Depayloder not found for pad %" GST_PTR_FORMAT, pad);
gst_bin_add(GST_BIN(self), fake); gst_element_link_pads(rtpbin, GST_OBJECT_NAME(pad), fake, "sink"); gst_element_sync_state_with_parent(fake); }
end: GST_PAD_STREAM_UNLOCK(pad);
//如果添加pad成功,则发送1个MEDIA_START的信号,通知basertpendpoint的子对象,进行业务层面的处理 //给业务层出发1个启动媒体的事件 if (added) { g_signal_emit(G_OBJECT(self), obj_signals[MEDIA_START], 0, media, TRUE); } }
//其中关键的1步骤是根据caps获取到depayload的对象,方法如下:
static GstElement * kms_base_rtp_endpoint_get_depayloader_for_caps(GstCaps *caps) { GstElementFactory *factory; GstElement *depayloader = NULL; GList *payloader_list, *filtered_list, *l;
payloader_list = gst_element_factory_list_get_elements( GST_ELEMENT_FACTORY_TYPE_DEPAYLOADER, GST_RANK_NONE); filtered_list = gst_element_factory_list_filter( payloader_list, caps, GST_PAD_SINK, FALSE);
if (filtered_list == NULL) { goto end; }
for (l = filtered_list; l != NULL; l = l->next) { factory = GST_ELEMENT_FACTORY(l->data);
if (factory == NULL) { continue; }
if (g_strcmp0(gst_plugin_feature_get_name(factory), "asteriskh263") == 0) { /* Do not use asteriskh263 for H263 */ continue; }
depayloader = gst_element_factory_create(factory, NULL);
if (depayloader != NULL) { kms_utils_depayloader_monitor_pts_out(depayloader); break; } }
end: gst_plugin_feature_list_free(filtered_list); gst_plugin_feature_list_free(payloader_list);
return depayloader; }
//如果成功的找到了1个拆包的对象 , 则继续处理这个depayload输出的pts,并生成1个新的pts输出
void kms_utils_depayloader_monitor_pts_out (GstElement * depayloader) { GstPad *src_pad;
GST_INFO_OBJECT (depayloader, "Add probe: Adjust depayloader PTS out"); //获取depayloader的src的pad src_pad = gst_element_get_static_pad (depayloader, "src"); //添加1个发现缓存数据的检测时间,检测到数据后对根据服务器的时间对pts进行重新调整 gst_pad_add_probe (src_pad, GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST, //具体调整pts的方法 (GstPadProbeCallback) kms_utils_depayloader_pts_out_probe, kms_utils_adjust_pts_data_new (depayloader), (GDestroyNotify) kms_utils_adjust_pts_data_destroy); g_object_unref (src_pad); }
//具体处理拆包后数据的pts的方法
static GstPadProbeReturn kms_utils_depayloader_pts_out_probe (GstPad * pad, GstPadProbeInfo * info, AdjustPtsData * data) { if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) { GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
buffer = gst_buffer_make_writable (buffer); kms_utils_depayloader_adjust_pts_out (data, buffer); GST_PAD_PROBE_INFO_DATA (info) = buffer; } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) { GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
list = gst_buffer_list_make_writable (list); gst_buffer_list_foreach (list, (GstBufferListFunc) kms_utils_depayloader_pts_out_it, data); GST_PAD_PROBE_INFO_DATA (info) = list; } return GST_PAD_PROBE_OK; }
//这里,rtpEndpoint的调用及拆包就完成了。
//接下来,如果我们要增加kurento支持的rtpEndpoint拆包的类型,那就要看看depayloader是如何生成的。我们将在下一章继续分析