ALEX: An Updatable Adaptive Learned Index

概述

ALEX 是一个可更新的内存型学习索引。对比 B+Tree 和 Learned Index,ALEX 的目标是:(1)插入时间与 B+Tree 接近,(2)查找时间应该比 B+Tree 和 Learned Index 快,(3)索引存储空间应该比 B+Tree 和 Learned Index 小,(4)数据存储空间(叶子节点)应该与 B+Tree 相当。

ALEX 的设计如下:

  • ALEX 动态调整 RMI 的形状和高度,节点可以进行扩展和分割
  • ALEX 使用 Exponential Search 来寻找叶子层的 key,以纠正 RMI 的错误预测,这比 Binary Search 效果更好
  • ALEX 在数据节点上使用 Gapped Array(GA),将数据插入在自己预测的地方。这和 RMI 有很大的不同。RMI 是先排好序,让后训练模型去拟合数据。而 ALEX 是在模型拟合完数据后,将数据在按照模型的预测值插入到对应的地方,这大大降低了预测的错误率
  • ALEX 的 RMI 结构的目标不是产生同等大小的数据节点,而是产生 key 分布大致为线性的数据节点,以便线性模型能够准确地拟合。因此,ALEX 中的内部节点更加灵活。例如下图中节点 A 中 keys 的范围在 [0,1) 内,并有四个指针。ALEX 将 keys 范围 [0,1/4) 和 [1/2,1) 分配给数据节点(因为这些空间的 CDF 是线性的),并将 [1/4,1/2) 分配给另一个内部节点(因为CDF是非线性的,RMI需要对这个空间进行更多的划分)。另外,多个指针可以指向同一个子节点,便于插入;限制每个内部节点的指针数量总是 2 的幂,便于节点可以在不重新训练的情况下分裂

下面介绍 ALEX 的查询、插入、删除等步骤。

Lookups and Range Queries

查找时,从 RMI 的根节点开始,使用模型计算在哪一个位置,然后迭代地查询下一级的叶子节点,直到到达一个数据节点。在数据节点中使用模型来查询 key 在数组中的位置,如果预测失败,则进行 Exponential Search 以找到 key 的实际位置。如果找到了一个 key,读取对应值并返回记录,否则返回一个空记录。对于范围查询,首先找到第一个 key 的位置,该 key 不小于范围的起始值,然后扫描,直到到达范围的结束值,使用节点的 bitmap 跳过间隙,必要时跳到下一个数据节点。

Insert in non-full Data Node

插入逻辑与上述查找算法相同。在一个 non-full 的数据节点中,使用数据节点中的模型来预测插入位置。如果预测的位置不正确(不能保持有序),则做 Exponential Search 来找到正确的插入位置。如果插入位置为空,则直接插入,否则插入到最近的间隙中。Gapped Array实现了 O(logn) 插入时间。

Insert in full Data Node

Criteria for Node Fullness

ALEX 并不会让数据节点 100% 充满,因为在 Gapped Array 上的插入性能会随着间隙数量的减少而下降。需要在 Gapped Array 上引入了密度的下限和上限:dl, du ∈ (0, 1),约束dl < du。密度被定义为被元素填充的位置的百分比。如果下一次插入使得密度超过了 du,那么这个节点就是满的。默认情况下,我们设置 dl=0.6,du=0.8。相比之下,B+Tree 的节点通常有 dl=0.5 和 du=1。

Node Expansion Mechanism

扩展一个包含 N 个 key 的数据节点,需要创建一个具有 N/dl 槽的新的较大的 Gapped Array。然后对线性回归模型进行缩放或重新训练,然后使用缩放或重新训练的模型对这个节点中的所有元素进行基于模型的插入。新的数据节点的密度处于下限 dl。下图为一个数据节点扩展的例子,数据节点内的 Gapped Array 从左边的两个槽扩展到右边的四个槽。

Node Split Mechanism

  1. 水平分裂。有两种情况:(1) 如果待分裂的数据节点的父内部节点还没有达到最大的节点大小,父内部节点的指针数组可能有多余的指向待分裂数据节点的指针。如果有,则各让一半的指针指向两个新数据节点。否则,将父节点的指针数组的大小增加一倍,并为每个指针制作一个冗余的副本,来创建第二个指向分裂数据节点的指针,然后再分裂。下图(a)展示了一个不需要扩展父内部节点的侧向分裂的例子。(2) 如果父内部节点已经达到最大节点大小,那么我们可以选择拆分父内部节点,如下图(b)中所示。因为内部节点的大小为2的幂,所以总是可以拆分一个数据节点,不需要对拆分后的任何模型进行重新训练。分裂可以一直传播到根节点,就像在 B+Tree 中一样。
  2. 向下分裂。如下图(c)所示,向下分裂将一个数据节点转换为具有两个子数据节点的内部节点。两个子数据节点中的模型是根据它们各自的 key 来训练的。

Delete and update

要删除一个 key,需要先找到该 key 的位置,然后删除它和它对应的值。删除不会移动任何现有的 key,所以删除是一个严格意义上的比插入更简单的操作,不会导致模型的准确性下降。如果一个数据节点由于删除而达到了密度下限 dl,那么就缩小数据节点(与扩大数据节点相反),以避免空间利用率过低。此外还可以使用节点内的成本模型来确定两个数据节点是否应该合并在一起,然而为了简单起见,ALEX 开源代码中并没有实现这些合并操作。
key 更新是通过结合插入和删除来实现的;值更新是通过查找 key 并将新值写入来实现的。

参考

  1. ALEX: An Updatable Adaptive Learned Index

gRPC Introduction

gRPC Overview

gRPC is a language-neutral RPC framework developed and open-sourced by Google, currently supporting C, Java, and Go languages. The C version supports languages such as C, C++, Node.js, C#. In gRPC, client applications can directly call methods on a server on a different machine as if they were calling local methods.

The simple steps to use gRPC are as follows, taking Go language as an example:

  1. Write the proto file and use protoc to generate the .pb.go file.
  2. On the server side, define a Server, create a Function to implement the interface -> net.Listen -> grpc.NewServer() -> pb.RegisterXXXServer(server, &Server{}) -> server.Serve(listener).
  3. On the client side, grpc.Dial to create a gRPC connection -> pb.NewXXXClient(conn) to create a client -> context.WithTimeout to set a timeout -> call the interface with client.Function -> If it is stream transmission, loop to read data.

gRPC Concepts

Stream

Unary RPC

The client sends a request to the server and gets a response from the server, just like a normal function call.

Server streaming RPC

The client sends a request to the server and can get a data stream to read a series of messages. The client reads from the returned data stream until there are no more messages.
The server needs to send messages into the stream, for example:

1
2
3
4
5
6
7
8
for n := 0; n < 5; n++ {
err := server.Send(&pb.StreamResponse{
StreamValue: req.Data + strconv.Itoa(n),
})
if err != nil {
return err
}
}

The client gets a stream transmission object ‘stream’ through the grpc call and needs to loop to receive data, for example:

1
2
3
4
5
6
7
8
9
10
11
12
for {
res, err := stream.Recv()
// Determine whether the message stream has ended
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListStr get stream err: %v", err)
}
// Print the return value
log.Println(res.StreamValue)
}

Client streaming RPC

The client writes and sends a series of messages to the server using a provided data stream. Once the client finishes writing messages, it waits for the server to read these messages and return a response.

The server uses stream.Recv() to loop to receive the data stream, and SendAndClose indicates that the server has finished receiving messages and sends a correct response to the client, for example:

1
2
3
4
5
6
7
8
9
10
11
for  {
res,err := stream.Recv()
// Message reception ends, send the result, and close
if err == io.EOF {
return stream.SendAndClose(&proto.UploadResponse{})
}
if err !=nil {
return err
}
fmt.Println(res)
}

The client needs to call CloseAndRecv when it has finished sending data, for example:

1
2
3
4
5
6
7
8
9
10
11
12
13
for i := 1; i <= 10; i++ {
img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"}
images := &proto.StreamImageList{Image:img}
err := stream.Send(images)
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
}
// Finish sending, close and get the message returned by the server
resp, err := stream.CloseAndRecv()

Bidirectional streaming RPC

Both sides can separately send a series of messages via a read-write data stream. These two streams operate independently, so the client and server can read and write in any order they wish, for example: the server can wait for all client messages before writing a response, or it can read a message then write a message, or use other combinations of reading and writing.

The server sends messages while receiving messages, for example:

1
2
3
4
5
6
7
8
9
10
11
for {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
err = stream.Send(&proto.StreamSumData{Number: int32(i)})
if err != nil {
return err
}
i++
}

The client needs a flag to disconnect, CloseSend(), but the server doesn’t need it because the server disconnects implicitly. We just need to exit the loop to disconnect, for example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for i := 1; i <= 10; i++ {
err = stream.Send(&proto.StreamRequest{})
if err == io.EOF {
break
}
if err != nil {
return
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return
}
log.Printf("res number: %d", res.Number)
}
stream.CloseSend()

Synchronous

A Channel provides a connection established with the host and port of a specific gRPC server. A Stub is created based on the Channel, and the RPC request can be actually called through the Stub.

Asynchronous based on CQ

  • CQ: Notification queue for completed asynchronous operations
  • StartCall() + Finish(): Create asynchronous tasks
  • CQ.next(): Get completed asynchronous operations
  • Tag: Identifiers marking asynchronous actions

Multiple threads can operate on the same CQ. CQ.next() can receive not only the completion events of the current request being processed but also the events of other requests. Suppose the first request is waiting for its reply data transmission to complete, and a new request arrives. CQ.next() can get the events generated by the new request and start processing the new request in parallel without waiting for the first request’s transmission to complete.

Asynchronous based on Callback

On the client side, send a single request, when calling the Function, in addition to passing the pointers of Request and Reply, a callback function receiving Status is also needed.

On the server side, the Function doesn’t return a status, but a ServerUnaryReactor pointer. Get the reactor through CallbackServerContext and call the Finish function of the reactor to handle the return status.

Context

  • Transfers some custom Metadata between the client and server.
  • Similar to HTTP headers, it controls call configurations such as compression, authentication, and timeout.
  • Assists observability, such as Trace ID.

gRPC Communication Protocol

The gRPC communication protocol is based on standard HTTP/2 design, supports bidirectional streams, multiplexing of single TCP (an HTTP request can be initiated in advance without waiting for the result of the previous HTTP request, and multiple requests can share the same HTTP connection without interfering with each other) and features such as message header compression and server push. These features make gRPC more power-saving and network traffic-saving on mobile devices.

gRPC Serialization Mechanism

Introduction to Protocol Buffers

gRPC serialization supports Protocol Buffers. ProtoBuf is a lightweight and efficient data structure serialization method, ensuring the high performance of gRPC calls. Its advantages include:

  • The volume of ProtoBuf after serialization is much smaller than JSON, XML, and the speed of serialization/deserialization is faster.
  • Supports cross-platform and multi-language.
  • Easy to use because it provides a set of compilation tools that can automatically generate serialization and deserialization boilerplate code.

However, ProtoBuf is a binary protocol, the readability of the encoded binary data stream is poor, and debugging is troublesome.

The scalar value types supported by ProtoBuf are as follows:

Why is ProtoBuf fast?

  • Because each field is stored continuously in the form of tag+value, the tag is a number, usually only occupying one byte, and the value is the value of the field, so there are no redundant characters.
  • In addition, for relatively small integers, ProtoBuf defines a Varint variable integer, which does not need to be stored in 4 bytes.
  • If the value is of string type and the specific length of the value cannot be known from the tag, ProtoBuf will add a leg field between the tag and the value to record the length of the string, so that string matching operations do not need to be performed, and the parsing speed is very fast.

Definition of IDL file

Define the data structure of RPC requests and responses in the proto file according to the syntax of Protocol Buffers, for example:

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = "../helloworld";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}

Here, syntax proto3 indicates the use of version 3 of Protocol Buffers. There are many changes in syntax between v3 and v2, so pay special attention when using it. go_package indicates the storage path of the generated code (package path). The data structure is defined by the message keyword, and the syntax of the data structure is:

Data_type Field_name = Tag

The message supports nesting, that is, A message references B message as its own field, which represents the aggregation relationship of objects, that is, A object aggregates (references) B object.

For some common data structures, such as common Header, you can define the proto file of the common data structure separately, and then import it for use, for example:

import "other_protofile.proto";

Import also supports cascading references, that is, a.proto imports b.proto, b.proto imports c.proto, then a.proto can directly use the message defined in c.proto.

References

  1. https://www.zhihu.com/zvideo/1427014658797027328
  2. https://zhuanlan.zhihu.com/p/389328756
  3. http://doc.oschina.net/grpc

gRPC 基础

gRPC 概览

gRPC 是由 Google 开发并开源的一种语言中立的 RPC 框架,当前支持 C、Java 和 Go 语言,其中 C 版本支持 C、C++、Node.js、C# 等。在 gRPC 的客户端应用可以像调用本地方法一样直接调用另一台不同的机器上的服务端的方法。

简单使用 gRPC 的步骤如下,以 Go 语言为例:

  1. 写好 proto 文件,用 protoc 生成.pb.go文件
  2. 服务端定义 Server,创建 Function 实现接口 -> net.Listen -> grpc.NewServer() -> pb.RegisterXXXServer(server, &Server{}) -> server.Serve(listener)
  3. 客户端 grpc.Dial,创建一个 gRPC 连接 -> pb.NewXXXClient(conn),创建 client -> context.WithTimeout,设置超时时间 -> client.Function,调用接口 -> 如果是流式传输则循环读取数据

gRPC 概念详解

Unary RPC

客户端发送一个请求给服务端,从服务端获取一个应答,就像一次普通的函数调用。

Server streaming RPC

客户端发送一个请求给服务端,可获取一个数据流用来读取一系列消息。客户端从返回的数据流里一直读取直到没有更多消息为止。
server 需要向流中发送消息,例如:

1
2
3
4
5
6
7
8
for n := 0; n < 5; n++ {
err := server.Send(&pb.StreamResponse{
StreamValue: req.Data + strconv.Itoa(n),
})
if err != nil {
return err
}
}

client 通过 grpc 调用获得的是一个流传输对象 stream,需要循环接收数据,例如:

1
2
3
4
5
6
7
8
9
10
11
12
for {
res, err := stream.Recv()
// 判断消息流是否已经结束
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListStr get stream err: %v", err)
}
// 打印返回值
log.Println(res.StreamValue)
}

Client streaming RPC

客户端用提供的一个数据流写入并发送一系列消息给服务端。一旦客户端完成消息写入,就等待服务端读取这些消息并返回应答。
server 使用 stream.Recv() 来循环接收数据流,SendAndClose 表示服务器已经接收消息结束,并发生一个正确的响应给客户端,例如:

1
2
3
4
5
6
7
8
9
10
11
for  {
res,err := stream.Recv()
// 接收消息结束,发送结果,并关闭
if err == io.EOF {
return stream.SendAndClose(&proto.UploadResponse{})
}
if err !=nil {
return err
}
fmt.Println(res)
}

client 发送数据完毕的时候需要调用 CloseAndRecv,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
for i := 1; i <= 10; i++ {
img := &proto.Image{FileName:"image"+strconv.Itoa(i),File:"file data"}
images := &proto.StreamImageList{Image:img}
err := stream.Send(images)
if err != nil {
ctx.JSON(map[string]string{
"err": err.Error(),
})
return
}
}
// 发送完毕,关闭并获取服务端返回的消息
resp, err := stream.CloseAndRecv()

Bidirectional streaming RPC

两边都可以分别通过一个读写数据流来发送一系列消息。这两个数据流操作是相互独立的,所以客户端和服务端能按其希望的任意顺序读写,例如:服务端可以在写应答前等待所有的客户端消息,或者它可以先读一个消息再写一个消息,或者是读写相结合的其他方式。
server 在接收消息的同时发送消息,例如:

1
2
3
4
5
6
7
8
9
10
11
for {
res, err := stream.Recv()
if err == io.EOF {
return nil
}
err = stream.Send(&proto.StreamSumData{Number: int32(i)})
if err != nil {
return err
}
i++
}

client 需要有一个执行断开连接的标识 CloseSend(),而 server 不需要,因为服务端断开连接是隐式的,我们只需要退出循环即可断开连接,例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
for i := 1; i <= 10; i++ {
err = stream.Send(&proto.StreamRequest{})
if err == io.EOF {
break
}
if err != nil {
return
}
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return
}
log.Printf("res number: %d", res.Number)
}
stream.CloseSend()

同步

Channel 提供一个与特定 gRPC server 的主机和端口建立的连接。Stub 就是在 Channel 的基础上创建而成的,通过 Stub 可以真正的调用 RPC 请求。

基于 CQ 异步

  • CQ:异步操作完成的通知队列
  • StartCall() + Finish():创建异步任务
  • CQ.next():获取完成的异步操作
  • Tag:标记异步动作的标识

多个线程可以操作同一个CQ。CQ.next() 不仅可以接收到当前处理的请求的完成事件,还可以接收到其他请求的事件。假设第一个请求正在等待它的回复数据传输完成时,一个新的请求到达了,CQ.next() 可以获得新请求产生的事件,并开始并行处理新请求,而不用等待第一个请求的传输完成。

基于回调异步

在 client 端发送单个请求,在调用 Function 时,除了传入 Request、 Reply 的指针之外,还需要传入一个接收 Status 的回调函数。
在 server 端 Function 返回的不是状态,而是 ServerUnaryReactor 指针,通过 CallbackServerContext 获得 reactor,调用 reactor 的 Finish 函数处理返回状态。

上下文

  • 在 client 端和 server 端之间传输一些自定义的 Metadata。
  • 类似于 HTTP 头,控制调用配置,如压缩、鉴权、超时。
  • 辅助可观测性,如 Trace ID。

gRPC 通信协议

gRPC 通信协议基于标准的 HTTP/2 设计,支持双向流、单 TCP 的多路复用(一个 HTTP 请求无需等待前一个 HTTP 请求返回结果就可以提前发起,多个请求可以共用同一个 HTTP 连接且互不影响)以及消息头压缩和服务端推送等特性,这些特性使得 gRPC 在移动端设备上更加省电和节省网络流量。

gRPC 序列化机制

Protocol Buffers 介绍

gRPC 序列化支持 Protocol Buffers。ProtoBuf 是一种轻便高效的数据结构序列化方法,保障了 gRPC 调用的高性能。它的优势在于:

  • ProtoBuf 序列化后的体积要比 json、XML 小很多,序列化/反序列化的速度更快。
  • 支持跨平台、多语言。
  • 使用简单,因为它提供了一套编译工具,可以自动生成序列化、反序列化的样板代码。

但是,ProtoBuf 是二进制协议,编码后二进制数据流可读性差,调试麻烦。

ProtoBuf 支持的标量值类型如下:

ProtoBuf 为什么快?

  • 因为每个字段都用 tag+value 这种方式连续存储的,tag 是编号,一般只占用一个字节,value 是字段的值,这样就没有冗余字符。
  • 另外,对于比较小的整数,ProtoBuf 中定义了 Varint 可变整型,可以不用 4 个字节去存。
  • 如果 value 是字符串类型的,从 tag 当中无法了解到 value 具体有多长,ProtoBuf 会在 tag 与 value 之间添加一个 leg 字段去记录字符串的长度,这样就可以不做字符串匹配操作,解析速度非常快。

IDL 文件定义

按照 Protocol Buffers 的语法在 proto 文件中定义 RPC 请求和响应的数据结构,示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = "../helloworld";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}

其中,syntax proto3 表示使用 v3 版本的 Protocol Buffers,v3 和 v2 版本语法上有较多的变更, 使用的时候需要特别注意。go_package 表示生成代码的存放路径(包路径)。通过 message 关键字来定义数据结构,数据结构的语法为:

数据类型 字段名称 = Tag

message 是支持嵌套的,即 A message 引用 B message 作为自己的 field,它表示的就是对象聚合关系,即 A 对象聚合(引用)了 B 对象。

对于一些公共的数据结构,例如公共 Header,可以通过单独定义公共数据结构 proto 文件,然后导入的方式使用,示例如下:

import "other_protofile.proto";

导入也支持级联引用,即 a.proto 导入了 b.proto,b.proto 导入了 c.proto,则 a.proto 可以直接使用 c.proto 中定义的 message。

参考

  1. 「直播回放」腾讯工程师分享:gRPC基础概念详解
  2. gRPC 基础概念详解
  3. gRPC 官方文档中文版