TDengine 部署并与 Kuiper 交互

安装和启动

Ubuntu 系统使用 apt-get 工具从官方仓库安装

1
2
3
4
5
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | sudo tee /etc/apt/sources.list.d/tdengine-stable.list
sudo apt-get update
apt-cache policy tdengine
sudo apt-get install tdengine

启动

1
systemctl start taosd

执行 TDengine 客户端程序,只要在 Linux 终端执行 taos 即可

1
taos

与 Kuiper 交互

安装 TDengine 插件,注意当前 TDengine 客户端版本为 2.4.0.12

1
curl -d "{\"name\":\"tdengine\",\"file\":\"https://packages.emqx.io/kuiper-plugins/1.4.3/debian/sinks/tdengine_amd64.zip\",\"shellParas\": [\"2.4.0.12\"]}" http://127.0.0.1:9081/plugins/sinks

进入 TDengine 客户端,创建 test 用户

1
create user test pass 'test';

切换 test 用户,创建数据库和数据表

1
2
3
4
5
6
create database test;

create stable sensordata (time timestamp,temperature float,humidity float) tags (location binary(64));
create table bedroom_sensordata using sensordata tags("bedroom");
create table balcony_sensordata using sensordata tags("balcony");
create table toilet_sensordata using sensordata tags("toilet");

创建 device1、device2、device3 三个 stream,分别接收MQTT test/bedroom、test/balcony、test/toilet 主题消息

1
2
3
kuiper create stream device1 '(temperature float, humidity float) WITH (FORMAT="JSON", DATASOURCE="test/bedroom")'
kuiper create stream device2 '(temperature float, humidity float) WITH (FORMAT="JSON", DATASOURCE="test/balcony")'
kuiper create stream device3 '(temperature float, humidity float) WITH (FORMAT="JSON", DATASOURCE="test/toilet")'

编写 demoRule1,demoRule2,demoRule3 规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# demoRule1
{
"sql": "SELECT temperature,humidity FROM device1 WHERE isNull(temperature,humidity) = false",
"actions": [
{
"tdengine": {
"provideTs": false,
"tsFieldName": "time",
"port": 0,
"ip": "127.0.0.1",
"user": "test",
"password": "test",
"database": "test",
"table": "bedroom_sensordata",
"fields": [
"temperature",
"humidity"
]
}
},
{
"log": {}
}
]
}
# demoRule2
{
"sql": "SELECT temperature,humidity FROM device2 WHERE isNull(temperature,humidity) = false",
"actions": [
{
"tdengine": {
"provideTs": false,
"tsFieldName": "time",
"port": 0,
"ip": "127.0.0.1",
"user": "test",
"password": "test",
"database": "test",
"table": "balcony_sensordata",
"fields": [
"temperature",
"humidity"
]
}
},
{
"log": {}
}
]
}
# demoRule3
{
"sql": "SELECT temperature,humidity FROM device3 WHERE isNull(temperature,humidity) = false",
"actions": [
{
"tdengine": {
"provideTs": false,
"tsFieldName": "time",
"port": 0,
"ip": "127.0.0.1",
"user": "test",
"password": "test",
"database": "test",
"table": "toilet_sensordata",
"fields": [
"temperature",
"humidity"
]
}
},
{
"log": {}
}
]
}

创建 demoRule 规则

1
2
3
kuiper create rule demoRule1 -f demoRule1
kuiper create rule demoRule2 -f demoRule2
kuiper create rule demoRule3 -f demoRule3

kuiper show rules,查看规则是否处于运行状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Connecting to 127.0.0.1:20498... 
[
{
"id": "demoRule1",
"status": "Running"
},
{
"id": "demoRule2",
"status": "Running"
},
{
"id": "demoRule3",
"status": "Running"
}
]

用 Go 编写测试代码(见附录),向 MQTT Broker 发送温度和湿度数据。一段时间过后,在 TDengine 客户端查看接收到的数据

附录

Go 发送消息测试完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package main

import (
"fmt"
"log"
"math/rand"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}

func genPair() (float64, float64) {
t := (200.0 + float64(rand.Intn(120))) / 10.0
h := (500.0 + float64(rand.Intn(350))) / 10.0
return t, h
}

func genLocation() string {
locations := []string{"bedroom", "balcony", "toilet"}
i := rand.Intn(3)
return locations[i]
}

func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("mqtt://175.178.160.127:1883")

opts.SetKeepAlive(60 * time.Second)
opts.SetDefaultPublishHandler(f)
opts.SetPingTimeout(1 * time.Second)

c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}

// pub msg loop
for {
t, h := genPair()
payload := fmt.Sprintf("{\"temperature\":%f, \"humidity\":%f}", t, h)
token := c.Publish("test/"+genLocation(), 0, false, payload)
token.Wait()
// wait 10s
time.Sleep(10 * time.Second)
}
}

Ubuntu 安装使用 Kuiper

下载和安装

通过 https://github.com/lf-edge/ekuiper/releases 获取安装包

1
2
wget https://github.com/lf-edge/ekuiper/releases/download/1.4.3/kuiper-1.4.3-linux-amd64.deb
sudo dpkg -i kuiper-1.4.3-linux-amd64.deb

启动 eKuiper 服务器

1
sudo systemctl start kuiper

运行第一个规则流

定义输入流

创建一个名为 demo 的流,该流使用 DATASOURCE 属性中指定的 MQTT test 主题。

1
kuiper create stream demo '(temperature float, humidity bigint) WITH (FORMAT="JSON", DATASOURCE="test")'

MQTT 源将通过 tcp://localhost:1883 连接到 MQTT 消息服务器,如果 MQTT 消息服务器位于别的位置,请在etc/mqtt_source.yaml中进行修改。

1
2
3
4
default:
qos: 1
sharedsubscription: true
servers: [tcp://127.0.0.1:1883]

使用 kuiper show streams 命令来查看是否创建了 demo 流。

1
kuiper show streams

通过查询工具测试流

通过 kuiper query 命令对其进行测试

1
2
3
kuiper query

kuiper > select count(*), avg(humidity) as avg_hum, max(humidity) as max_hum from demo where temperature > 30 group by TUMBLINGWINDOW(ss, 5);

编写规则

rule 由三部分组成:

  • 规则名称:它必须是唯一的
  • sql:针对规则运行的查询
  • 动作:规则的输出动作

myRule 文件的内容。对于在1分钟内滚动时间窗口中的平均温度大于30的事件,它将打印到日志中。

1
2
3
4
5
6
{
"sql": "SELECT temperature from demo where temperature > 30",
"actions": [{
"log": {}
}]
}

运行 kuiper rule 命令来创建 ruleDemo 规则

1
kuiper create rule ruleDemo -f myRule

测试规则

使用 MQTT 客户端将消息发布到 test 主题即可。消息应为 json 格式

1
2
mosquitto_pub -h 192.168.181.97 -t "test" -m "{\"temperature\":31.2, \"humidity\": 77}"
mosquitto_pub -h 192.168.181.97 -t "test" -m "{\"temperature\":29, \"humidity\": 80}"

查看日志

1
tail -f /var/log/kuiper/stream.log

管理规则

开启规则

1
kuiper start rule ruleDemo

暂停规则

1
kuiper stop rule ruleDemo

删除规则

1
kuiper drop rule ruleDemo

KubeEdge 边缘部分 Edged 简析

本文基于 commit 9a7e140b42abb4bf6bcabada67e3568f73964278。

概述

Edged 是运行在边缘节点的代理(轻量化的 kubelet),用于管理容器化的应用程序。Edged 内部模块如图所示:

代码入口

Edged 的注册和启动过程代码在 edge/pkg/edged/edged.go 中。

Register 调用了 newEdged,newEdged 做了以下事情:

  1. 初始化 pod status 管理器
  2. 初始化 edged 的 livenessManager、readinessManager、startupManager
  3. 创建并启动作为 grpc 服务器运行的 docker shim
  4. 初始化运行时服务 runtimeService 和镜像服务 imageService
  5. 初始化容器生命周期管理器 clcm
  6. 初始化容器日志管理器 logManager
  7. 初始化通用容器运行时服务 containerRuntime
  8. 创建运行时缓存 runtimeCache
  9. 初始化 edged 的镜像存放地 Provider
  10. 初始化镜像垃圾回收管理器 imageGCManager
  11. 初始化容器垃圾回收器 containerGCManager
  12. 初始化 edged 的 server

Start 做了以下事情:

  1. 初始化 edged 的 volume plugin 管理器 volumePluginMgr
  2. 初始化 edged 节点的模块
  3. 新建配置管理器configMapManager
  4. 初始化并启动 volume 管理器 volumeManager
  5. 启动 edged 的探针管理器 probeManager
  6. 启动 pod 状态管理器 statusManager 和 pod 生命周期事件生成器 pleg
  7. 启动 pod 增加和删除消息队列
  8. 启动 pod 监听事件循环
  9. 启动 edged 的 http server
  10. 启动镜像和容器的垃圾回收服务
  11. 初始化和启动 edged 的插件服务
  12. 在 clcm 中启动 CPU 管理器
  13. 最后调用 syncPod,启动与 pod 进行事件同步的服务

edged 与容器运行时

edged 与容器运行时(container runtime)的调用关系可以总结为下图:

可以看出 edged 首先启动作为 grpc 服务器运行的 docker shim,然后 edged 通过调用 docker shim 的 grpc server,來实现与容器运行时(container runtime)的交互,最后 docker shim 的 grpc server 将 edged 具体操作传递给容器运行时。

edged 如何实现边缘自治

首先看 edged 启动时调用的 syncPod,它向 metamanager 发送一条请求(QueryOperation 类型消息),来请求数据库中现有的 pod 信息。然后开始循环接收消息,后面对消息的类型进行判断,类型有 pod、configmap、secret、以及 volume:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
func (e *edged) syncPod() {
time.Sleep(10 * time.Second)

//when starting, send msg to metamanager once to get existing pods
info := model.NewMessage("").BuildRouter(e.Name(), e.Group(), e.namespace+"/"+model.ResourceTypePod,
model.QueryOperation)
beehiveContext.Send(metamanager.MetaManagerModuleName, *info)
for {
......
result, err := beehiveContext.Receive(e.Name())
......
switch resType {
case model.ResourceTypePod:
if op == model.ResponseOperation && resID == "" && result.GetSource() == metamanager.MetaManagerModuleName {
err := e.handlePodListFromMetaManager(content)
if err != nil {
klog.Errorf("handle podList failed: %v", err)
continue
}
e.setInitPodReady(true)
} else if op == model.ResponseOperation && resID == "" && result.GetSource() == EdgeController {
err := e.handlePodListFromEdgeController(content)
if err != nil {
klog.Errorf("handle controllerPodList failed: %v", err)
continue
}
e.setInitPodReady(true)
} else {
err := e.handlePod(op, content)
if err != nil {
klog.Errorf("handle pod failed: %v", err)
continue
}
}
case model.ResourceTypeConfigmap:
......
case model.ResourceTypeSecret:
......
case constants.CSIResourceTypeVolume:
......
default:
......
}
}
}

这里重点关心pod,消息需要通过 result.GetSource() 字段判断来源,可能是 MetaManager 来的,也有可能是 EdgeController 来的。在断网环境下只有可能是 MetaManager 发送的。

handlePodListFromMetaManager 遍历收到的消息中的 pod 内容,调用 addPod 将 pod 全部加入 podAdditionQueue 队列,再调用 updatePodStatus 删除或更新 pod,将 pod status 更新到数据库中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (e *edged) handlePodListFromMetaManager(content []byte) (err error) {
var lists []string
err = json.Unmarshal([]byte(content), &lists)
if err != nil {
return err
}

for _, list := range lists {
var pod v1.Pod
err = json.Unmarshal([]byte(list), &pod)
if err != nil {
return err
}
if filterPodByNodeName(&pod, e.nodeName) {
e.addPod(&pod)
if err = e.updatePodStatus(&pod); err != nil {
klog.Errorf("handlePodListFromMetaManager: update pod %s status error", pod.Name)
return err
}
}
}

return nil
}

另外 edged 启动时会调用 podAddWorkerRun,它会在后台不断从 podAdditionQueue 中 get,后面就和 kubelet 一样开始创建容器。

More

关于 Edged 部分内部模块执行的流程图请见 Edged