Optimization of Common Table Expressions in MPP Database Systems 概述

背景

论文基于 Orca 对非递归的 CTE 进行了形式化表达和优化,贡献总结如下:

  • 在查询中使用 CTE 的上下文中优化 CTE
  • 对于查询中的每个 CTE 引用,CTE 不会每次重新优化,仅在需要的时候才进行,例如下推 fitlers 或者 sort 操作
  • 基于 cost 来决定是否对 CTE 进行内联
  • 减少 plan 的搜索空间,加速查询执行,包括下推 predicates 到 CTE,如果 CTE 被引用一次则始终内联,消除掉没有被引用的 CTE
  • 避免死锁,保证 CTE producer 在 CTE consumer 之前执行

REPRESENTATION OF CTEs

  1. CTEProducer:一个 CTE 定义对应一个 CTEProducer
  2. CTEConsumer:query 中引用 CTE 的地方
  3. CTEAnchor:query 中定义 CTE 的 node,CTE 只能被该 CTEAnchor 的子树中引用
  4. Sequence:按序执行它的孩子节点,先执行左节点,再执行右节点,并把右节点做为返回值

对于此查询:

1
2
3
WITH v AS (SELECT i_brand FROM item WHERE i_color = ’red’)
SELECT * FROM v as v1, v as v2
WHERE v1.i_brand = v2.i_brand;

它的 Logical representation 如下所示:

它的 Execution plans 如下所示:

PLAN ENUMERATION

CTE 是否内联需要取决于 cost,因此需要枚举出 CTE 在不同引用地方内联前后的计划代价。Orca 中定义了 Memo,下图是初始的逻辑查询在 Memo 中的结构,每个编号就是一个 Memo Group:

Transformation Rules

Transformation Rules 可以看做 Memo Group 一个输入(或者一个函数),Memo Group 根据这个规则展开产生另一些 expression 放在同一个 Memo Group 中。对于每个 CTE,我们生成内联或不内联 CTE 的备选方案。

  • 第一条规则应用于 CTEAnchor 运算符。它在 Group 0 中生成一个 Sequence 操作符,这样序列的左节点就是表示 CTE 定义的整个 Plan Tree —— 根据需要创建尽可能多的新 Group (Group 4、5和6) —— 序列的右节点就是 CTEAnchor (Group 1) 的原始节点。

  • 第二条规则也应用于 CTEAnchor,生成 Group 0 中的 NoOp 运算符,其孩子节点是 CTEAnchor 的孩子节点(Group 1)。

  • 第三条规则应用于 CTEConsumer 运算符,生成 CTE 定义的副本,该副本与 CTEConsumer 属于同一 Group。例如,Group 2 中的 CTEConsumer,添加了 CTE 定义 Select 操作符,并将其子操作符 TableScan 添加到新Group(Group 7)。

该方法产生的 Plan Tree 的组合中并不都是有效的,比如:

a 和 b 都没有 CTEProducer;c 有一个 CTEProducer,没有 CTEConsumer;d 中的 Plan 是有效的,但是只有一个 CTEConsumer 对应于包含的 CTEProducer,是一个失败的 Plan。

通过 Memo 的机制来表达不同的 Plan,基于 cost 选择是否内联。在一个 query 中,CTE 可能有的内联,而有的不内联。内联的好处是能进行普通 query 的优化,比如:下推 predicates,distribution,sorting 等。例如:

CTEConsumer 上游有 predicate: i_color=’red’,Orca在默认情况下会将谓词下推,使其从表达式 c 变为表达式 d。

Avoiding Invalid Plans

上述产生的 Plan Tree 会很多,所以需要裁剪掉一些无效的 Plan,例如,使用了 CTEConsumer 却没有 CTEProducer。裁剪算法如下:

CTESpec 表示一个 CTE 的属性对(id, type),比如:(1, ‘c’),cteid = 1,type 是 CTEConsumer。该算法简单来说就是遍历 Tree,检查 CTEConsumer 和 CTEProduct 是否配对。具体描述如下:

  1. 先计算自身的 CTESpec;
  2. 遍历所有子节点:
    1. 计算对于该子节点的 CTESpec 的 Request,输入是:前面兄弟节点以及父节点的 specList,来自父节点的 reqParent,得到该子节点应该满足的 reqChild;
    2. 子节点调用该函数 DeriveCTEs(child, reqChild),递归返回子节点的有效的 CTESpecs,即 specChild;
    3. 把子节点 DeriveCTE 返回的 specChild 追加到 specList。如果发现有一对 CTEProducer 和 CTEConsumer就从 specList 中去除掉。
  3. 对比遍历所有子节点后得到的 specList 与传入的 reqParent 是否 match。如果匹配,则返回当前的 specList。

Optimizations Across Consumers

上述算法可以枚举出所有 CTE 是否内联的 Plan,另外还有一些其他优化 CTE 的方法。

Predicate Push-down

1
2
3
4
5
6
WITH v as (SELECT i_brand, i_color FROM item
WHERE i_current_price < 50)
SELECT * FROM v v1, v v2
WHERE v1.i_brand = v2.i_brand
AND v1.i_color = ’red’
AND v2.i_color = ’blue’;

把一个 CTEProducer 对应所有的 CTEConsumer 的 predicates,下推到该 CTEProducer 上,条件通过 OR 组合起来,减少物化的数据量。(注意:因为下推到 CTEProducer 的 predicate 是通过 OR 连接的,因此 CTEConsumer 仍然需要执行原来的 predicate。)

Always Inlining Single-use CTEs

1
2
3
4
WITH v as (SELECT i_color FROM item 
WHERE i_current_price < 50)
SELECT * FROM v
WHERE v.i_color = ’red’;

如果只有一个 CTEConsumer,则始终内联 CTE。

Elimination of unused CTEs

1
2
3
4
WITH v as (SELECT i_color FROM item 
WHERE i_current_price < 50)
SELECT * FROM item
WHERE item.i_color = ’red’;

CTE v 在上述 query 中没有被使用,这种情况可以消除 CTE。另外,对于如下 query:

1
2
3
4
5
6
WITH v as (SELECT i_current_price p FROM item
WHERE i_current_price < 50),
w as (SELECT v1.p FROM v as v1, v as v2
WHERE v1.p < v2.p)
SELECT * FROM item
WHERE item.i_color = ’red’;

CTE v 被引用了两次,而 CTE w 从未被引用。因此,我们可以消除 w 的定义。并且,这样做去掉了对 v 的唯一引用,这意味着我们还可以消除 v 的定义。

CONTEXTUALIZED OPTIMIZATION

对 CTE 是否内联进行枚举之后,Plan 中不同的 CTEConsumer 可能使用不同的优化方案(内联或不内联、下推等)。

Enforcing Physical Properties

Orca 通过 top-down 发送处理 Memo Group 中的优化请求来优化候选计划。优化请求是一组表达式要满足的 Physical Properties 上的要求,包括 sort order, distribution, rewindability, CTEs 和 data partitioning 等,也可以没有(即 ANY)。下图以 distribution 为例子,CTE 需要在不同的上下文中满足不同的 Physical Properties。

  1. Sequence 算子对 CTEProducer 发射 ANY 的 prop 请求,返回 Hashed(i_sk) 的 prop(表 item 按 i_sk 这一列进行哈希分布);
  2. 上述的 prop 发送到右子树中(结合自身 prop 和父节点的 prop),右子树中的 HashJoin 节点的连接条件需要子节点的数据基于 i_brand 哈希分布,发送请求到 group 2 和 group 3 的 CTEConsumer 中,而 CTEConsumer 并不满足 i_brand 哈希分布的要求,而父节点又需要此 prop,这时就需要在两个 CTEConsumer 分别添加 Redistribute 的算子,把数据按 i_brand 进行哈希,这样才能满足 HashJoin 的要求。

与 (a) 相比,(b) 中可以一开始就要求 CTE 按 i_brand 哈希分布,CTEProducer 会发现数据分布不满足要求,然后就可以在 group 5 中添加 Redistribute 的算子,CTEProducer 返回 Hashed(i_brand),这样 CTEConsumer 就不需要加上 Redistribute 的算子,最终得到一个最优的计划(CTEProducer 只需要计算一遍并保存数据,两个 CTEConsumer 意味着需要读取两遍数据)。

Cost Estimation

CTEProducer 和 CTEConsumer 的 cost 分开计算:

  • CTEProducer 的 cost 是 CTE 自身的 cost,加上物化写磁盘的 cost
  • CTEConsumer 的 cost 是读取物化结果的 cost,类似 scan 算子

参考

  1. El-Helw A, Raghavan V, Soliman M A, et al. Optimization of common table expressions in mpp database systems[J]. Proceedings of the VLDB Endowment, 2015, 8(12): 1704-1715.
  2. 《Optimization of Common Table Expressions in MPP Database Systems》论文导读

Vectorization vs. Compilation in Query Execution

当代 CPU 特性

超标量流水线与乱序执行

CPU指令的执行可以分为5个阶段:取指令、指令译码、执行指令、访存取数、结果写回。

流水线:一套控制单元可以同时执行多条指令,不需要等到上一条指令执行完就可以执行下一条指令。

超标量:一个 CPU 核有多套控制单元,因此可以有多条 pipeline 并发执行。CPU 还会维护一个乱序执行的指令窗口,窗口中的无数据依赖的指令就可以被取来并发执行。并发指令越多越好,因为这样指令之间没有依赖,并发流水线的执行会更加的流畅。

分支预测

遇到 if/switch 这种判断跳转的指令时会产生分支预测,分支预测系统会决定流水线接下来是载入紧挨着判断指令的下一条指令,还是载入跳转到另一个地址的指令。如果 CPU 的预测是正确的,那么判断指令结果出来的那一刻,真正需要执行的指令已经执行到尾声了,这时候只需要继续执行即可;如果CPU的预测是错误的,那么会把执行到尾声的错误指令全部清空,恢复到从未执行过的状态,然后再执行正确的指令。

程序分支越少或者是分支预测成功率越高,对流水线的执行就越有利,因为如果预测失败了,是要丢弃当前 pipeline 的所有指令重新 flush,这个过程往往会消耗掉十几个 CPU 周期。

多级存储与数据预取

当数据在寄存器,cache 或者内存中,CPU 取数据的速度并不是在一个个数量级上的。CPU 取指令/数据的时候并不是直接从内存中取的,通常 CPU 和内存中会有多级缓存,分别为 L1,L2,L3 cache,其中 L1 cache 又可以分为 L1-data cache,L1-instruction cache。先从 cache 中取数据,若不存在,才访问内存。访问内存的时候会同时把访问数据相邻的一些数据一起加载进 cache 中。

预取指的是若数据存在线性访问的模式,CPU会主动把后续的内存块预先加载进cache中。

SIMD

单指令多数据流,对于数据密集型的程序来说,可能会需要对大量不同的数据进行相同的运算。SIMD 引入了一组大容量的寄存器,比如 128 位,256 位。可以将这多个数据按次序同时放到一个寄存器。同时,CPU 新增了处理这种大容量寄存器的指令,可以在一个指令周期内完成多个数据的运算。

早期解释执行模型

大多数的 query 解释器模型都是使用基于迭代器的火山模型,如下图所示。每个算子看成一个 iterator,iterator 会提供一个 next 方法,每个 next 方法只会产生一个 tuple,可以理解为一行数据。查询执行的时候,查询树自顶向下调用 next 接口,数据则自底向上被拉取处理,层层计算返回结果。所以火山模型属于 pull 模型。

Volcano 模型简单灵活,且这种设计不用占用过多的内存。火山模型将更多的内存资源用于磁盘 IO 的缓存设计而没有优化 CPU 的执行效率,这在当时的硬件基础上是很自然的权衡。但是现在 CPU 的硬件环境与大数据场景下,性能表现却差强人意。主要有如下几点原因:

  1. 时间都花在了query plan上,而不是计算上
    next 函数实现为虚函数,调用虚函数的时候要去查虚函数表,编译器无法对虚函数进行 inline 优化。同时会带来分支预测的开销,导致一次错误的 CPU 分支预测,需要多花费十几个 CPU 周期的开销。

  2. CPU cache利用率低
    next 方法一次只返回一个元组,元组通常采用行存储,如果仅需访问其中某个字段但是每次都将整行数据填入 CPU cache,将导致那些不会被访问的字段也放在了 Cache 中,使得 cache 利用率非常低。

编译执行

编译执行指的是运行时期的代码生成生成技术。在执行过程中生成编译执行代码,避免过多的虚函数调用和解析执行,因为在执行之初我们是知道关系代数的 Schema 信息。在具备 Schema 信息的情况下,事先生成好的代码,可以有效减少很多执行分支预测开销。

上图右边的代码非常紧凑,有效消除了字段个数,字段大小,字段类型,对于数据量特别多的处理场景,可以大大减少CPU开销,提高性能。

编译执行以数据为中心,消灭了火山模型中的大量虚函数调用开销。甚至使大部分指令执行,可以直接从寄存器取数,极大提高了执行效率。

在 Java 中通过 JIT 来实现,在 C++ 中通过 LLVM 来实现 codegen,对于 OLAP 这种运行时间较长的 query 来说,通常编译的时间是可以忽略的。

向量化执行

向量可以理解为按列组织的一组数据,连续存储的一列数据,在内存中可以表示为一个向量。

向量模型和火山模型的本质区别就在于,数据处理的基本单元不再是按行组织的 tuple,而是按列组织的多个向量,我们常说的一个 chunk 其实就是多个 vector 的集合,就是多个列的意思。

向量化执行好处是:由于每次 next 都是处理一批数据,那么大大减少了虚函数调用的次数,分支预测的成功概率会提升,减少了分支预测的开销,并且充分发挥 SIMD 指令并行计算的优势;还可以和列式存储有效结合在一起,减少数据额外转换的 overhead。

向量化和编译执行比较

向量化执行的主要访存开销在于像 join 这种算子的物化开销,物化就是从寄存器把数据读到内存中。而编译执行,tuple 可以一直留在寄存器中,一个 operator 处理完后,给另外一个 operator 继续处理。除非遇到不得不物化的情况。

向量化执行模型的循环较短,并发度高,可以同时有更多的指令等待取数。编译执行循环内部会包含多个 operator 的运算,这些有依赖关系的指令占据了大部分的乱序执行窗口,并发度低。

参考

  1. Sompolski, J. , M. Zukowski , and P. A. Boncz . “Vectorization vs. Compilation in Query Execution.” International Workshop on Data Management on New Hardware ACM, 2011.
  2. S. Wanderman-Milne and N. Li, “Runtime Code Generation in Cloudera Impala,” IEEE Data Eng. Bull., vol. 37, no. 1, pp. 31–37, 2014.
  3. 向量化与编译执行浅析

TiDB 架构

TiDB是支持MySQL语法的开源分布式混合事务/分析处理(HTAP)数据库。TiDB 可以提供水平可扩展性、强一致性和高可用性。它主要由 PingCAP 公司开发和支持,并在 Apache 2.0 下授权。TiDB 从 Google 的 Spanner 和 F1 论文中汲取了最初的设计灵感。

HTAP 是 Hybrid Transactional / Analytical Processing 的缩写。这个词汇在 2014 年由 Gartner 提出。传统意义上,数据库往往专为交易或者分析场景设计,因而数据平台往往需要被切分为 TP 和 AP 两个部分,而数据需要从交易库复制到分析型数据库以便快速响应分析查询。而新型的 HTAP 数据库则可以同时承担交易和分析两种智能,这大大简化了数据平台的建设,也能让用户使用更新鲜的数据进行分析。作为一款优秀的 HTAP 数据数据库,TiDB 除了优异的交易处理能力,也具备了良好的分析能力。

TiDB在整体架构基本是参考 Google Spanner 和 F1 的设计,上分两层为 TiDB 和 TiKV。 TiDB 对应的是 Google F1,是一层无状态的 SQL Layer,兼容绝大多数 MySQL 语法,对外暴露 MySQL 网络协议,负责解析用户的 SQL 语句,生成分布式的 Query Plan,翻译成底层 Key Value 操作发送给 TiKV,TiKV 是真正的存储数据的地方,对应的是 Google Spanner,是一个分布式 Key Value 数据库,支持弹性水平扩展,自动的灾难恢复和故障转移(高可用),以及 ACID 跨行事务。值得一提的是 TiKV 并不像 HBase 或者 BigTable 那样依赖底层的分布式文件系统,在性能和灵活性上能更好,这个对于在线业务来说是非常重要。

  • TiDB Server:SQL 层,对外暴露 MySQL 协议的连接 endpoint,负责接受客户端的连接,执行 SQL 解析和优化,最终生成分布式执行计划。TiDB 层本身是无状态的,实践中可以启动多个 TiDB 实例,通过负载均衡组件(如 LVS、HAProxy 或 F5)对外提供统一的接入地址,客户端的连接可以均匀地分摊在多个 TiDB 实例上以达到负载均衡的效果。TiDB Server 本身并不存储数据,只是解析 SQL,将实际的数据读取请求转发给底层的存储节点 TiKV(或 TiFlash)。
  • PD (Placement Driver) Server:整个 TiDB 集群的元信息管理模块,负责存储每个 TiKV 节点实时的数据分布情况和集群的整体拓扑结构,提供 TiDB Dashboard 管控界面,并为分布式事务分配事务 ID。PD 不仅存储元信息,同时还会根据 TiKV 节点实时上报的数据分布状态,下发数据调度命令给具体的 TiKV 节点,可以说是整个集群的“大脑”。此外,PD 本身也是由至少 3 个节点构成,拥有高可用的能力。建议部署奇数个 PD 节点。
  • 存储节点
    • TiKV Server:负责存储数据,从外部看 TiKV 是一个分布式的提供事务的 Key-Value 存储引擎。存储数据的基本单位是 Region,每个 Region 负责存储一个 Key Range(从 StartKey 到 EndKey 的左闭右开区间)的数据,每个 TiKV 节点会负责多个 Region。TiKV 的 API 在 KV 键值对层面提供对分布式事务的原生支持,默认提供了 SI (Snapshot Isolation) 的隔离级别,这也是 TiDB 在 SQL 层面支持分布式事务的核心。TiDB 的 SQL 层做完 SQL 解析后,会将 SQL 的执行计划转换为对 TiKV API 的实际调用。所以,数据都存储在 TiKV 中。另外,TiKV 中的数据都会自动维护多副本(默认为三副本),天然支持高可用和自动故障转移。
    • TiFlash:TiFlash 是一类特殊的存储节点。和普通 TiKV 节点不一样的是,在 TiFlash 内部,数据是以列式的形式进行存储,主要的功能是为分析型的场景加速。

TiKV 基于 RocksDB,采用了 Raft 协议来实现分布式的一致性。TiKV 的系统架构如下图所示:

优点:

  • 纯分布式架构,拥有良好的扩展性,支持弹性的扩缩容
  • 支持 SQL,对外暴露 MySQL 的网络协议,并兼容大多数 MySQL 的语法,在大多数场景下可以直接替换 MySQL
  • 默认支持高可用,在少数副本失效的情况下,数据库本身能够自动进行数据修复和故障转移,对业务透明
  • 支持 ACID 事务,对于一些有强一致需求的场景友好,例如:银行转账
  • 具有丰富的工具链生态,覆盖数据迁移、同步、备份等多种场景
  • 智能的行列混合模式,TiDB 可经由优化器自主选择行列。这套选择的逻辑与选择索引类似:优化器根据统计信息估算读取数据的规模,并对比选择列存与行存访问开销,做出最优选择

缺点:

  • 虽然兼容MySQL,但是不支持存储过程,触发器,自定义函数,窗口功能有限
  • 不适用数据量小的场景,专门为大数据量设计

Online, Asynchronous Schema Change in F1

背景

分布式数据库 Schema 变更时,由于 Server 获取 Schema 元数据的时机不是同步的,不可避免地会使同一时刻一些 Server 上的 Schema 是旧的,如下图所示。而若变更时禁止 DML 让所有的 Server 都暂停服务,对于大规模分布式数据库,基本没法做到,因为 Schema 变更操作需要花费大量时间,而数据库需要保证 24 小时在线。

例如,增加一个索引 E,Schema 从 S1 变为 S2,有两个节点 A 和 B 分别使用 S1 和 S2:

  1. B 添加一行数据,由于它按照 Index E 已经创建完成的 Schema,它会插入两个 KV,RowKV 和 IndexKV
  2. A 删除该行数据,由于它按照 Index E 未创建的 Schema,它只会删除 RowKV,IndexKV 就成了孤儿,破坏了数据的完整性

论文思路

论文提出了一种 Schema 演进的协议,协议有两个特点:

  1. Online——Schema 变更期间,所有 Server 仍然可以读写全部数据
  2. Asynchronous——允许不同 Server 在不同时间点开始使用新版本 Schema

论文把从 Schema 0 到 Schema 1 的突变,替换为一系列相互兼容的小状态变化:

  • 任意两个相邻的小状态(版本)都是兼容的
  • 只有一个 Server 负责 DDL 的执行,其他 Server 只是定期刷新状态(拉取 Schema)
  • 每次 Schema 版本变化间隔不小于一个 Lease 时间,任意时刻,集群中 Server 至多存在两个版本的 Schema。也就是说所有 Server 使用的 Schema 状态都相邻,都是兼容的,经过一系列小状态的转换,就可以实现 Schema 0 到 Schema 1 的变更

schema elements

  • 包括 tables,columns,indexes,constraints,和 optimistic locks
  • 每个 schema element 都有一个与之关联的 state

states

  1. Absent 状态
  • 完全不感知该 schema element,任何 DML 都不会涉及该 schema element
  1. Delete Only 状态
  • Select 语句不能使用该 schema element
  • Delete 语句在删除时,如果​该 schema element​ 对应的条目存在,要一并删除
  • Insert 语句在插入​时,不允许插入该 schema element​ 对应的条目
  • Update 语句在修改时,只允许删除既存的该 schema element​ 对应的条目,但不能插入新的该 schema element​ 对应的条目
  1. Write Only 状态
  • Select 语句不能使用该 schema element
  • 其他 DML 语句可以正常使用该 schema element、修改该 schema element​ 对应的条目
  1. Reorg
  • 不是一种 schema 状态,而是发生在 write-only 状态之后的一系列操作,保证在索引变为 public 之前所有旧数据的 schema element 都被正确地生成
  • reorg 要做的就是取到当前时刻的 snapshot,为每条数据补写对应的 schema element 条目即可。当然 reorg 开始之后数据可能发生变更,这种情况下底层 Spanner 提供的一致性能保证 reorg 的写入操作要么失败(说明新数据已提前写入),要么被新数据覆盖
  1. Public 状态
  • 该 schema element 正常工作,所有 DML 都正常使用该 schema element

状态兼容说明

破坏一致性(兼容性)的场景有两种:

  • orphan data anomaly:数据库中包含了按照当前 schema 下不应存在的 KV
  • integrity anomaly:数据库中缺少当前 schema 下应该存在的 KV
  1. 为什么 “Absent” 和 “Delete Only” 能够兼容
  • Absent 状态的 Server 不知道该 schema element 因此不需要该 schema element,不会产生该 schema element 的条目
  • Delete Only 状态的 Server 知道该 schema element(非 public)但也不需要该 schema element,不会产生该 schema element 的条目
  1. 为什么 “Delete Only” 和 “Write Only” 能够兼容
  • Delete Only 状态和 Write Only 状态的 Server 都知道该 schema element(非 public)但都不需要该 schema element
  1. 为什么 “Write Only” 和 “Public” 能够兼容
  • Write Only 状态的 Server 在该 schema element 的所有已经完整的情况下(通过 Reorg),可以与 Public 兼容
  1. 为什么 “Absent” 和 “Write Only” 不兼容
  • 因为 Write Only 会产生新的条目,破坏了 Absent 的条件
  1. 为什么 “Delete Only” 和 “Public” 不兼容
  • 因为 Public 有要求所有历史数据有完整的 schema element,Delete Only 状态下并不具备
  1. 通俗点举例
  • 假设在增加一个索引 E 的过程中,有如下执行顺序:1)Server A 插入一行 x;2) Server B 删除了行 x;3)Server A 查询 y;4) Server B 查询 y:
    (1) A 为 Delete Only 状态,B 为 Absent 状态:A 插入了一个 KV(RowKV),B 将 RowKV 删除,A 和 B 在查询时都不会用到 Index E,是兼容的
    (2) A 为 Write Only 状态,B 为 Delete Only 状态:A 插入了两个 KV(RowKV 和 IndexKV),B 将 RowKV 和 IndexKV 删除,A 和 B 在查询时都不会用到 Index E,是兼容的
    (3) A 为 Public 状态,B 为 Write Only 状态:A 插入了两个 KV(RowKV 和 IndexKV),B 将 RowKV 和 IndexKV 删除;查询时,A 会用到 Index E,B 虽然不会用到 Index E,但数据库中存在 A 的 schema 下应该存在的 IndexKV,所以是兼容的
    (4) A 为 Write Only 状态,B 为 Absent 状态:A 插入了两个 KV(RowKV 和 IndexKV),B 感知不到 IndexKV,因此只会删除 RowKV,这一行的 IndexKV 就成了孤儿数据,所以不兼容
    (5) A 为 Public 状态,B 为 Delete Only 状态:A 会用到 Index E,B 不会用到 Index E,并且数据库中也不存在 A 的 schema 下的 IndexKV,所以不兼容

参考

  1. Ian Rae, Eric Rollins, Jeff Shute, Sukhdeep Sodhi and Radek Vingralek, Online, Asynchronous Schema Change in F1, VLDB 2013.

Google Percolator

背景

Percolator 事务模型是 Google 内部用于 Web 索引更新的业务提出的分布式事务协议,构建在 BigTable 之上,总体来说就是一个经过优化的二阶段提交的实现。使用基于 Percolator 的增量处理系统代替原有的批处理索引系统后,Google 在处理同样数据量的文档时,将文档的平均搜索延时降低了50%。

2PC

传统的 2PC 简单描述一下就是两步:

  1. 发起事务:事务管理器会发出 Prepare 请求,要求参与者记录日志,进行资源的检查和锁定
  2. 确认/取消事务:当请求得到所有参与者的成功确认后,事务管理器会发出 Commit 请求,执行真正的操作;如果第一步中只要有一个执行者返回失败,则取消事务

这样会有两个问题,一个就是单点故障:如果事务管理器发生故障,数据库会一直阻塞下去。尤其是在第二阶段发生故障的话,所有参与者还都处于锁定事务资源的状态中,从而无法继续完成事务操作;另一个就是存在数据不一致的情况:在第二阶段,当事务管理器向参与者发送 Commit 请求之后,发生了局部网络异常,导致只有部分参与者接收到请求,但是其他参与者未接到请求所以无法提交事务,整个系统就会出现数据不一致性的现象。

Percolator 事务流程

Percolator 事务是一个经过优化的 2PC 的实现,进行了一个二级锁的优化,也分为两个阶段:预写(Pre-write)和提交(Commit)。另外,所有启用了 Percolator 事务的表中,每一个 Column Family 都会预先增加两个列,分别是:

  • lock:存储事务过程中的锁信息
  • write:存储当前行可见(最近一次提交)的版本号

另外,为了简化场景,假设存储用户数据的列只有一个,名为 data。

Pre-write

  1. 客户端从 TSO 获取时间戳,记为 start_ts,并向 Percolator Worker 发起 Pre-write 请求。

  2. 在该事务包含的所有写操作中选取一个作为主(primary)操作,其余的作为次(secondary)操作。主操作将作为整个事务的互斥点,标记事务的状态。

  3. 先预写主操作,成功后再预写次操作。在预写过程中,对每一个写操作都要执行检查:

    • 检查写入的行对应的 lock 列是否有锁,如果有,说明其他事务正在写,直接取消整个事务
    • 检查写入的行对应的 write 列版本号是否晚于 start_ts,如果是,说明有版本冲突,直接取消整个事务
  4. 检查通过后,以 start_ts 作为版本号将数据写入 data 列,对操作行加锁,即更新 lock 列的锁信息:主操作行的 lock 直接标为primary,次操作行的 lock 则标为主操作行的键和列名。不更新write列,亦即此时写入的数据仍然不可见。

Commit

  1. 客户端从 TSO 获取时间戳,记为 commit_ts,并向 Percolator Worker 发起 Commit 请求。

  2. 检查主操作行对应的 lock 列所在的 primary 标记是否存在,如果不存在则失败,取消事务;如果存在则继续。

  3. 以 commit_ts 作为版本号,将 start_ts 更新到 write 列中。也就是说在本阶段完成后,预写阶段写入的数据将会可见。

  4. 对该行解锁,即删除 lock 列的 primary 信息。

  5. 若步骤 1~4 均成功,说明主操作行成功,代表整个事务实际上已经提交。接下来更新每个 secondary 即可,即重复步骤3、4的更新 write 列和清除 lock 列操作。secondary 的 commit 是可以异步进行的,只是在异步提交进行的过程中,如果此时有读请求,可能会需要做一下锁的清理工作。

案例

银行转账,Bob 向 Joe 转账 7 元。该事务于 start_ts=7 开始,commit_ts=8 结束,Key 为 Bob 和 Joe 的行可能在不同的分片上。具体过程如下:

  1. 首先查询 write 列获取最新时间戳数据,获取到 data@5,然后从 data 列里面获取时间戳为 5 的数据,初始状态下,Bob 的帐户下有 10,Joe 的帐户下有 2。
  1. 事务开始,获取 start_ts=7 作为当前事务的开始时间戳,将 Bob 行选为本事务的 primary,通过写入 lock 列锁定 Bob 的帐户,同时将数据 7:$3 写入到 data 列。
  1. 同样,使用 start_ts=7,将 Joe 改变后的余额写入到 data 列,当前操作作为 secondary,因此在 lock 列写入 7:Primary@Bob.bal(当失败时,能够快速定位到 primary 操作,并根据其状态异步清理)。
  1. 事务带着当前时间戳 commit_ts=8 进入 Commit 阶段:删除 primary 所在的 lock,并在 write 列中写入以提交时间戳作为版本号指向数据存储的一个指针 data@7。至此,读请求过来时将看到 Bob 的余额为 3。
  1. 同样,使用 commit_ts=8,依次在 secondary 操作项中写入 write 列并清理锁。
  1. 至此,整个当前 Percolator 事务已完成。

对比

相比 2PC 存在的问题,来看看 Percolator 事务模型有哪些改进。

  1. 单点故障
    Percolator 通过日志和异步线程的方式弱化了这个问题。一是,Percolator 引入的异步线程可以在事务管理器宕机后,回滚各个分片上的事务,提供了善后手段,不会让分片上被占用的资源无法释放。二是,事务管理器可以用记录日志的方式使自身无状态化,日志通过共识算法同时保存在系统的多个节点上。这样,事务管理器宕机后,可以在其他节点启动新的事务管理器,基于日志恢复事务操作。

  2. 数据不一致
    2PC 的一致性问题主要缘自第二阶段,不能确保事务管理器与多个参与者的通讯始终正常。但在 Percolator 的第二阶段,事务管理器只需要与 primary 操作所在的一个节点通讯,这个 Commit 操作本身就是原子的。所以,事务的状态自然也是原子的,一致性问题被完美解决了。

Snapshot Isolation

传统关系型数据库中定义的隔离级别有4种(RU、RC、RR、S),而 Percolator 事务模型提供的隔离级别是快照隔离(Snapshot Isolation, SI),它也是与 MVCC 相辅相成的。SI的优点是:

  • 对于读操作,保证能够从时间戳/版本号指定的稳定快照获取,不会发生幻读
  • 对于写操作,保证在多个事务并发写同一条记录时,最多只有一个会提交成功

如图,基于快照隔离的事务,开始于 start timestamp(图内为小空格),结束于 commit timestamp(图内为小黑球)。本例包含以下信息:

  1. txn_2 不能看到 txn_1 的提交信息,因为 txn_2 的开始时间戳 start timestamp 小于 txn_1 的提交时间戳 commit timestamp
  2. txn_3 可以看到 txn_2 和 txn_1 的提交信息
  3. txn_1 和 txn_2 并发执行:如果它们对同一条记录进行写入,至少有一个会失败

参考

  1. Peng D, Dabek F, Inc G . “Large-scale Incremental Processing Using Distributed Transactions and Notifications” (2010).

Learning to Optimize Join Queries With Deep Reinforcement Learning

Background

传统的多表 join 算法采用的是动态规划:

  1. 从初始 query graph 开始
  2. 找到 cost 最少的 join
  3. 更新 query graph 直到只剩下一个节点。但这种贪心策略并不会保证一定会选到合适的 join order,因为它只直观表示了每个 join 的短期cost。例如:

动态规划的结果 cost 为 140,而最优解的 cost 为 110。

Learning to Optimize Join Queries With Deep Reinforcement Learning 论文中将 join 问题表示为马尔可夫决策过程(MDP),然后构建了一个使用深度 Q 网络(DQN)的优化器,用来有效地优化 join 顺序。

Method

将连接排序表示为 MDP:

  • 状态G:a query graph
  • 动作c:a join
  • 下一个状态G’:join后的query graph
  • 奖励J(c):join的估算成本

用 Q-Learning 算法来解决 join 顺序 MDP。在 Q-Learning 中最关键的是得到 Q 函数 Q(G,c),它可以知道当前 query graph 中进行每个 join 的长期cost。如果我们可以访问真正的 Q(G,c),就可以对传统的动态规划进行改进:

  1. 从初始 query graph 开始
  2. 找到 Q(G,c) 值最小的 join
  3. 更新 query graph 直到只剩下一个节点,从而得到最优的 join order。实际上我们无法访问真正的 Q 函数,因此,我们需要训练一个神经网络,它接收 (G,c) 作为输入,并输出估算的 Q(G,c)。在论文中算法如下,给定query:

状态 G 和动作 c 的特征化

  1. query graph 中的每个关系的所有属性放入集合 A-G 中;join 左侧的所有属性放入集合 A-L 中;join 右侧的所有属性放入集合 A-R 中。并使用 1-hot 向量来编码。对于该例子,论文中表示如下:
  1. 对于查询中的每个选择,我们可以获得 selectivity ∈ [0,1](用来估计选择后存在的元组占选择前总元组的比例),我们需要根据 selectivity 的值去更新向量。例如:
  1. 还可以根据在物理计划中选择的具体 join 算法去产生新的 1-hot 向量(例如,IndexJoin 为 [1 0], HashJoin 为 [0 1]),与原向量进行串联,如下:

根据在论文 2.5 节的假设:

在这里我们就可以知道 query graph 特征 fG 为 fG = AG,join 决策特征 fc 为 fc = A-L ⊕ A-R,对于一个特定的元组(G,c)特征化为 fG ⊕ fc。

模型训练

DQ 使用多层感知机(MLP)神经网络来表示 Q 函数。它以(G,c)的最终特征化 fG ⊕ fc 作为输入。在实验中发现,两层的 MLP 可以提供最佳表现。模型使用随机梯度下降算法进行训练。

执行

训练后,在原有基础上再对多表 join 算法进行改进:

  1. 从初始 query graph 开始
  2. 使每个 join 特征化
  3. 找到模型估计的 Q 值最小的 join(即神经网络的输出)
  4. 更新 query graph 直到只剩下一个节点,从而得到最优的 join order。在执行过程中,还可以对 DQ 进行进一步微调。

Experiment Result

论文中使用了 Join Order Benchmark(JOB) 来评估 DQ。这个数据库由来自 IMDB 的 21 个表组成,并提供了 33 个查询模板和 113 个查询。查询中的连接关系大小范围为 5 到 15 个。当连接关系的数量不超过 10 个时,DQ 从穷举中收集训练数据。

将 DQ 与几个启发式优化器(QuickPick 和 KBZ)以及经典动态规划(left-deep、right-deep、zig-zag)进行比较。对每个优化器生成的计划进行评分,并与最优计划(通过穷举获得)进行比较。
此外,论文中设计了 3 个成本模型:

  1. Cost Model 1(Index Mostly):模拟内存数据库并鼓励使用索引连接
  2. Cost Model 2(Hybrid Hash):仅考虑具有内存预算的散列连接和嵌套循环连接
  3. Cost Model 3(Hash Reuse):考虑重用已构建的散列表

进行了 4 轮交叉验证后,确保仅对未出现在训练工作负载中的查询进行 DQ 评估(对于每种情况,论文中在 80 个查询上训练并测试其中的 33 个)。计算查询的平均次优性,即“成本(算法计划)/ 成本(最佳计划)”,这个数字越低越好。例如,对 Const Model 1,DQ 平均距离最佳计划 1.32 倍。结果如下:

在所有成本模型中,DQ 在没有指数结构的先验知识的前提下可以与最优解决方案一比高下。对于固定的动态规划,情况并非如此:例如,left-deep 在 CM1 中产生了良好的计划,但在 CM2 和 CM3 中效果没有那么好。同样,right-deep在 CM1 中没有竞争力,但如果使用 CM2 或 CM3,right-deep 不是最差的。需要注意的是,基于学习的优化器比手动设计的算法更强大,可以适应工作负载、数据或成本模型的变化。

此外,DQ 以比传统动态规划快得多的速度产生了良好的计划:

对于最大的连接(15),DQ 的速度是穷举的 10000 倍,比 zig-zag 快 1000 倍,比 left-deep 和 right-deep 快 10 倍。

Future work

本文研究中存在的不足:

  1. 奖励值(即J(c))依赖于数据库系统的代价模型,当代价估计错误时,算法的 join 计划无法达到最优
  2. 需要大量的 query 进行训练,估计的 Q 函数的值才能趋向稳定

可以拓展的思路:
同样使用强化学习,参考于 SkinnerDB: Regret-Bounded Query Evaluation via Reinforcement Learning,一条 join query 的执行框架如下:

查询时,Pre-Processor 首先通过一元谓词过滤基表,接着由 Join Processor 生成查询的连接顺序与执行结果,最后调用 Post-Processor 对结果进行分组、聚合与排序等操作。

Join Processor 包括 4 部分,Join Processor 将每个连接操作分为多个时间片,每个时间片首先由 Learning Optimizer 选择连接顺序,选中的连接顺序由特定的 Join Executor 执行,每次执行固定时长,并将执行结果加入结果集中。Progress Tracker 跟踪被处理的数据,最后由 Reward Calcultor 计算连接顺序的得分。当所有数据被连接后,完成连接操作。学习优化器使用强化学习领域的上限置信区间算法(UCT),在每个时间片中根据连接顺序的枚举空间生成搜索树,并选择一条路径。UCT 算法的特点即不依赖任何具体示例的参数设置,能够适用于较大的搜索空间。

该算法不需要任何查询上下文及 Cardinality 估计模型。