func (a *cloudHub) Start() { if !cache.WaitForCacheSync(beehiveContext.Done(), a.informersSyncedFuncs...) { klog.Errorf("unable to sync caches for objectSyncController") os.Exit(1) }
// start dispatch message from the cloud to edge node go a.messageq.DispatchMessage()
// check whether the certificates exist in the local directory, // and then check whether certificates exist in the secret, generate if they don't exist if err := httpserver.PrepareAllCerts(); err != nil { klog.Exit(err) } // TODO: Will improve in the future DoneTLSTunnelCerts <- true close(DoneTLSTunnelCerts)
// HttpServer mainly used to issue certificates for the edge go httpserver.StartHTTPServer()
servers.StartCloudHub(a.messageq)
if hubconfig.Config.UnixSocket.Enable { // The uds server is only used to communicate with csi driver from kubeedge on cloud. // It is not used to communicate between cloud and edge. go udsserver.StartServer(hubconfig.Config.UnixSocket.Address) } }
func (q *ChannelMessageQueue) DispatchMessage() { for { select { case <-beehiveContext.Done(): klog.Warning("Cloudhub channel eventqueue dispatch message loop stopped") return default: } msg, err := beehiveContext.Receive(model.SrcCloudHub) klog.V(4).Infof("[cloudhub] dispatchMessage to edge: %+v", msg) if err != nil { klog.Info("receive not Message format message") continue } nodeID, err := GetNodeID(&msg) if nodeID == "" || err != nil { klog.Warning("node id is not found in the message") continue } if isListResource(&msg) { q.addListMessageToQueue(nodeID, &msg) } else { q.addMessageToQueue(nodeID, &msg) } } }
StartCloudHub
StartCloudHub 的代码如下:
1 2 3 4 5 6 7 8 9 10 11
func StartCloudHub(messageq *channelq.ChannelMessageQueue) { handler.InitHandler(messageq) // start websocket server if hubconfig.Config.WebSocket.Enable { go startWebsocketServer() } // start quic server if hubconfig.Config.Quic.Enable { go startQuicServer() } }