// SubTopics which edge-client should be sub SubTopics = []string{ "$hw/events/upload/#", "$hw/events/device/+/state/update", "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", UploadTopic, "+/user/#", }
// OnSubMessageReceived msg received callback func OnSubMessageReceived(client MQTT.Client, msg MQTT.Message) { klog.Infof("OnSubMessageReceived receive msg from topic: %s", msg.Topic()) // for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin // for other, send to hub // for "SYS/dis/upload_records", no need to base64 topic var target string var message *beehiveModel.Message if strings.HasPrefix(msg.Topic(), "$hw/events/device") || strings.HasPrefix(msg.Topic(), "$hw/events/node") { target = modules.TwinGroup resource := base64.URLEncoding.EncodeToString([]byte(msg.Topic())) // routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic> message = beehiveModel.NewMessage("").BuildRouter(modules.BusGroup, modules.UserGroup, resource, messagepkg.OperationResponse).FillBody(string(msg.Payload())) } else { target = modules.HubGroup message = beehiveModel.NewMessage("").BuildRouter(modules.BusGroup, modules.UserGroup, msg.Topic(), beehiveModel.UploadOperation).FillBody(string(msg.Payload())) }
klog.Info(fmt.Sprintf("Received msg from mqttserver, deliver to %s with resource %s", target, message.GetResource())) beehiveContext.SendToGroup(target, *message) }
InitPubClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// InitPubClient init pub client func (mq *Client) InitPubClient() { timeStr := strconv.FormatInt(time.Now().UnixNano()/1e6, 10) right := len(timeStr) if right > 10 { right = 10 } // if PubClientID is NOT set, we need to generate it by ourselves. if mq.PubClientID == "" { mq.PubClientID = fmt.Sprintf("hub-client-pub-%s", timeStr[0:right]) } pubOpts := util.HubClientInit(mq.MQTTUrl, mq.PubClientID, mq.Username, mq.Password) pubOpts.OnConnectionLost = onPubConnectionLost pubOpts.AutoReconnect = false mq.PubCli = MQTT.NewClient(pubOpts) util.LoopConnect(mq.PubClientID, mq.PubCli) klog.Info("finish hub-client pub") }
func (eb *eventbus) pubCloudMsgToEdge() { for { select { case <-beehiveContext.Done(): klog.Warning("EventBus PubCloudMsg To Edge stop") return default: } accessInfo, err := beehiveContext.Receive(eb.Name()) if err != nil { klog.Errorf("Fail to get a message from channel: %v", err) continue } operation := accessInfo.GetOperation() resource := accessInfo.GetResource() switch operation { case messagepkg.OperationSubscribe: eb.subscribe(resource) klog.Infof("Edge-hub-cli subscribe topic to %s", resource) case messagepkg.OperationUnsubscribe: eb.unsubscribe(resource) klog.Infof("Edge-hub-cli unsubscribe topic to %s", resource) case messagepkg.OperationMessage: body, ok := accessInfo.GetContent().(map[string]interface{}) if !ok { klog.Errorf("Message is not map type") continue } message := body["message"].(map[string]interface{}) topic := message["topic"].(string) payload, _ := json.Marshal(&message) eb.publish(topic, payload) case messagepkg.OperationPublish: topic := resource // cloud and edge will send different type of content, need to check payload, ok := accessInfo.GetContent().([]byte) if !ok { content, ok := accessInfo.GetContent().(string) if !ok { klog.Errorf("Message is not []byte or string") continue } payload = []byte(content) } eb.publish(topic, payload) case messagepkg.OperationGetResult: if resource != "auth_info" { klog.Info("Skip none auth_info get_result message") continue } topic := fmt.Sprintf("$hw/events/node/%s/authInfo/get/result", eventconfig.Config.NodeName) payload, _ := json.Marshal(accessInfo.GetContent()) eb.publish(topic, payload) default: klog.Warningf("Action not found") } } }
func SyncDeviceFromSqlite(context *dtcontext.DTContext, deviceID string) error { klog.Infof("Sync device detail info from DB of device %s", deviceID) _, exist := context.GetDevice(deviceID) if !exist { var deviceMutex sync.Mutex context.DeviceMutex.Store(deviceID, &deviceMutex) }
func (dt *DeviceTwin) runDeviceTwin() { moduleNames := []string{dtcommon.MemModule, dtcommon.TwinModule, dtcommon.DeviceModule, dtcommon.CommModule} for _, v := range moduleNames { dt.RegisterDTModule(v) go dt.DTModules[v].Start() } go func() { for { select { case <-beehiveContext.Done(): klog.Warning("Stop DeviceTwin ModulesContext Receive loop") return default: } if msg, ok := beehiveContext.Receive("twin"); ok == nil { klog.Info("DeviceTwin receive msg") err := dt.distributeMsg(msg) if err != nil { klog.Warningf("distributeMsg failed: %v", err) } } } }()
for { select { case <-time.After((time.Duration)(60) * time.Second): //range to check whether has bug for dtmName := range dt.DTModules { health, ok := dt.DTContexts.ModulesHealth.Load(dtmName) if ok { now := time.Now().Unix() if now-health.(int64) > 60*2 { klog.Infof("%s health %v is old, and begin restart", dtmName, health) go dt.DTModules[dtmName].Start() } } } for _, v := range dt.HeartBeatToModule { v <- "ping" } case <-beehiveContext.Done(): for _, v := range dt.HeartBeatToModule { v <- "stop" } klog.Warning("Stop DeviceTwin ModulesHealth load loop") return } } }
// Meta metadata object type Meta struct { Key string `orm:"column(key); size(256); pk"` Type string `orm:"column(type); size(32)"` Value string `orm:"column(value); null; type(text)"` }
// MetaV2 record k8s api object type MetaV2 struct { // Key is the primary key of a line record, format like k8s obj key in etcd: // /Group/Version/Resources/Namespace/Name //0/1 /2 /3 /4 /5 // /core/v1/pods/{namespaces}/{name} normal obj // /core/v1/pods/{namespaces} List obj // /extensions/v1beta1/ingresses/{namespaces}/{name} normal obj // /storage.k8s.io/v1beta1/csidrivers/null/{name} cluster scope obj Key string `orm:"column(key); size(256); pk"` // GroupVersionResource are set buy gvr.String() like "/v1, Resource=endpoints" GroupVersionResource string `orm:"column(groupversionresource); size(256);"` // Namespace is the namespace of an api object, and set as metadata.namespace Namespace string `orm:"column(namespace); size(256)"` // Name is the name of api object, and set as metadata.name Name string `orm:"column(name); size(256)"` // ResourceVersion is the resource version of the obj, and set as metadata.resourceVersion ResourceVersion uint64 `orm:"column(resourceversion); size(256)"` // Value is the api object in json format // TODO: change to []byte Value string `orm:"column(value); null; type(text)"` }
func (m *metaManager) process(message model.Message) { operation := message.GetOperation() switch operation { case model.InsertOperation: m.processInsert(message) case model.UpdateOperation: m.processUpdate(message) case model.DeleteOperation: m.processDelete(message) case model.QueryOperation: m.processQuery(message) case model.ResponseOperation: m.processResponse(message) case messagepkg.OperationNodeConnection: m.processNodeConnection(message) case OperationMetaSync: m.processSync() case OperationFunctionAction: m.processFunctionAction(message) case OperationFunctionActionResult: m.processFunctionActionResult(message) case constants.CSIOperationTypeCreateVolume, constants.CSIOperationTypeDeleteVolume, constants.CSIOperationTypeControllerPublishVolume, constants.CSIOperationTypeControllerUnpublishVolume: m.processVolume(message) default: klog.Errorf("metamanager not supported operation: %v", operation) } }