KubeEdge 云上部分 CloudHub 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

CloudHub 是一个 Web Socket 服务端,负责监听云端的变化,缓存并发送消息到 EdgeHub。

模块入口

cloud/pkg/cloudhub/cloudhub.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (a *cloudHub) Start() {
if !cache.WaitForCacheSync(beehiveContext.Done(), a.informersSyncedFuncs...) {
klog.Errorf("unable to sync caches for objectSyncController")
os.Exit(1)
}

// start dispatch message from the cloud to edge node
go a.messageq.DispatchMessage()

// check whether the certificates exist in the local directory,
// and then check whether certificates exist in the secret, generate if they don't exist
if err := httpserver.PrepareAllCerts(); err != nil {
klog.Exit(err)
}
// TODO: Will improve in the future
DoneTLSTunnelCerts <- true
close(DoneTLSTunnelCerts)

// generate Token
if err := httpserver.GenerateToken(); err != nil {
klog.Exit(err)
}

// HttpServer mainly used to issue certificates for the edge
go httpserver.StartHTTPServer()

servers.StartCloudHub(a.messageq)

if hubconfig.Config.UnixSocket.Enable {
// The uds server is only used to communicate with csi driver from kubeedge on cloud.
// It is not used to communicate between cloud and edge.
go udsserver.StartServer(hubconfig.Config.UnixSocket.Address)
}
}

cloudhub 启动主要有以下 3 步:

  1. 调用 DispatchMessage,开始从云端向边缘节点派送消息
  2. 启动 HttpServer,主要用于为边端发放证书
  3. 调用 StartCloudHub

接下来对 DispatchMessage 和 StartCloudHub 进行具体分析。

DispatchMessage

DispatchMessage 从云中获取消息,提取节点 ID,获取与节点相关的消息,将其放入消息队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (q *ChannelMessageQueue) DispatchMessage() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Cloudhub channel eventqueue dispatch message loop stopped")
return
default:
}
msg, err := beehiveContext.Receive(model.SrcCloudHub)
klog.V(4).Infof("[cloudhub] dispatchMessage to edge: %+v", msg)
if err != nil {
klog.Info("receive not Message format message")
continue
}
nodeID, err := GetNodeID(&msg)
if nodeID == "" || err != nil {
klog.Warning("node id is not found in the message")
continue
}
if isListResource(&msg) {
q.addListMessageToQueue(nodeID, &msg)
} else {
q.addMessageToQueue(nodeID, &msg)
}
}
}

StartCloudHub

StartCloudHub 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
func StartCloudHub(messageq *channelq.ChannelMessageQueue) {
handler.InitHandler(messageq)
// start websocket server
if hubconfig.Config.WebSocket.Enable {
go startWebsocketServer()
}
// start quic server
if hubconfig.Config.Quic.Enable {
go startQuicServer()
}
}

如果设置了 WebSocket 启动,就启动 WebSocket 服务器协程;如果设置了 Quic 启动,就启动 Quic 服务器协程。

WebSocket 是性能最好的,默认使用 WebSocket。Quic 作为备选项,在网络频繁断开等很不稳定场景下有优势。KubeEdge 云边消息传递是通过 cloudhub 跟 edgehub 间的 Websocket 或 Quic 协议的长连接传输的。

KubeEdge 概述

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

KubeEdge 架构图

KubeEdge 总体有两大部分 —— cloudcore 和 edgecore。cloudcore 部分是 k8s api server 与 Edge 部分的桥梁,负责将指令下发到 Edge,同时将 Edge 的状态和事件同步到的 k8s api server;edgecore 部分接受并执行 Cloud 部分下发的指令,管理各种负载,并将 Edge 部分负载的状态和事件同步到 Cloud 部分。

云上部分

CloudHub 是一个 Web Socket 服务端,负责监听云端的变化,缓存并发送消息到 EdgeHub。

DeviceController 是一个扩展的 k8s 控制器,管理边缘设备,确保设备信息、设备状态的云边同步。

EdgeController 是一个扩展的 k8s 控制器,管理边缘节点和 Pods 的元数据,确保数据能够传递到指定的边缘节点。

边缘部分

EdgeHub 是一个 Web Socket 客户端,负责与边缘计算的云端交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。

Edged 是运行在边缘节点的代理(轻量化的 kubelet),用于管理容器化的应用程序。

EventBus 是一个与 MQTT 服务器 (mosquitto) 交互的 MQTT 客户端,为其他组件提供订阅和发布功能。

ServiceBus 是一个运行在边缘的 HTTP 客户端。

DeviceTwin 负责存储设备状态(传感器的值等)并将设备状态同步到云 (DeviceController),它还为应用程序提供查询接口。

MetaManager 是消息处理器,位于 Edged 和 Edgehub 之间,它负责向轻量级数据库 (SQLite) 持久化/检索元数据。

关键代码

cloudcore 代码入口为 Cloud/cmd/cloudcore/cloudcore.go,在 main 函数中调用 NewCloudCoreCommand,通过 registerModules 函数注册 cloudcore 中的功能模块,通过 StartModules 函数启动已注册的 cloudcore 上的功能模块。registerModules 函数如下:

1
2
3
4
5
6
7
8
9
func registerModules(c *v1alpha1.CloudCoreConfig) {
cloudhub.Register(c.Modules.CloudHub)
edgecontroller.Register(c.Modules.EdgeController)
devicecontroller.Register(c.Modules.DeviceController)
synccontroller.Register(c.Modules.SyncController)
cloudstream.Register(c.Modules.CloudStream, c.CommonConfig)
router.Register(c.Modules.Router)
dynamiccontroller.Register(c.Modules.DynamicController)
}

这 7 个模块都实现了 Module 接口,注册最终会将模块封装后的结构体放入一个 map[string]*ModuleInfo 类型的全局变量 modules 中。之后 StartModules 函数通过 for 循环从 modules 获取每一个的模块,每个模块分配一个协程调用 Start 函数启动。

edgecore 代码入口为 edge/cmd/edgecore/edgecore.go,在 main 函数中调用 NewEdgeCoreCommand。和在 cloudcore 类似,在 NewEdgeCoreCommand 函数中,通过 registerModules 函数注册 edgecore 中的功能模块,通过 Run 函数启动已注册的 edgecore 中的功能模块。edgecore 中 registerModules 函数注册的模块如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// registerModules register all the modules started in edgecore
func registerModules(c *v1alpha1.EdgeCoreConfig) {
devicetwin.Register(c.Modules.DeviceTwin, c.Modules.Edged.HostnameOverride)
edged.Register(c.Modules.Edged)
edgehub.Register(c.Modules.EdgeHub, c.Modules.Edged.HostnameOverride)
eventbus.Register(c.Modules.EventBus, c.Modules.Edged.HostnameOverride)
metamanager.Register(c.Modules.MetaManager)
servicebus.Register(c.Modules.ServiceBus)
edgestream.Register(c.Modules.EdgeStream, c.Modules.Edged.HostnameOverride, c.Modules.Edged.NodeIP)
test.Register(c.Modules.DBTest)
// Note: Need to put it to the end, and wait for all models to register before executing
dbm.InitDBConfig(c.DataBase.DriverName, c.DataBase.AliasName, c.DataBase.DataSource)
}

Why KubeEdge

为什么用 KubeEdge 而不是 k8s 构建边缘计算平台?

k8s 构建边缘计算平台的主要挑战:①资源有限。边缘设备可能只有几百兆的内存,一个原生 kubelet 都跑不起来。②网络受限。k8s 的 master 和 node 通信是通过 List/Watch 机制,边缘场景下网络可能会断开很长时间,这时候 node 上的 kubelet 一直 re-watch 失败,就会请求 re-list,把 apiserver 上的对象全量拿回去,没法在边缘场景这种受限的网络下很好的工作。③k8s 节点没有自治能力。如何在网络质量不稳定的情况下,对边缘节点实现离线自治,这也是个问题。

KubeEdge 主打三个核心理念,首先是云边协同,边是云的延伸,用户的边可能位于私有网络,因此需要穿透私有网络,通过云来管理私有节点,KubeEdge 默认采用 WebSocket + 消息封装来实现,这样只要边缘网络能访问外网情况下,就能实现双向通信,这就不需要边端需要一个公网的 IP。同时呢,KubeEdge 也优化了原生 Kubernetes 中不必要的一些请求,能够大幅减少通信压力,高时延状态下仍可以工作。

KubeEdge 第二个核心理念是边缘节点自治,做到节点级的元数据的持久化,比如 Pod,ConfigMap 等基础元数据,每个节点都持久化这些元数据,边缘节点离线之后,它仍可以通过本地持久化的元数据来管理应用。在 Kubernetes 中,当 kubelet 重启后, 它首先要向 master 做一次 List 获取全量的数据,然后再进行应用管理工作,如果这时候边和云端的网络断开,就无法获得全量的元数据,也不能进行故障恢复。KubeEdge 做了元数据的持久化后,可以直接从本地获得这些元数据,保证故障恢复的能力,保证服务快速 ready。

另外一个理念是极致轻量,在大多数边缘计算场景下,节点的资源是非常有限的,KubeEdge 采用的方式是重组 kubelet 组件(~10mb 内存占用),优化 runtime 资源消耗。在空载时候,内存占用率很低。

参考

  1. kubeedge源码分析系列之整体架构

etcd 读写概述

etcd 总体是基于 Raft 实现的,本文对 etcd 读写流程关键点进行简单记录。

etcd 读流程

一个读请求从 client 通过 Round-robin 负载均衡算法,选择一个 etcd server 节点,发出 gRPC 请求,经过 etcd server 的 KVServer 模块进入核心的读流程,进行串行读或线性读(默认),通过与 MVCC 的 treeIndex 和 boltdb 模块紧密协作,完成读请求。

  • 串行读(非强一致性读):直接读状态机返回数据,无需通过 Raft 协议与集群进行交互,具有低延时、高吞吐量的特点,适合对数据一致性要求不高的场景。
  • 线性读(强一致性读):需要经过 Raft 协议模块,反应集群共识的最新数据,因此在延时和吞吐量上相比串行读略差一点,适用于对数据一致性要求高的场景。

ReadIndex

线性读的 ReadIndex 可以使 follower 的读请求不必转发给 leader。它的实现原理是:

  1. 当 Follower 节点 收到一个线性读请求时,它首先会从 leader 获取集群最新的已提交的日志索引 (committed index);
  2. leader 收到 ReadIndex 请求时,为防止脑裂等异常场景,会向 follower 节点发送心跳确认,一半以上节点确认 leader 身份后才能将已提交的索引 (committed index) 返回给请求的 follower 节点;
  3. follower 节点则会等待,直到状态机已应用索引 (applied index) 大于等于 leader 的已提交索引时 (committed Index)才会去通知读请求,数据已赶上 leader,然后去状态机中访问数据。

ReadIndex 是非常轻量的,不会导致 leader 负载变高,ReadIndex 机制使得每个 follower 节点都可以处理读请求,进而提升了系统的整体写性能。

MVCC

它由内存树形索引模块 (treeIndex) 和嵌入式的 KV 持久化存储库 boltdb 组成。treeIndex 模块是基于 Google 开源的内存版 btree 库实现的;boltdb 是个基于 B+ tree 实现的 key-value 键值库,支持事务,提供 Get/Put 等简易 API 给 etcd 操作。

首先,有个全局递增的版本号(put hello a 时,hello 对应的版本号若为 1,下个请求 put world b 时,world 对应的版本号则为 2),每次修改操作,都会生成一个新的版本号。treeIndex 模块保存用户的 key 和相关版本号,以版本号为 key,value 为用户 key-value 数据存储在 boltdb 里面。另外,并不是所有请求都一定要从 boltdb 获取数据,etcd 出于数据一致性、性能等考虑,在访问 boltdb 前,首先会从一个内存读事务 buffer 中,二分查找你要访问 key 是否在 buffer 里面,若命中则直接返回。具体如下图:

etcd 写流程

一个写请求从 client 通过负载均衡算法选择一个 etcd 节点,发出 gRPC 调用,etcd 节点收到请求后会经过 gRPC 拦截、Quota 校验,接着进入 KVServer 模块,KVServer 模块将请求发送给本模块中的 raft,这里负责与 etcd raft 模块进行通信,发起一个提案,内容为 put foo bar,即使用 put 方法将 foo 更新为 bar,提案经过转发之后,半数节点成功持久化,MVCC 模块更新状态机,完成写请求。

与读流程不一样的是写流程涉及 Quota、WAL、Apply 三个模块。

  • Quota 模块配额检查 db 的大小,如果超过会报 etcdserver: mvcc: database space exceeded 的告警,通过 Raft 日志同步给集群中的节点 db 空间不足,整个集群将不可写入,对外提供只读的功能。
  • 只有 leader 才能处理写请求。leader 收到提案后,会将提案封装成日志条目广播给集群各个节点,同时需要把内容持久化到一个 WAL 日志文件中。日志条目包含 leader 任期号、条目索引、日志类型、提案内容。WAL 持久化内容包含日志条目内容、WAL 记录类型、校验码、WAL 记录的长度。
  • Apply 模块用于执行提案,首先会判断该提案是否被执行过,如果已经执行,则直接返回结束;未执行过的情况下,将会进入 MVCC 模块执行持久化提案内容的操作。

MVCC

MVCC 在执行 put 请求时,会基于当前版本号自增生成新的版本号,然后从 treeIndex 模块中查询 key 的创建版本号、修改次数信息。这些信息将填充到 boltdb 的 value 中,同时将用户的 key 和版本号信息存储到 treeIndex。在读流程中介绍过,boltdb 的 value 的值就是①key 名称;②key 创建时的版本号(create_revision)、最后一次修改时的版本号(mod_revision)、key 自身修改的次数(version);③value 值;④租约信息,将这些信息的结构体序列化成的二进制数据。另外,为了提高吞吐量,此时数据并未提交,而是存在 boltdb 所管理的内存数据结构中,由 backend 异步 goroutine 定时将批量事务提交,和 MySQL 类似,写时优先写入 Buffer。具体如下图:

参考

  1. etcd Source Code
  2. Learning | etcd
  3. etcd教程(七)—读请求执行流程分析
  4. etcd教程(八)—写请求执行流程分析