linux内核协议栈 UDP之数据报接收过程Ⅱ

it2023-04-03  74

目录

1 系统调用 udp_recvmsg()

1.1 从接收队列 sk_receive_queue 中获取skb

1.1.1 获取队列头不删除 skb_peek()

1.1.2 将 skb 从移除队列中 __skb_unlink()

1.2 尝试释放skb内存 skb_free_datagram_locked()

2 后备队列 sk_backlog 中的skb处理 release_sock()

2.1 后备队列skb进入接收队列 sk_backlog_rcv()


1 系统调用 udp_recvmsg()

对于应用程序而言,读操作可以通过多个系统调用实现,如read()、recv()、recvfrom()等等,但是这些系统调用到了传输层协议,都调用到了同一接口,对于UDP就是udp_recvmsg()。

int udp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, size_t len, int noblock, int flags, int *addr_len) { struct inet_sock *inet = inet_sk(sk); struct sockaddr_in *sin = (struct sockaddr_in *)msg->msg_name; struct sk_buff *skb; unsigned int ulen, copied; int peeked; int err; int is_udplite = IS_UDPLITE(sk); //需要返回源地址信息,设置源地址长度 if (addr_len) *addr_len = sizeof(*sin); //如果设置了MSG_ERRQUEUE标记,那么只读取错误信息 if (flags & MSG_ERRQUEUE) return ip_recv_error(sk, msg, len); try_again: //根据是否需要阻塞,从接收队列中取出一个SKB skb = __skb_recv_datagram(sk, flags | (noblock ? MSG_DONTWAIT : 0), &peeked, &err); if (!skb) goto out; //ulen为该SKB中包含的应用数据长度 ulen = skb->len - sizeof(struct udphdr); //len为应用程序指定的buffer大小,所以下面的逻辑含义为: //1. 如果应用提供的buffer超过了该数据包的数据长度,那么调整要拷贝的数据量为该SKB中实际数据量 //2. 如果应用提供的buffer不够大,那么需要截断数据包,设置截断标记 copied = len; if (copied > ulen) copied = ulen; else if (copied < ulen) msg->msg_flags |= MSG_TRUNC; /* * If checksum is needed at all, try to do it while copying the * data. If the data is truncated, or if we only want a partial * coverage checksum (UDP-Lite), do it before the copy. */ //条件一:对于截断的数据包和尚未完成校验的数据包,先进行校验,校验出错则尝试读取下一个数据包 //条件二:实际上只用于UDPLite,因为UDP协议的校验在接收过程的第一步就完成了 if (copied < ulen || UDP_SKB_CB(skb)->partial_cov) { if (udp_lib_checksum_complete(skb)) goto csum_copy_err; } //根据是否需要校验,调用不同的数据拷贝函数 if (skb_csum_unnecessary(skb)) err = skb_copy_datagram_iovec(skb, sizeof(struct udphdr), msg->msg_iov, copied); else { //在数据拷贝过程中还会进行校验 err = skb_copy_and_csum_datagram_iovec(skb, sizeof(struct udphdr), msg->msg_iov); if (err == -EINVAL) goto csum_copy_err; } //数据拷贝失败,返回错误 if (err) goto out_free; //只有非PEEK读取才更新统计信息 if (!peeked) UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_INDATAGRAMS, is_udplite); //更新数据包接收的时间到sk->sk_stamp中 sock_recv_timestamp(msg, sk, skb); //拷贝数据包源地址信息,该地址会返回给应用程序 if (sin) { sin->sin_family = AF_INET; sin->sin_port = udp_hdr(skb)->source; sin->sin_addr.s_addr = ip_hdr(skb)->saddr; memset(sin->sin_zero, 0, sizeof(sin->sin_zero)); } //获取控制信息 if (inet->cmsg_flags) ip_cmsg_recv(msg, skb); //读取成功,返回值err表示的是已经读取到的字节数 err = copied; if (flags & MSG_TRUNC) err = ulen; out_free: //释放该SKB的数据 skb_free_datagram_locked(sk, skb); out: return err; csum_copy_err: lock_sock(sk); if (!skb_kill_datagram(sk, skb, flags)) UDP_INC_STATS_USER(sock_net(sk), UDP_MIB_INERRORS, is_udplite); release_sock(sk); if (noblock) return -EAGAIN; goto try_again; }

1.1 从接收队列 sk_receive_queue 中获取skb

/** * __skb_recv_datagram - Receive a datagram skbuff * @sk: socket * @flags: MSG_ flags * @peeked: returns non-zero if this packet has been seen before * @err: error code returned */ struct sk_buff *__skb_recv_datagram(struct sock *sk, unsigned flags, int *peeked, int *err) { struct sk_buff *skb; long timeo; //如果该socket遇到了错误,返回错误 int error = sock_error(sk); if (error) goto no_packet; //根据是否设置了非阻塞标记,获取超时时间。对于非阻塞模式,timeo为0 timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); do { /* Again only user level code calls this function, so nothing * interrupt level will suddenly eat the receive_queue. * * Look at current nfs client by the way... * However, this function was corrent in any case. 8) */ unsigned long cpu_flags; //关中断并且持有接收队列的锁 spin_lock_irqsave(&sk->sk_receive_queue.lock, cpu_flags); //获取接收队列中的第一个skb skb = skb_peek(&sk->sk_receive_queue); if (skb) { *peeked = skb->peeked; //如果设置了MSG_PEEK标记,那么设置skb的peek标记,并且增加对skb的引用计数, //该标记很重要,会影响是否释放该skb,见下文的总结 if (flags & MSG_PEEK) { skb->peeked = 1; atomic_inc(&skb->users); } else //非MSG_PEEK场景,将该skb从接收队列中移除 __skb_unlink(skb, &sk->sk_receive_queue); } //释放接收队列锁并开启中断 spin_unlock_irqrestore(&sk->sk_receive_queue.lock, cpu_flags); //找到了skb,直接返回 if (skb) return skb; //当前接收队列为空,如果超时时间为0,即非阻塞模式,那么直接返回EAGAIN错误 /* User doesn't want to wait */ error = -EAGAIN; if (!timeo) goto no_packet; //没有可读数据,需要阻塞等待数据可用,阻塞在了sk->sk_sleep等待队列上 } while (!wait_for_packet(sk, err, &timeo)); return NULL; no_packet: *err = error; return NULL; }

1.1.1 获取队列头不删除 skb_peek()

/** * skb_peek * @list_: list to peek at * * Peek an &sk_buff. Unlike most other operations you _MUST_ * be careful with this one. A peek leaves the buffer on the * list and someone else may run off with it. You must hold * the appropriate locks or have a private queue to do this. * * Returns %NULL for an empty list or a pointer to the head element. * The reference count is not incremented and the reference is therefore * volatile. Use with caution. */ //如注释所述,使用该函数需要小心,保证不会有并发问题。这里是在持有锁的情况下操作的 static inline struct sk_buff *skb_peek(struct sk_buff_head *list_) { //该函数会返回list中第一个skb的指针,但是并不会将该skb从队列中移除,这点很重要 struct sk_buff *list = ((struct sk_buff *)list_)->next; if (list == (struct sk_buff *)list_) list = NULL; return list; }

1.1.2 将 skb 从移除队列中 __skb_unlink()

/* * remove sk_buff from list. _Must_ be called atomically, and with * the list known.. */ //将skb从队列list中移除,典型的链表操作 static inline void __skb_unlink(struct sk_buff *skb, struct sk_buff_head *list) { struct sk_buff *next, *prev; list->qlen--; next = skb->next; prev = skb->prev; skb->next = skb->prev = NULL; next->prev = prev; prev->next = next; }

1.2 尝试释放skb内存 skb_free_datagram_locked()

该函数尝试释放SKB,但是要注意,是否真的会释放最终取决于SKB自己维护的引用计数。

/* * Read buffer destructor automatically called from kfree_skb. */ void sock_rfree(struct sk_buff *skb) { struct sock *sk = skb->sk; //该SKB将被释放,所以递减传输控制块占用的内存记账 atomic_sub(skb->truesize, &sk->sk_rmem_alloc); sk_mem_uncharge(skb->sk, skb->truesize); } void skb_free_datagram_locked(struct sock *sk, struct sk_buff *skb) { //因为如果真的触发释放SKB,那么会调用skb->destructor()回调,在接收过程的第一步,找到 //传输控制块后,使用skb_set_owner_r()将该skb的属主设置成了当前传输控制块,当时指定的 //回调函数是sock_rfree(),在该函数中会操作传输控制块的成员,所以这里需要提前锁定 lock_sock(sk); skb_free_datagram(sk, skb); release_sock(sk); } void skb_free_datagram(struct sock *sk, struct sk_buff *skb) { consume_skb(skb); sk_mem_reclaim_partial(sk); } /** * consume_skb - free an skbuff * @skb: buffer to free * * Drop a ref to the buffer and free it if the usage count has hit zero * Functions identically to kfree_skb, but kfree_skb assumes that the frame * is being dropped after a failure and notes that */ void consume_skb(struct sk_buff *skb) { if (unlikely(!skb)) return; //如果该skb的引用计数为1,那么需要真的释放 if (likely(atomic_read(&skb->users) == 1)) smp_rmb(); //如果skb引用计数大于1,那么仅仅是将其引用计数减1 else if (likely(!atomic_dec_and_test(&skb->users))) return; __kfree_skb(skb); }

到此,回忆一下前面设置了MSG_PEEK的处理,对于此种情况,在调用__skb_recv_datagram()时并不会真的将skb从接收队列中移除,只是返回其指针,并且增加了对该skb的引用计数,所以在接收完毕后调用skb_free_datagram_locked()的时候,该skb的引用计数至少为2,并不会真正的释放。

2 后备队列 sk_backlog 中的skb处理 release_sock()

在《linux内核协议栈 UDP之数据报接收过程Ⅰ》中有提到,在软中断接收过程中,如果当前传输控制块刚好被进程上下文锁定,那么只是将数据放入到后备队列中,我们并没有介绍该队列中的数据又是如何被应用接收的。实际上,在进程上下文中调用release_sock()的时候会处理该后备队列,代码如下:

void release_sock(struct sock *sk) { /* * The sk_lock has mutex_unlock() semantics: */ mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_); spin_lock_bh(&sk->sk_lock.slock); //重点看这里,如果后备队列不为空,调用__release_sock()进行处理 if (sk->sk_backlog.tail) __release_sock(sk); sk->sk_lock.owned = 0; if (waitqueue_active(&sk->sk_lock.wq)) wake_up(&sk->sk_lock.wq); spin_unlock_bh(&sk->sk_lock.slock); }

关于传输控制块的同步锁可以参考笔记《linux内核协议栈 套接口层之传输控制块同步锁socket_lock_t》

static void __release_sock(struct sock *sk) { //获取后备队列第一个元素 struct sk_buff *skb = sk->sk_backlog.head; do { //这里先将后备队列清空,然后打开硬中断,但是软中没有打开。 //由于对数据包的处理比较耗时,这种处理方式可以提高系统性能 sk->sk_backlog.head = sk->sk_backlog.tail = NULL; bh_unlock_sock(sk); //循环处理后备队列中数据包 do { struct sk_buff *next = skb->next; skb->next = NULL; //处理该数据包 sk_backlog_rcv(sk, skb); /* * We are in process context here with softirqs * disabled, use cond_resched_softirq() to preempt. * This is safe to do because we've taken the backlog * queue private: */ //重新调度一下下半部 cond_resched_softirq(); skb = next; } while (skb != NULL); //再次持锁,因为要判断传输控制块的后备队列是否为空。因为前面重新调度过软中断, //所以下面的外层循环可以保证能够处理新到来的数据包 bh_lock_sock(sk); } while ((skb = sk->sk_backlog.head) != NULL); }

2.1 后备队列skb进入接收队列 sk_backlog_rcv()

static inline int sk_backlog_rcv(struct sock *sk, struct sk_buff *skb) { //对于UDP,该回调函数就是__udp_queue_rcv_skb(), //对于TCP,该回调函数则是 tcp_v4_do_rcv() //在软中断中就是使用该函数将数据包放入了接收队列 return sk->sk_backlog_rcv(sk, skb); }
最新回复(0)