Kurento 源码解析系列(4)- RtpEndpoint端点c层代码的调用处理

it2024-12-05  22

当完成rtpbaseendpoint的初始化后,根据我们业务调用的逻辑。首先是在服务端调用genrateOff方法,从而生成服务端的sdp信息返回给远端RTP, 然后远端处理sdp后返回answer给本端rtp,再然后就是本端的sdpbaseendpoint继续answer这个offer后,进开始发送数据。流程大致如下

假如我们有A服务器的rtp端点需要和B服务器的rtp端点进行通信,具体的代码在kurento中调用方法的流程如下: 首先在A中,我们调用genrateOffer方法,实际是发送了1个generate_offer信号,引发了kmsbasesdpendpoint.c中注册的回调

kms_base_sdp_endpoint_class_init (KmsBaseSdpEndpointClass * klass)方法中   klass->generate_offer = kms_base_sdp_endpoint_generate_offer;   klass->process_offer = kms_base_sdp_endpoint_process_offer;   klass->process_answer = kms_base_sdp_endpoint_process_answer; A中将触发kms_base_sdp_endpoint_generate_offer方法,调用这个方法后,将调用 kms_base_sdp_endpoint_init_sdp_handlers (self, sess),初始化sdp处理类,这里升入解析后发现其实并么有真正处理SDP 然后通过 offer = kms_sdp_session_generate_offer (sess),这里才最终生成所需要的offer,详细生成offer的方法如下:

在kmssdpsession.c文件中. GstSDPMessage * kms_sdp_session_generate_offer (KmsSdpSession * self) {   GstSDPMessage *offer = NULL;   GError *err = NULL;   gchar *sdp_str = NULL;

//这里方法有引用了sdp_agent的create_offer方法,真是层层调用.   offer = kms_sdp_agent_create_offer (self->agent, &err);   if (err != NULL) {     GST_ERROR_OBJECT (self, "Generating SDP Offer: %s", err->message);     goto error;   }

  kms_sdp_agent_set_local_description (self->agent, offer, &err);   if (err != NULL) {     GST_ERROR_OBJECT (self, "Generating SDP Offer: %s", err->message);     goto error;   }

  if (gst_sdp_message_copy (offer, &self->local_sdp) != GST_SDP_OK) {     GST_ERROR_OBJECT (self, "Generating SDP Offer: gst_sdp_message_copy");     goto error;   }

  GST_DEBUG_OBJECT (self, "Generated SDP Offer:\n%s",       (sdp_str = gst_sdp_message_as_text (offer)));   g_free (sdp_str);   sdp_str = NULL;

  return offer;

error:   g_clear_error (&err);

  if (offer != NULL) {     gst_sdp_message_free (offer);   }

  return NULL; }

继续关注sdpagent.c中的方法

GstSDPMessage * kms_sdp_agent_create_offer (KmsSdpAgent * agent, GError ** error) {   g_return_val_if_fail (KMS_IS_SDP_AGENT (agent), NULL);

  return KMS_SDP_AGENT_GET_CLASS (agent)->create_offer (agent, error); }

 

继续看sdp_agent的create_offer方法的实现方法   klass->create_offer = kms_sdp_agent_create_offer_impl;具体方法实现如下

 

  static GstSDPMessage * kms_sdp_agent_create_offer_impl (KmsSdpAgent * agent, GError ** error) {   GstSDPMessage *offer = NULL;   GstSDPOrigin o;   GSList *tmp = NULL;   gboolean state_changed = FALSE;   gboolean failed = TRUE;

  SDP_AGENT_LOCK (agent);

  if (agent->priv->state != KMS_SDP_AGENT_STATE_UNNEGOTIATED &&       agent->priv->state != KMS_SDP_AGENT_STATE_NEGOTIATED) {     g_set_error (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,         "Agent in state %s", SDP_AGENT_STATE (agent));     goto end;   }

  if (gst_sdp_message_new (&offer) != GST_SDP_OK) {     g_set_error_literal (error, KMS_SDP_AGENT_ERROR, SDP_AGENT_INVALID_STATE,         "Can not allocate SDP offer");     goto end;   }

  if (!kms_sdp_agent_set_default_session_attributes (offer, error)) {     goto end;   }

  if (agent->priv->state == KMS_SDP_AGENT_STATE_NEGOTIATED) {     const GstSDPOrigin *orig;

    orig = gst_sdp_message_get_origin (agent->priv->local_description);     set_sdp_session_description (&agent->priv->local, orig->sess_id,         orig->sess_version);   } else {     generate_sdp_session_description (&agent->priv->local);   }

  kms_sdp_agent_origin_init (agent, &o, agent->priv->local.id,       agent->priv->local.version);

  if (!kms_sdp_agent_set_origin (offer, &o, error)) {     goto end;   }

  /* Execute pre-processing extensions */   if (!kms_sdp_agent_offer_processing_extensions (agent, offer, TRUE, error)) {     goto end;   }

  tmp = g_slist_copy_deep (agent->priv->offer_handlers,       (GCopyFunc) sdp_handler_ref, NULL);   kms_sdp_agent_merge_offer_handlers (agent);

  /* Process medias */   if (!kms_sdp_agent_create_media_offer (agent, offer, error)) {     goto end;   }

  if (!kms_sdp_agent_update_session_version (agent, offer, error)) {     goto end;   }

  /* Execute post-processing extensions */   if (!kms_sdp_agent_offer_processing_extensions (agent, offer, FALSE, error)) {     gst_sdp_message_free (offer);     offer = NULL;     goto end;   }

  g_slist_free_full (tmp, (GDestroyNotify) kms_ref_struct_unref);   SDP_AGENT_NEW_STATE (agent, KMS_SDP_AGENT_STATE_LOCAL_OFFER);   state_changed = TRUE;   tmp = NULL;

  failed = FALSE;

end:

  if (tmp != NULL) {     g_slist_free_full (agent->priv->offer_handlers,         (GDestroyNotify) kms_ref_struct_unref);     agent->priv->offer_handlers = tmp;   }

  SDP_AGENT_UNLOCK (agent);

  if (failed && offer != NULL) {     gst_sdp_message_free (offer);     return NULL;   }

  if (state_changed) {     g_object_notify_by_pspec (G_OBJECT (agent), obj_properties[PROP_STATE]);   }

  return offer; }    

其中几个主要的GST原生的结构体 GstSDPOrigin  ,  spd协议中o=xxx,后面的内容

 

/** * GstSDPOrigin: * @username: the user's login on the originating host, or it is "-" *    if the originating host does not support the concept of user ids. * @sess_id: is a numeric string such that the tuple of @username, @sess_id, *    @nettype, @addrtype and @addr form a globally unique identifier for the *    session. * @sess_version: a version number for this announcement * @nettype: the type of network. "IN" is defined to have the meaning *    "Internet". * @addrtype: the type of @addr. * @addr: the globally unique address of the machine from which the session was *     created. * * The contents of the SDP "o=" field which gives the originator of the session * (their username and the address of the user's host) plus a session id and * session version number. */ typedef struct {   gchar *username;   gchar *sess_id;   gchar *sess_version;   gchar *nettype;   gchar *addrtype;   gchar *addr; } GstSDPOrigin;

sdp协议中,c=xxx后面的内容

 

/** * GstSDPConnection: * @nettype: the type of network. "IN" is defined to have the meaning *    "Internet". * @addrtype: the type of @address. * @address: the address * @ttl: the time to live of the address * @addr_number: the number of layers * * The contents of the SDP "c=" field which contains connection data. */ typedef struct {   gchar *nettype;   gchar *addrtype;   gchar *address;   guint  ttl;   guint  addr_number; } GstSDPConnection;

 

 

sdp协议最后解析为结构体的格式

 

/**  * GstSDPMessage:  * @version: the protocol version  * @origin: owner/creator and session identifier  * @session_name: session name  * @information: session information  * @uri: URI of description  * @emails: array of #gchar with email addresses  * @phones: array of #gchar with phone numbers  * @connection: connection information for the session  * @bandwidths: array of #GstSDPBandwidth with bandwidth information  * @times: array of #GstSDPTime with time descriptions  * @zones: array of #GstSDPZone with time zone adjustments  * @key: encryption key  * @attributes: array of #GstSDPAttribute with session attributes  * @medias: array of #GstSDPMedia with media descriptions  *  * The contents of the SDP message.  */ typedef struct {   gchar            *version;   GstSDPOrigin      origin;   gchar            *session_name;   gchar            *information;   gchar            *uri;   GArray           *emails;   GArray           *phones;   GstSDPConnection  connection;   GArray           *bandwidths;   GArray           *times;   GArray           *zones;   GstSDPKey         key;   GArray           *attributes;   GArray           *medias; } GstSDPMessage;

 

 

//根据调用rtp的业务流程,当创建sdp并且建立网络链接后,baseRtpEndpoint将根据media的类型来创建并添加1个数据拆包的pad,然后根据再根据拉取数据的endpoint数据类型,看后续如何处理 //具体调用方法

static void kms_base_rtp_endpoint_rtpbin_pad_added(     GstElement *rtpbin, GstPad *pad, KmsBaseRtpEndpoint *self) {     //agnostic这里可以认为是1个临时的中间pad,用于根据map协商的处理媒体信息来确认下一步连接什么处理的拆包pad   //depayloader则是拆包的bin,如果拉流和推流是相同的媒体类型,则只是拆包   //如果拉流和推流媒体类型不一致,并且都是系统支持的编码类型,则会自动进行转码   GstElement *agnostic, *depayloader;   gboolean added = TRUE;   KmsMediaType media;   GstCaps *caps;

  GST_PAD_STREAM_LOCK(pad);

//检测如果是音频,获取1个音频转码的element   if (g_str_has_prefix(GST_OBJECT_NAME(pad), AUDIO_RTPBIN_RECV_RTP_SRC))   {     agnostic = kms_element_get_audio_agnosticbin(KMS_ELEMENT(self));     media = KMS_MEDIA_TYPE_AUDIO;   }

  //检测如果是视频,获取1个视频可以转码的element   else if (g_str_has_prefix(                GST_OBJECT_NAME(pad), VIDEO_RTPBIN_RECV_RTP_SRC))   {     agnostic = kms_element_get_video_agnosticbin(KMS_ELEMENT(self));     media = KMS_MEDIA_TYPE_VIDEO; //这里用于处理rtcp需要返回的相关信息     if (self->priv->rl != NULL)     {       self->priv->rl->event_manager = kms_utils_remb_event_manager_create(pad);     }   }   //如果是其他的数据,则不处理   else   {     added = FALSE;     goto end;   } //获取rtp中sdp携带的当前处理媒体流的负载类型信息 //caps=application/x-rtp, media=(string)video, payload=(int)96, clock-rate=(int)90000, encoding-name=(string)H264, packetization-mode=(string)1, sprop-parameter-sets=(string)"Z2QAH62EAQwgCGEAQwgCGEAQwgCEK1AoAt03AQEBQAAAAwBAAAAMoQ\=\=\,aO48sA\=\=", profile-level-id=(string)64001F   caps = gst_pad_query_caps(pad, NULL);   GST_ERROR_OBJECT(self,                    "New pad: %" GST_PTR_FORMAT " for linking to %" GST_PTR_FORMAT                    " with caps %" GST_PTR_FORMAT,                    pad, agnostic, caps); //根据rtp的sdp协商的信息来获取正确的拆包的gst对象,目前video支持的h264和vp8   depayloader = kms_base_rtp_endpoint_get_depayloader_for_caps(caps);   //释放caps,这里caps相当于再中间做一些存取的变量   gst_caps_unref(caps);

//如果找到了对应的拆包对象   if (depayloader != NULL)   {     GST_DEBUG_OBJECT(self, "Found depayloader %" GST_PTR_FORMAT, depayloader);     kms_base_rtp_endpoint_update_stats(self, depayloader, media);     gst_bin_add(GST_BIN(self), depayloader);     gst_element_link_pads(depayloader, "src", agnostic, "sink");     gst_element_link_pads(rtpbin, GST_OBJECT_NAME(pad), depayloader, "sink");     gst_element_sync_state_with_parent(depayloader);   }   //如果没找到,则直接将数据输出到1个fakesink对象,这个相当于1个黑洞,类似于linux输出流到/dev/null   else   {     GstElement *fake = gst_element_factory_make("fakesink", NULL);

    g_object_set(fake, "async", FALSE, "sync", FALSE, NULL);

    GST_WARNING_OBJECT(         self, "Depayloder not found for pad %" GST_PTR_FORMAT, pad);

    gst_bin_add(GST_BIN(self), fake);     gst_element_link_pads(rtpbin, GST_OBJECT_NAME(pad), fake, "sink");     gst_element_sync_state_with_parent(fake);   }

end:   GST_PAD_STREAM_UNLOCK(pad);

//如果添加pad成功,则发送1个MEDIA_START的信号,通知basertpendpoint的子对象,进行业务层面的处理 //给业务层出发1个启动媒体的事件   if (added)   {     g_signal_emit(G_OBJECT(self), obj_signals[MEDIA_START], 0, media, TRUE);   } }

 

 

//其中关键的1步骤是根据caps获取到depayload的对象,方法如下:

 

static GstElement * kms_base_rtp_endpoint_get_depayloader_for_caps(GstCaps *caps) {   GstElementFactory *factory;   GstElement *depayloader = NULL;   GList *payloader_list, *filtered_list, *l;

  payloader_list = gst_element_factory_list_get_elements(       GST_ELEMENT_FACTORY_TYPE_DEPAYLOADER, GST_RANK_NONE);   filtered_list = gst_element_factory_list_filter(       payloader_list, caps, GST_PAD_SINK, FALSE);

  if (filtered_list == NULL)   {     goto end;   }

  for (l = filtered_list; l != NULL; l = l->next)   {     factory = GST_ELEMENT_FACTORY(l->data);

    if (factory == NULL)     {       continue;     }

    if (g_strcmp0(gst_plugin_feature_get_name(factory), "asteriskh263") == 0)     {       /* Do not use asteriskh263 for H263 */       continue;     }

    depayloader = gst_element_factory_create(factory, NULL);

    if (depayloader != NULL)     {       kms_utils_depayloader_monitor_pts_out(depayloader);       break;     }   }

end:   gst_plugin_feature_list_free(filtered_list);   gst_plugin_feature_list_free(payloader_list);

  return depayloader; }

//如果成功的找到了1个拆包的对象 , 则继续处理这个depayload输出的pts,并生成1个新的pts输出

 

void kms_utils_depayloader_monitor_pts_out (GstElement * depayloader) {   GstPad *src_pad;

  GST_INFO_OBJECT (depayloader, "Add probe: Adjust depayloader PTS out");   //获取depayloader的src的pad   src_pad = gst_element_get_static_pad (depayloader, "src");   //添加1个发现缓存数据的检测时间,检测到数据后对根据服务器的时间对pts进行重新调整   gst_pad_add_probe (src_pad,       GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_BUFFER_LIST,       //具体调整pts的方法       (GstPadProbeCallback) kms_utils_depayloader_pts_out_probe,       kms_utils_adjust_pts_data_new (depayloader),       (GDestroyNotify) kms_utils_adjust_pts_data_destroy);   g_object_unref (src_pad); }

//具体处理拆包后数据的pts的方法

 

static GstPadProbeReturn kms_utils_depayloader_pts_out_probe (GstPad * pad, GstPadProbeInfo * info,     AdjustPtsData * data) {   if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER) {     GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);

    buffer = gst_buffer_make_writable (buffer);     kms_utils_depayloader_adjust_pts_out (data, buffer);     GST_PAD_PROBE_INFO_DATA (info) = buffer;   }   else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {     GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);

    list = gst_buffer_list_make_writable (list);     gst_buffer_list_foreach (list,         (GstBufferListFunc) kms_utils_depayloader_pts_out_it, data);     GST_PAD_PROBE_INFO_DATA (info) = list;   }   return GST_PAD_PROBE_OK; }

 

 

//这里,rtpEndpoint的调用及拆包就完成了。

//接下来,如果我们要增加kurento支持的rtpEndpoint拆包的类型,那就要看看depayloader是如何生成的。我们将在下一章继续分析  

最新回复(0)