KubeEdge 边缘部分 EventBus&ServiceBus 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

EventBus 是一个与 MQTT 服务器 (mosquitto) 交互的 MQTT 客户端,为其他组件提供订阅和发布功能;ServiceBus 是一个运行在边缘的 HTTP 客户端。

EventBus

edge/pkg/eventbus/eventbus.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (eb *eventbus) Start() {
if eventconfig.Config.MqttMode >= v1alpha1.MqttModeBoth {
hub := &mqttBus.Client{
MQTTUrl: eventconfig.Config.MqttServerExternal,
SubClientID: eventconfig.Config.MqttSubClientID,
PubClientID: eventconfig.Config.MqttPubClientID,
Username: eventconfig.Config.MqttUsername,
Password: eventconfig.Config.MqttPassword,
}
mqttBus.MQTTHub = hub
hub.InitSubClient()
hub.InitPubClient()
klog.Infof("Init Sub And Pub Client for external mqtt broker %v successfully", eventconfig.Config.MqttServerExternal)
}

if eventconfig.Config.MqttMode <= v1alpha1.MqttModeBoth {
// launch an internal mqtt server only
mqttServer = mqttBus.NewMqttServer(
int(eventconfig.Config.MqttSessionQueueSize),
eventconfig.Config.MqttServerInternal,
eventconfig.Config.MqttRetain,
int(eventconfig.Config.MqttQOS))
mqttServer.InitInternalTopics()
err := mqttServer.Run()
if err != nil {
klog.Errorf("Launch internal mqtt broker failed, %s", err.Error())
os.Exit(1)
}
klog.Infof("Launch internal mqtt broker %v successfully", eventconfig.Config.MqttServerInternal)
}

eb.pubCloudMsgToEdge()
}

MqttMode 分 MqttModeInternal、MqttModeBoth 和 MqttModeExternal 三种。当 eventconfig.Config.MqttMode >= v1alpha1.MqttModeBoth 将 MQTT 代理启动在 eventbus 之外,eventbus 作为独立启动的 MQTT 代理的客户端与其交互;当 eventconfig.Config.MqttMode <= v1alpha1.MqttModeBoth 时,在 eventbus 内启动一个 MQTT 代理,负责与终端设备交互。

InitSubClient

InitSubClient 设置参数启动 subscribe 连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (mq *Client) InitSubClient() {
timeStr := strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
right := len(timeStr)
if right > 10 {
right = 10
}
// if SubClientID is NOT set, we need to generate it by ourselves.
if mq.SubClientID == "" {
mq.SubClientID = fmt.Sprintf("hub-client-sub-%s", timeStr[0:right])
}
subOpts := util.HubClientInit(mq.MQTTUrl, mq.SubClientID, mq.Username, mq.Password)
subOpts.OnConnect = onSubConnect
subOpts.AutoReconnect = false
subOpts.OnConnectionLost = onSubConnectionLost
mq.SubCli = MQTT.NewClient(subOpts)
util.LoopConnect(mq.SubClientID, mq.SubCli)
klog.Info("finish hub-client sub")
}

onSubConnect 和 onSubConnectionLost 定义了当连接和失联时的处理逻辑。eventbus 订阅以下 topic:

1
2
3
4
5
6
7
8
9
// SubTopics which edge-client should be sub
SubTopics = []string{
"$hw/events/upload/#",
"$hw/events/device/+/state/update",
"$hw/events/device/+/twin/+",
"$hw/events/node/+/membership/get",
UploadTopic,
"+/user/#",
}

当获得这些 topic 消息时,通过 mqtt 的 subscribe 方法回调 OnSubMessageReceived。该函数判断 topic,”hw/events/device” 和 “hw/events/node” 开头发送给 DeviceTwin 模块,其他信息发送给 EdgeHub 模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// OnSubMessageReceived msg received callback
func OnSubMessageReceived(client MQTT.Client, msg MQTT.Message) {
klog.Infof("OnSubMessageReceived receive msg from topic: %s", msg.Topic())
// for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
// for other, send to hub
// for "SYS/dis/upload_records", no need to base64 topic
var target string
var message *beehiveModel.Message
if strings.HasPrefix(msg.Topic(), "$hw/events/device") || strings.HasPrefix(msg.Topic(), "$hw/events/node") {
target = modules.TwinGroup
resource := base64.URLEncoding.EncodeToString([]byte(msg.Topic()))
// routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
message = beehiveModel.NewMessage("").BuildRouter(modules.BusGroup, modules.UserGroup,
resource, messagepkg.OperationResponse).FillBody(string(msg.Payload()))
} else {
target = modules.HubGroup
message = beehiveModel.NewMessage("").BuildRouter(modules.BusGroup, modules.UserGroup,
msg.Topic(), beehiveModel.UploadOperation).FillBody(string(msg.Payload()))
}

klog.Info(fmt.Sprintf("Received msg from mqttserver, deliver to %s with resource %s", target, message.GetResource()))
beehiveContext.SendToGroup(target, *message)
}

InitPubClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// InitPubClient init pub client
func (mq *Client) InitPubClient() {
timeStr := strconv.FormatInt(time.Now().UnixNano()/1e6, 10)
right := len(timeStr)
if right > 10 {
right = 10
}
// if PubClientID is NOT set, we need to generate it by ourselves.
if mq.PubClientID == "" {
mq.PubClientID = fmt.Sprintf("hub-client-pub-%s", timeStr[0:right])
}
pubOpts := util.HubClientInit(mq.MQTTUrl, mq.PubClientID, mq.Username, mq.Password)
pubOpts.OnConnectionLost = onPubConnectionLost
pubOpts.AutoReconnect = false
mq.PubCli = MQTT.NewClient(pubOpts)
util.LoopConnect(mq.PubClientID, mq.PubCli)
klog.Info("finish hub-client pub")
}

InitPubClient 创建了一个 MQTT client,然后调用 LoopConnect 每 5 秒钟连接一次 MQTT server,直到连接成功。如果失去连接,则通过 onPubConnectionLost 继续调用 InitPubClient。

pubCloudMsgToEdge

在启动/连接完 MQTT server 后,调用了 pubCloudMsgToEdge 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
func (eb *eventbus) pubCloudMsgToEdge() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("EventBus PubCloudMsg To Edge stop")
return
default:
}
accessInfo, err := beehiveContext.Receive(eb.Name())
if err != nil {
klog.Errorf("Fail to get a message from channel: %v", err)
continue
}
operation := accessInfo.GetOperation()
resource := accessInfo.GetResource()
switch operation {
case messagepkg.OperationSubscribe:
eb.subscribe(resource)
klog.Infof("Edge-hub-cli subscribe topic to %s", resource)
case messagepkg.OperationUnsubscribe:
eb.unsubscribe(resource)
klog.Infof("Edge-hub-cli unsubscribe topic to %s", resource)
case messagepkg.OperationMessage:
body, ok := accessInfo.GetContent().(map[string]interface{})
if !ok {
klog.Errorf("Message is not map type")
continue
}
message := body["message"].(map[string]interface{})
topic := message["topic"].(string)
payload, _ := json.Marshal(&message)
eb.publish(topic, payload)
case messagepkg.OperationPublish:
topic := resource
// cloud and edge will send different type of content, need to check
payload, ok := accessInfo.GetContent().([]byte)
if !ok {
content, ok := accessInfo.GetContent().(string)
if !ok {
klog.Errorf("Message is not []byte or string")
continue
}
payload = []byte(content)
}
eb.publish(topic, payload)
case messagepkg.OperationGetResult:
if resource != "auth_info" {
klog.Info("Skip none auth_info get_result message")
continue
}
topic := fmt.Sprintf("$hw/events/node/%s/authInfo/get/result", eventconfig.Config.NodeName)
payload, _ := json.Marshal(accessInfo.GetContent())
eb.publish(topic, payload)
default:
klog.Warningf("Action not found")
}
}
}

pubCloudMsgToEdge 执行以下操作:

  1. 从 beehive 获取消息
  2. 获取消息的 operation 和 resource
  3. 当动作为 subscribe 时从 MQTT 订阅 resource(topic) 消息;当动作为 unsubscribe 时从 MQTT 取消订阅 resource(topic) 消息
  4. 当动作为 message 时,将消息的 message 根据消息的 topic 发送给 MQTT broker,消息类型是一个 map
  5. 当动作为 publish 时,将消息发送给 MQTT broker,消息为一个字符串,topic 和 resource 一致
  6. 当动作为 getResult 时,resource 必须为 auth_info,然后发送消息到 “hw/events/node/eventconfig.Config.NodeName/authInfo/get/result” 这一个 topic

ServiceBus

edge/pkg/servicebus/servicebus.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (sb *servicebus) Start() {
// no need to call TopicInit now, we have fixed topic
htc.Timeout = time.Second * 10
uc.Client = htc
if !dao.IsTableEmpty() {
if atomic.CompareAndSwapInt32(&inited, 0, 1) {
go server(c)
}
}
//Get message from channel
for {
select {
case <-beehiveContext.Done():
klog.Warning("servicebus stop")
return
default:
}
msg, err := beehiveContext.Receive(modules.ServiceBusModuleName)
if err != nil {
klog.Warningf("servicebus receive msg error %v", err)
continue
}

// build new message with required field & send message to servicebus
klog.V(4).Info("servicebus receive msg")
go processMessage(&msg)
}
}

ServiceBus 接受来自 beehive 的消息,然后启动一个 processMessage 协程基于消息中带的参数,将消息通过 REST-API 发送到本地 127.0.0.1 上的目标 APP。相当于一个客户端,而 APP 是一个 http Rest-API server,所有的操作和设备状态都需要客户端调用接口来下发和获取。ServiceBus 执行过程图如下:

参考

  1. kubeedge edgecore - EventBus源码分析
  2. 【KubeEdge】 ServiceBus分析

KubeEdge 边缘部分 DeviceTwin 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

DeviceTwin 负责存储设备状态(传感器的值等)并将设备状态同步到云,它还为应用程序提供查询接口。它由四个子模块组成(membership 模块,communication 模块,device 模块和 device twin 模块)。

DeviceTwin 注册

DeviceTwin 注册也调用了 InitDBTable,在 SQLite 数据库中初始化了三张表 Device,DeviceAttr 与 DeviceTwin:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//Device the struct of device
type Device struct {
ID string `orm:"column(id); size(64); pk"`
Name string `orm:"column(name); null; type(text)"`
Description string `orm:"column(description); null; type(text)"`
State string `orm:"column(state); null; type(text)"`
LastOnline string `orm:"column(last_online); null; type(text)"`
}

//DeviceAttr the struct of device attributes
type DeviceAttr struct {
ID int64 `orm:"column(id);size(64);auto;pk"`
DeviceID string `orm:"column(deviceid); null; type(text)"`
Name string `orm:"column(name);null;type(text)"`
Description string `orm:"column(description);null;type(text)"`
Value string `orm:"column(value);null;type(text)"`
Optional bool `orm:"column(optional);null;type(integer)"`
AttrType string `orm:"column(attr_type);null;type(text)"`
Metadata string `orm:"column(metadata);null;type(text)"`
}

//DeviceTwin the struct of device twin
type DeviceTwin struct {
ID int64 `orm:"column(id);size(64);auto;pk"`
DeviceID string `orm:"column(deviceid); null; type(text)"`
Name string `orm:"column(name);null;type(text)"`
Description string `orm:"column(description);null;type(text)"`
Expected string `orm:"column(expected);null;type(text)"`
Actual string `orm:"column(actual);null;type(text)"`
ExpectedMeta string `orm:"column(expected_meta);null;type(text)"`
ActualMeta string `orm:"column(actual_meta);null;type(text)"`
ExpectedVersion string `orm:"column(expected_version);null;type(text)"`
ActualVersion string `orm:"column(actual_version);null;type(text)"`
Optional bool `orm:"column(optional);null;type(integer)"`
AttrType string `orm:"column(attr_type);null;type(text)"`
Metadata string `orm:"column(metadata);null;type(text)"`
}

模块入口

edge/pkg/devicetwin/devicetwin.go:

1
2
3
4
5
6
7
8
9
10
11
// Start run the module
func (dt *DeviceTwin) Start() {
dtContexts, _ := dtcontext.InitDTContext()
dt.DTContexts = dtContexts
err := SyncSqlite(dt.DTContexts)
if err != nil {
klog.Errorf("Start DeviceTwin Failed, Sync Sqlite error:%v", err)
return
}
dt.runDeviceTwin()
}

主要就是 SyncSqlite 和 runDeviceTwin

SyncSqlite

SyncSqlite 最终会执行 SyncDeviceFromSqlite:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func SyncDeviceFromSqlite(context *dtcontext.DTContext, deviceID string) error {
klog.Infof("Sync device detail info from DB of device %s", deviceID)
_, exist := context.GetDevice(deviceID)
if !exist {
var deviceMutex sync.Mutex
context.DeviceMutex.Store(deviceID, &deviceMutex)
}

defer context.Unlock(deviceID)
context.Lock(deviceID)

devices, err := dtclient.QueryDevice("id", deviceID)
if err != nil {
klog.Errorf("query device failed: %v", err)
return err
}
if len(*devices) <= 0 {
return errors.New("Not found device from db")
}
device := (*devices)[0]

deviceAttr, err := dtclient.QueryDeviceAttr("deviceid", deviceID)
if err != nil {
klog.Errorf("query device attr failed: %v", err)
return err
}
attributes := make([]dtclient.DeviceAttr, 0)
attributes = append(attributes, *deviceAttr...)

deviceTwin, err := dtclient.QueryDeviceTwin("deviceid", deviceID)
if err != nil {
klog.Errorf("query device twin failed: %v", err)
return err
}
twins := make([]dtclient.DeviceTwin, 0)
twins = append(twins, *deviceTwin...)

context.DeviceList.Store(deviceID, &dttype.Device{
ID: deviceID,
Name: device.Name,
Description: device.Description,
State: device.State,
LastOnline: device.LastOnline,
Attributes: dttype.DeviceAttrToMsgAttr(attributes),
Twin: dttype.DeviceTwinToMsgTwin(twins)})

return nil
}

这段函数主要执行了以下操作:

  1. 检查设备是否在上下文中(设备列表存储在上下文中),如果不在则添加一个 deviceMutex 至上下文中
  2. 从数据库中查询设备
  3. 从数据库中查询设备属性
  4. 从数据库中查询 Device Twin
  5. 将设备、设备属性和 Device Twin 数据合并为一个结构,并将其存储在上下文中

runDeviceTwin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (dt *DeviceTwin) runDeviceTwin() {
moduleNames := []string{dtcommon.MemModule, dtcommon.TwinModule, dtcommon.DeviceModule, dtcommon.CommModule}
for _, v := range moduleNames {
dt.RegisterDTModule(v)
go dt.DTModules[v].Start()
}
go func() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("Stop DeviceTwin ModulesContext Receive loop")
return
default:
}
if msg, ok := beehiveContext.Receive("twin"); ok == nil {
klog.Info("DeviceTwin receive msg")
err := dt.distributeMsg(msg)
if err != nil {
klog.Warningf("distributeMsg failed: %v", err)
}
}
}
}()

for {
select {
case <-time.After((time.Duration)(60) * time.Second):
//range to check whether has bug
for dtmName := range dt.DTModules {
health, ok := dt.DTContexts.ModulesHealth.Load(dtmName)
if ok {
now := time.Now().Unix()
if now-health.(int64) > 60*2 {
klog.Infof("%s health %v is old, and begin restart", dtmName, health)
go dt.DTModules[dtmName].Start()
}
}
}
for _, v := range dt.HeartBeatToModule {
v <- "ping"
}
case <-beehiveContext.Done():
for _, v := range dt.HeartBeatToModule {
v <- "stop"
}
klog.Warning("Stop DeviceTwin ModulesHealth load loop")
return
}
}
}

runDeviceTwin 主要执行了以下操作:

  1. 启动 devicetwin 中四个的子模块,子模块代码在 edge/pkg/devicetwin/dtmanager 下
  2. 轮询接收消息,执行 distributeMsg。将收到的消息发送给 communication 模块,对消息进行分类,即消息是来自 EventBus、EdgeManager 还是 EdgeHub,并填充 ActionModuleMap,再将消息发送至对应的子模块
  3. 定期(默认60s)向子模块发送 “ping” 信息。每个子模块一旦收到 “ping” 信息,就会更新自己的时间戳。控制器检查每个模块的时间戳是否超过 2 分钟,如果超过则重新启动该子模块。

Membership 模块

Membership 模块的主要作用是为新设备添加提供资格,该模块将新设备与边缘节点绑定,并在边缘节点和边缘设备之间建立相应关系。它主要执行以下操作:

  1. 初始化 memActionCallBack,它的类型是 map[string]Callback,包含可执行的动作函数
  2. 接收消息
  3. 对于每条消息,都会调用相应动作函数
  4. 接收心跳信息,并向控制器发送心跳信号

以下是可由 Membership 模块执行的动作函数:

  • dealMembershipGet:从缓存中获取与特定边缘节点相关的设备信息
  • dealMembershipUpdated:更新节点的成员信息
  • dealMembershipDetail:提供了边缘节点的成员详细信息

Twin 模块

Twin 模块的主要作用是处理所有与 device twin 相关的操作。它可以执行诸如更新 device twin、获取 device twin 和同步 device twin 到云的操作。它执行的操作与 Membership 模块类似。

以下是可由 Twin 模块执行的动作函数:

  • dealTwinUpdate:更新一个特定设备的 device twin 信息
  • dealTwinGet:提供一个特定设备的 device twin 信息
  • dealTwinSync:将 device twin 信息同步到云端

Communication 模块

Communication 模块的主要作用是确保设备双胞胎和其他组件之间的通信功能。它主要执行以下操作:

  1. 初始化 memActionCallBack,它的类型是 map[string]Callback,包含可执行的动作函数
  2. 接收消息
  3. 对于每条消息,都会调用相应动作函数
  4. 确认消息中指定的动作是否完成,如果动作没有完成则重做该动作
  5. 接收心跳信息,并向控制器发送心跳信号

以下是可由 Communication 模块执行的动作函数:

  • dealSendToCloud:用于发送数据到 cloudhub。这个函数首先确保云边是连接的,然后将消息发送到 edgehub 模块,edgehub 将消息转发给云
  • dealSendToEdge:用于发送数据给边缘的其他模块。这个函数将收到的消息发送到 edgehub 模块,edgehub 将消息转发给其他模块
  • dealLifeCycle:检查是否连接到云并且 twin 的状态是否为断开,将状态改为连接并将节点的详细信息发送给 edgehub;如果未连接到云,就把 twin 的状态设置为断开
  • dealConfirm:检查消息的类型是否正确,然后从 ConfirmMap 中删除 msgID

Device 模块

Device 模块的主要作用是执行与设备有关的操作,如设备状态更新和设备属性更新。它执行的操作与 Membership 模块类似。

以下是可由 Device 模块执行的动作函数:

  • dealDeviceUpdated:处理的是当遇到设备属性更新时要执行的操作。更新设备属性,比如在数据库中增加属性、更新属性和删除属性
  • dealDeviceStateUpdate:处理的是当遇到设备状态更新时要执行的操作。更新设备的状态以及数据库中设备的最后在线时间

More

关于执行动作函数的流程以及 Device,DeviceAttr 与 DeviceTwin 这三张表中字段的描述请见 DeviceTwin

KubeEdge 边缘部分 MetaManager 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

MetaManager 是消息处理器,位于 Edged 和 Edgehub 之间,它负责向轻量级数据库 (SQLite) 持久化/检索元数据。

MetaManager 注册

和其他模块注册相比,metamanager 注册最大的不同就是它还调用了 initDBTable 在 SQLite 数据库中初始化了两张表 Meta 与 MetaV2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// Meta metadata object
type Meta struct {
Key string `orm:"column(key); size(256); pk"`
Type string `orm:"column(type); size(32)"`
Value string `orm:"column(value); null; type(text)"`
}

// MetaV2 record k8s api object
type MetaV2 struct {
// Key is the primary key of a line record, format like k8s obj key in etcd:
// /Group/Version/Resources/Namespace/Name
//0/1 /2 /3 /4 /5
// /core/v1/pods/{namespaces}/{name} normal obj
// /core/v1/pods/{namespaces} List obj
// /extensions/v1beta1/ingresses/{namespaces}/{name} normal obj
// /storage.k8s.io/v1beta1/csidrivers/null/{name} cluster scope obj
Key string `orm:"column(key); size(256); pk"`
// GroupVersionResource are set buy gvr.String() like "/v1, Resource=endpoints"
GroupVersionResource string `orm:"column(groupversionresource); size(256);"`
// Namespace is the namespace of an api object, and set as metadata.namespace
Namespace string `orm:"column(namespace); size(256)"`
// Name is the name of api object, and set as metadata.name
Name string `orm:"column(name); size(256)"`
// ResourceVersion is the resource version of the obj, and set as metadata.resourceVersion
ResourceVersion uint64 `orm:"column(resourceversion); size(256)"`
// Value is the api object in json format
// TODO: change to []byte
Value string `orm:"column(value); null; type(text)"`
}

模块入口

edge/pkg/metamanager/metamanager.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (m *metaManager) Start() {
if metaserverconfig.Config.Enable {
imitator.StorageInit()
go metaserver.NewMetaServer().Start(beehiveContext.Done())
}
go func() {
period := getSyncInterval()
timer := time.NewTimer(period)
for {
select {
case <-beehiveContext.Done():
klog.Warning("MetaManager stop")
return
case <-timer.C:
timer.Reset(period)
msg := model.NewMessage("").BuildRouter(MetaManagerModuleName, GroupResource, model.ResourceTypePodStatus, OperationMetaSync)
beehiveContext.Send(MetaManagerModuleName, *msg)
}
}
}()

m.runMetaManager()
}

启动时,开启两个协程,一个用于定时(默认60s)给自己发送消息通知进行边到云的 podstatus 数据同步(KubeEdge 实现了边缘自治,需要将数据同步到云端,网络断开后如果网络恢复,就能立刻将边端的状态进行反馈);另一个 runMetaManager 用于 edgehub 与 edged 的消息,然后调用 m.process(msg) 进行处理。

process 函数获取消息的操作的类型,然后根据信息操作类型对信息进行相应处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *metaManager) process(message model.Message) {
operation := message.GetOperation()
switch operation {
case model.InsertOperation:
m.processInsert(message)
case model.UpdateOperation:
m.processUpdate(message)
case model.DeleteOperation:
m.processDelete(message)
case model.QueryOperation:
m.processQuery(message)
case model.ResponseOperation:
m.processResponse(message)
case messagepkg.OperationNodeConnection:
m.processNodeConnection(message)
case OperationMetaSync:
m.processSync()
case OperationFunctionAction:
m.processFunctionAction(message)
case OperationFunctionActionResult:
m.processFunctionActionResult(message)
case constants.CSIOperationTypeCreateVolume,
constants.CSIOperationTypeDeleteVolume,
constants.CSIOperationTypeControllerPublishVolume,
constants.CSIOperationTypeControllerUnpublishVolume:
m.processVolume(message)
default:
klog.Errorf("metamanager not supported operation: %v", operation)
}
}

具体的处理函数 processInsert、processUpdate 等的具体过程不再分析,大致都是对数据库进行操作,然后再通知 edgehub 或 edged。