KubeEdge 边缘部分 EdgeHub 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

EdgeHub 是一个 Web Socket 客户端,负责与边缘计算的云端交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。

模块入口

edge/pkg/edgehub/edgehub.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
//Start sets context and starts the controller
func (eh *EdgeHub) Start() {
eh.certManager = certificate.NewCertManager(config.Config.EdgeHub, config.Config.NodeName)
eh.certManager.Start()

HasTLSTunnelCerts <- true
close(HasTLSTunnelCerts)

go eh.ifRotationDone()

for {
select {
case <-beehiveContext.Done():
klog.Warning("EdgeHub stop")
return
default:
}
err := eh.initial()
if err != nil {
klog.Exitf("failed to init controller: %v", err)
return
}

waitTime := time.Duration(config.Config.Heartbeat) * time.Second * 2

err = eh.chClient.Init()
if err != nil {
klog.Errorf("connection failed: %v, will reconnect after %s", err, waitTime.String())
time.Sleep(waitTime)
continue
}
// execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()

// wait the stop signal
// stop authinfo manager/websocket connection
<-eh.reconnectChan
eh.chClient.UnInit()

// execute hook fun after disconnect
eh.pubConnectInfo(false)

// sleep one period of heartbeat, then try to connect cloud hub again
klog.Warningf("connection is broken, will reconnect after %s", waitTime.String())
time.Sleep(waitTime)

// clean channel
clean:
for {
select {
case <-eh.reconnectChan:
default:
break clean
}
}
}
}

edgehub 启动主要有以下几步:

  1. 设置证书,从 cloudcore 申请证书(若正确配置本地证书,则直接使用本地证书),然后进入循环
  2. 调用 eh.initial() 创建 eh.chClient,接着调用 eh.chClient.Init(),初始化过程建立了 websocket/quic 的连接
  3. 调用 eh.pubConnectInfo(true),向 edgecore 各模块广播已经连接成功的消息
  4. go eh.routeToEdge(),执行 eh.chClient.Receive() 接收消息,将从云上部分收到的消息转发给指定边缘部分的模块 (MetaManager/DeviceTwin/EventBus/ServiceBus)
  5. go eh.routeToCloud(),执行 beehiveContext.Receive(modules.EdgeHubModuleName) 接收来自边缘 (MetaManager/DeviceTwin/EventBus/ServiceBus) 的信息,并执行 eh.sendToCloud(message) 发到 cloudhub
  6. go eh.keepalive(),向 cloudhub 发送心跳信息

另外,当云边消息传送过程中出现错误时,边缘部分会重新 init 相应的 websocket/quic client,与云端重新建立连接。

KubeEdge 云上部分 EdgeController 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

EdgeController 是一个扩展的 k8s 控制器,管理边缘节点和 Pods 的元数据,确保数据能够传递到指定的边缘节点。

模块入口

cloud/pkg/edgecontroller/edgecontroller.go:

1
2
3
4
5
6
7
8
9
10
// Start controller
func (ec *EdgeController) Start() {
if err := ec.upstream.Start(); err != nil {
klog.Exitf("start upstream failed with error: %s", err)
}

if err := ec.downstream.Start(); err != nil {
klog.Exitf("start downstream failed with error: %s", err)
}
}

Start 分别启动 upstream 和 downstream,upstream 和 downstream 之间没有依赖关系。注册 EdgeController 时,upstream 和 downstream 就通过 NewUpstreamController 和 NewDownstreamController 初始化好了。

upstream

在 NewUpstreamController 中初始化了所有成员 channel,upstream.Start() 主要就是调用 uc.dispatchMessage() 分发收到的消息,以及执行其他函数用于处理成员 channel 里面的数据。

dispatchMessage 函数不断轮询,调用 uc.messageLayer.Receive() 接受消息,根据收到的消息 resourceType 选择将数据送到对应的 channel 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
switch resourceType {
case model.ResourceTypeNodeStatus:
uc.nodeStatusChan <- msg
case model.ResourceTypePodStatus:
uc.podStatusChan <- msg
case model.ResourceTypeConfigmap:
uc.configMapChan <- msg
case model.ResourceTypeSecret:
uc.secretChan <- msg
case model.ResourceTypeServiceAccountToken:
uc.serviceAccountTokenChan <- msg
case common.ResourceTypePersistentVolume:
uc.persistentVolumeChan <- msg
case common.ResourceTypePersistentVolumeClaim:
uc.persistentVolumeClaimChan <- msg
case common.ResourceTypeVolumeAttachment:
uc.volumeAttachmentChan <- msg
case model.ResourceTypeNode:
switch msg.GetOperation() {
case model.QueryOperation:
uc.queryNodeChan <- msg
case model.UpdateOperation:
uc.updateNodeChan <- msg
default:
klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), msg.GetOperation())
}
case model.ResourceTypePod:
if msg.GetOperation() == model.DeleteOperation {
uc.podDeleteChan <- msg
} else {
klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), msg.GetOperation())
}
case model.ResourceTypeRuleStatus:
uc.ruleStatusChan <- msg

default:
klog.Errorf("message: %s, resource type: %s unsupported", msg.GetID(), resourceType)
}

每种 channel 中的消息都由不同的函数来处理,这里以 updateNodeStatus 函数为例,它接收 nodeStatusChan 中的消息,依次 GetContentData,GetNamespace,GetResourceName,GetOperation,根据消息的 Operation 做出相应的操作, 一般是上传到 apiserver:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
func (uc *UpstreamController) updateNodeStatus() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("stop updateNodeStatus")
return
case msg := <-uc.nodeStatusChan:
klog.V(5).Infof("message: %s, operation is: %s, and resource is %s", msg.GetID(), msg.GetOperation(), msg.GetResource())

data, err := msg.GetContentData()
if err != nil {
klog.Warningf("message: %s process failure, get content data failed with error: %s", msg.GetID(), err)
continue
}

namespace, err := messagelayer.GetNamespace(msg)
if err != nil {
klog.Warningf("message: %s process failure, get namespace failed with error: %s", msg.GetID(), err)
continue
}
name, err := messagelayer.GetResourceName(msg)
if err != nil {
klog.Warningf("message: %s process failure, get resource name failed with error: %s", msg.GetID(), err)
continue
}

switch msg.GetOperation() {
case model.InsertOperation:
_, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})
if err == nil {
klog.Infof("node: %s already exists, do nothing", name)
uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)
continue
}

if !errors.IsNotFound(err) {
errLog := fmt.Sprintf("get node %s info error: %v , register node failed", name, err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

node := &v1.Node{}
err = json.Unmarshal(data, node)
if err != nil {
errLog := fmt.Sprintf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

if _, err = uc.createNode(name, node); err != nil {
errLog := fmt.Sprintf("create node %s error: %v , register node failed", name, err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)

case model.UpdateOperation:
nodeStatusRequest := &edgeapi.NodeStatusRequest{}
err := json.Unmarshal(data, nodeStatusRequest)
if err != nil {
klog.Warningf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
continue
}

getNode, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})
if errors.IsNotFound(err) {
klog.Warningf("message: %s process failure, node %s not found", msg.GetID(), name)
continue
}

if err != nil {
klog.Warningf("message: %s process failure with error: %s, namespaces: %s name: %s", msg.GetID(), err, namespace, name)
continue
}

// TODO: comment below for test failure. Needs to decide whether to keep post troubleshoot
// In case the status stored at metadata service is outdated, update the heartbeat automatically
for i := range nodeStatusRequest.Status.Conditions {
if time.Since(nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime.Time) > time.Duration(uc.config.NodeUpdateFrequency)*time.Second {
nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime = metaV1.NewTime(time.Now())
}
}

if getNode.Annotations == nil {
getNode.Annotations = make(map[string]string)
}
for name, v := range nodeStatusRequest.ExtendResources {
if name == constants.NvidiaGPUScalarResourceName {
var gpuStatus []types.NvidiaGPUStatus
for _, er := range v {
gpuStatus = append(gpuStatus, types.NvidiaGPUStatus{ID: er.Name, Healthy: true})
}
if len(gpuStatus) > 0 {
data, _ := json.Marshal(gpuStatus)
getNode.Annotations[constants.NvidiaGPUStatusAnnotationKey] = string(data)
}
}
data, err := json.Marshal(v)
if err != nil {
klog.Warningf("message: %s process failure, extend resource list marshal with error: %s", msg.GetID(), err)
continue
}
getNode.Annotations[string(name)] = string(data)
}

// Keep the same "VolumesAttached" attribute with upstream,
// since this value is maintained by kube-controller-manager.
nodeStatusRequest.Status.VolumesAttached = getNode.Status.VolumesAttached
if getNode.Status.DaemonEndpoints.KubeletEndpoint.Port != 0 {
nodeStatusRequest.Status.DaemonEndpoints.KubeletEndpoint.Port = getNode.Status.DaemonEndpoints.KubeletEndpoint.Port
}

getNode.Status = nodeStatusRequest.Status

node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metaV1.UpdateOptions{})
if err != nil {
klog.Warningf("message: %s process failure, update node failed with error: %s, namespace: %s, name: %s", msg.GetID(), err, getNode.Namespace, getNode.Name)
continue
}

nodeID, err := messagelayer.GetNodeID(msg)
if err != nil {
klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err)
continue
}

resource, err := messagelayer.BuildResource(nodeID, namespace, model.ResourceTypeNode, name)
if err != nil {
klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err)
continue
}

resMsg := model.NewMessage(msg.GetID()).
SetResourceVersion(node.ResourceVersion).
FillBody(common.MessageSuccessfulContent).
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)
if err = uc.messageLayer.Response(*resMsg); err != nil {
klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err)
continue
}

klog.V(4).Infof("message: %s, update node status successfully, namespace: %s, name: %s", msg.GetID(), getNode.Namespace, getNode.Name)

default:
klog.Warningf("message: %s process failure, node status operation: %s unsupported", msg.GetID(), msg.GetOperation())
continue
}
klog.V(4).Infof("message: %s process successfully", msg.GetID())
}
}
}

downstream

downstream 的 Start 函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (dc *DownstreamController) Start() error {
klog.Info("start downstream controller")
// pod
go dc.syncPod()

// configmap
go dc.syncConfigMap()

// secret
go dc.syncSecret()

// nodes
go dc.syncEdgeNodes()

// rule
go dc.syncRule()

// ruleendpoint
go dc.syncRuleEndpoint()

return nil
}

pod 是最小的,管理,创建,计划的最小单元,包含一个或多个容器;configmap 用于保存配置数据的键值对,可以用来保存单个属性,也可以用来保存配置文件,作用是可以将配置文件与镜像文件分离;secret 与 configmap 类似,但是用来存储敏感信息;node 是 pod 真正运行的主机,可以物理机,也可以是虚拟机;ruleEndpoint 定义了消息的来源,或消息的去向。它包含 3 种类型:rest(云上的一个端点,可以是源端点,用于向边缘发送请求;或者是目标端点,从边缘接收消息)、eventbus(可以是源端点,用于向云发送请求;或者是目标端点,从云接收消息)、servicebus(目标端点,接收云端的消息);rule 定义了消息如何传输,它包含 3 种类型:rest->eventbus(用户应用调用云上的 rest api 发送消息,最后消息被发送到边缘的 mqttbroker),
eventbus->rest(用户程序向边缘的 mqttbroker 发布消息,最后消息被发送到云上的 rest api),
rest->servicebus(用户程序调用云上的 rest api 发送消息,最后消息被发送到边缘的应用程序)。

syncPod 获取 podManager 中收到的 events,根据 e.Type 分发不同的路由,最后执行dc.messageLayer.Send(*msg) 把数据发送到边缘。

syncConfigMap 获取 configmapManager 中收到的 events,根据 e.Type 设置不同的 operation,最后执行dc.messageLayer.Send(*msg) 把数据发送到边缘。

syncSecret、syncEdgeNodes、syncRule 和 syncRuleEndpoint 函数的流程也类似。

总结

参考

  1. Kubeedge源码阅读系列–cloudcore.edgecontroller模块

KubeEdge 云上部分 DeviceController 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

DeviceController 是一个扩展的 k8s 控制器,管理边缘设备,确保设备信息、设备状态的云边同步。

模块入口

cloud/pkg/devicecontroller/devicecontroller.go:

1
2
3
4
5
6
7
8
9
10
11
12
// Start controller
func (dc *DeviceController) Start() {
if err := dc.downstream.Start(); err != nil {
klog.Exitf("start downstream failed with error: %s", err)
}
// wait for downstream controller to start and load deviceModels and devices
// TODO think about sync
time.Sleep(1 * time.Second)
if err := dc.upstream.Start(); err != nil {
klog.Exitf("start upstream failed with error: %s", err)
}
}

Start 分别启动 downstream 和 upstream,同时 upstream 依赖于 downstream。注册 DeviceController 时,downstream 和 upstream 就通过 NewDownstreamController 和 NewUpstreamController 初始化好了。

downstream

downstream 一般描述云端向边缘端下发数据。

NewDownstreamController 创建了 kubeClient,deviceManager,deviceModelManager,messageLayer,configMapManager 赋值给了 dc 并返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// NewDownstreamController create a DownstreamController from config
func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) {
deviceManager, err := manager.NewDeviceManager(crdInformerFactory.Devices().V1alpha2().Devices().Informer())
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}

deviceModelManager, err := manager.NewDeviceModelManager(crdInformerFactory.Devices().V1alpha2().DeviceModels().Informer())
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}

dc := &DownstreamController{
kubeClient: client.GetKubeClient(),
deviceManager: deviceManager,
deviceModelManager: deviceModelManager,
messageLayer: messagelayer.NewContextMessageLayer(),
configMapManager: manager.NewConfigMapManager(),
}
return dc, nil
}

downstream 的 Start 方法执行了 dc.syncDeviceModel(),dc.syncDevice() 两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Start DownstreamController
func (dc *DownstreamController) Start() error {
klog.Info("Start downstream devicecontroller")

go dc.syncDeviceModel()

// Wait for adding all device model
// TODO need to think about sync
time.Sleep(1 * time.Second)
go dc.syncDevice()

return nil
}

syncDeviceModel 调用了 dc.deviceModelManager.Events(),
获取 deviceModelManager 的 events,events 类型为 chan watch.Event,可以理解为 deviceModel 相关的事件到来后会传到通道 events 中。即 syncDeviceModel 从 deviceModelManager 中获取 event 并进行分析。

之后 syncDeviceModel 根据 e.Type 的类型执行不同的操作:

1
2
3
4
5
6
7
8
9
10
switch e.Type {
case watch.Added:
dc.deviceModelAdded(deviceModel)
case watch.Deleted:
dc.deviceModelDeleted(deviceModel)
case watch.Modified:
dc.deviceModelUpdated(deviceModel)
default:
klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}
  1. dc.deviceModelAdded(deviceModel) 将 deviceModel 存如表 dc.deviceModelManager.DeviceModel 中;
  2. dc.deviceModelDeleted(deviceModel) 将 deviceModel 从表 dc.deviceModelManager.DeviceModel 删除;
  3. dc.deviceModelUpdated(deviceModel) 更新表 dc.deviceModelManager.DeviceModel 中的 deviceModel,如果 deviceModel Name 不存在,则直接添加 deviceModel。

syncDevice 与 syncDeviceModel 类似,都是先通过 Events() 获取 events,然后根据 events 的类型执行相应的处理。deviceManager 与 deviceModelManager 也几乎一样。不过收到事件后的处理比 syncDeviceModel 略微复杂,需要发送消息。以 deviceAdded 为例,deviceAdded 首先把 device 存到 dc.deviceManager.Device 中,然后执行 dc.addToConfigMap(device) 和 createDevice(device),接着执行 messagelayer.BuildResource,msg.BuildRouter 等函数来构建 msg,最后通过 dc.messageLayer.Send(*msg) 将 device 数据发送出去。

Device Model

Device Model 描述了设备属性,如“温度”或”压力”。Device Model 相当于是模板,使用它可以创建和管理许多设备。spec 中的 properties 字段定义设备通用支持的属性,例如数据类型、是否只读、默认值、最大值和最小值;另外还有 propertyVisitors 字段,它定义每种属性字段的访问方式,例如数据是否需要经过某种运算处理,数据格式转换。以下是一个 Device Model 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
apiVersion: devices.kubeedge.io/v1alpha2
kind: DeviceModel
metadata:
name: sensor-tag-model
namespace: default
spec:
properties:
- name: temperature
description: temperature in degree celsius
type:
int:
accessMode: ReadWrite
maximum: 100
unit: degree celsius
- name: temperature-enable
description: enable data collection of temperature sensor
type:
string:
accessMode: ReadWrite
defaultValue: 'OFF'

Device

Device 代表一个实际的设备对象,可以看作是 Device Model 的实例化。spec 字段是静态的,status 字段中是动态变化的数据,如设备期望的状态和设备报告的状态。以下是一个 Device 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
apiVersion: devices.kubeedge.io/v1alpha2
kind: Device
metadata:
name: sensor-tag-instance-01
labels:
description: TISimplelinkSensorTag
manufacturer: TexasInstruments
model: CC2650
spec:
deviceModelRef:
name: sensor-tag-model
protocol:
modbus:
slaveID: 1
common:
com:
serialPort: '1'
baudRate: 115200
dataBits: 8
parity: even
stopBits: 1
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: ''
operator: In
values:
- node1
propertyVisitors:
- propertyName: temperature
modbus:
register: CoilRegister
offset: 2
limit: 1
scale: 1
isSwap: true
isRegisterSwap: true
- propertyName: temperature-enable
modbus:
register: DiscreteInputRegister
offset: 3
limit: 1
scale: 1.0
isSwap: true
isRegisterSwap: true
status:
twins:
- propertyName: temperature
reported:
metadata:
timestamp: '1550049403598'
type: int
value: '10'
desired:
metadata:
timestamp: '1550049403598'
type: int
value: '15'

upstream

upstream 一般描述边缘端向云端上传数据。

NewUpstreamController 通过 keclient.GetCRDClient() 创建了 crdClient,另外创建了 messageLayer,除此之外,UpstreamController 还包含了一个 downstream:

1
2
3
4
5
6
7
8
9
// NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
uc := &UpstreamController{
crdClient: keclient.GetCRDClient(),
messageLayer: messagelayer.NewContextMessageLayer(),
dc: dc,
}
return uc, nil
}

upstream 的 Start 方法执行了 uc.dispatchMessage(),uc.updateDeviceStatus() 两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
// Start UpstreamController
func (uc *UpstreamController) Start() error {
klog.Info("Start upstream devicecontroller")

uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
go uc.dispatchMessage()

for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++ {
go uc.updateDeviceStatus()
}
return nil
}

dispatchMessage 函数主要通过 uc.messageLayer.Receive() 收取数据,放入 uc.deviceStatusChan。

updateDeviceStatus 函数循环执行,收取 uc.deviceStatusChan 的 msg,将 msg 反序列化获得 msgTwin,获取 deviceID,device,cacheDevice,deviceStatus,然后将 deviceStatus 上传,最后向 Edge 返回确认 msg。

总结

参考

  1. Kubeedge源码阅读系列–cloudcore.devicecontroller模块