RocksDB WriteImpl 流程

本文对 RocksDB 6.7.3 版本的 WriteImpl 流程进行分析。

概述

RocksDB 写入实现主要在 DBImpl::WriteImpl 中,过程主要分为以下三步:

  • 把 WriteBatch 加入队列,多个 WriteBatch 成为一个 WriteGroup
  • 将该 WriteGroup 所有的记录对应的日志写到 WAL 文件中
  • 将该 WriteGroup 所有的 WriteBatch 中的一条或者多条记录写到内存中的 Memtable 中

其中,每个 WriteBatch 代表一个事务的提交,可以包含多条操作,可以通过调用 WriteBatch::Put/Delete 等操作将对应多条的 key/value 记录加入 WriteBatch 中。

源码分析

WriteThread::JoinBatchGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
void WriteThread::JoinBatchGroup(Writer* w) {
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
assert(w->batch != nullptr);

bool linked_as_leader = LinkOne(w, &newest_writer_);

if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}

TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);

if (!linked_as_leader) {
/**
* Wait util:
* 1) An existing leader pick us as the new leader when it finishes
* 2) An existing leader pick us as its follewer and
* 2.1) finishes the memtable writes on our behalf
* 2.2) Or tell us to finish the memtable writes in pralallel
* 3) (pipelined write) An existing leader pick us as its follower and
* finish book-keeping and WAL write for us, enqueue us as pending
* memtable writer, and
* 3.1) we become memtable writer group leader, or
* 3.2) an existing memtable writer group leader tell us to finish memtable
* writes in parallel.
*/
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&jbg_ctx);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
}
}

每个事务提交请求都会生成一个 WriteBatch 对象,进入 WriteImpl 函数后各自的线程首先调用 JoinBatchGroup 来加入到队列。该队列主要核心的实现在于 LinkOne 函数,通过 CAS 无锁将多个线程的请求组成请求链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
assert(newest_writer != nullptr);
assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
// If write stall in effect, and w->no_slowdown is not true,
// block here until stall is cleared. If its true, then return
// immediately
if (writers == &write_stall_dummy_) {
if (w->no_slowdown) {
w->status = Status::Incomplete("Write stall");
SetState(w, STATE_COMPLETED);
return false;
}
// Since no_slowdown is false, wait here to be notified of the write
// stall clearing
{
MutexLock lock(&stall_mu_);
writers = newest_writer->load(std::memory_order_relaxed);
if (writers == &write_stall_dummy_) {
stall_cv_.Wait();
// Load newest_writers_ again since it may have changed
writers = newest_writer->load(std::memory_order_relaxed);
continue;
}
}
}
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
}

write_group 链表结构如下:

每个 writer 在头部插入,插入时如果发现 link_older 为空,则此 writer 成为 write_group 的 Leader(即链表尾为 Leader)。

在 JoinBatchGroup 中,如果 writer 不是 Leader(在后文把不是 Leader 的 writer 称为 Follower),则会调用 AwaitState 等待被唤醒。

PS:由于条件锁 Context Switches 代价高,Rocksdb 在 AwaitState 也做了优化,将 pthread_cond_wait 拆成 3 步来做,本文不对该优化进行详细描述。

WriteImpl 写日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
if (w.state == WriteThread::STATE_GROUP_LEADER) {
...

last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
const SequenceNumber current_sequence =
write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
...

if (w.status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
if (wal_write_group.size > 1) {
stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
w.status = WriteToWAL(wal_write_group, log_writer, log_used,
need_log_sync, need_log_dir_sync, current_sequence);
}

...

write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
}

成为 Leader 的 writer,负责批量写入 WAL。在写 WAL 前,首先调用 EnterAsBatchGroupLeader 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
WriteGroup* write_group) {
assert(leader->link_older == nullptr);
assert(leader->batch != nullptr);
assert(write_group != nullptr);

size_t size = WriteBatchInternal::ByteSize(leader->batch);

// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = max_write_batch_group_size_bytes;
const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
if (size <= min_batch_size_bytes) {
max_size = size + min_batch_size_bytes;
}

leader->write_group = write_group;
write_group->leader = leader;
write_group->last_writer = leader;
write_group->size = 1;
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);

// This is safe regardless of any db mutex status of the caller. Previous
// calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
// (they emptied the list and then we added ourself as leader) or had to
// explicitly wake us up (the list was non-empty when we added ourself,
// so we have already received our MarkJoined).
CreateMissingNewerLinks(newest_writer);

// Tricky. Iteration start (leader) is exclusive and finish
// (newest_writer) is inclusive. Iteration goes from old to new.
Writer* w = leader;
while (w != newest_writer) {
w = w->link_newer;

if (w->sync && !leader->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}

if (w->no_slowdown != leader->no_slowdown) {
// Do not mix writes that are ok with delays with the ones that
// request fail on delays.
break;
}

if (w->disable_wal != leader->disable_wal) {
// Do not mix writes that enable WAL with the ones whose
// WAL disabled.
break;
}

if (w->batch == nullptr) {
// Do not include those writes with nullptr batch. Those are not writes,
// those are something else. They want to be alone
break;
}

if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
// dont batch writes that don't want to be batched
break;
}

auto batch_size = WriteBatchInternal::ByteSize(w->batch);
if (size + batch_size > max_size) {
// Do not make batch too big
break;
}

w->write_group = write_group;
size += batch_size;
write_group->last_writer = w;
write_group->size++;
}
TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
return size;
}

在这里,通过 CreateMissingNewerLinks 函数来生成一个双向链表,使得可以从 Leader 开始顺序写。创建完成反向写请求链表之后,则开始计算有多少个写请求可以批量的进行,同时更新 write_group 中的批量写尺寸以及个数等信息,EnterAsBatchGroupLeader 取队列时会把此刻所有的 writer 一次性全取完。

该操作完成之后,则进入写 WAL 的流程了。调用 WriteToWAL,在 MergeBatch 函数中,将根据 write_group 生成一个 merged_batch,该 merged_batch 中记录着应当被写入 WAL 的内容。接着就通过 WriteToWAL 将 merged_batch 写入 WAL 中,这里会根据是否设置了 sync 来决定是否对 WAL 进行落盘操作。

PS:这里有一个优化点,在生成 merged_batch 的时候,假设该写请求的尺寸为一并且该请求需要写 WAL,则 merged_batch 直接复用了该写请求;反之则会复用一个 tmp_batch_ 对象避免频繁的生成 WriteBatch 对象。在写完 WAL 之后,假设复用了 tmp_batch_,则会清空该对象。

最后,调用 ExitAsBatchGroupLeader,该函数会决定该 Leader 是否为 STATE_MEMTABLE_WRITER_LEADER(MEMTABLE_WRITER_LEADER数量 <= GROUP_LEADER数量),从而进行写 Memtable 流程。

WriteImpl 写 Memtable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
WriteThread::WriteGroup memtable_write_group;
if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(w.ShouldWriteToMemtable());
write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
if (memtable_write_group.size > 1 &&
immutable_db_options_.allow_concurrent_memtable_write) {
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
memtable_write_group.status = WriteBatchInternal::InsertInto(
memtable_write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
}

if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(w.ShouldWriteToMemtable());
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
write_options.memtable_insert_hint_per_batch);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
}
}

RocksDB 有一个 allow_concurrent_memtable_write 的配置项,开启后可以并发写 memtable(memtable 能设置并发写,但是 WAL 文件不能,因为 WAL 是一个追加写的文件,多个 writer 必须要串行化),所以接下来分为串行写和并行写来进行分析。

串行写 Memtable

Leader 调用 InsertInto,对 write_group 进行遍历,将 Leader 和 Follower 的 WriteBatch 写入。之后调用 ExitAsMemTableWriter,把所有 Follower 的状态设置为 STATE_COMPLETED,将它们唤醒,最后再把 Leader 的状态设置为 STATE_COMPLETED。

并行写 Memtable

调用 LaunchParallelMemTableWriters,遍历 write_group 把 Leader 和 Follower 的状态都设置为 STATE_PARALLEL_MEMTABLE_WRITER,将等待的线程唤醒。最后所有 writer 通过调用 InsertInto 来将 WriteBatch 写入 MemTable 中。writer 完成了 MemTable 的写操作之后,都会调用 CompleteParallelMemTableWriter 函数。该函数会将该 write_group 中运行的任务数减一,当运行中的任务数为零的时候就代表了所有的线程都完成了操作,调用 ExitAsMemTableWriter 把 Leader 的状态设置为 STATE_COMPLETED,反之则会进入等待状态,等待当前其他的写任务完成。

无论是串行写还是并行写,写入 MemTable 完成之后,还有一项工作,就是在取队列时获取 newest_writer_ 和当前时间点处,可能又有很多的写请求产生了,所以批量任务中最后一个完成的线程必须负责重新指定 Leader 给堆积写请求链表的尾部,让其接过 Leader 角色继续进行批量提交。可以看到,串行写和并行写最后都会调用 ExitAsMemTableWriter,正是在该函数中完成了该项工作。

PS:在高并发场景下,Follow 调用 AwaitState 的平均等待时延差不多是写 WAL 时延的两倍。因为获取 newest_writer_ 后,可能又来了许多写请求,这些写请求先要等待此时的 Leader 完成写流程,还要等待下个 Leader,也就是和这些写请求是同一个 write_group 的 Leader 完成写 WAL 才能被唤醒。

回顾

参考

  1. Rocksdb Source Code 6.7.3
  2. rocksdb写流程DBImpl::WriteImpl()源代码分析
  3. RocksDB写入流程
  4. RocksDB 写流程分析
Author

王亮

Posted on

2021-04-05

Licensed under