// SubTopics which edge-client should be sub SubTopics = []string{ "$hw/events/upload/#", "$hw/events/device/+/state/update", "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", UploadTopic, "+/user/#", }
// OnSubMessageReceived msg received callback func OnSubMessageReceived(client MQTT.Client, msg MQTT.Message) { klog.Infof("OnSubMessageReceived receive msg from topic: %s", msg.Topic()) // for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin // for other, send to hub // for "SYS/dis/upload_records", no need to base64 topic var target string var message *beehiveModel.Message if strings.HasPrefix(msg.Topic(), "$hw/events/device") || strings.HasPrefix(msg.Topic(), "$hw/events/node") { target = modules.TwinGroup resource := base64.URLEncoding.EncodeToString([]byte(msg.Topic())) // routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic> message = beehiveModel.NewMessage("").BuildRouter(modules.BusGroup, modules.UserGroup, resource, messagepkg.OperationResponse).FillBody(string(msg.Payload())) } else { target = modules.HubGroup message = beehiveModel.NewMessage("").BuildRouter(modules.BusGroup, modules.UserGroup, msg.Topic(), beehiveModel.UploadOperation).FillBody(string(msg.Payload())) }
klog.Info(fmt.Sprintf("Received msg from mqttserver, deliver to %s with resource %s", target, message.GetResource())) beehiveContext.SendToGroup(target, *message) }
InitPubClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// InitPubClient init pub client func (mq *Client) InitPubClient() { timeStr := strconv.FormatInt(time.Now().UnixNano()/1e6, 10) right := len(timeStr) if right > 10 { right = 10 } // if PubClientID is NOT set, we need to generate it by ourselves. if mq.PubClientID == "" { mq.PubClientID = fmt.Sprintf("hub-client-pub-%s", timeStr[0:right]) } pubOpts := util.HubClientInit(mq.MQTTUrl, mq.PubClientID, mq.Username, mq.Password) pubOpts.OnConnectionLost = onPubConnectionLost pubOpts.AutoReconnect = false mq.PubCli = MQTT.NewClient(pubOpts) util.LoopConnect(mq.PubClientID, mq.PubCli) klog.Info("finish hub-client pub") }
func (eb *eventbus) pubCloudMsgToEdge() { for { select { case <-beehiveContext.Done(): klog.Warning("EventBus PubCloudMsg To Edge stop") return default: } accessInfo, err := beehiveContext.Receive(eb.Name()) if err != nil { klog.Errorf("Fail to get a message from channel: %v", err) continue } operation := accessInfo.GetOperation() resource := accessInfo.GetResource() switch operation { case messagepkg.OperationSubscribe: eb.subscribe(resource) klog.Infof("Edge-hub-cli subscribe topic to %s", resource) case messagepkg.OperationUnsubscribe: eb.unsubscribe(resource) klog.Infof("Edge-hub-cli unsubscribe topic to %s", resource) case messagepkg.OperationMessage: body, ok := accessInfo.GetContent().(map[string]interface{}) if !ok { klog.Errorf("Message is not map type") continue } message := body["message"].(map[string]interface{}) topic := message["topic"].(string) payload, _ := json.Marshal(&message) eb.publish(topic, payload) case messagepkg.OperationPublish: topic := resource // cloud and edge will send different type of content, need to check payload, ok := accessInfo.GetContent().([]byte) if !ok { content, ok := accessInfo.GetContent().(string) if !ok { klog.Errorf("Message is not []byte or string") continue } payload = []byte(content) } eb.publish(topic, payload) case messagepkg.OperationGetResult: if resource != "auth_info" { klog.Info("Skip none auth_info get_result message") continue } topic := fmt.Sprintf("$hw/events/node/%s/authInfo/get/result", eventconfig.Config.NodeName) payload, _ := json.Marshal(accessInfo.GetContent()) eb.publish(topic, payload) default: klog.Warningf("Action not found") } } }