深入分析KubeEdge的kubectl logs---cloudstream 和 edgestream 的实现

it2025-10-09  4

文章目录

前言背景一、总体架构二、edgestream 启动 创建websocket1. 读取本地的证书配置2. 连接cloud 端的 tunnel server 三. 监听该 Websocket四. cloud的tunnel 保存session五. tunnel 监听上一步接收的wss connection六. 创建stream server 时引用了tunnel 指针七. API Server 发起`/containerLogs`八. 从api server 的请求中提取 session并往session 中设置sessionid九. 找到session后往session里的wss 写一个查询日志的message,message里必须要带着sessionid,用于查询apiserver 的连接十. edgenode收到后转发给kebelet 的server十一. kubelt 的日志返回写会wss十二. cloud 那边的tunnel 收到message 后,写会apiserver总结

前言背景

kubeedge 这个边缘计算平台有一个特点:它的边缘节点网络和云端节点的网络不是在一个平面的,边缘的计算节点通过keadm join加入边缘集群的时候,并不要求自身的IP从云端可达,这是很复合真实场景的。

kubelet 上报的kubelet server 信息 它是kubelet 上报的信息,kubectl describe node raspberrypi 看到的重要信息如下:

addresses: - address: 192.168.4.87 type: InternalIP - address: raspberrypi type: Hostname daemonEndpoints: kubeletEndpoint: Port: 10350

在边缘计算节点加入集群后,cloudcore 生成的 node 中包含了该edge node 的 KubeletEndpoint 端口和它的IP 以及主机名(也就是kubectl get node 看到的node 名称)。

kubectl logs 的实现原理(how kubectl logs works) 发起一个日志查询请求,打开详细日志:

root@172:~# kubectl logs -f nginx-edge-6785d8586b-g7j6p -v=7 I1021 15:21:43.922181 17648 loader.go:375] Config loaded from file: /root/.kube/config I1021 15:21:43.924765 17648 round_trippers.go:420] GET https://172.171.1.220:6443/api/v1/namespaces/default/pods/nginx-edge-6785d8586b-g7j6p I1021 15:21:43.924774 17648 round_trippers.go:427] Request Headers: I1021 15:21:43.924777 17648 round_trippers.go:431] Accept: application/json, */* I1021 15:21:43.924781 17648 round_trippers.go:431] User-Agent: kubectl/v1.18.3 (linux/amd64) kubernetes/2e7996e I1021 15:21:43.929433 17648 round_trippers.go:446] Response Status: 200 OK in 4 milliseconds I1021 15:21:43.932459 17648 round_trippers.go:420] GET https://172.171.1.220:6443/api/v1/namespaces/default/pods/nginx-edge-6785d8586b-g7j6p/log?follow=true I1021 15:21:43.932467 17648 round_trippers.go:427] Request Headers: I1021 15:21:43.932471 17648 round_trippers.go:431] Accept: application/json, */* I1021 15:21:43.932475 17648 round_trippers.go:431] User-Agent: kubectl/v1.18.3 (linux/amd64) kubernetes/2e7996e I1021 15:21:43.980607 17648 round_trippers.go:446] Response Status: 200 OK in 48 milliseconds /docker-entrypoint.sh: /docker-entrypoint.d/ is not empty, will attempt to perform configuration /docker-entrypoint.sh: Looking for shell scripts in /docker-entrypoint.d/ /docker-entrypoint.sh: Launching /docker-entrypoint.d/10-listen-on-ipv6-by-default.sh 10-listen-on-ipv6-by-default.sh: Getting the checksum of /etc/nginx/conf.d/default.conf 10-listen-on-ipv6-by-default.sh: Enabled listen on IPv6 in /etc/nginx/conf.d/default.conf /docker-entrypoint.sh: Launching /docker-entrypoint.d/20-envsubst-on-templates.sh /docker-entrypoint.sh: Configuration complete; ready for start up

可以看到,kubectl 其实是发起了2个请求:

查询该pod,主要是确认pod是否存在,以及是否存在多个容器拼接出来/logs 并向真正的该pod 所在的node kubelet server 发起请求

我们从kubeapiserver 的相关代码就可以看出来,具体的解释在下边代码中做了编辑:

func LogLocation( ctx context.Context, getter ResourceGetter, connInfo client.ConnectionInfoGetter, name string, opts *api.PodLogOptions, ) (*url.URL, http.RoundTripper, error) { //1. 根据name 查询pod详细信息 pod, err := getPod(ctx, getter, name) if err != nil { return nil, nil, err } // 2. 判断是否提供了容器名称,如果指定了,就获取指定的容器,如果没指定,判断能不能给默认的容器,如果既没有指定,也无法给出默认,那么就err。 // Try to figure out a container // If a container was provided, it must be valid container := opts.Container container, err = validateContainer(container, pod) if err != nil { return nil, nil, err } // 3. 根据pod name 查询它所在的node name nodeName := types.NodeName(pod.Spec.NodeName) if len(nodeName) == 0 { // If pod has not been assigned a host, return an empty location return nil, nil, nil } // 4. 根据node name 查询到node info。 nodeInfo, err := connInfo.GetConnectionInfo(ctx, nodeName)

接下来我们看下connInfo.GetConnectionInfo(ctx, nodeName)

// GetConnectionInfo retrieves connection info from the status of a Node API object. func (k *NodeConnectionInfoGetter) GetConnectionInfo(ctx context.Context, nodeName types.NodeName) (*ConnectionInfo, error) { node, err := k.nodes.Get(ctx, string(nodeName), metav1.GetOptions{}) if err != nil { return nil, err } // Find a kubelet-reported address, using preferred address type host, err := nodeutil.GetPreferredNodeAddress(node, k.preferredAddressTypes) if err != nil { return nil, err } // Use the kubelet-reported port, if present port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) if port <= 0 { port = k.defaultPort } return &ConnectionInfo{ Scheme: k.scheme, Hostname: host, Port: strconv.Itoa(port), Transport: k.transport, InsecureSkipTLSVerifyTransport: k.insecureSkipTLSVerifyTransport, }, nil }

这里会从Node Status的address 中选一个地址做url的host,一般是InternalIP,显然是一个内网IP,当这个IP 从云端可达的时候,我们去执行kubectl logs 或者 kubectl exec 的时候会用这个IP 和 kubeletEndpoint(默认是10250) 拼接出目的kubelet server 的地址。

但是当Pod 运行在Edge Node 上时,这个IP是不可达的,那么kubeedge 是怎么做的呢?前戏很长,但是很有必要,让我们开启今天的正题。

一、总体架构

可以看到apiserver 通过 tunnelserver与的node 的ws通道发给node上的kubelet server,我们分了10步去解释整个过程,接下来一步步的从代码去深入分析。

二、edgestream 启动 创建websocket

1. 读取本地的证书配置

2. 连接cloud 端的 tunnel server

dial := websocket.Dialer{ TLSClientConfig: tlsConfig, HandshakeTimeout: time.Duration(config.Config.HandshakeTimeout) * time.Second, } header := http.Header{} // 在请求里带上自己的Ip与Hostname header.Add(stream.SessionKeyHostNameOveride, e.hostnameOveride) header.Add(stream.SessionKeyInternalIP, e.nodeIP) con, _, err := dial.Dial(url.String(), header)

三. 监听该 Websocket

_, r, err := s.Tunnel.NextReader() if err != nil { klog.Errorf("Read Message error %v", err) return err } mess, err := stream.ReadMessageFromTunnel(r) if err != nil { klog.Errorf("Get tunnel Message error %v", err) return err } if mess.MessageType < stream.MessageTypeData { go s.ServeConnection(mess) } s.WriteToLocalConnection(mess) }

这里启动go s.ServeConnection(mess)

四. cloud的tunnel 保存session

// 1. con 这个对象非常重要,下一步会用来封装在session里 con, err := s.upgrader.Upgrade(w, r.Request, nil) if err != nil { return } klog.Infof("get a new tunnel agent hostname %v, internalIP %v", hostNameOverride, interalIP) session := &Session{ tunnel: stream.NewDefaultTunnel(con), apiServerConn: make(map[uint64]APIServerConnection), apiConnlock: &sync.Mutex{}, sessionID: hostNameOverride, } // 2. 把session保存在tunnel的kv结构里 s.addSession(hostNameOverride, session) s.addSession(interalIP, session)

它会把hostname 和 internalIP 做key,session 做value。

五. tunnel 监听上一步接收的wss connection

// Serve read tunnel message ,and write to specific apiserver connection func (s *Session) Serve() { defer s.Close() for { // 1. 监听这个session connection t, r, err := s.tunnel.NextReader() if err != nil { klog.Errorf("get %v reader error %v", s, err) return } if t != websocket.TextMessage { klog.Errorf("Websocket message type must be %v type", websocket.TextMessage) return } // 2. 从接收到的消息里,组装message message, err := stream.ReadMessageFromTunnel(r) if err != nil { klog.Errorf("Read message from tunnel %v error %v", s.String(), err) return } } }

六. 创建stream server 时引用了tunnel 指针

func newStreamServer(t *TunnelServer) *StreamServer { return &StreamServer{ container: restful.NewContainer(), tunnel: t, } }

这样,我们就可以在stream server 的处理中,获取到根据request 的host获取到session,并且获取到该wss connection

七. API Server 发起/containerLogs

前言背景中,提到了API Server 会对host:10350 发起请求/containerLogs/{podNamespace}/{podID}/{containerName}

八. 从api server 的请求中提取 session并往session 中设置sessionid

// 1. 从request中分离Host,作为sessionkey sessionKey := strings.Split(r.Request.Host, ":")[0] // 2. 根据host 查询到session,查不到就凉了,报错 session, ok := s.tunnel.getSession(sessionKey) if !ok { err = fmt.Errorf("Can not find %v session ", sessionKey) return } // 3. 往session里保存一个sessionid,作为当前请求的唯一标示,后续这个id,又叫messageid,会发给edgecore,edgecore 返回消息给wss 时,也会根据这个id 找到apiserver 的connection,用fw 写回apiserver,理解这一步很重要。 logConnection, err := session.AddAPIServerConnection(s, &ContainerLogsConnection{ r: r, flush: fw, session: session, ctx: r.Request.Context(), edgePeerStop: make(chan struct{}), })

解释在code里。

九. 找到session后往session里的wss 写一个查询日志的message,message里必须要带着sessionid,用于查询apiserver 的连接

func (l *ContainerLogsConnection) Serve() error { defer func() { klog.Infof("%s end successful", l.String()) }() // first send connect message // 发送消息,给这个wss,这个消息里说明了,我要查日志,请求地址是127.0.0.1:10350 if _, err := l.SendConnection(); err != nil { klog.Errorf("%s send %s info error %v", l.String(), stream.MessageTypeLogsConnect, err) return err }

十. edgenode收到后转发给kebelet 的server

func (l *EdgedLogsConnection) Serve(tunnel SafeWriteTunneler) error { //connect edged client := http.Client{} req, err := http.NewRequest("GET", l.URL.String(), nil) if err != nil { klog.Errorf("create new logs request error %v", err) return err } // header里有该请求的地址url,127.0.0.1:10350 req.Header = l.Header resp, err := client.Do(req) if err != nil { klog.Errorf("request logs error %v", err) return err } defer resp.Body.Close() // 用scanner 查日志的返回 scan := bufio.NewScanner(resp.Body) stop := make(chan struct{})

十一. kubelt 的日志返回写会wss

for scan.Scan() { select { case <-stop: klog.Infof("receive stop single, so stop logs scan ...") return nil default: } // 10 = \n msg := NewMessage(l.MessID, MessageTypeData, append(scan.Bytes(), 10)) err := msg.WriteTo(tunnel) if err != nil { klog.Errorf("write tunnel message %v error", msg) return err } klog.Infof("%v write logs %v", l.String(), string(scan.Bytes())) }

这一步,写会tunnel

十二. cloud 那边的tunnel 收到message 后,写会apiserver

// Serve read tunnel message ,and write to specific apiserver connection func (s *Session) Serve() { defer s.Close() for { .... // 发给了APISERVER,这一步是根据 if err := s.ProxyTunnelMessageToApiserver(message); err != nil { klog.Errorf("Proxy tunnel message [%s] to kube-apiserver error %v", message.String(), err) continue } } } func (s *Session) ProxyTunnelMessageToApiserver(message *stream.Message) error { // 1. 根据message 的id,其实就是sessionid查出来apiserver connetcion kubeCon, ok := s.apiServerConn[message.ConnectID] if !ok { return fmt.Errorf("Can not find apiServer connection id %v in %v", message.ConnectID, s.String()) } switch message.MessageType { case stream.MessageTypeRemoveConnect: kubeCon.SetEdgePeerDone() case stream.MessageTypeData: for i := 0; i < len(message.Data); { // 2.把全部消息写会到apiserver connection里。其实就是前面的fw对象,这个对象封装了rest.response. n, err := kubeCon.WriteToAPIServer(message.Data[i:]) if err != nil { return err } i += n } default: } return nil }

代码中做了解释。

总结

至此我们对每一步全部做了解释,除了一个小尾巴,apiserver 为什么会把10350 的流量全部写入了stream server呢? 这里社区,只能做一个dnat了。

iptables -t nat -A OUTPUT -p tcp --dport 10350 -j DNAT --to $CLOUDCOREIPS:10003

这样,在apiserver 发出请求时,所有流量都会发给了10003端口,这个端口就是stream server 的监听端口。 这个过程还是比较清晰的,不必要求edge node 的cloud node 网络互相可达。

最新回复(0)