KubeEdge 边缘部分 Edged 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

Edged 是运行在边缘节点的代理(轻量化的 kubelet),用于管理容器化的应用程序。Edged 内部模块如图所示:

代码入口

Edged 的注册和启动过程代码在 edge/pkg/edged/edged.go 中。

Register 调用了 newEdged,newEdged 做了以下事情:

  1. 初始化 pod status 管理器
  2. 初始化 edged 的 livenessManager、readinessManager、startupManager
  3. 创建并启动作为 grpc 服务器运行的 docker shim
  4. 初始化运行时服务 runtimeService 和镜像服务 imageService
  5. 初始化容器生命周期管理器 clcm
  6. 初始化容器日志管理器 logManager
  7. 初始化通用容器运行时服务 containerRuntime
  8. 创建运行时缓存 runtimeCache
  9. 初始化 edged 的镜像存放地 Provider
  10. 初始化镜像垃圾回收管理器 imageGCManager
  11. 初始化容器垃圾回收器 containerGCManager
  12. 初始化 edged 的 server

Start 做了以下事情:

  1. 初始化 edged 的 volume plugin 管理器 volumePluginMgr
  2. 初始化 edged 节点的模块
  3. 新建配置管理器configMapManager
  4. 初始化并启动 volume 管理器 volumeManager
  5. 启动 edged 的探针管理器 probeManager
  6. 启动 pod 状态管理器 statusManager 和 pod 生命周期事件生成器 pleg
  7. 启动 pod 增加和删除消息队列
  8. 启动 pod 监听事件循环
  9. 启动 edged 的 http server
  10. 启动镜像和容器的垃圾回收服务
  11. 初始化和启动 edged 的插件服务
  12. 在 clcm 中启动 CPU 管理器
  13. 最后调用 syncPod,启动与 pod 进行事件同步的服务

edged 与容器运行时

edged 与容器运行时(container runtime)的调用关系可以总结为下图:

可以看出 edged 首先启动作为 grpc 服务器运行的 docker shim,然后 edged 通过调用 docker shim 的 grpc server,來实现与容器运行时(container runtime)的交互,最后 docker shim 的 grpc server 将 edged 具体操作传递给容器运行时。

edged 如何实现边缘自治

首先看 edged 启动时调用的 syncPod,它向 metamanager 发送一条请求(QueryOperation 类型消息),来请求数据库中现有的 pod 信息。然后开始循环接收消息,后面对消息的类型进行判断,类型有 pod、configmap、secret、以及 volume:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (e *edged) syncPod() {
time.Sleep(10 * time.Second)

//when starting, send msg to metamanager once to get existing pods
info := model.NewMessage("").BuildRouter(e.Name(), e.Group(), e.namespace+"/"+model.ResourceTypePod,
model.QueryOperation)
beehiveContext.Send(metamanager.MetaManagerModuleName, *info)
for {
......
result, err := beehiveContext.Receive(e.Name())
......
switch resType {
case model.ResourceTypePod:
if op == model.ResponseOperation && resID == "" && result.GetSource() == metamanager.MetaManagerModuleName {
err := e.handlePodListFromMetaManager(content)
if err != nil {
klog.Errorf("handle podList failed: %v", err)
continue
}
e.setInitPodReady(true)
} else if op == model.ResponseOperation && resID == "" && result.GetSource() == EdgeController {
err := e.handlePodListFromEdgeController(content)
if err != nil {
klog.Errorf("handle controllerPodList failed: %v", err)
continue
}
e.setInitPodReady(true)
} else {
err := e.handlePod(op, content)
if err != nil {
klog.Errorf("handle pod failed: %v", err)
continue
}
}
case model.ResourceTypeConfigmap:
......
case model.ResourceTypeSecret:
......
case constants.CSIResourceTypeVolume:
......
default:
......
}
}
}

这里重点关心pod,消息需要通过 result.GetSource() 字段判断来源,可能是 MetaManager 来的,也有可能是 EdgeController 来的。在断网环境下只有可能是 MetaManager 发送的。

handlePodListFromMetaManager 遍历收到的消息中的 pod 内容,调用 addPod 将 pod 全部加入 podAdditionQueue 队列,再调用 updatePodStatus 删除或更新 pod,将 pod status 更新到数据库中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (e *edged) handlePodListFromMetaManager(content []byte) (err error) {
var lists []string
err = json.Unmarshal([]byte(content), &lists)
if err != nil {
return err
}

for _, list := range lists {
var pod v1.Pod
err = json.Unmarshal([]byte(list), &pod)
if err != nil {
return err
}
if filterPodByNodeName(&pod, e.nodeName) {
e.addPod(&pod)
if err = e.updatePodStatus(&pod); err != nil {
klog.Errorf("handlePodListFromMetaManager: update pod %s status error", pod.Name)
return err
}
}
}

return nil
}

另外 edged 启动时会调用 podAddWorkerRun,它会在后台不断从 podAdditionQueue 中 get,后面就和 kubelet 一样开始创建容器。

More

关于 Edged 部分内部模块执行的流程图请见 Edged

KubeEdge 边缘部分 EventBus&ServiceBus 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

EventBus 是一个与 MQTT 服务器 (mosquitto) 交互的 MQTT 客户端,为其他组件提供订阅和发布功能;ServiceBus 是一个运行在边缘的 HTTP 客户端。

EventBus

edge/pkg/eventbus/eventbus.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (eb *eventbus) Start() {
if eventconfig.Config.MqttMode >= v1alpha1.MqttModeBoth {
hub := &mqttBus.Client{
MQTTUrl: eventconfig.Config.MqttServerExternal,
SubClientID: eventconfig.Config.MqttSubClientID,
PubClientID: eventconfig.Config.MqttPubClientID,
Username: eventconfig.Config.MqttUsername,
Password: eventconfig.Config.MqttPassword,
}
mqttBus.MQTTHub = hub
hub.InitSubClient()
hub.InitPubClient()
klog.Infof("Init Sub And Pub Client for external mqtt broker %v successfully", eventconfig.Config.MqttServerExternal)
}

if eventconfig.Config.MqttMode <= v1alpha1.MqttModeBoth {
// launch an internal mqtt server only
mqttServer = mqttBus.NewMqttServer(
int(eventconfig.Config.MqttSessionQueueSize),
eventconfig.Config.MqttServerInternal,
eventconfig.Config.MqttRetain,
int(eventconfig.Config.MqttQOS))
mqttServer.InitInternalTopics()
err := mqttServer.Run()
if err != nil {
klog.Errorf("Launch internal mqtt broker failed, %s", err.Error())
os.Exit(1)
}
klog.Infof("Launch internal mqtt broker %v successfully", eventconfig.Config.MqttServerInternal)
}

eb.pubCloudMsgToEdge()
}

MqttMode 分 MqttModeInternal、MqttModeBoth 和 MqttModeExternal 三种。当 eventconfig.Config.MqttMode >= v1alpha1.MqttModeBoth 将 MQTT 代理启动在 eventbus 之外,eventbus 作为独立启动的 MQTT 代理的客户端与其交互;当 eventconfig.Config.MqttMode <= v1alpha1.MqttModeBoth 时,在 eventbus 内启动一个 MQTT 代理,负责与终端设备交互。

InitSubClient

InitSubClient 设置参数启动 subscribe 连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (mq *Client) InitSubClient() {
timeStr := strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
right := len(timeStr)
if right > 10 {
right = 10
}
// if SubClientID is NOT set, we need to generate it by ourselves.
if mq.SubClientID == "" {
mq.SubClientID = fmt.Sprintf("hub-client-sub-%s", timeStr[0:right])
}
subOpts := util.HubClientInit(mq.MQTTUrl, mq.SubClientID, mq.Username, mq.Password)
subOpts.OnConnect = onSubConnect
subOpts.AutoReconnect = false
subOpts.OnConnectionLost = onSubConnectionLost
mq.SubCli = MQTT.NewClient(subOpts)
util.LoopConnect(mq.SubClientID, mq.SubCli)
klog.Info("finish hub-client sub")
}

onSubConnect 和 onSubConnectionLost 定义了当连接和失联时的处理逻辑。eventbus 订阅以下 topic:

1
2
3
4
5
6
7
8
9
// SubTopics which edge-client should be sub
SubTopics = []string{
"$hw/events/upload/#",
"$hw/events/device/+/state/update",
"$hw/events/device/+/twin/+",
"$hw/events/node/+/membership/get",
UploadTopic,
"+/user/#",
}

当获得这些 topic 消息时,通过 mqtt 的 subscribe 方法回调 OnSubMessageReceived。该函数判断 topic,”hw/events/device” 和 “hw/events/node” 开头发送给 DeviceTwin 模块,其他信息发送给 EdgeHub 模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// OnSubMessageReceived msg received callback
func OnSubMessageReceived(client MQTT.Client, msg MQTT.Message) {
klog.Infof("OnSubMessageReceived receive msg from topic: %s", msg.Topic())
// for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
// for other, send to hub
// for "SYS/dis/upload_records", no need to base64 topic
var target string
var message *beehiveModel.Message
if strings.HasPrefix(msg.Topic(), "$hw/events/device") || strings.HasPrefix(msg.Topic(), "$hw/events/node") {
target = modules.TwinGroup
resource := base64.URLEncoding.EncodeToString([]byte(msg.Topic()))
// routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
message = beehiveModel.NewMessage("").BuildRouter(modules.BusGroup, modules.UserGroup,
resource, messagepkg.OperationResponse).FillBody(string(msg.Payload()))
} else {
target = modules.HubGroup
message = beehiveModel.NewMessage("").BuildRouter(modules.BusGroup, modules.UserGroup,
msg.Topic(), beehiveModel.UploadOperation).FillBody(string(msg.Payload()))
}

klog.Info(fmt.Sprintf("Received msg from mqttserver, deliver to %s with resource %s", target, message.GetResource()))
beehiveContext.SendToGroup(target, *message)
}

InitPubClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// InitPubClient init pub client
func (mq *Client) InitPubClient() {
timeStr := strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
right := len(timeStr)
if right > 10 {
right = 10
}
// if PubClientID is NOT set, we need to generate it by ourselves.
if mq.PubClientID == "" {
mq.PubClientID = fmt.Sprintf("hub-client-pub-%s", timeStr[0:right])
}
pubOpts := util.HubClientInit(mq.MQTTUrl, mq.PubClientID, mq.Username, mq.Password)
pubOpts.OnConnectionLost = onPubConnectionLost
pubOpts.AutoReconnect = false
mq.PubCli = MQTT.NewClient(pubOpts)
util.LoopConnect(mq.PubClientID, mq.PubCli)
klog.Info("finish hub-client pub")
}

InitPubClient 创建了一个 MQTT client,然后调用 LoopConnect 每 5 秒钟连接一次 MQTT server,直到连接成功。如果失去连接,则通过 onPubConnectionLost 继续调用 InitPubClient。

pubCloudMsgToEdge

在启动/连接完 MQTT server 后,调用了 pubCloudMsgToEdge 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (eb *eventbus) pubCloudMsgToEdge() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("EventBus PubCloudMsg To Edge stop")
return
default:
}
accessInfo, err := beehiveContext.Receive(eb.Name())
if err != nil {
klog.Errorf("Fail to get a message from channel: %v", err)
continue
}
operation := accessInfo.GetOperation()
resource := accessInfo.GetResource()
switch operation {
case messagepkg.OperationSubscribe:
eb.subscribe(resource)
klog.Infof("Edge-hub-cli subscribe topic to %s", resource)
case messagepkg.OperationUnsubscribe:
eb.unsubscribe(resource)
klog.Infof("Edge-hub-cli unsubscribe topic to %s", resource)
case messagepkg.OperationMessage:
body, ok := accessInfo.GetContent().(map[string]interface{})
if !ok {
klog.Errorf("Message is not map type")
continue
}
message := body["message"].(map[string]interface{})
topic := message["topic"].(string)
payload, _ := json.Marshal(&message)
eb.publish(topic, payload)
case messagepkg.OperationPublish:
topic := resource
// cloud and edge will send different type of content, need to check
payload, ok := accessInfo.GetContent().([]byte)
if !ok {
content, ok := accessInfo.GetContent().(string)
if !ok {
klog.Errorf("Message is not []byte or string")
continue
}
payload = []byte(content)
}
eb.publish(topic, payload)
case messagepkg.OperationGetResult:
if resource != "auth_info" {
klog.Info("Skip none auth_info get_result message")
continue
}
topic := fmt.Sprintf("$hw/events/node/%s/authInfo/get/result", eventconfig.Config.NodeName)
payload, _ := json.Marshal(accessInfo.GetContent())
eb.publish(topic, payload)
default:
klog.Warningf("Action not found")
}
}
}

pubCloudMsgToEdge 执行以下操作:

  1. 从 beehive 获取消息
  2. 获取消息的 operation 和 resource
  3. 当动作为 subscribe 时从 MQTT 订阅 resource(topic) 消息;当动作为 unsubscribe 时从 MQTT 取消订阅 resource(topic) 消息
  4. 当动作为 message 时,将消息的 message 根据消息的 topic 发送给 MQTT broker,消息类型是一个 map
  5. 当动作为 publish 时,将消息发送给 MQTT broker,消息为一个字符串,topic 和 resource 一致
  6. 当动作为 getResult 时,resource 必须为 auth_info,然后发送消息到 “hw/events/node/eventconfig.Config.NodeName/authInfo/get/result” 这一个 topic

ServiceBus

edge/pkg/servicebus/servicebus.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (sb *servicebus) Start() {
// no need to call TopicInit now, we have fixed topic
htc.Timeout = time.Second * 10
uc.Client = htc
if !dao.IsTableEmpty() {
if atomic.CompareAndSwapInt32(&inited, 0, 1) {
go server(c)
}
}
//Get message from channel
for {
select {
case <-beehiveContext.Done():
klog.Warning("servicebus stop")
return
default:
}
msg, err := beehiveContext.Receive(modules.ServiceBusModuleName)
if err != nil {
klog.Warningf("servicebus receive msg error %v", err)
continue
}

// build new message with required field & send message to servicebus
klog.V(4).Info("servicebus receive msg")
go processMessage(&msg)
}
}

ServiceBus 接受来自 beehive 的消息,然后启动一个 processMessage 协程基于消息中带的参数,将消息通过 REST-API 发送到本地 127.0.0.1 上的目标 APP。相当于一个客户端,而 APP 是一个 http Rest-API server,所有的操作和设备状态都需要客户端调用接口来下发和获取。ServiceBus 执行过程图如下:

参考

  1. kubeedge edgecore - EventBus源码分析
  2. 【KubeEdge】 ServiceBus分析

KubeEdge 边缘部分 DeviceTwin 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

DeviceTwin 负责存储设备状态(传感器的值等)并将设备状态同步到云,它还为应用程序提供查询接口。它由四个子模块组成(membership 模块,communication 模块,device 模块和 device twin 模块)。

DeviceTwin 注册

DeviceTwin 注册也调用了 InitDBTable,在 SQLite 数据库中初始化了三张表 Device,DeviceAttr 与 DeviceTwin:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//Device the struct of device
type Device struct {
ID string `orm:"column(id); size(64); pk"`
Name string `orm:"column(name); null; type(text)"`
Description string `orm:"column(description); null; type(text)"`
State string `orm:"column(state); null; type(text)"`
LastOnline string `orm:"column(last_online); null; type(text)"`
}

//DeviceAttr the struct of device attributes
type DeviceAttr struct {
ID int64 `orm:"column(id);size(64);auto;pk"`
DeviceID string `orm:"column(deviceid); null; type(text)"`
Name string `orm:"column(name);null;type(text)"`
Description string `orm:"column(description);null;type(text)"`
Value string `orm:"column(value);null;type(text)"`
Optional bool `orm:"column(optional);null;type(integer)"`
AttrType string `orm:"column(attr_type);null;type(text)"`
Metadata string `orm:"column(metadata);null;type(text)"`
}

//DeviceTwin the struct of device twin
type DeviceTwin struct {
ID int64 `orm:"column(id);size(64);auto;pk"`
DeviceID string `orm:"column(deviceid); null; type(text)"`
Name string `orm:"column(name);null;type(text)"`
Description string `orm:"column(description);null;type(text)"`
Expected string `orm:"column(expected);null;type(text)"`
Actual string `orm:"column(actual);null;type(text)"`
ExpectedMeta string `orm:"column(expected_meta);null;type(text)"`
ActualMeta string `orm:"column(actual_meta);null;type(text)"`
ExpectedVersion string `orm:"column(expected_version);null;type(text)"`
ActualVersion string `orm:"column(actual_version);null;type(text)"`
Optional bool `orm:"column(optional);null;type(integer)"`
AttrType string `orm:"column(attr_type);null;type(text)"`
Metadata string `orm:"column(metadata);null;type(text)"`
}

模块入口

edge/pkg/devicetwin/devicetwin.go:

1
2
3
4
5
6
7
8
9
10
11
// Start run the module
func (dt *DeviceTwin) Start() {
dtContexts, _ := dtcontext.InitDTContext()
dt.DTContexts = dtContexts
err := SyncSqlite(dt.DTContexts)
if err != nil {
klog.Errorf("Start DeviceTwin Failed, Sync Sqlite error:%v", err)
return
}
dt.runDeviceTwin()
}

主要就是 SyncSqlite 和 runDeviceTwin

SyncSqlite

SyncSqlite 最终会执行 SyncDeviceFromSqlite:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func SyncDeviceFromSqlite(context *dtcontext.DTContext, deviceID string) error {
klog.Infof("Sync device detail info from DB of device %s", deviceID)
_, exist := context.GetDevice(deviceID)
if !exist {
var deviceMutex sync.Mutex
context.DeviceMutex.Store(deviceID, &deviceMutex)
}

defer context.Unlock(deviceID)
context.Lock(deviceID)

devices, err := dtclient.QueryDevice("id", deviceID)
if err != nil {
klog.Errorf("query device failed: %v", err)
return err
}
if len(*devices) <= 0 {
return errors.New("Not found device from db")
}
device := (*devices)[0]

deviceAttr, err := dtclient.QueryDeviceAttr("deviceid", deviceID)
if err != nil {
klog.Errorf("query device attr failed: %v", err)
return err
}
attributes := make([]dtclient.DeviceAttr, 0)
attributes = append(attributes, *deviceAttr...)

deviceTwin, err := dtclient.QueryDeviceTwin("deviceid", deviceID)
if err != nil {
klog.Errorf("query device twin failed: %v", err)
return err
}
twins := make([]dtclient.DeviceTwin, 0)
twins = append(twins, *deviceTwin...)

context.DeviceList.Store(deviceID, &dttype.Device{
ID: deviceID,
Name: device.Name,
Description: device.Description,
State: device.State,
LastOnline: device.LastOnline,
Attributes: dttype.DeviceAttrToMsgAttr(attributes),
Twin: dttype.DeviceTwinToMsgTwin(twins)})

return nil
}

这段函数主要执行了以下操作:

  1. 检查设备是否在上下文中(设备列表存储在上下文中),如果不在则添加一个 deviceMutex 至上下文中
  2. 从数据库中查询设备
  3. 从数据库中查询设备属性
  4. 从数据库中查询 Device Twin
  5. 将设备、设备属性和 Device Twin 数据合并为一个结构,并将其存储在上下文中

runDeviceTwin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (dt *DeviceTwin) runDeviceTwin() {
moduleNames := []string{dtcommon.MemModule, dtcommon.TwinModule, dtcommon.DeviceModule, dtcommon.CommModule}
for _, v := range moduleNames {
dt.RegisterDTModule(v)
go dt.DTModules[v].Start()
}
go func() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Stop DeviceTwin ModulesContext Receive loop")
return
default:
}
if msg, ok := beehiveContext.Receive("twin"); ok == nil {
klog.Info("DeviceTwin receive msg")
err := dt.distributeMsg(msg)
if err != nil {
klog.Warningf("distributeMsg failed: %v", err)
}
}
}
}()

for {
select {
case <-time.After((time.Duration)(60) * time.Second):
//range to check whether has bug
for dtmName := range dt.DTModules {
health, ok := dt.DTContexts.ModulesHealth.Load(dtmName)
if ok {
now := time.Now().Unix()
if now-health.(int64) > 60*2 {
klog.Infof("%s health %v is old, and begin restart", dtmName, health)
go dt.DTModules[dtmName].Start()
}
}
}
for _, v := range dt.HeartBeatToModule {
v <- "ping"
}
case <-beehiveContext.Done():
for _, v := range dt.HeartBeatToModule {
v <- "stop"
}
klog.Warning("Stop DeviceTwin ModulesHealth load loop")
return
}
}
}

runDeviceTwin 主要执行了以下操作:

  1. 启动 devicetwin 中四个的子模块,子模块代码在 edge/pkg/devicetwin/dtmanager 下
  2. 轮询接收消息,执行 distributeMsg。将收到的消息发送给 communication 模块,对消息进行分类,即消息是来自 EventBus、EdgeManager 还是 EdgeHub,并填充 ActionModuleMap,再将消息发送至对应的子模块
  3. 定期(默认60s)向子模块发送 “ping” 信息。每个子模块一旦收到 “ping” 信息,就会更新自己的时间戳。控制器检查每个模块的时间戳是否超过 2 分钟,如果超过则重新启动该子模块。

Membership 模块

Membership 模块的主要作用是为新设备添加提供资格,该模块将新设备与边缘节点绑定,并在边缘节点和边缘设备之间建立相应关系。它主要执行以下操作:

  1. 初始化 memActionCallBack,它的类型是 map[string]Callback,包含可执行的动作函数
  2. 接收消息
  3. 对于每条消息,都会调用相应动作函数
  4. 接收心跳信息,并向控制器发送心跳信号

以下是可由 Membership 模块执行的动作函数:

  • dealMembershipGet:从缓存中获取与特定边缘节点相关的设备信息
  • dealMembershipUpdated:更新节点的成员信息
  • dealMembershipDetail:提供了边缘节点的成员详细信息

Twin 模块

Twin 模块的主要作用是处理所有与 device twin 相关的操作。它可以执行诸如更新 device twin、获取 device twin 和同步 device twin 到云的操作。它执行的操作与 Membership 模块类似。

以下是可由 Twin 模块执行的动作函数:

  • dealTwinUpdate:更新一个特定设备的 device twin 信息
  • dealTwinGet:提供一个特定设备的 device twin 信息
  • dealTwinSync:将 device twin 信息同步到云端

Communication 模块

Communication 模块的主要作用是确保设备双胞胎和其他组件之间的通信功能。它主要执行以下操作:

  1. 初始化 memActionCallBack,它的类型是 map[string]Callback,包含可执行的动作函数
  2. 接收消息
  3. 对于每条消息,都会调用相应动作函数
  4. 确认消息中指定的动作是否完成,如果动作没有完成则重做该动作
  5. 接收心跳信息,并向控制器发送心跳信号

以下是可由 Communication 模块执行的动作函数:

  • dealSendToCloud:用于发送数据到 cloudhub。这个函数首先确保云边是连接的,然后将消息发送到 edgehub 模块,edgehub 将消息转发给云
  • dealSendToEdge:用于发送数据给边缘的其他模块。这个函数将收到的消息发送到 edgehub 模块,edgehub 将消息转发给其他模块
  • dealLifeCycle:检查是否连接到云并且 twin 的状态是否为断开,将状态改为连接并将节点的详细信息发送给 edgehub;如果未连接到云,就把 twin 的状态设置为断开
  • dealConfirm:检查消息的类型是否正确,然后从 ConfirmMap 中删除 msgID

Device 模块

Device 模块的主要作用是执行与设备有关的操作,如设备状态更新和设备属性更新。它执行的操作与 Membership 模块类似。

以下是可由 Device 模块执行的动作函数:

  • dealDeviceUpdated:处理的是当遇到设备属性更新时要执行的操作。更新设备属性,比如在数据库中增加属性、更新属性和删除属性
  • dealDeviceStateUpdate:处理的是当遇到设备状态更新时要执行的操作。更新设备的状态以及数据库中设备的最后在线时间

More

关于执行动作函数的流程以及 Device,DeviceAttr 与 DeviceTwin 这三张表中字段的描述请见 DeviceTwin

KubeEdge 边缘部分 MetaManager 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

MetaManager 是消息处理器,位于 Edged 和 Edgehub 之间,它负责向轻量级数据库 (SQLite) 持久化/检索元数据。

MetaManager 注册

和其他模块注册相比,metamanager 注册最大的不同就是它还调用了 initDBTable 在 SQLite 数据库中初始化了两张表 Meta 与 MetaV2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Meta metadata object
type Meta struct {
Key string `orm:"column(key); size(256); pk"`
Type string `orm:"column(type); size(32)"`
Value string `orm:"column(value); null; type(text)"`
}

// MetaV2 record k8s api object
type MetaV2 struct {
// Key is the primary key of a line record, format like k8s obj key in etcd:
// /Group/Version/Resources/Namespace/Name
//0/1 /2 /3 /4 /5
// /core/v1/pods/{namespaces}/{name} normal obj
// /core/v1/pods/{namespaces} List obj
// /extensions/v1beta1/ingresses/{namespaces}/{name} normal obj
// /storage.k8s.io/v1beta1/csidrivers/null/{name} cluster scope obj
Key string `orm:"column(key); size(256); pk"`
// GroupVersionResource are set buy gvr.String() like "/v1, Resource=endpoints"
GroupVersionResource string `orm:"column(groupversionresource); size(256);"`
// Namespace is the namespace of an api object, and set as metadata.namespace
Namespace string `orm:"column(namespace); size(256)"`
// Name is the name of api object, and set as metadata.name
Name string `orm:"column(name); size(256)"`
// ResourceVersion is the resource version of the obj, and set as metadata.resourceVersion
ResourceVersion uint64 `orm:"column(resourceversion); size(256)"`
// Value is the api object in json format
// TODO: change to []byte
Value string `orm:"column(value); null; type(text)"`
}

模块入口

edge/pkg/metamanager/metamanager.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (m *metaManager) Start() {
if metaserverconfig.Config.Enable {
imitator.StorageInit()
go metaserver.NewMetaServer().Start(beehiveContext.Done())
}
go func() {
period := getSyncInterval()
timer := time.NewTimer(period)
for {
select {
case <-beehiveContext.Done():
klog.Warning("MetaManager stop")
return
case <-timer.C:
timer.Reset(period)
msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)
beehiveContext.Send(MetaManagerModuleName, *msg)
}
}
}()

m.runMetaManager()
}

启动时,开启两个协程,一个用于定时(默认60s)给自己发送消息通知进行边到云的 podstatus 数据同步(KubeEdge 实现了边缘自治,需要将数据同步到云端,网络断开后如果网络恢复,就能立刻将边端的状态进行反馈);另一个 runMetaManager 用于 edgehub 与 edged 的消息,然后调用 m.process(msg) 进行处理。

process 函数获取消息的操作的类型,然后根据信息操作类型对信息进行相应处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *metaManager) process(message model.Message) {
operation := message.GetOperation()
switch operation {
case model.InsertOperation:
m.processInsert(message)
case model.UpdateOperation:
m.processUpdate(message)
case model.DeleteOperation:
m.processDelete(message)
case model.QueryOperation:
m.processQuery(message)
case model.ResponseOperation:
m.processResponse(message)
case messagepkg.OperationNodeConnection:
m.processNodeConnection(message)
case OperationMetaSync:
m.processSync()
case OperationFunctionAction:
m.processFunctionAction(message)
case OperationFunctionActionResult:
m.processFunctionActionResult(message)
case constants.CSIOperationTypeCreateVolume,
constants.CSIOperationTypeDeleteVolume,
constants.CSIOperationTypeControllerPublishVolume,
constants.CSIOperationTypeControllerUnpublishVolume:
m.processVolume(message)
default:
klog.Errorf("metamanager not supported operation: %v", operation)
}
}

具体的处理函数 processInsert、processUpdate 等的具体过程不再分析,大致都是对数据库进行操作,然后再通知 edgehub 或 edged。

KubeEdge 边缘部分 EdgeHub 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

EdgeHub 是一个 Web Socket 客户端,负责与边缘计算的云端交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。

模块入口

edge/pkg/edgehub/edgehub.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//Start sets context and starts the controller
func (eh *EdgeHub) Start() {
eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)
eh.certManager.Start()

HasTLSTunnelCerts <- true
close(HasTLSTunnelCerts)

go eh.ifRotationDone()

for {
select {
case <-beehiveContext.Done():
klog.Warning("EdgeHub stop")
return
default:
}
err := eh.initial()
if err != nil {
klog.Exitf("failed to init controller: %v", err)
return
}

waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2

err = eh.chClient.Init()
if err != nil {
klog.Errorf("connection failed: %v, will reconnect after %s", err, waitTime.String())
time.Sleep(waitTime)
continue
}
// execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()

// wait the stop signal
// stop authinfo manager/websocket connection
<-eh.reconnectChan
eh.chClient.UnInit()

// execute hook fun after disconnect
eh.pubConnectInfo(false)

// sleep one period of heartbeat, then try to connect cloud hub again
klog.Warningf("connection is broken, will reconnect after %s", waitTime.String())
time.Sleep(waitTime)

// clean channel
clean:
for {
select {
case <-eh.reconnectChan:
default:
break clean
}
}
}
}

edgehub 启动主要有以下几步:

  1. 设置证书,从 cloudcore 申请证书(若正确配置本地证书,则直接使用本地证书),然后进入循环
  2. 调用 eh.initial() 创建 eh.chClient,接着调用 eh.chClient.Init(),初始化过程建立了 websocket/quic 的连接
  3. 调用 eh.pubConnectInfo(true),向 edgecore 各模块广播已经连接成功的消息
  4. go eh.routeToEdge(),执行 eh.chClient.Receive() 接收消息,将从云上部分收到的消息转发给指定边缘部分的模块 (MetaManager/DeviceTwin/EventBus/ServiceBus)
  5. go eh.routeToCloud(),执行 beehiveContext.Receive(modules.EdgeHubModuleName) 接收来自边缘 (MetaManager/DeviceTwin/EventBus/ServiceBus) 的信息,并执行 eh.sendToCloud(message) 发到 cloudhub
  6. go eh.keepalive(),向 cloudhub 发送心跳信息

另外,当云边消息传送过程中出现错误时,边缘部分会重新 init 相应的 websocket/quic client,与云端重新建立连接。

KubeEdge 云上部分 EdgeController 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

EdgeController 是一个扩展的 k8s 控制器,管理边缘节点和 Pods 的元数据,确保数据能够传递到指定的边缘节点。

模块入口

cloud/pkg/edgecontroller/edgecontroller.go:

1
2
3
4
5
6
7
8
9
10
// Start controller
func (ec *EdgeController) Start() {
if err := ec.upstream.Start(); err != nil {
klog.Exitf("start upstream failed with error: %s", err)
}

if err := ec.downstream.Start(); err != nil {
klog.Exitf("start downstream failed with error: %s", err)
}
}

Start 分别启动 upstream 和 downstream,upstream 和 downstream 之间没有依赖关系。注册 EdgeController 时,upstream 和 downstream 就通过 NewUpstreamController 和 NewDownstreamController 初始化好了。

upstream

在 NewUpstreamController 中初始化了所有成员 channel,upstream.Start() 主要就是调用 uc.dispatchMessage() 分发收到的消息,以及执行其他函数用于处理成员 channel 里面的数据。

dispatchMessage 函数不断轮询,调用 uc.messageLayer.Receive() 接受消息,根据收到的消息 resourceType 选择将数据送到对应的 channel 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
switch resourceType {
case model.ResourceTypeNodeStatus:
uc.nodeStatusChan <- msg
case model.ResourceTypePodStatus:
uc.podStatusChan <- msg
case model.ResourceTypeConfigmap:
uc.configMapChan <- msg
case model.ResourceTypeSecret:
uc.secretChan <- msg
case model.ResourceTypeServiceAccountToken:
uc.serviceAccountTokenChan <- msg
case common.ResourceTypePersistentVolume:
uc.persistentVolumeChan <- msg
case common.ResourceTypePersistentVolumeClaim:
uc.persistentVolumeClaimChan <- msg
case common.ResourceTypeVolumeAttachment:
uc.volumeAttachmentChan <- msg
case model.ResourceTypeNode:
switch msg.GetOperation() {
case model.QueryOperation:
uc.queryNodeChan <- msg
case model.UpdateOperation:
uc.updateNodeChan <- msg
default:
klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), msg.GetOperation())
}
case model.ResourceTypePod:
if msg.GetOperation() == model.DeleteOperation {
uc.podDeleteChan <- msg
} else {
klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), msg.GetOperation())
}
case model.ResourceTypeRuleStatus:
uc.ruleStatusChan <- msg

default:
klog.Errorf("message: %s, resource type: %s unsupported", msg.GetID(), resourceType)
}

每种 channel 中的消息都由不同的函数来处理,这里以 updateNodeStatus 函数为例,它接收 nodeStatusChan 中的消息,依次 GetContentData,GetNamespace,GetResourceName,GetOperation,根据消息的 Operation 做出相应的操作, 一般是上传到 apiserver:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
func (uc *UpstreamController) updateNodeStatus() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("stop updateNodeStatus")
return
case msg := <-uc.nodeStatusChan:
klog.V(5).Infof("message: %s, operation is: %s, and resource is %s", msg.GetID(), msg.GetOperation(), msg.GetResource())

data, err := msg.GetContentData()
if err != nil {
klog.Warningf("message: %s process failure, get content data failed with error: %s", msg.GetID(), err)
continue
}

namespace, err := messagelayer.GetNamespace(msg)
if err != nil {
klog.Warningf("message: %s process failure, get namespace failed with error: %s", msg.GetID(), err)
continue
}
name, err := messagelayer.GetResourceName(msg)
if err != nil {
klog.Warningf("message: %s process failure, get resource name failed with error: %s", msg.GetID(), err)
continue
}

switch msg.GetOperation() {
case model.InsertOperation:
_, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})
if err == nil {
klog.Infof("node: %s already exists, do nothing", name)
uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)
continue
}

if !errors.IsNotFound(err) {
errLog := fmt.Sprintf("get node %s info error: %v , register node failed", name, err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

node := &v1.Node{}
err = json.Unmarshal(data, node)
if err != nil {
errLog := fmt.Sprintf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

if _, err = uc.createNode(name, node); err != nil {
errLog := fmt.Sprintf("create node %s error: %v , register node failed", name, err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)

case model.UpdateOperation:
nodeStatusRequest := &edgeapi.NodeStatusRequest{}
err := json.Unmarshal(data, nodeStatusRequest)
if err != nil {
klog.Warningf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
continue
}

getNode, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})
if errors.IsNotFound(err) {
klog.Warningf("message: %s process failure, node %s not found", msg.GetID(), name)
continue
}

if err != nil {
klog.Warningf("message: %s process failure with error: %s, namespaces: %s name: %s", msg.GetID(), err, namespace, name)
continue
}

// TODO: comment below for test failure. Needs to decide whether to keep post troubleshoot
// In case the status stored at metadata service is outdated, update the heartbeat automatically
for i := range nodeStatusRequest.Status.Conditions {
if time.Since(nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime.Time) > time.Duration(uc.config.NodeUpdateFrequency)*time.Second {
nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime = metaV1.NewTime(time.Now())
}
}

if getNode.Annotations == nil {
getNode.Annotations = make(map[string]string)
}
for name, v := range nodeStatusRequest.ExtendResources {
if name == constants.NvidiaGPUScalarResourceName {
var gpuStatus []types.NvidiaGPUStatus
for _, er := range v {
gpuStatus = append(gpuStatus, types.NvidiaGPUStatus{ID: er.Name, Healthy: true})
}
if len(gpuStatus) > 0 {
data, _ := json.Marshal(gpuStatus)
getNode.Annotations[constants.NvidiaGPUStatusAnnotationKey] = string(data)
}
}
data, err := json.Marshal(v)
if err != nil {
klog.Warningf("message: %s process failure, extend resource list marshal with error: %s", msg.GetID(), err)
continue
}
getNode.Annotations[string(name)] = string(data)
}

// Keep the same "VolumesAttached" attribute with upstream,
// since this value is maintained by kube-controller-manager.
nodeStatusRequest.Status.VolumesAttached = getNode.Status.VolumesAttached
if getNode.Status.DaemonEndpoints.KubeletEndpoint.Port != 0 {
nodeStatusRequest.Status.DaemonEndpoints.KubeletEndpoint.Port = getNode.Status.DaemonEndpoints.KubeletEndpoint.Port
}

getNode.Status = nodeStatusRequest.Status

node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metaV1.UpdateOptions{})
if err != nil {
klog.Warningf("message: %s process failure, update node failed with error: %s, namespace: %s, name: %s", msg.GetID(), err, getNode.Namespace, getNode.Name)
continue
}

nodeID, err := messagelayer.GetNodeID(msg)
if err != nil {
klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err)
continue
}

resource, err := messagelayer.BuildResource(nodeID, namespace, model.ResourceTypeNode, name)
if err != nil {
klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err)
continue
}

resMsg := model.NewMessage(msg.GetID()).
SetResourceVersion(node.ResourceVersion).
FillBody(common.MessageSuccessfulContent).
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)
if err = uc.messageLayer.Response(*resMsg); err != nil {
klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err)
continue
}

klog.V(4).Infof("message: %s, update node status successfully, namespace: %s, name: %s", msg.GetID(), getNode.Namespace, getNode.Name)

default:
klog.Warningf("message: %s process failure, node status operation: %s unsupported", msg.GetID(), msg.GetOperation())
continue
}
klog.V(4).Infof("message: %s process successfully", msg.GetID())
}
}
}

downstream

downstream 的 Start 函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (dc *DownstreamController) Start() error {
klog.Info("start downstream controller")
// pod
go dc.syncPod()

// configmap
go dc.syncConfigMap()

// secret
go dc.syncSecret()

// nodes
go dc.syncEdgeNodes()

// rule
go dc.syncRule()

// ruleendpoint
go dc.syncRuleEndpoint()

return nil
}

pod 是最小的,管理,创建,计划的最小单元,包含一个或多个容器;configmap 用于保存配置数据的键值对,可以用来保存单个属性,也可以用来保存配置文件,作用是可以将配置文件与镜像文件分离;secret 与 configmap 类似,但是用来存储敏感信息;node 是 pod 真正运行的主机,可以物理机,也可以是虚拟机;ruleEndpoint 定义了消息的来源,或消息的去向。它包含 3 种类型:rest(云上的一个端点,可以是源端点,用于向边缘发送请求;或者是目标端点,从边缘接收消息)、eventbus(可以是源端点,用于向云发送请求;或者是目标端点,从云接收消息)、servicebus(目标端点,接收云端的消息);rule 定义了消息如何传输,它包含 3 种类型:rest->eventbus(用户应用调用云上的 rest api 发送消息,最后消息被发送到边缘的 mqttbroker),
eventbus->rest(用户程序向边缘的 mqttbroker 发布消息,最后消息被发送到云上的 rest api),
rest->servicebus(用户程序调用云上的 rest api 发送消息,最后消息被发送到边缘的应用程序)。

syncPod 获取 podManager 中收到的 events,根据 e.Type 分发不同的路由,最后执行dc.messageLayer.Send(*msg) 把数据发送到边缘。

syncConfigMap 获取 configmapManager 中收到的 events,根据 e.Type 设置不同的 operation,最后执行dc.messageLayer.Send(*msg) 把数据发送到边缘。

syncSecret、syncEdgeNodes、syncRule 和 syncRuleEndpoint 函数的流程也类似。

总结

参考

  1. Kubeedge源码阅读系列–cloudcore.edgecontroller模块

KubeEdge 云上部分 DeviceController 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

DeviceController 是一个扩展的 k8s 控制器,管理边缘设备,确保设备信息、设备状态的云边同步。

模块入口

cloud/pkg/devicecontroller/devicecontroller.go:

1
2
3
4
5
6
7
8
9
10
11
12
// Start controller
func (dc *DeviceController) Start() {
if err := dc.downstream.Start(); err != nil {
klog.Exitf("start downstream failed with error: %s", err)
}
// wait for downstream controller to start and load deviceModels and devices
// TODO think about sync
time.Sleep(1 * time.Second)
if err := dc.upstream.Start(); err != nil {
klog.Exitf("start upstream failed with error: %s", err)
}
}

Start 分别启动 downstream 和 upstream,同时 upstream 依赖于 downstream。注册 DeviceController 时,downstream 和 upstream 就通过 NewDownstreamController 和 NewUpstreamController 初始化好了。

downstream

downstream 一般描述云端向边缘端下发数据。

NewDownstreamController 创建了 kubeClient,deviceManager,deviceModelManager,messageLayer,configMapManager 赋值给了 dc 并返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// NewDownstreamController create a DownstreamController from config
func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) {
deviceManager, err := manager.NewDeviceManager(crdInformerFactory.Devices().V1alpha2().Devices().Informer())
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}

deviceModelManager, err := manager.NewDeviceModelManager(crdInformerFactory.Devices().V1alpha2().DeviceModels().Informer())
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}

dc := &DownstreamController{
kubeClient: client.GetKubeClient(),
deviceManager: deviceManager,
deviceModelManager: deviceModelManager,
messageLayer: messagelayer.NewContextMessageLayer(),
configMapManager: manager.NewConfigMapManager(),
}
return dc, nil
}

downstream 的 Start 方法执行了 dc.syncDeviceModel(),dc.syncDevice() 两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Start DownstreamController
func (dc *DownstreamController) Start() error {
klog.Info("Start downstream devicecontroller")

go dc.syncDeviceModel()

// Wait for adding all device model
// TODO need to think about sync
time.Sleep(1 * time.Second)
go dc.syncDevice()

return nil
}

syncDeviceModel 调用了 dc.deviceModelManager.Events(),
获取 deviceModelManager 的 events,events 类型为 chan watch.Event,可以理解为 deviceModel 相关的事件到来后会传到通道 events 中。即 syncDeviceModel 从 deviceModelManager 中获取 event 并进行分析。

之后 syncDeviceModel 根据 e.Type 的类型执行不同的操作:

1
2
3
4
5
6
7
8
9
10
switch e.Type {
case watch.Added:
dc.deviceModelAdded(deviceModel)
case watch.Deleted:
dc.deviceModelDeleted(deviceModel)
case watch.Modified:
dc.deviceModelUpdated(deviceModel)
default:
klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}
  1. dc.deviceModelAdded(deviceModel) 将 deviceModel 存如表 dc.deviceModelManager.DeviceModel 中;
  2. dc.deviceModelDeleted(deviceModel) 将 deviceModel 从表 dc.deviceModelManager.DeviceModel 删除;
  3. dc.deviceModelUpdated(deviceModel) 更新表 dc.deviceModelManager.DeviceModel 中的 deviceModel,如果 deviceModel Name 不存在,则直接添加 deviceModel。

syncDevice 与 syncDeviceModel 类似,都是先通过 Events() 获取 events,然后根据 events 的类型执行相应的处理。deviceManager 与 deviceModelManager 也几乎一样。不过收到事件后的处理比 syncDeviceModel 略微复杂,需要发送消息。以 deviceAdded 为例,deviceAdded 首先把 device 存到 dc.deviceManager.Device 中,然后执行 dc.addToConfigMap(device) 和 createDevice(device),接着执行 messagelayer.BuildResource,msg.BuildRouter 等函数来构建 msg,最后通过 dc.messageLayer.Send(*msg) 将 device 数据发送出去。

Device Model

Device Model 描述了设备属性,如“温度”或”压力”。Device Model 相当于是模板,使用它可以创建和管理许多设备。spec 中的 properties 字段定义设备通用支持的属性,例如数据类型、是否只读、默认值、最大值和最小值;另外还有 propertyVisitors 字段,它定义每种属性字段的访问方式,例如数据是否需要经过某种运算处理,数据格式转换。以下是一个 Device Model 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
apiVersion: devices.kubeedge.io/v1alpha2
kind: DeviceModel
metadata:
name: sensor-tag-model
namespace: default
spec:
properties:
- name: temperature
description: temperature in degree celsius
type:
int:
accessMode: ReadWrite
maximum: 100
unit: degree celsius
- name: temperature-enable
description: enable data collection of temperature sensor
type:
string:
accessMode: ReadWrite
defaultValue: 'OFF'

Device

Device 代表一个实际的设备对象,可以看作是 Device Model 的实例化。spec 字段是静态的,status 字段中是动态变化的数据,如设备期望的状态和设备报告的状态。以下是一个 Device 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
apiVersion: devices.kubeedge.io/v1alpha2
kind: Device
metadata:
name: sensor-tag-instance-01
labels:
description: TISimplelinkSensorTag
manufacturer: TexasInstruments
model: CC2650
spec:
deviceModelRef:
name: sensor-tag-model
protocol:
modbus:
slaveID: 1
common:
com:
serialPort: '1'
baudRate: 115200
dataBits: 8
parity: even
stopBits: 1
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: ''
operator: In
values:
- node1
propertyVisitors:
- propertyName: temperature
modbus:
register: CoilRegister
offset: 2
limit: 1
scale: 1
isSwap: true
isRegisterSwap: true
- propertyName: temperature-enable
modbus:
register: DiscreteInputRegister
offset: 3
limit: 1
scale: 1.0
isSwap: true
isRegisterSwap: true
status:
twins:
- propertyName: temperature
reported:
metadata:
timestamp: '1550049403598'
type: int
value: '10'
desired:
metadata:
timestamp: '1550049403598'
type: int
value: '15'

upstream

upstream 一般描述边缘端向云端上传数据。

NewUpstreamController 通过 keclient.GetCRDClient() 创建了 crdClient,另外创建了 messageLayer,除此之外,UpstreamController 还包含了一个 downstream:

1
2
3
4
5
6
7
8
9
// NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
uc := &UpstreamController{
crdClient: keclient.GetCRDClient(),
messageLayer: messagelayer.NewContextMessageLayer(),
dc: dc,
}
return uc, nil
}

upstream 的 Start 方法执行了 uc.dispatchMessage(),uc.updateDeviceStatus() 两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
// Start UpstreamController
func (uc *UpstreamController) Start() error {
klog.Info("Start upstream devicecontroller")

uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
go uc.dispatchMessage()

for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++ {
go uc.updateDeviceStatus()
}
return nil
}

dispatchMessage 函数主要通过 uc.messageLayer.Receive() 收取数据,放入 uc.deviceStatusChan。

updateDeviceStatus 函数循环执行,收取 uc.deviceStatusChan 的 msg,将 msg 反序列化获得 msgTwin,获取 deviceID,device,cacheDevice,deviceStatus,然后将 deviceStatus 上传,最后向 Edge 返回确认 msg。

总结

参考

  1. Kubeedge源码阅读系列–cloudcore.devicecontroller模块

KubeEdge 云上部分 CloudHub 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

CloudHub 是一个 Web Socket 服务端,负责监听云端的变化,缓存并发送消息到 EdgeHub。

模块入口

cloud/pkg/cloudhub/cloudhub.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (a *cloudHub) Start() {
if !cache.WaitForCacheSync(beehiveContext.Done(), a.informersSyncedFuncs...) {
klog.Errorf("unable to sync caches for objectSyncController")
os.Exit(1)
}

// start dispatch message from the cloud to edge node
go a.messageq.DispatchMessage()

// check whether the certificates exist in the local directory,
// and then check whether certificates exist in the secret, generate if they don't exist
if err := httpserver.PrepareAllCerts(); err != nil {
klog.Exit(err)
}
// TODO: Will improve in the future
DoneTLSTunnelCerts <- true
close(DoneTLSTunnelCerts)

// generate Token
if err := httpserver.GenerateToken(); err != nil {
klog.Exit(err)
}

// HttpServer mainly used to issue certificates for the edge
go httpserver.StartHTTPServer()

servers.StartCloudHub(a.messageq)

if hubconfig.Config.UnixSocket.Enable {
// The uds server is only used to communicate with csi driver from kubeedge on cloud.
// It is not used to communicate between cloud and edge.
go udsserver.StartServer(hubconfig.Config.UnixSocket.Address)
}
}

cloudhub 启动主要有以下 3 步:

  1. 调用 DispatchMessage,开始从云端向边缘节点派送消息
  2. 启动 HttpServer,主要用于为边端发放证书
  3. 调用 StartCloudHub

接下来对 DispatchMessage 和 StartCloudHub 进行具体分析。

DispatchMessage

DispatchMessage 从云中获取消息,提取节点 ID,获取与节点相关的消息,将其放入消息队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (q *ChannelMessageQueue) DispatchMessage() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Cloudhub channel eventqueue dispatch message loop stopped")
return
default:
}
msg, err := beehiveContext.Receive(model.SrcCloudHub)
klog.V(4).Infof("[cloudhub] dispatchMessage to edge: %+v", msg)
if err != nil {
klog.Info("receive not Message format message")
continue
}
nodeID, err := GetNodeID(&msg)
if nodeID == "" || err != nil {
klog.Warning("node id is not found in the message")
continue
}
if isListResource(&msg) {
q.addListMessageToQueue(nodeID, &msg)
} else {
q.addMessageToQueue(nodeID, &msg)
}
}
}

StartCloudHub

StartCloudHub 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
func StartCloudHub(messageq *channelq.ChannelMessageQueue) {
handler.InitHandler(messageq)
// start websocket server
if hubconfig.Config.WebSocket.Enable {
go startWebsocketServer()
}
// start quic server
if hubconfig.Config.Quic.Enable {
go startQuicServer()
}
}

如果设置了 WebSocket 启动,就启动 WebSocket 服务器协程;如果设置了 Quic 启动,就启动 Quic 服务器协程。

WebSocket 是性能最好的,默认使用 WebSocket。Quic 作为备选项,在网络频繁断开等很不稳定场景下有优势。KubeEdge 云边消息传递是通过 cloudhub 跟 edgehub 间的 Websocket 或 Quic 协议的长连接传输的。

KubeEdge 概述

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

KubeEdge 架构图

KubeEdge 总体有两大部分 —— cloudcore 和 edgecore。cloudcore 部分是 k8s api server 与 Edge 部分的桥梁,负责将指令下发到 Edge,同时将 Edge 的状态和事件同步到的 k8s api server;edgecore 部分接受并执行 Cloud 部分下发的指令,管理各种负载,并将 Edge 部分负载的状态和事件同步到 Cloud 部分。

云上部分

CloudHub 是一个 Web Socket 服务端,负责监听云端的变化,缓存并发送消息到 EdgeHub。

DeviceController 是一个扩展的 k8s 控制器,管理边缘设备,确保设备信息、设备状态的云边同步。

EdgeController 是一个扩展的 k8s 控制器,管理边缘节点和 Pods 的元数据,确保数据能够传递到指定的边缘节点。

边缘部分

EdgeHub 是一个 Web Socket 客户端,负责与边缘计算的云端交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。

Edged 是运行在边缘节点的代理(轻量化的 kubelet),用于管理容器化的应用程序。

EventBus 是一个与 MQTT 服务器 (mosquitto) 交互的 MQTT 客户端,为其他组件提供订阅和发布功能。

ServiceBus 是一个运行在边缘的 HTTP 客户端。

DeviceTwin 负责存储设备状态(传感器的值等)并将设备状态同步到云 (DeviceController),它还为应用程序提供查询接口。

MetaManager 是消息处理器,位于 Edged 和 Edgehub 之间,它负责向轻量级数据库 (SQLite) 持久化/检索元数据。

关键代码

cloudcore 代码入口为 Cloud/cmd/cloudcore/cloudcore.go,在 main 函数中调用 NewCloudCoreCommand,通过 registerModules 函数注册 cloudcore 中的功能模块,通过 StartModules 函数启动已注册的 cloudcore 上的功能模块。registerModules 函数如下:

1
2
3
4
5
6
7
8
9
func registerModules(c *v1alpha1.CloudCoreConfig) {
cloudhub.Register(c.Modules.CloudHub)
edgecontroller.Register(c.Modules.EdgeController)
devicecontroller.Register(c.Modules.DeviceController)
synccontroller.Register(c.Modules.SyncController)
cloudstream.Register(c.Modules.CloudStream, c.CommonConfig)
router.Register(c.Modules.Router)
dynamiccontroller.Register(c.Modules.DynamicController)
}

这 7 个模块都实现了 Module 接口,注册最终会将模块封装后的结构体放入一个 map[string]*ModuleInfo 类型的全局变量 modules 中。之后 StartModules 函数通过 for 循环从 modules 获取每一个的模块,每个模块分配一个协程调用 Start 函数启动。

edgecore 代码入口为 edge/cmd/edgecore/edgecore.go,在 main 函数中调用 NewEdgeCoreCommand。和在 cloudcore 类似,在 NewEdgeCoreCommand 函数中,通过 registerModules 函数注册 edgecore 中的功能模块,通过 Run 函数启动已注册的 edgecore 中的功能模块。edgecore 中 registerModules 函数注册的模块如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// registerModules register all the modules started in edgecore
func registerModules(c *v1alpha1.EdgeCoreConfig) {
devicetwin.Register(c.Modules.DeviceTwin, c.Modules.Edged.HostnameOverride)
edged.Register(c.Modules.Edged)
edgehub.Register(c.Modules.EdgeHub, c.Modules.Edged.HostnameOverride)
eventbus.Register(c.Modules.EventBus, c.Modules.Edged.HostnameOverride)
metamanager.Register(c.Modules.MetaManager)
servicebus.Register(c.Modules.ServiceBus)
edgestream.Register(c.Modules.EdgeStream, c.Modules.Edged.HostnameOverride, c.Modules.Edged.NodeIP)
test.Register(c.Modules.DBTest)
// Note: Need to put it to the end, and wait for all models to register before executing
dbm.InitDBConfig(c.DataBase.DriverName, c.DataBase.AliasName, c.DataBase.DataSource)
}

Why KubeEdge

为什么用 KubeEdge 而不是 k8s 构建边缘计算平台?

k8s 构建边缘计算平台的主要挑战:①资源有限。边缘设备可能只有几百兆的内存,一个原生 kubelet 都跑不起来。②网络受限。k8s 的 master 和 node 通信是通过 List/Watch 机制,边缘场景下网络可能会断开很长时间,这时候 node 上的 kubelet 一直 re-watch 失败,就会请求 re-list,把 apiserver 上的对象全量拿回去,没法在边缘场景这种受限的网络下很好的工作。③k8s 节点没有自治能力。如何在网络质量不稳定的情况下,对边缘节点实现离线自治,这也是个问题。

KubeEdge 主打三个核心理念,首先是云边协同,边是云的延伸,用户的边可能位于私有网络,因此需要穿透私有网络,通过云来管理私有节点,KubeEdge 默认采用 WebSocket + 消息封装来实现,这样只要边缘网络能访问外网情况下,就能实现双向通信,这就不需要边端需要一个公网的 IP。同时呢,KubeEdge 也优化了原生 Kubernetes 中不必要的一些请求,能够大幅减少通信压力,高时延状态下仍可以工作。

KubeEdge 第二个核心理念是边缘节点自治,做到节点级的元数据的持久化,比如 Pod,ConfigMap 等基础元数据,每个节点都持久化这些元数据,边缘节点离线之后,它仍可以通过本地持久化的元数据来管理应用。在 Kubernetes 中,当 kubelet 重启后, 它首先要向 master 做一次 List 获取全量的数据,然后再进行应用管理工作,如果这时候边和云端的网络断开,就无法获得全量的元数据,也不能进行故障恢复。KubeEdge 做了元数据的持久化后,可以直接从本地获得这些元数据,保证故障恢复的能力,保证服务快速 ready。

另外一个理念是极致轻量,在大多数边缘计算场景下,节点的资源是非常有限的,KubeEdge 采用的方式是重组 kubelet 组件(~10mb 内存占用),优化 runtime 资源消耗。在空载时候,内存占用率很低。

参考

  1. kubeedge源码分析系列之整体架构