KubeEdge 云上部分 EdgeController 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

EdgeController 是一个扩展的 k8s 控制器,管理边缘节点和 Pods 的元数据,确保数据能够传递到指定的边缘节点。

模块入口

cloud/pkg/edgecontroller/edgecontroller.go:

1
2
3
4
5
6
7
8
9
10
// Start controller
func (ec *EdgeController) Start() {
if err := ec.upstream.Start(); err != nil {
klog.Exitf("start upstream failed with error: %s", err)
}

if err := ec.downstream.Start(); err != nil {
klog.Exitf("start downstream failed with error: %s", err)
}
}

Start 分别启动 upstream 和 downstream,upstream 和 downstream 之间没有依赖关系。注册 EdgeController 时,upstream 和 downstream 就通过 NewUpstreamController 和 NewDownstreamController 初始化好了。

upstream

在 NewUpstreamController 中初始化了所有成员 channel,upstream.Start() 主要就是调用 uc.dispatchMessage() 分发收到的消息,以及执行其他函数用于处理成员 channel 里面的数据。

dispatchMessage 函数不断轮询,调用 uc.messageLayer.Receive() 接受消息,根据收到的消息 resourceType 选择将数据送到对应的 channel 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
switch resourceType {
case model.ResourceTypeNodeStatus:
uc.nodeStatusChan <- msg
case model.ResourceTypePodStatus:
uc.podStatusChan <- msg
case model.ResourceTypeConfigmap:
uc.configMapChan <- msg
case model.ResourceTypeSecret:
uc.secretChan <- msg
case model.ResourceTypeServiceAccountToken:
uc.serviceAccountTokenChan <- msg
case common.ResourceTypePersistentVolume:
uc.persistentVolumeChan <- msg
case common.ResourceTypePersistentVolumeClaim:
uc.persistentVolumeClaimChan <- msg
case common.ResourceTypeVolumeAttachment:
uc.volumeAttachmentChan <- msg
case model.ResourceTypeNode:
switch msg.GetOperation() {
case model.QueryOperation:
uc.queryNodeChan <- msg
case model.UpdateOperation:
uc.updateNodeChan <- msg
default:
klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), msg.GetOperation())
}
case model.ResourceTypePod:
if msg.GetOperation() == model.DeleteOperation {
uc.podDeleteChan <- msg
} else {
klog.Errorf("message: %s, operation type: %s unsupported", msg.GetID(), msg.GetOperation())
}
case model.ResourceTypeRuleStatus:
uc.ruleStatusChan <- msg

default:
klog.Errorf("message: %s, resource type: %s unsupported", msg.GetID(), resourceType)
}

每种 channel 中的消息都由不同的函数来处理,这里以 updateNodeStatus 函数为例,它接收 nodeStatusChan 中的消息,依次 GetContentData,GetNamespace,GetResourceName,GetOperation,根据消息的 Operation 做出相应的操作, 一般是上传到 apiserver:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
func (uc *UpstreamController) updateNodeStatus() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("stop updateNodeStatus")
return
case msg := <-uc.nodeStatusChan:
klog.V(5).Infof("message: %s, operation is: %s, and resource is %s", msg.GetID(), msg.GetOperation(), msg.GetResource())

data, err := msg.GetContentData()
if err != nil {
klog.Warningf("message: %s process failure, get content data failed with error: %s", msg.GetID(), err)
continue
}

namespace, err := messagelayer.GetNamespace(msg)
if err != nil {
klog.Warningf("message: %s process failure, get namespace failed with error: %s", msg.GetID(), err)
continue
}
name, err := messagelayer.GetResourceName(msg)
if err != nil {
klog.Warningf("message: %s process failure, get resource name failed with error: %s", msg.GetID(), err)
continue
}

switch msg.GetOperation() {
case model.InsertOperation:
_, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})
if err == nil {
klog.Infof("node: %s already exists, do nothing", name)
uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)
continue
}

if !errors.IsNotFound(err) {
errLog := fmt.Sprintf("get node %s info error: %v , register node failed", name, err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

node := &v1.Node{}
err = json.Unmarshal(data, node)
if err != nil {
errLog := fmt.Sprintf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

if _, err = uc.createNode(name, node); err != nil {
errLog := fmt.Sprintf("create node %s error: %v , register node failed", name, err)
klog.Error(errLog)
uc.nodeMsgResponse(name, namespace, errLog, msg)
continue
}

uc.nodeMsgResponse(name, namespace, common.MessageSuccessfulContent, msg)

case model.UpdateOperation:
nodeStatusRequest := &edgeapi.NodeStatusRequest{}
err := json.Unmarshal(data, nodeStatusRequest)
if err != nil {
klog.Warningf("message: %s process failure, unmarshal marshaled message content with error: %s", msg.GetID(), err)
continue
}

getNode, err := uc.kubeClient.CoreV1().Nodes().Get(context.Background(), name, metaV1.GetOptions{})
if errors.IsNotFound(err) {
klog.Warningf("message: %s process failure, node %s not found", msg.GetID(), name)
continue
}

if err != nil {
klog.Warningf("message: %s process failure with error: %s, namespaces: %s name: %s", msg.GetID(), err, namespace, name)
continue
}

// TODO: comment below for test failure. Needs to decide whether to keep post troubleshoot
// In case the status stored at metadata service is outdated, update the heartbeat automatically
for i := range nodeStatusRequest.Status.Conditions {
if time.Since(nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime.Time) > time.Duration(uc.config.NodeUpdateFrequency)*time.Second {
nodeStatusRequest.Status.Conditions[i].LastHeartbeatTime = metaV1.NewTime(time.Now())
}
}

if getNode.Annotations == nil {
getNode.Annotations = make(map[string]string)
}
for name, v := range nodeStatusRequest.ExtendResources {
if name == constants.NvidiaGPUScalarResourceName {
var gpuStatus []types.NvidiaGPUStatus
for _, er := range v {
gpuStatus = append(gpuStatus, types.NvidiaGPUStatus{ID: er.Name, Healthy: true})
}
if len(gpuStatus) > 0 {
data, _ := json.Marshal(gpuStatus)
getNode.Annotations[constants.NvidiaGPUStatusAnnotationKey] = string(data)
}
}
data, err := json.Marshal(v)
if err != nil {
klog.Warningf("message: %s process failure, extend resource list marshal with error: %s", msg.GetID(), err)
continue
}
getNode.Annotations[string(name)] = string(data)
}

// Keep the same "VolumesAttached" attribute with upstream,
// since this value is maintained by kube-controller-manager.
nodeStatusRequest.Status.VolumesAttached = getNode.Status.VolumesAttached
if getNode.Status.DaemonEndpoints.KubeletEndpoint.Port != 0 {
nodeStatusRequest.Status.DaemonEndpoints.KubeletEndpoint.Port = getNode.Status.DaemonEndpoints.KubeletEndpoint.Port
}

getNode.Status = nodeStatusRequest.Status

node, err := uc.kubeClient.CoreV1().Nodes().UpdateStatus(context.Background(), getNode, metaV1.UpdateOptions{})
if err != nil {
klog.Warningf("message: %s process failure, update node failed with error: %s, namespace: %s, name: %s", msg.GetID(), err, getNode.Namespace, getNode.Name)
continue
}

nodeID, err := messagelayer.GetNodeID(msg)
if err != nil {
klog.Warningf("Message: %s process failure, get node id failed with error: %s", msg.GetID(), err)
continue
}

resource, err := messagelayer.BuildResource(nodeID, namespace, model.ResourceTypeNode, name)
if err != nil {
klog.Warningf("Message: %s process failure, build message resource failed with error: %s", msg.GetID(), err)
continue
}

resMsg := model.NewMessage(msg.GetID()).
SetResourceVersion(node.ResourceVersion).
FillBody(common.MessageSuccessfulContent).
BuildRouter(modules.EdgeControllerModuleName, constants.GroupResource, resource, model.ResponseOperation)
if err = uc.messageLayer.Response(*resMsg); err != nil {
klog.Warningf("Message: %s process failure, response failed with error: %s", msg.GetID(), err)
continue
}

klog.V(4).Infof("message: %s, update node status successfully, namespace: %s, name: %s", msg.GetID(), getNode.Namespace, getNode.Name)

default:
klog.Warningf("message: %s process failure, node status operation: %s unsupported", msg.GetID(), msg.GetOperation())
continue
}
klog.V(4).Infof("message: %s process successfully", msg.GetID())
}
}
}

downstream

downstream 的 Start 函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (dc *DownstreamController) Start() error {
klog.Info("start downstream controller")
// pod
go dc.syncPod()

// configmap
go dc.syncConfigMap()

// secret
go dc.syncSecret()

// nodes
go dc.syncEdgeNodes()

// rule
go dc.syncRule()

// ruleendpoint
go dc.syncRuleEndpoint()

return nil
}

pod 是最小的,管理,创建,计划的最小单元,包含一个或多个容器;configmap 用于保存配置数据的键值对,可以用来保存单个属性,也可以用来保存配置文件,作用是可以将配置文件与镜像文件分离;secret 与 configmap 类似,但是用来存储敏感信息;node 是 pod 真正运行的主机,可以物理机,也可以是虚拟机;ruleEndpoint 定义了消息的来源,或消息的去向。它包含 3 种类型:rest(云上的一个端点,可以是源端点,用于向边缘发送请求;或者是目标端点,从边缘接收消息)、eventbus(可以是源端点,用于向云发送请求;或者是目标端点,从云接收消息)、servicebus(目标端点,接收云端的消息);rule 定义了消息如何传输,它包含 3 种类型:rest->eventbus(用户应用调用云上的 rest api 发送消息,最后消息被发送到边缘的 mqttbroker),
eventbus->rest(用户程序向边缘的 mqttbroker 发布消息,最后消息被发送到云上的 rest api),
rest->servicebus(用户程序调用云上的 rest api 发送消息,最后消息被发送到边缘的应用程序)。

syncPod 获取 podManager 中收到的 events,根据 e.Type 分发不同的路由,最后执行dc.messageLayer.Send(*msg) 把数据发送到边缘。

syncConfigMap 获取 configmapManager 中收到的 events,根据 e.Type 设置不同的 operation,最后执行dc.messageLayer.Send(*msg) 把数据发送到边缘。

syncSecret、syncEdgeNodes、syncRule 和 syncRuleEndpoint 函数的流程也类似。

总结

参考

  1. Kubeedge源码阅读系列–cloudcore.edgecontroller模块

KubeEdge 云上部分 DeviceController 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

DeviceController 是一个扩展的 k8s 控制器,管理边缘设备,确保设备信息、设备状态的云边同步。

模块入口

cloud/pkg/devicecontroller/devicecontroller.go:

1
2
3
4
5
6
7
8
9
10
11
12
// Start controller
func (dc *DeviceController) Start() {
if err := dc.downstream.Start(); err != nil {
klog.Exitf("start downstream failed with error: %s", err)
}
// wait for downstream controller to start and load deviceModels and devices
// TODO think about sync
time.Sleep(1 * time.Second)
if err := dc.upstream.Start(); err != nil {
klog.Exitf("start upstream failed with error: %s", err)
}
}

Start 分别启动 downstream 和 upstream,同时 upstream 依赖于 downstream。注册 DeviceController 时,downstream 和 upstream 就通过 NewDownstreamController 和 NewUpstreamController 初始化好了。

downstream

downstream 一般描述云端向边缘端下发数据。

NewDownstreamController 创建了 kubeClient,deviceManager,deviceModelManager,messageLayer,configMapManager 赋值给了 dc 并返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// NewDownstreamController create a DownstreamController from config
func NewDownstreamController(crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) {
deviceManager, err := manager.NewDeviceManager(crdInformerFactory.Devices().V1alpha2().Devices().Informer())
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}

deviceModelManager, err := manager.NewDeviceModelManager(crdInformerFactory.Devices().V1alpha2().DeviceModels().Informer())
if err != nil {
klog.Warningf("Create device manager failed with error: %s", err)
return nil, err
}

dc := &DownstreamController{
kubeClient: client.GetKubeClient(),
deviceManager: deviceManager,
deviceModelManager: deviceModelManager,
messageLayer: messagelayer.NewContextMessageLayer(),
configMapManager: manager.NewConfigMapManager(),
}
return dc, nil
}

downstream 的 Start 方法执行了 dc.syncDeviceModel(),dc.syncDevice() 两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Start DownstreamController
func (dc *DownstreamController) Start() error {
klog.Info("Start downstream devicecontroller")

go dc.syncDeviceModel()

// Wait for adding all device model
// TODO need to think about sync
time.Sleep(1 * time.Second)
go dc.syncDevice()

return nil
}

syncDeviceModel 调用了 dc.deviceModelManager.Events(),
获取 deviceModelManager 的 events,events 类型为 chan watch.Event,可以理解为 deviceModel 相关的事件到来后会传到通道 events 中。即 syncDeviceModel 从 deviceModelManager 中获取 event 并进行分析。

之后 syncDeviceModel 根据 e.Type 的类型执行不同的操作:

1
2
3
4
5
6
7
8
9
10
switch e.Type {
case watch.Added:
dc.deviceModelAdded(deviceModel)
case watch.Deleted:
dc.deviceModelDeleted(deviceModel)
case watch.Modified:
dc.deviceModelUpdated(deviceModel)
default:
klog.Warningf("deviceModel event type: %s unsupported", e.Type)
}
  1. dc.deviceModelAdded(deviceModel) 将 deviceModel 存如表 dc.deviceModelManager.DeviceModel 中;
  2. dc.deviceModelDeleted(deviceModel) 将 deviceModel 从表 dc.deviceModelManager.DeviceModel 删除;
  3. dc.deviceModelUpdated(deviceModel) 更新表 dc.deviceModelManager.DeviceModel 中的 deviceModel,如果 deviceModel Name 不存在,则直接添加 deviceModel。

syncDevice 与 syncDeviceModel 类似,都是先通过 Events() 获取 events,然后根据 events 的类型执行相应的处理。deviceManager 与 deviceModelManager 也几乎一样。不过收到事件后的处理比 syncDeviceModel 略微复杂,需要发送消息。以 deviceAdded 为例,deviceAdded 首先把 device 存到 dc.deviceManager.Device 中,然后执行 dc.addToConfigMap(device) 和 createDevice(device),接着执行 messagelayer.BuildResource,msg.BuildRouter 等函数来构建 msg,最后通过 dc.messageLayer.Send(*msg) 将 device 数据发送出去。

Device Model

Device Model 描述了设备属性,如“温度”或”压力”。Device Model 相当于是模板,使用它可以创建和管理许多设备。spec 中的 properties 字段定义设备通用支持的属性,例如数据类型、是否只读、默认值、最大值和最小值;另外还有 propertyVisitors 字段,它定义每种属性字段的访问方式,例如数据是否需要经过某种运算处理,数据格式转换。以下是一个 Device Model 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
apiVersion: devices.kubeedge.io/v1alpha2
kind: DeviceModel
metadata:
name: sensor-tag-model
namespace: default
spec:
properties:
- name: temperature
description: temperature in degree celsius
type:
int:
accessMode: ReadWrite
maximum: 100
unit: degree celsius
- name: temperature-enable
description: enable data collection of temperature sensor
type:
string:
accessMode: ReadWrite
defaultValue: 'OFF'

Device

Device 代表一个实际的设备对象,可以看作是 Device Model 的实例化。spec 字段是静态的,status 字段中是动态变化的数据,如设备期望的状态和设备报告的状态。以下是一个 Device 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
apiVersion: devices.kubeedge.io/v1alpha2
kind: Device
metadata:
name: sensor-tag-instance-01
labels:
description: TISimplelinkSensorTag
manufacturer: TexasInstruments
model: CC2650
spec:
deviceModelRef:
name: sensor-tag-model
protocol:
modbus:
slaveID: 1
common:
com:
serialPort: '1'
baudRate: 115200
dataBits: 8
parity: even
stopBits: 1
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: ''
operator: In
values:
- node1
propertyVisitors:
- propertyName: temperature
modbus:
register: CoilRegister
offset: 2
limit: 1
scale: 1
isSwap: true
isRegisterSwap: true
- propertyName: temperature-enable
modbus:
register: DiscreteInputRegister
offset: 3
limit: 1
scale: 1.0
isSwap: true
isRegisterSwap: true
status:
twins:
- propertyName: temperature
reported:
metadata:
timestamp: '1550049403598'
type: int
value: '10'
desired:
metadata:
timestamp: '1550049403598'
type: int
value: '15'

upstream

upstream 一般描述边缘端向云端上传数据。

NewUpstreamController 通过 keclient.GetCRDClient() 创建了 crdClient,另外创建了 messageLayer,除此之外,UpstreamController 还包含了一个 downstream:

1
2
3
4
5
6
7
8
9
// NewUpstreamController create UpstreamController from config
func NewUpstreamController(dc *DownstreamController) (*UpstreamController, error) {
uc := &UpstreamController{
crdClient: keclient.GetCRDClient(),
messageLayer: messagelayer.NewContextMessageLayer(),
dc: dc,
}
return uc, nil
}

upstream 的 Start 方法执行了 uc.dispatchMessage(),uc.updateDeviceStatus() 两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
// Start UpstreamController
func (uc *UpstreamController) Start() error {
klog.Info("Start upstream devicecontroller")

uc.deviceStatusChan = make(chan model.Message, config.Config.Buffer.UpdateDeviceStatus)
go uc.dispatchMessage()

for i := 0; i < int(config.Config.Load.UpdateDeviceStatusWorkers); i++ {
go uc.updateDeviceStatus()
}
return nil
}

dispatchMessage 函数主要通过 uc.messageLayer.Receive() 收取数据,放入 uc.deviceStatusChan。

updateDeviceStatus 函数循环执行,收取 uc.deviceStatusChan 的 msg,将 msg 反序列化获得 msgTwin,获取 deviceID,device,cacheDevice,deviceStatus,然后将 deviceStatus 上传,最后向 Edge 返回确认 msg。

总结

参考

  1. Kubeedge源码阅读系列–cloudcore.devicecontroller模块

KubeEdge 云上部分 CloudHub 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

CloudHub 是一个 Web Socket 服务端,负责监听云端的变化,缓存并发送消息到 EdgeHub。

模块入口

cloud/pkg/cloudhub/cloudhub.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (a *cloudHub) Start() {
if !cache.WaitForCacheSync(beehiveContext.Done(), a.informersSyncedFuncs...) {
klog.Errorf("unable to sync caches for objectSyncController")
os.Exit(1)
}

// start dispatch message from the cloud to edge node
go a.messageq.DispatchMessage()

// check whether the certificates exist in the local directory,
// and then check whether certificates exist in the secret, generate if they don't exist
if err := httpserver.PrepareAllCerts(); err != nil {
klog.Exit(err)
}
// TODO: Will improve in the future
DoneTLSTunnelCerts <- true
close(DoneTLSTunnelCerts)

// generate Token
if err := httpserver.GenerateToken(); err != nil {
klog.Exit(err)
}

// HttpServer mainly used to issue certificates for the edge
go httpserver.StartHTTPServer()

servers.StartCloudHub(a.messageq)

if hubconfig.Config.UnixSocket.Enable {
// The uds server is only used to communicate with csi driver from kubeedge on cloud.
// It is not used to communicate between cloud and edge.
go udsserver.StartServer(hubconfig.Config.UnixSocket.Address)
}
}

cloudhub 启动主要有以下 3 步:

  1. 调用 DispatchMessage,开始从云端向边缘节点派送消息
  2. 启动 HttpServer,主要用于为边端发放证书
  3. 调用 StartCloudHub

接下来对 DispatchMessage 和 StartCloudHub 进行具体分析。

DispatchMessage

DispatchMessage 从云中获取消息,提取节点 ID,获取与节点相关的消息,将其放入消息队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (q *ChannelMessageQueue) DispatchMessage() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Cloudhub channel eventqueue dispatch message loop stopped")
return
default:
}
msg, err := beehiveContext.Receive(model.SrcCloudHub)
klog.V(4).Infof("[cloudhub] dispatchMessage to edge: %+v", msg)
if err != nil {
klog.Info("receive not Message format message")
continue
}
nodeID, err := GetNodeID(&msg)
if nodeID == "" || err != nil {
klog.Warning("node id is not found in the message")
continue
}
if isListResource(&msg) {
q.addListMessageToQueue(nodeID, &msg)
} else {
q.addMessageToQueue(nodeID, &msg)
}
}
}

StartCloudHub

StartCloudHub 的代码如下:

1
2
3
4
5
6
7
8
9
10
11
func StartCloudHub(messageq *channelq.ChannelMessageQueue) {
handler.InitHandler(messageq)
// start websocket server
if hubconfig.Config.WebSocket.Enable {
go startWebsocketServer()
}
// start quic server
if hubconfig.Config.Quic.Enable {
go startQuicServer()
}
}

如果设置了 WebSocket 启动,就启动 WebSocket 服务器协程;如果设置了 Quic 启动,就启动 Quic 服务器协程。

WebSocket 是性能最好的,默认使用 WebSocket。Quic 作为备选项,在网络频繁断开等很不稳定场景下有优势。KubeEdge 云边消息传递是通过 cloudhub 跟 edgehub 间的 Websocket 或 Quic 协议的长连接传输的。

KubeEdge 概述

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

KubeEdge 架构图

KubeEdge 总体有两大部分 —— cloudcore 和 edgecore。cloudcore 部分是 k8s api server 与 Edge 部分的桥梁,负责将指令下发到 Edge,同时将 Edge 的状态和事件同步到的 k8s api server;edgecore 部分接受并执行 Cloud 部分下发的指令,管理各种负载,并将 Edge 部分负载的状态和事件同步到 Cloud 部分。

云上部分

CloudHub 是一个 Web Socket 服务端,负责监听云端的变化,缓存并发送消息到 EdgeHub。

DeviceController 是一个扩展的 k8s 控制器,管理边缘设备,确保设备信息、设备状态的云边同步。

EdgeController 是一个扩展的 k8s 控制器,管理边缘节点和 Pods 的元数据,确保数据能够传递到指定的边缘节点。

边缘部分

EdgeHub 是一个 Web Socket 客户端,负责与边缘计算的云端交互,包括同步云端资源更新、报告边缘主机和设备状态变化到云端等功能。

Edged 是运行在边缘节点的代理(轻量化的 kubelet),用于管理容器化的应用程序。

EventBus 是一个与 MQTT 服务器 (mosquitto) 交互的 MQTT 客户端,为其他组件提供订阅和发布功能。

ServiceBus 是一个运行在边缘的 HTTP 客户端。

DeviceTwin 负责存储设备状态(传感器的值等)并将设备状态同步到云 (DeviceController),它还为应用程序提供查询接口。

MetaManager 是消息处理器,位于 Edged 和 Edgehub 之间,它负责向轻量级数据库 (SQLite) 持久化/检索元数据。

关键代码

cloudcore 代码入口为 Cloud/cmd/cloudcore/cloudcore.go,在 main 函数中调用 NewCloudCoreCommand,通过 registerModules 函数注册 cloudcore 中的功能模块,通过 StartModules 函数启动已注册的 cloudcore 上的功能模块。registerModules 函数如下:

1
2
3
4
5
6
7
8
9
func registerModules(c *v1alpha1.CloudCoreConfig) {
cloudhub.Register(c.Modules.CloudHub)
edgecontroller.Register(c.Modules.EdgeController)
devicecontroller.Register(c.Modules.DeviceController)
synccontroller.Register(c.Modules.SyncController)
cloudstream.Register(c.Modules.CloudStream, c.CommonConfig)
router.Register(c.Modules.Router)
dynamiccontroller.Register(c.Modules.DynamicController)
}

这 7 个模块都实现了 Module 接口,注册最终会将模块封装后的结构体放入一个 map[string]*ModuleInfo 类型的全局变量 modules 中。之后 StartModules 函数通过 for 循环从 modules 获取每一个的模块,每个模块分配一个协程调用 Start 函数启动。

edgecore 代码入口为 edge/cmd/edgecore/edgecore.go,在 main 函数中调用 NewEdgeCoreCommand。和在 cloudcore 类似,在 NewEdgeCoreCommand 函数中,通过 registerModules 函数注册 edgecore 中的功能模块,通过 Run 函数启动已注册的 edgecore 中的功能模块。edgecore 中 registerModules 函数注册的模块如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// registerModules register all the modules started in edgecore
func registerModules(c *v1alpha1.EdgeCoreConfig) {
devicetwin.Register(c.Modules.DeviceTwin, c.Modules.Edged.HostnameOverride)
edged.Register(c.Modules.Edged)
edgehub.Register(c.Modules.EdgeHub, c.Modules.Edged.HostnameOverride)
eventbus.Register(c.Modules.EventBus, c.Modules.Edged.HostnameOverride)
metamanager.Register(c.Modules.MetaManager)
servicebus.Register(c.Modules.ServiceBus)
edgestream.Register(c.Modules.EdgeStream, c.Modules.Edged.HostnameOverride, c.Modules.Edged.NodeIP)
test.Register(c.Modules.DBTest)
// Note: Need to put it to the end, and wait for all models to register before executing
dbm.InitDBConfig(c.DataBase.DriverName, c.DataBase.AliasName, c.DataBase.DataSource)
}

Why KubeEdge

为什么用 KubeEdge 而不是 k8s 构建边缘计算平台?

k8s 构建边缘计算平台的主要挑战:①资源有限。边缘设备可能只有几百兆的内存,一个原生 kubelet 都跑不起来。②网络受限。k8s 的 master 和 node 通信是通过 List/Watch 机制,边缘场景下网络可能会断开很长时间,这时候 node 上的 kubelet 一直 re-watch 失败,就会请求 re-list,把 apiserver 上的对象全量拿回去,没法在边缘场景这种受限的网络下很好的工作。③k8s 节点没有自治能力。如何在网络质量不稳定的情况下,对边缘节点实现离线自治,这也是个问题。

KubeEdge 主打三个核心理念,首先是云边协同,边是云的延伸,用户的边可能位于私有网络,因此需要穿透私有网络,通过云来管理私有节点,KubeEdge 默认采用 WebSocket + 消息封装来实现,这样只要边缘网络能访问外网情况下,就能实现双向通信,这就不需要边端需要一个公网的 IP。同时呢,KubeEdge 也优化了原生 Kubernetes 中不必要的一些请求,能够大幅减少通信压力,高时延状态下仍可以工作。

KubeEdge 第二个核心理念是边缘节点自治,做到节点级的元数据的持久化,比如 Pod,ConfigMap 等基础元数据,每个节点都持久化这些元数据,边缘节点离线之后,它仍可以通过本地持久化的元数据来管理应用。在 Kubernetes 中,当 kubelet 重启后, 它首先要向 master 做一次 List 获取全量的数据,然后再进行应用管理工作,如果这时候边和云端的网络断开,就无法获得全量的元数据,也不能进行故障恢复。KubeEdge 做了元数据的持久化后,可以直接从本地获得这些元数据,保证故障恢复的能力,保证服务快速 ready。

另外一个理念是极致轻量,在大多数边缘计算场景下,节点的资源是非常有限的,KubeEdge 采用的方式是重组 kubelet 组件(~10mb 内存占用),优化 runtime 资源消耗。在空载时候,内存占用率很低。

参考

  1. kubeedge源码分析系列之整体架构

etcd 读写概述

etcd 总体是基于 Raft 实现的,本文对 etcd 读写流程关键点进行简单记录。

etcd 读流程

一个读请求从 client 通过 Round-robin 负载均衡算法,选择一个 etcd server 节点,发出 gRPC 请求,经过 etcd server 的 KVServer 模块进入核心的读流程,进行串行读或线性读(默认),通过与 MVCC 的 treeIndex 和 boltdb 模块紧密协作,完成读请求。

  • 串行读(非强一致性读):直接读状态机返回数据,无需通过 Raft 协议与集群进行交互,具有低延时、高吞吐量的特点,适合对数据一致性要求不高的场景。
  • 线性读(强一致性读):需要经过 Raft 协议模块,反应集群共识的最新数据,因此在延时和吞吐量上相比串行读略差一点,适用于对数据一致性要求高的场景。

ReadIndex

线性读的 ReadIndex 可以使 follower 的读请求不必转发给 leader。它的实现原理是:

  1. 当 Follower 节点 收到一个线性读请求时,它首先会从 leader 获取集群最新的已提交的日志索引 (committed index);
  2. leader 收到 ReadIndex 请求时,为防止脑裂等异常场景,会向 follower 节点发送心跳确认,一半以上节点确认 leader 身份后才能将已提交的索引 (committed index) 返回给请求的 follower 节点;
  3. follower 节点则会等待,直到状态机已应用索引 (applied index) 大于等于 leader 的已提交索引时 (committed Index)才会去通知读请求,数据已赶上 leader,然后去状态机中访问数据。

ReadIndex 是非常轻量的,不会导致 leader 负载变高,ReadIndex 机制使得每个 follower 节点都可以处理读请求,进而提升了系统的整体写性能。

MVCC

它由内存树形索引模块 (treeIndex) 和嵌入式的 KV 持久化存储库 boltdb 组成。treeIndex 模块是基于 Google 开源的内存版 btree 库实现的;boltdb 是个基于 B+ tree 实现的 key-value 键值库,支持事务,提供 Get/Put 等简易 API 给 etcd 操作。

首先,有个全局递增的版本号(put hello a 时,hello 对应的版本号若为 1,下个请求 put world b 时,world 对应的版本号则为 2),每次修改操作,都会生成一个新的版本号。treeIndex 模块保存用户的 key 和相关版本号,以版本号为 key,value 为用户 key-value 数据存储在 boltdb 里面。另外,并不是所有请求都一定要从 boltdb 获取数据,etcd 出于数据一致性、性能等考虑,在访问 boltdb 前,首先会从一个内存读事务 buffer 中,二分查找你要访问 key 是否在 buffer 里面,若命中则直接返回。具体如下图:

etcd 写流程

一个写请求从 client 通过负载均衡算法选择一个 etcd 节点,发出 gRPC 调用,etcd 节点收到请求后会经过 gRPC 拦截、Quota 校验,接着进入 KVServer 模块,KVServer 模块将请求发送给本模块中的 raft,这里负责与 etcd raft 模块进行通信,发起一个提案,内容为 put foo bar,即使用 put 方法将 foo 更新为 bar,提案经过转发之后,半数节点成功持久化,MVCC 模块更新状态机,完成写请求。

与读流程不一样的是写流程涉及 Quota、WAL、Apply 三个模块。

  • Quota 模块配额检查 db 的大小,如果超过会报 etcdserver: mvcc: database space exceeded 的告警,通过 Raft 日志同步给集群中的节点 db 空间不足,整个集群将不可写入,对外提供只读的功能。
  • 只有 leader 才能处理写请求。leader 收到提案后,会将提案封装成日志条目广播给集群各个节点,同时需要把内容持久化到一个 WAL 日志文件中。日志条目包含 leader 任期号、条目索引、日志类型、提案内容。WAL 持久化内容包含日志条目内容、WAL 记录类型、校验码、WAL 记录的长度。
  • Apply 模块用于执行提案,首先会判断该提案是否被执行过,如果已经执行,则直接返回结束;未执行过的情况下,将会进入 MVCC 模块执行持久化提案内容的操作。

MVCC

MVCC 在执行 put 请求时,会基于当前版本号自增生成新的版本号,然后从 treeIndex 模块中查询 key 的创建版本号、修改次数信息。这些信息将填充到 boltdb 的 value 中,同时将用户的 key 和版本号信息存储到 treeIndex。在读流程中介绍过,boltdb 的 value 的值就是①key 名称;②key 创建时的版本号(create_revision)、最后一次修改时的版本号(mod_revision)、key 自身修改的次数(version);③value 值;④租约信息,将这些信息的结构体序列化成的二进制数据。另外,为了提高吞吐量,此时数据并未提交,而是存在 boltdb 所管理的内存数据结构中,由 backend 异步 goroutine 定时将批量事务提交,和 MySQL 类似,写时优先写入 Buffer。具体如下图:

参考

  1. etcd Source Code
  2. Learning | etcd
  3. etcd教程(七)—读请求执行流程分析
  4. etcd教程(八)—写请求执行流程分析

k8s in action—Pod

本章用到的三个 YAML 描述文件如下:

kubia-manual.yaml

1
2
3
4
5
6
7
8
9
10
11
apiVersion: v1
kind: Pod
metadata:
name: kubia-manual
spec:
containers:
- image: luksa/kubia
name: kubia
ports:
- containerPort: 8080
protocol: TCP

kubia-manual-with-labels.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
apiVersion: v1
kind: Pod
metadata:
name: kubia-manual-v2
labels:
creation_mrthod: manual
env: prod
spec:
containers:
- image: luksa/kubia
name: kubia
ports:
- containerPort: 8080
protocol: TCP

kubia-gpu.yaml

1
2
3
4
5
6
7
8
9
10
apiVersion: v1
kind: Pod
metadata:
name: kubia-gpu
spec:
nodeSelector:
gpu: "true"
containers:
- image: luksa/kubia
name: kubia

apiVersion 描述文件遵循什么版本的 Kubernetes API;
kind 指定资源类型,这里为 pod;
metadata 中的字段描述 pod 名称、标签、注解;
spec 中的字段描述创建容器所需要的镜像、容器名称、监听端口等。

创建 pod

使用 kubectl create -f 命令从 YAML 文件创建 pod

1
kubectl create -f kubia-manual.yaml

得到运行中 pod 的完整定义

1
2
3
4
5
# YAML 格式
kubectl get pod kubia-manual -o yaml

# JSON 格式
kubectl get pod kubia-manual -o json

查看 pod

1
kubectl get pods

查看日志

1
2
3
4
kubectl logs kubia-manual

# 获取多容器 pod 的日志时指定容器名称
kubectl logs kubia-manual -c kubia

将本机的 8888 端口转发至 kubia-manual pod 的 8080 端口

1
kubectl port-forward kubia-manual 8888:8080

使用标签组织 pod

标签时可以附加到资源的任意键值对,用以选择具有该确切标签的资源。

创建带标签的 pod

1
kubectl create -f kubia-manual-with-labels.yaml

基于标签的 pod 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 显示 pod 标签
kubectl get pods --show-labels

# 只显示 env 标签
kubectl get pods -L env

# 为指定的 pod 资源添加新标签
kubectl label pod kubia-manual env=debug

# 修改标签
kubectl label pod kubia-manual env=online --overwrite=true

# - 号删除标签
kubectl label pod kubia-manual env-

使用标签选择器列出 pod

筛选指定值的 pod

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 筛选 env 为 debug 的 pods
kubectl get pods -l env=debug

# 筛选 creation_method 不为 manual 的 pods
kubectl get pods -l creation_method!=manual

# 筛选不含 env 标签的 pods
kubectl get pods -l '!env'

# in 筛选
kubectl get pods -l 'env in (debug)'

# not in 筛选
kubectl get pods -l 'env notin (debug,online)'

使用标签和选择器约束 pod 调度

使用标签分类工作节点

1
2
kubectl label node gke-kubia gpu=true
kubectl get nodes -l gpu=true

在 kubia-gpu.yaml 中的 spec 添加了 nodeSelector 字段。创建该 pod 时,调度器将只在包含标签 gpu=true 的节点中选择。

注解 pod

注解也是键值对,但与标签不同,不用作标识,用作资源说明,注解可以包含相对更多的数据。

使用 kubectl annotate 添加注解

1
kubectl annotate pod kubia-manual mycompany.com/someannotation="foo bar"

查看注解

1
kubectl describe pod kubia-manual

命名空间

标签会导致资源重叠,可用 namespace 将对象分配到集群级别的隔离区域,相当于多租户的概念,以 namespace 为操作单位。

获取所有 namespace

1
kubectl get ns

获取属于 kube-system 命名空间的 pod

1
kubectl get pods -n kube-system

创建 namespace 资源

1
2
3
4
apiVersion: v1
kind: Namespace
metadata:
name: custom-namespace

创建资源时在 metadata.namespace 中指定资源的命名空间

1
2
3
4
5
6
apiVersion: v1
kind: Pod
metadata:
name: kubia-manual
namespace: custom-namespace
...

使用 kubectl create 命令创建资源时指定命名空间

1
kubectl create -f kubia-manual.yaml -n custom-namespace

切换命名空间

1
kubectl config set-context $(kubectl config current-context) --namespace custom-namespace

删除 pod

删除原理:向 pod 所有容器进程定期发送 SIGTERM 信号,使其正常关闭。如果没有及时关闭,则发送 SIGKILL 强制终止。进程需要正确处理信号,如 Go 注册捕捉信号 signal.Notify() 后 select 监听该 channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 按名称删除 pod
kubectl delete po kubia-gpu

# 删除指定标签的 pod
kubectl delete po -l env=debug

# 通过删除整个命名空间来删除 pod
kubectl delete ns custom-namespace

# 删除当前命名空间下的所有 pod(慎用)
kubectl delete pod --all

# 删除当前命名空间的所有资源(慎用)
kubectl delete all --all

CentOS7.6部署k8s

两台 2 核 CPU、2G 内存的阿里云服务器,一台 master 节点,一台 node 节点。

准备工作

关闭防火墙

1
2
systemctl stop firewalld
systemctl disable firewalld

禁用 swap 分区

1
swapoff -a && sed -i '/ swap / s/^\(.*\)$/#\1/g' /etc/fstab

禁用 SELinux

1
setenforce 0 && sed -i 's/^SELINUX=.*/SELINUX=disabled/' /etc/selinux/config

时间同步

1
2
systemctl start chronyd
systemctl enable chronyd

重新设置主机名

1
2
3
4
5
6
7
8
9
10
# master 节点
hostnamectl set-hostname master

# node 节点
hostnamectl set-hostname node

vi /etc/hosts
# 添加 ip:
8.130.22.97 master
8.130.23.131 node

将桥接的 IPv4 以及 IPv6 的流量串通

1
2
3
4
5
6
cat >/etc/sysctl.d/k8s.conf << EOF
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
EOF

sysctl --system

配置 ipvs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sudo yum install -y yum-utils
yum install ipset ipvsadmin -y

cat <<EOF> /etc/sysconfig/modules/ipvs.modules
#!/bin/bash
modprobe -- ip_vs
modprobe -- ip_vs_rr
modprobe -- ip_vs_wrr
modprobe -- ip_vs_sh
modprobe -- nf_conntrack_ipv4
EOF

chmod +x /etc/sysconfig/modules/ipvs.modules
/bin/bash /etc/sysconfig/modules/ipvs.modules
lsmod | grep -e ip_vs -e nf_conntrack_ipv4

安装 docker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo

yum install docker-ce-19.03.2 docker-ce-cli-19.03.2 containerd.io-1.4.4 -y

# 使用 systemd 代替 cgroupfs,配置仓库镜像地址
mkdir /etc/docker
vi /etc/docker/daemon.json
# 添加:
{
"registry-mirrors": ["https://q2hy3fzi.mirror.aliyuncs.com"],
"exec-opts": ["native.cgroupdriver=systemd"]
}

# 启动 docker
systemctl start docker
systemctl enable docker --now

安装 k8s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
cat <<EOF | sudo tee /etc/yum.repos.d/kubernetes.repo
[kubernetes]
name=Kubernetes
baseurl=http://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=0
repo_gpgcheck=0
gpgkey=http://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg
http://mirrors.aliyun.com/kubernetes/yum/doc/rpm-package-key.gpg
exclude=kubelet kubeadm kubectl
EOF

yum install -y kubelet-1.23.1 kubeadm-1.23.1 kubectl-1.23.1 --disableexcludes=kubernetes

vi /etc/sysconfig/kubelet
# 添加:
KUBELET_CGROUP_ARGS="--cgroup-driver=systemd"
KUBE_PROXY_MODE="ipvs"

systemctl enable --now kubelet

# 组件下载脚本
sudo tee ./images.sh <<-'EOF'
#!/bin/bash
images=(
kube-apiserver:v1.23.1
kube-proxy:v1.23.1
kube-controller-manager:v1.23.1
kube-scheduler:v1.23.1
coredns:1.7.5
etcd:3.4.13-0
pause:3.2
kubernetes-dashboard-amd64:v1.10.0
heapster-amd64:v1.5.4
heapster-grafana-amd64:v5.0.4
heapster-influxdb-amd64:v1.5.2
pause-amd64:3.1
)
for imageName in ${images[@]} ; do
docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/$imageName
docker tag registry.cn-hangzhou.aliyuncs.com/google_containers/$imageName k8s.gcr.io/$imageName
done
EOF
# 执行脚本
chmod +x ./images.sh && ./images.sh

部署 k8s master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kubeadm init \
--kubernetes-version v1.23.1 \
--control-plane-endpoint "master:6443" \
--upload-certs \
--image-repository registry.aliyuncs.com/google_containers \
--pod-network-cidr=10.244.0.0/16

# 将执行成功后的命令拷贝过来
mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

# kubectl
kubectl get node

# 安装插件
curl https://docs.projectcalico.org/manifests/calico.yaml -O
kubectl apply -f calico.yaml

# 查看 node,状态为 ready
kubectl get node

如果安装失败,需要 reset:

1
2
kubeadm reset
rm -rf $HOME/.kube

部署 k8s node

需要执行在 kubeadm init 输出的 kubeadm join 命令:

1
2
3
kubeadm join master:6443 --token hkakru.rnv32cvzw2alodkw \
--discovery-token-ca-cert-hash sha256:49257352b5a320c40785df0b6cc5534e7ae0b6cc758023b52abc0da4b5e9890d \
--control-plane --certificate-key bd218950889df9e03586f4df549a1ec955715d75fb9e576ec75f3287c0004063

部署 dashboard

1
2
3
4
5
6
7
kubectl apply -f https://raw.githubusercontent.com/kubernetes/dashboard/v2.3.1/aio/deploy/recommended.yaml

kubectl edit svc kubernetes-dashboard -n kubernetes-dashboard
# type: ClusterIP 改为 type: NodePort

# 查看 dashboard 端口
kubectl get svc -A | grep kubernetes-dashboard

通过 https://集群任意IP:端口 来访问,这里可以是 https://8.130.22.97:31323。

添加用户和绑定角色

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
vi dash.yaml
# 添加:
apiVersion: v1
kind: ServiceAccount
metadata:
name: admin-user
namespace: kubernetes-dashboard

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: admin-user
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: cluster-admin
subjects:
- kind: ServiceAccount
name: admin-user
namespace: kubernetes-dashboard

生成登录 token

1
kubectl -n kubernetes-dashboard describe secret $(kubectl -n kubernetes-dashboard get secret | grep admin-user | awk '{print $1}')

将输出的 token 输入到网站中,就可以看到管理界面了。

Docker 简述

什么是容器

当一个应用程序仅由较少数量的大组件构成时,完全可以接受给每个组件分配专用的虚拟机,以及通过给每个组件提供自己的操作系统实例来隔离它们的环境。但是当这些组件开始变小且数量开始增长时,如果你不想浪费硬件资源,又想持续压低硬件成本,那就不能给每个组件配置一个虚拟机了。

容器类似与虚拟机,但开销小很多。一个容器里运行的进程实际上运行在宿主机的操作系统上,就像所有其他进程一样,不像虚拟机,进程是运行在不同的操作系统上的。但在容器里的进程仍然是和其他进程隔离的。对于容器内进程本身而言,就好像是在机器和操作系统上运行的唯一一个进程。

容器隔离机制

容器实现隔离有两个机制:第一个是 Linux 命名空间,它使每个进程只看到它自己的系统视图;第二个是 cgroups,它限制了进程能使用的资源量。

用 Linux 命名空间隔离进程

每个 Linux 系统最初只有一个命名空间,所有的系统资源都属于这一个命名空间。一个进程在一个命名空间下运行,进程将只能看到同一个命名空间下的资源。通过 clone 函数可以在创建新进程的同时创建 namespace:

1
int pid = clone(main_function, stack_size, CLONE_NEWPID | SIGCHLD, NULL);

在这里指定 CLONE_NEWPID 参数,这样新创建的进程,就会看到一个全新的进程空间。而此时这个新的进程,也就变成了 PID=1 的进程。

与 namespace 相关的 flag 参数有 CLONE_NEWNS、CLONE_NEWUTS、CLONE_NEWIPC、CLONE_NEWPID、CLONE_NEWNET 和 CLONE_NEWUSER,对应 Mount(mnt)、UTS、Inter-process communication(ipc)、Process ID(pid)、Network(net)、User ID(user) 这六种类型的命名空间。

用 cgroups 限制系统的可用资源

cgroups 是一个 Linux 内核功能,它用来限制一个进程或者一组进程的资源使用。一个进程的资源(CPU、内存、网络带宽等)使用量不能超过被分配的量。

cgroups 通过 cgroupfs 提供接口,cgroupfs 默认情况下挂载在 /sys/fs/cgroup 目录。

cgroups 限制 CPU、内存的简单操作方法:

1
2
3
4
5
6
7
8
9
# 创建一个新的目录,也就是创建了一个新的 cgroup 
mkdir /sys/fs/cgroup/cpuset/demo

# 配置这个 cgroup 的资源配额,限制这个 cgroup 的进程只能在 0 号 CPU 上运行,并且只能在 0 号内存节点分配内存
echo 0 > /sys/fs/cgroup/cpuset/demo/cpuset.cpus
echo 0 > /sys/fs/cgroup/cpuset/demo/cpuset.mems

# 将进程 id 写进 tasks 文件,即整个进程移动到 cgroup 中,cgroup 开始起作用
echo > /sys/fs/cgroup/cpuset/demo/tasks

Docker 容器介绍

Docker 是第一个使容器在不同机器之间移植的系统。Docker 有三个概念:

  • 镜像 —— Docker 镜像里包含了打包的应用程序及其所依赖的环境。
  • 镜像仓库 —— Docker 镜像仓库用于存放 Docker 镜像。
  • 容器 —— 一个运行中的容器是一个运行在 Docker 主机上的进程,但它和主机以及所有运行在主机上的其他进程都是隔离的。

Docker 推荐将容器运行时的 cgroup driver 更改为 systemd,systemd 限制 CPU、内存的简单操作方法:

1
2
# 限制 CPU 占用为 0.1 个 CPU,内存为 200 MB
systemctl set-property xxxx.service CPUShares=100 MemoryLimit=200M

systemd 相对 cgroupfs 更加简单,目前主流 Linux 发行版中 systemd 是系统自带的 cgroup 管理器,系统初始化就存在的,和 cgroups 联系紧密。

如果 Docker 的 cgroup driver 是 cgroupfs,就会存在两个控制管理器,对于该服务器上启动的容器使用的是 cgroupfs,而对于其他 systemd 管理的进程使用的是 systemd,这样在服务器资源负载高的情况下可能会变的不稳定。

ALEX: An Updatable Adaptive Learned Index

概述

ALEX 是一个可更新的内存型学习索引。对比 B+Tree 和 Learned Index,ALEX 的目标是:(1)插入时间与 B+Tree 接近,(2)查找时间应该比 B+Tree 和 Learned Index 快,(3)索引存储空间应该比 B+Tree 和 Learned Index 小,(4)数据存储空间(叶子节点)应该与 B+Tree 相当。

ALEX 的设计如下:

  • ALEX 动态调整 RMI 的形状和高度,节点可以进行扩展和分割
  • ALEX 使用 Exponential Search 来寻找叶子层的 key,以纠正 RMI 的错误预测,这比 Binary Search 效果更好
  • ALEX 在数据节点上使用 Gapped Array(GA),将数据插入在自己预测的地方。这和 RMI 有很大的不同。RMI 是先排好序,让后训练模型去拟合数据。而 ALEX 是在模型拟合完数据后,将数据在按照模型的预测值插入到对应的地方,这大大降低了预测的错误率
  • ALEX 的 RMI 结构的目标不是产生同等大小的数据节点,而是产生 key 分布大致为线性的数据节点,以便线性模型能够准确地拟合。因此,ALEX 中的内部节点更加灵活。例如下图中节点 A 中 keys 的范围在 [0,1) 内,并有四个指针。ALEX 将 keys 范围 [0,1/4) 和 [1/2,1) 分配给数据节点(因为这些空间的 CDF 是线性的),并将 [1/4,1/2) 分配给另一个内部节点(因为CDF是非线性的,RMI需要对这个空间进行更多的划分)。另外,多个指针可以指向同一个子节点,便于插入;限制每个内部节点的指针数量总是 2 的幂,便于节点可以在不重新训练的情况下分裂

下面介绍 ALEX 的查询、插入、删除等步骤。

Lookups and Range Queries

查找时,从 RMI 的根节点开始,使用模型计算在哪一个位置,然后迭代地查询下一级的叶子节点,直到到达一个数据节点。在数据节点中使用模型来查询 key 在数组中的位置,如果预测失败,则进行 Exponential Search 以找到 key 的实际位置。如果找到了一个 key,读取对应值并返回记录,否则返回一个空记录。对于范围查询,首先找到第一个 key 的位置,该 key 不小于范围的起始值,然后扫描,直到到达范围的结束值,使用节点的 bitmap 跳过间隙,必要时跳到下一个数据节点。

Insert in non-full Data Node

插入逻辑与上述查找算法相同。在一个 non-full 的数据节点中,使用数据节点中的模型来预测插入位置。如果预测的位置不正确(不能保持有序),则做 Exponential Search 来找到正确的插入位置。如果插入位置为空,则直接插入,否则插入到最近的间隙中。Gapped Array实现了 O(logn) 插入时间。

Insert in full Data Node

Criteria for Node Fullness

ALEX 并不会让数据节点 100% 充满,因为在 Gapped Array 上的插入性能会随着间隙数量的减少而下降。需要在 Gapped Array 上引入了密度的下限和上限:dl, du ∈ (0, 1),约束dl < du。密度被定义为被元素填充的位置的百分比。如果下一次插入使得密度超过了 du,那么这个节点就是满的。默认情况下,我们设置 dl=0.6,du=0.8。相比之下,B+Tree 的节点通常有 dl=0.5 和 du=1。

Node Expansion Mechanism

扩展一个包含 N 个 key 的数据节点,需要创建一个具有 N/dl 槽的新的较大的 Gapped Array。然后对线性回归模型进行缩放或重新训练,然后使用缩放或重新训练的模型对这个节点中的所有元素进行基于模型的插入。新的数据节点的密度处于下限 dl。下图为一个数据节点扩展的例子,数据节点内的 Gapped Array 从左边的两个槽扩展到右边的四个槽。

Node Split Mechanism

  1. 水平分裂。有两种情况:(1) 如果待分裂的数据节点的父内部节点还没有达到最大的节点大小,父内部节点的指针数组可能有多余的指向待分裂数据节点的指针。如果有,则各让一半的指针指向两个新数据节点。否则,将父节点的指针数组的大小增加一倍,并为每个指针制作一个冗余的副本,来创建第二个指向分裂数据节点的指针,然后再分裂。下图(a)展示了一个不需要扩展父内部节点的侧向分裂的例子。(2) 如果父内部节点已经达到最大节点大小,那么我们可以选择拆分父内部节点,如下图(b)中所示。因为内部节点的大小为2的幂,所以总是可以拆分一个数据节点,不需要对拆分后的任何模型进行重新训练。分裂可以一直传播到根节点,就像在 B+Tree 中一样。
  2. 向下分裂。如下图(c)所示,向下分裂将一个数据节点转换为具有两个子数据节点的内部节点。两个子数据节点中的模型是根据它们各自的 key 来训练的。

Delete and update

要删除一个 key,需要先找到该 key 的位置,然后删除它和它对应的值。删除不会移动任何现有的 key,所以删除是一个严格意义上的比插入更简单的操作,不会导致模型的准确性下降。如果一个数据节点由于删除而达到了密度下限 dl,那么就缩小数据节点(与扩大数据节点相反),以避免空间利用率过低。此外还可以使用节点内的成本模型来确定两个数据节点是否应该合并在一起,然而为了简单起见,ALEX 开源代码中并没有实现这些合并操作。
key 更新是通过结合插入和删除来实现的;值更新是通过查找 key 并将新值写入来实现的。

参考

  1. ALEX: An Updatable Adaptive Learned Index

gRPC 基础

gRPC 概览

gRPC 是由 Google 开发并开源的一种语言中立的 RPC 框架,当前支持 C、Java 和 Go 语言,其中 C 版本支持 C、C++、Node.js、C# 等。在 gRPC 的客户端应用可以像调用本地方法一样直接调用另一台不同的机器上的服务端的方法。

简单使用 gRPC 的步骤如下,以 Go 语言为例:

  1. 写好 proto 文件,用 protoc 生成.pb.go文件
  2. 服务端定义 Server,创建 Function 实现接口 -> net.Listen -> grpc.NewServer() -> pb.RegisterXXXServer(server, &Server{}) -> server.Serve(listener)
  3. 客户端 grpc.Dial,创建一个 gRPC 连接 -> pb.NewXXXClient(conn),创建 client -> context.WithTimeout,设置超时时间 -> client.Function,调用接口 -> 如果是流式传输则循环读取数据

gRPC 概念详解

Unary RPC

客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用。

Server streaming RPC

客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
server 需要向流中发送消息,例如:

1
2
3
4
5
6
7
8
for n := 0; n < 5; n++ {
err := server.Send(&pb.StreamResponse{
StreamValue: req.Data + strconv.Itoa(n),
})
if err != nil {
return err
}
}

client 通过 grpc 调用获得的是一个流传输对象 stream,需要循环接收数据,例如:

1
2
3
4
5
6
7
8
9
10
11
12
for {
res, err := stream.Recv()
// 判断消息流是否已经结束
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListStr get stream err: %v", err)
}
// 打印返回值
log.Println(res.StreamValue)
}

Client streaming RPC

客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
server 使用 stream.Recv() 来循环接收数据流,SendAndClose 表示服务器已经接收消息结束,并发生一个正确的响应给客户端,例如:

1
2
3
4
5
6
7
8
9
10
11
for  {
res,err := stream.Recv()
// 接收消息结束,发送结果,并关闭
if err == io.EOF {
return stream.SendAndClose(&proto.UploadResponse{})
}
if err !=nil {
return err
}
fmt.Println(res)
}

client 发送数据完毕的时候需要调用 CloseAndRecv,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
for i := 1; i <= 10; i++ {
img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"}
images := &proto.StreamImageList{Image:img}
err := stream.Send(images)
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
}
// 发送完毕,关闭并获取服务端返回的消息
resp, err := stream.CloseAndRecv()

Bidirectional streaming RPC

两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。
server 在接收消息的同时发送消息,例如:

1
2
3
4
5
6
7
8
9
10
11
for {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
err = stream.Send(&proto.StreamSumData{Number: int32(i)})
if err != nil {
return err
}
i++
}

client 需要有一个执行断开连接的标识 CloseSend(),而 server 不需要,因为服务端断开连接是隐式的,我们只需要退出循环即可断开连接,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for i := 1; i <= 10; i++ {
err = stream.Send(&proto.StreamRequest{})
if err == io.EOF {
break
}
if err != nil {
return
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return
}
log.Printf("res number: %d", res.Number)
}
stream.CloseSend()

同步

Channel 提供一个与特定 gRPC server 的主机和端口建立的连接。Stub 就是在 Channel 的基础上创建而成的,通过 Stub 可以真正的调用 RPC 请求。

基于 CQ 异步

  • CQ:异步操作完成的通知队列
  • StartCall() + Finish():创建异步任务
  • CQ.next():获取完成的异步操作
  • Tag:标记异步动作的标识

多个线程可以操作同一个CQ。CQ.next() 不仅可以接收到当前处理的请求的完成事件,还可以接收到其他请求的事件。假设第一个请求正在等待它的回复数据传输完成时,一个新的请求到达了,CQ.next() 可以获得新请求产生的事件,并开始并行处理新请求,而不用等待第一个请求的传输完成。

基于回调异步

在 client 端发送单个请求,在调用 Function 时,除了传入 Request、 Reply 的指针之外,还需要传入一个接收 Status 的回调函数。
在 server 端 Function 返回的不是状态,而是 ServerUnaryReactor 指针,通过 CallbackServerContext 获得 reactor,调用 reactor 的 Finish 函数处理返回状态。

上下文

  • 在 client 端和 server 端之间传输一些自定义的 Metadata。
  • 类似于 HTTP 头,控制调用配置,如压缩、鉴权、超时。
  • 辅助可观测性,如 Trace ID。

gRPC 通信协议

gRPC 通信协议基于标准的 HTTP/2 设计,支持双向流、单 TCP 的多路复用(一个 HTTP 请求无需等待前一个 HTTP 请求返回结果就可以提前发起,多个请求可以共用同一个 HTTP 连接且互不影响)以及消息头压缩和服务端推送等特性,这些特性使得 gRPC 在移动端设备上更加省电和节省网络流量。

gRPC 序列化机制

Protocol Buffers 介绍

gRPC 序列化支持 Protocol Buffers。ProtoBuf 是一种轻便高效的数据结构序列化方法,保障了 gRPC 调用的高性能。它的优势在于:

  • ProtoBuf 序列化后的体积要比 json、XML 小很多,序列化/反序列化的速度更快。
  • 支持跨平台、多语言。
  • 使用简单,因为它提供了一套编译工具,可以自动生成序列化、反序列化的样板代码。

但是,ProtoBuf 是二进制协议,编码后二进制数据流可读性差,调试麻烦。

ProtoBuf 支持的标量值类型如下:

ProtoBuf 为什么快?

  • 因为每个字段都用 tag+value 这种方式连续存储的,tag 是编号,一般只占用一个字节,value 是字段的值,这样就没有冗余字符。
  • 另外,对于比较小的整数,ProtoBuf 中定义了 Varint 可变整型,可以不用 4 个字节去存。
  • 如果 value 是字符串类型的,从 tag 当中无法了解到 value 具体有多长,ProtoBuf 会在 tag 与 value 之间添加一个 leg 字段去记录字符串的长度,这样就可以不做字符串匹配操作,解析速度非常快。

IDL 文件定义

按照 Protocol Buffers 的语法在 proto 文件中定义 RPC 请求和响应的数据结构,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = "../helloworld";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}

其中,syntax proto3 表示使用 v3 版本的 Protocol Buffers,v3 和 v2 版本语法上有较多的变更, 使用的时候需要特别注意。go_package 表示生成代码的存放路径(包路径)。通过 message 关键字来定义数据结构,数据结构的语法为:

数据类型 字段名称 = Tag

message 是支持嵌套的,即 A message 引用 B message 作为自己的 field,它表示的就是对象聚合关系,即 A 对象聚合(引用)了 B 对象。

对于一些公共的数据结构,例如公共 Header,可以通过单独定义公共数据结构 proto 文件,然后导入的方式使用,示例如下:

import "other_protofile.proto";

导入也支持级联引用,即 a.proto 导入了 b.proto,b.proto 导入了 c.proto,则 a.proto 可以直接使用 c.proto 中定义的 message。

参考

  1. 「直播回放」腾讯工程师分享:gRPC基础概念详解
  2. gRPC 基础概念详解
  3. gRPC 官方文档中文版