RocksDB WriteImpl 流程

本文对 RocksDB 6.7.3 版本的 WriteImpl 流程进行分析。

概述

RocksDB 写入实现主要在 DBImpl::WriteImpl 中,过程主要分为以下三步:

  • 把 WriteBatch 加入队列,多个 WriteBatch 成为一个 WriteGroup
  • 将该 WriteGroup 所有的记录对应的日志写到 WAL 文件中
  • 将该 WriteGroup 所有的 WriteBatch 中的一条或者多条记录写到内存中的 Memtable 中

其中,每个 WriteBatch 代表一个事务的提交,可以包含多条操作,可以通过调用 WriteBatch::Put/Delete 等操作将对应多条的 key/value 记录加入 WriteBatch 中。

源码分析

WriteThread::JoinBatchGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup");
void WriteThread::JoinBatchGroup(Writer* w) {
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w);
assert(w->batch != nullptr);

bool linked_as_leader = LinkOne(w, &newest_writer_);

if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}

TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);

if (!linked_as_leader) {
/**
* Wait util:
* 1) An existing leader pick us as the new leader when it finishes
* 2) An existing leader pick us as its follewer and
* 2.1) finishes the memtable writes on our behalf
* 2.2) Or tell us to finish the memtable writes in pralallel
* 3) (pipelined write) An existing leader pick us as its follower and
* finish book-keeping and WAL write for us, enqueue us as pending
* memtable writer, and
* 3.1) we become memtable writer group leader, or
* 3.2) an existing memtable writer group leader tell us to finish memtable
* writes in parallel.
*/
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w);
AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&jbg_ctx);
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w);
}
}

每个事务提交请求都会生成一个 WriteBatch 对象,进入 WriteImpl 函数后各自的线程首先调用 JoinBatchGroup 来加入到队列。该队列主要核心的实现在于 LinkOne 函数,通过 CAS 无锁将多个线程的请求组成请求链表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
assert(newest_writer != nullptr);
assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
// If write stall in effect, and w->no_slowdown is not true,
// block here until stall is cleared. If its true, then return
// immediately
if (writers == &write_stall_dummy_) {
if (w->no_slowdown) {
w->status = Status::Incomplete("Write stall");
SetState(w, STATE_COMPLETED);
return false;
}
// Since no_slowdown is false, wait here to be notified of the write
// stall clearing
{
MutexLock lock(&stall_mu_);
writers = newest_writer->load(std::memory_order_relaxed);
if (writers == &write_stall_dummy_) {
stall_cv_.Wait();
// Load newest_writers_ again since it may have changed
writers = newest_writer->load(std::memory_order_relaxed);
continue;
}
}
}
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
}

write_group 链表结构如下:

每个 writer 在头部插入,插入时如果发现 link_older 为空,则此 writer 成为 write_group 的 Leader(即链表尾为 Leader)。

在 JoinBatchGroup 中,如果 writer 不是 Leader(在后文把不是 Leader 的 writer 称为 Follower),则会调用 AwaitState 等待被唤醒。

PS:由于条件锁 Context Switches 代价高,Rocksdb 在 AwaitState 也做了优化,将 pthread_cond_wait 拆成 3 步来做,本文不对该优化进行详细描述。

WriteImpl 写日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
if (w.state == WriteThread::STATE_GROUP_LEADER) {
...

last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group);
const SequenceNumber current_sequence =
write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
...

if (w.status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
if (wal_write_group.size > 1) {
stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
w.status = WriteToWAL(wal_write_group, log_writer, log_used,
need_log_sync, need_log_dir_sync, current_sequence);
}

...

write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
}

成为 Leader 的 writer,负责批量写入 WAL。在写 WAL 前,首先调用 EnterAsBatchGroupLeader 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
WriteGroup* write_group) {
assert(leader->link_older == nullptr);
assert(leader->batch != nullptr);
assert(write_group != nullptr);

size_t size = WriteBatchInternal::ByteSize(leader->batch);

// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = max_write_batch_group_size_bytes;
const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
if (size <= min_batch_size_bytes) {
max_size = size + min_batch_size_bytes;
}

leader->write_group = write_group;
write_group->leader = leader;
write_group->last_writer = leader;
write_group->size = 1;
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);

// This is safe regardless of any db mutex status of the caller. Previous
// calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks
// (they emptied the list and then we added ourself as leader) or had to
// explicitly wake us up (the list was non-empty when we added ourself,
// so we have already received our MarkJoined).
CreateMissingNewerLinks(newest_writer);

// Tricky. Iteration start (leader) is exclusive and finish
// (newest_writer) is inclusive. Iteration goes from old to new.
Writer* w = leader;
while (w != newest_writer) {
w = w->link_newer;

if (w->sync && !leader->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}

if (w->no_slowdown != leader->no_slowdown) {
// Do not mix writes that are ok with delays with the ones that
// request fail on delays.
break;
}

if (w->disable_wal != leader->disable_wal) {
// Do not mix writes that enable WAL with the ones whose
// WAL disabled.
break;
}

if (w->batch == nullptr) {
// Do not include those writes with nullptr batch. Those are not writes,
// those are something else. They want to be alone
break;
}

if (w->callback != nullptr && !w->callback->AllowWriteBatching()) {
// dont batch writes that don't want to be batched
break;
}

auto batch_size = WriteBatchInternal::ByteSize(w->batch);
if (size + batch_size > max_size) {
// Do not make batch too big
break;
}

w->write_group = write_group;
size += batch_size;
write_group->last_writer = w;
write_group->size++;
}
TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
return size;
}

在这里,通过 CreateMissingNewerLinks 函数来生成一个双向链表,使得可以从 Leader 开始顺序写。创建完成反向写请求链表之后,则开始计算有多少个写请求可以批量的进行,同时更新 write_group 中的批量写尺寸以及个数等信息,EnterAsBatchGroupLeader 取队列时会把此刻所有的 writer 一次性全取完。

该操作完成之后,则进入写 WAL 的流程了。调用 WriteToWAL,在 MergeBatch 函数中,将根据 write_group 生成一个 merged_batch,该 merged_batch 中记录着应当被写入 WAL 的内容。接着就通过 WriteToWAL 将 merged_batch 写入 WAL 中,这里会根据是否设置了 sync 来决定是否对 WAL 进行落盘操作。

PS:这里有一个优化点,在生成 merged_batch 的时候,假设该写请求的尺寸为一并且该请求需要写 WAL,则 merged_batch 直接复用了该写请求;反之则会复用一个 tmp_batch_ 对象避免频繁的生成 WriteBatch 对象。在写完 WAL 之后,假设复用了 tmp_batch_,则会清空该对象。

最后,调用 ExitAsBatchGroupLeader,该函数会决定该 Leader 是否为 STATE_MEMTABLE_WRITER_LEADER(MEMTABLE_WRITER_LEADER数量 <= GROUP_LEADER数量),从而进行写 Memtable 流程。

WriteImpl 写 Memtable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
WriteThread::WriteGroup memtable_write_group;
if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) {
PERF_TIMER_GUARD(write_memtable_time);
assert(w.ShouldWriteToMemtable());
write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group);
if (memtable_write_group.size > 1 &&
immutable_db_options_.allow_concurrent_memtable_write) {
write_thread_.LaunchParallelMemTableWriters(&memtable_write_group);
} else {
memtable_write_group.status = WriteBatchInternal::InsertInto(
memtable_write_group, w.sequence, column_family_memtables_.get(),
&flush_scheduler_, &trim_history_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this,
false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_);
versions_->SetLastSequence(memtable_write_group.last_sequence);
write_thread_.ExitAsMemTableWriter(&w, memtable_write_group);
}
}

if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
assert(w.ShouldWriteToMemtable());
ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
w.status = WriteBatchInternal::InsertInto(
&w, w.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/,
false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/,
write_options.memtable_insert_hint_per_batch);
if (write_thread_.CompleteParallelMemTableWriter(&w)) {
MemTableInsertStatusCheck(w.status);
versions_->SetLastSequence(w.write_group->last_sequence);
write_thread_.ExitAsMemTableWriter(&w, *w.write_group);
}
}

RocksDB 有一个 allow_concurrent_memtable_write 的配置项,开启后可以并发写 memtable(memtable 能设置并发写,但是 WAL 文件不能,因为 WAL 是一个追加写的文件,多个 writer 必须要串行化),所以接下来分为串行写和并行写来进行分析。

串行写 Memtable

Leader 调用 InsertInto,对 write_group 进行遍历,将 Leader 和 Follower 的 WriteBatch 写入。之后调用 ExitAsMemTableWriter,把所有 Follower 的状态设置为 STATE_COMPLETED,将它们唤醒,最后再把 Leader 的状态设置为 STATE_COMPLETED。

并行写 Memtable

调用 LaunchParallelMemTableWriters,遍历 write_group 把 Leader 和 Follower 的状态都设置为 STATE_PARALLEL_MEMTABLE_WRITER,将等待的线程唤醒。最后所有 writer 通过调用 InsertInto 来将 WriteBatch 写入 MemTable 中。writer 完成了 MemTable 的写操作之后,都会调用 CompleteParallelMemTableWriter 函数。该函数会将该 write_group 中运行的任务数减一,当运行中的任务数为零的时候就代表了所有的线程都完成了操作,调用 ExitAsMemTableWriter 把 Leader 的状态设置为 STATE_COMPLETED,反之则会进入等待状态,等待当前其他的写任务完成。

无论是串行写还是并行写,写入 MemTable 完成之后,还有一项工作,就是在取队列时获取 newest_writer_ 和当前时间点处,可能又有很多的写请求产生了,所以批量任务中最后一个完成的线程必须负责重新指定 Leader 给堆积写请求链表的尾部,让其接过 Leader 角色继续进行批量提交。可以看到,串行写和并行写最后都会调用 ExitAsMemTableWriter,正是在该函数中完成了该项工作。

PS:在高并发场景下,Follow 调用 AwaitState 的平均等待时延差不多是写 WAL 时延的两倍。因为获取 newest_writer_ 后,可能又来了许多写请求,这些写请求先要等待此时的 Leader 完成写流程,还要等待下个 Leader,也就是和这些写请求是同一个 write_group 的 Leader 完成写 WAL 才能被唤醒。

回顾

参考

  1. Rocksdb Source Code 6.7.3
  2. rocksdb写流程DBImpl::WriteImpl()源代码分析
  3. RocksDB写入流程
  4. RocksDB 写流程分析

从 Row Cache 的 Get 来看 Rocksdb LRUCache

本文简单介绍 RocksDB 6.7.3 版本的 LRUCache。

Row Cache

Row Cache 对查找的 key 在 SST 中对应的 value 进行 cache。如果 row_cache 打开,在 TableCache::Get 函数中,会调用 CreateRowCacheKeyPrefix 和 GetFromRowCache 获取 row cache 的 key(fd_number + seq_no + user_key),在 GetFromRowCache 中,会调用 row_cache->Lookup,得到 row cache 缓存的 row_handle,构造 found_row_cache_entry 指针指向 value,利用 Cleannable 类的特性,可以通过减少一次对 value 内存拷贝的方式来获取最终的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k,
GetContext* get_context,
const SliceTransform* prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters,
int level) {
...
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
auto user_key = ExtractUserKey(k);
CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
get_context);
if (!done) {
row_cache_entry = &row_cache_entry_buffer;
}
}
...
}

void TableCache::CreateRowCacheKeyPrefix(const ReadOptions& options,
const FileDescriptor& fd,
const Slice& internal_key,
GetContext* get_context,
IterKey& row_cache_key) {
uint64_t fd_number = fd.GetNumber();
uint64_t seq_no = 0;
...
AppendVarint64(&row_cache_key, fd_number);
AppendVarint64(&row_cache_key, seq_no);
}

bool TableCache::GetFromRowCache(const Slice& user_key, IterKey& row_cache_key,
size_t prefix_size, GetContext* get_context) {
bool found = false;

row_cache_key.TrimAppend(prefix_size, user_key.data(), user_key.size());
if (auto row_handle =
ioptions_.row_cache->Lookup(row_cache_key.GetUserKey())) {
// Cleanable routine to release the cache entry
Cleanable value_pinner;
auto release_cache_entry_func = [](void* cache_to_clean,
void* cache_handle) {
((Cache*)cache_to_clean)->Release((Cache::Handle*)cache_handle);
};
auto found_row_cache_entry =
static_cast<const std::string*>(ioptions_.row_cache->Value(row_handle));
// If it comes here value is located on the cache.
// found_row_cache_entry points to the value on cache,
// and value_pinner has cleanup procedure for the cached entry.
// After replayGetContextLog() returns, get_context.pinnable_slice_
// will point to cache entry buffer (or a copy based on that) and
// cleanup routine under value_pinner will be delegated to
// get_context.pinnable_slice_. Cache entry is released when
// get_context.pinnable_slice_ is reset.
value_pinner.RegisterCleanup(release_cache_entry_func,
ioptions_.row_cache.get(), row_handle);
replayGetContextLog(*found_row_cache_entry, user_key, get_context,
&value_pinner);
RecordTick(ioptions_.statistics, ROW_CACHE_HIT);
found = true;
} else {
RecordTick(ioptions_.statistics, ROW_CACHE_MISS);
}
return found;
}

LRUCache 类

  • Cache
    定义了 Cache 的接口,包括 Insert, Lookup, Release 等操作。

  • ShardedCache
    支持对 Cache 进行分桶,分桶数量为 2^num_shard_bits,每个桶的容量相等。分桶的依据是取 key 的 hash 值的高 num_shard_bits 位。

  • LRUCache
    实现了 ShardedCache,维护了一个 LRUCacheShard 数组,一个 shard 就是一个桶。

  • CacheShard
    定义了一个桶的接口,包括 Insert, Lookup, Release 等操作,Cache 的相关调用经过分桶处理后,都会调用指定桶的对应操作。

  • LRUCacheShard
    实现了 CacheShard,维护了一个 LRU list 和 hash table,用来实现 LRU 策略,他们的成员类型都是 LRUHandle。

  • LRUHandle
    保存 key 和 value 的单元,并且包含前向和后续指针,可以组成双向循环链表作为 LRU list。

  • LRUHandleTable
    hash table 的实现,根据 key 再次做了分组处理,并且尽量保证每个桶中只有一个元素,元素类型为 LRUHandle。提供了Lookup, Insert, Remove操作。

Lookup

在 GetFromRowCache 中,会调用 row_cache->Lookup,这里实际调用的是 ShardedCache::Lookup

1
2
3
4
Cache::Handle* ShardedCache::Lookup(const Slice& key, Statistics* /*stats*/) {
uint32_t hash = HashSlice(key);
return GetShard(Shard(hash))->Lookup(key, hash);
}

获取哈希值,根据 hash 值的高 num_shard_bits 位获取 shard,再调用 LRUCacheShard::Lookup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) {
MutexLock l(&mutex_);
LRUHandle* e = table_.Lookup(key, hash);
if (e != nullptr) {
assert(e->InCache());
if (!e->HasRefs()) {
// The entry is in LRU since it's in hash and has no external references
LRU_Remove(e);
}
e->Ref();
e->SetHit();
}
return reinterpret_cast<Cache::Handle*>(e);
}

LRUCacheShard::Lookup 中又会调用 LRUHandleTable::Lookup,在 FindPointer 中,hash 到特定位置后,如果当前位置的 hash 和当前 hash 不一样,或者 key 不一样,并且指针也不为空,则继续向下找,直到找到

1
2
3
4
5
6
7
8
9
10
11
LRUHandle* LRUHandleTable::Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}

LRUHandle** LRUHandleTable::FindPointer(const Slice& key, uint32_t hash) {
LRUHandle** ptr = &list_[hash & (length_ - 1)];
while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}

总结

LRUCache 就是把多个 LRUCacheShard 组合起来,每个 LRUCacheShard 维护了一个 LRUHandle list 和 hash table,LRUHandleTable 用拉链法实现哈希表。通过对缓存的 Lookup 调用链分析可以看到具体的实现非常简练。

参考

  1. Rocksdb Source Code 6.7.3
  2. RocksDB. LRUCache源码分析
  3. RocksDB中的LRUCache

RocksDB Get 流程

本文对 RocksDB 6.7.3 版本的 Get 流程进行分析。

概述

(1) 获取当前的 SuperVersion。SuperVersion 用于管理 CF 的元数据,如当前版本号、内存中的 MemTable 和 Immutable MemTable、SST 文件信息等:

1
2
3
4
5
6
Struct SuperVersion {
MemTable* mem;
MemTableListVersion* imm;
Version* current;
...
}

(2) 从内存读: 尝试从第一步 SuperVersion 中引用的 MemTable 以及Immutable MemTable 中获取对应的值

(3) 从持久化设备读: 首先通过 Table cache 获取到文件的元数据,如布隆过滤器(Bloom Filters)和数据块索引(Indexes), 如果 Block cache 中缓存了 SST 的数据块,如果命中那就直接读取成功,否则便需要从 SST 中读取数据块并插入到 Block cache

源码分析

DBImpl::Get

1
2
3
4
5
6
7
8
Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = value;
return GetImpl(read_options, key, get_impl_options);
}

Rocksdb 的 Get 接口 DBImpl::Get 其实现主要靠 DBImpl::GetImpl 函数调用。

DBImpl::GetImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
...
SuperVersion* sv = GetAndRefSuperVersion(cfd);
...
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (get_impl_options.callback) {
snapshot = get_impl_options.
} else {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
}
} else {
....
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
...
}
}
...
if (!skip_memtable) {
...
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
...
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
...
}
...
}
if (!done) {
sv->current->Get(
read_options, lkey, get_impl_options.value, &s, &merge_context,
&max_covering_tombstone_seq,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
...
}
...
}

DBImpl::GetImpl 获取 SuperVersion 的信息,如果用户未指定 snapshot,需要获取当前的 snapshot。读取时不对 key 加锁,能读到什么数据完全取决于 Options 传入的 snapshot。

SuperVersion 中按照数据的新旧程度排序 MemTable -> MemTableListVersion -> Version,依次按序查找,如果在新的数据中找到符合 snapshot 规则的结果,就可以立即返回,完成本次查找。

MemTable::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback, bool* is_blob_index, bool do_merge) {
...

if (bloom_filter_ && !may_contain) {
// iter is null if prefix bloom says the key does not exist
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
*seq = kMaxSequenceNumber;
} else {
if (bloom_filter_) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
is_blob_index, value, s, merge_context, seq,
&found_final_value, &merge_in_progress);
}

...
}

void MemTable::GetFromTable(const LookupKey& key,
SequenceNumber max_covering_tombstone_seq,
bool do_merge, ReadCallback* callback,
bool* is_blob_index, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq,
bool* found_final_value, bool* merge_in_progress) {
...
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
}

利用 MemTableRep 的 Get 函数进行查找(以 SkipListRep 实现为例,在 skiplist 中进行查找,从 seek 到的位置开始向后遍历,遍历 entry 是否符合SaveValue 定义的规则)。SaveValue 函数查看当前 entry 是否还是当前查找的 key,如果不是则返回;查看当前 entry 的 snapshot 是否小于或等于需要查找的 snapshot,不符合则继续循环。如果 entry 的snapshot 符合上述条件,那么则跳出循环,返回查找结果。

MemTableListVersion::Get

1
2
3
4
5
6
7
8
9
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr) {
SequenceNumber seq;
return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq,
read_opts, callback, is_blob_index);
}

MemTableListVersion 用链表的形式保存了所有 Immutable memtable 的结构,查找时,按时间序依次查找于每一个 memtable,如果任何一个 memtable 查找到结果则立即返回,即返回最新的返回值。具体 memtable 查找见上述 MemTable::Get 接口。

Version::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
PinnableSlice* value, Status* status,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
bool* is_blob, bool do_merge) {
...
FilePicker fp(
storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();

while (f != nullptr) {
...
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel());
...
f = fp.GetNextFile();
}
...
}

GetNextFile 函数会遍历所有的 level,然后再遍历每个 level 的所有的文件,这里会对 level 0 的文件做一个特殊处理,这是因为只有 level 0 的 SST 的 range 不是有序的,因此我们每次查找需要查找所有的文件,也就是会一个个的遍历;而在非 level 0,我们只需要按照二分查找来得到对应的文件即可,如果二分查找不存在,那么我就需要进入下一个 level 进行查找。

调用 TableCache::Get 遍历单个 SST 文件,如果查找到结果立即返回。

TableCache::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k,
GetContext* get_context,
const SliceTransform* prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters,
int level) {
auto& fd = file_meta.fd;
std::string* row_cache_entry = nullptr;
bool done = false;
#ifndef ROCKSDB_LITE
IterKey row_cache_key;
std::string row_cache_entry_buffer;

// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
auto user_key = ExtractUserKey(k);
CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
get_context);
if (!done) {
row_cache_entry = &row_cache_entry_buffer;
}
}
#endif // ROCKSDB_LITE
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (!done && s.ok()) {
if (t == nullptr) {
s = FindTable(
file_options_, internal_comparator, fd, &handle, prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters, level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
}
SequenceNumber* max_covering_tombstone_seq =
get_context->max_covering_tombstone_seq();
if (s.ok() && max_covering_tombstone_seq != nullptr &&
!options.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
}
}
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
s = t->Get(options, k, get_context, prefix_extractor, skip_filters);
get_context->SetReplayLog(nullptr);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
get_context->MarkKeyMayExist();
s = Status::OK();
done = true;
}
}

#ifndef ROCKSDB_LITE
// Put the replay log in row cache only if something was found.
if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>);
}
#endif // ROCKSDB_LITE

if (handle != nullptr) {
ReleaseHandle(handle);
}
return s;
}

Status TableCache::FindTable(const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, Cache::Handle** handle,
const SliceTransform* prefix_extractor,
const bool no_io, bool record_read_stats,
HistogramImpl* file_read_hist, bool skip_filters,
int level,
bool prefetch_index_and_filter_in_cache) {
...
std::unique_ptr<TableReader> table_reader;
s = GetTableReader(file_options, internal_comparator, fd,
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader, prefix_extractor,
skip_filters, level, prefetch_index_and_filter_in_cache);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
handle);
if (s.ok()) {
// Release ownership of table reader.
table_reader.release();
}
...
return s;
}

如果 row_cache 打开,首先它会计算 row cache 的 key,再在row cache 中进行一次查找,如果有对应的值则直接返回结果,否则则将会在对应的 SST 读取传递进来的 key。

调用 FindTable,进行对应 table_reader 的读取以及进行 Table cache。

接下来调用 t->Get,从 Block cache 或者 SST 中读取数据。

最后,如果 row_cache 打开,把读取的数据插入到 row cache 中。

BlockBasedTable::Get

1
2
3
4
5
6
7
8
for (iiter->Seek(key); iiter->Valid()&&!done; iiter->Next()) {
...
NewDataBlockIterator(&biter);
for(; biter.Valid; biter.Next()) {
...
get_context->SaveValue(biter->Value());
}
}

在 Table Cache 中,假设最终缓存的 table reader 是一个 BlockBasedTable 对象,调用 BlockBasedTable::Get。

首先,根据 Table 的元数据信息(布隆过滤器,数据块Index)查找 SST 内部的 Block。

调用 NewDataBlockIterator,若 Block 在 Block Cache 当中,直接返回对象地址,否则,发生磁盘IO,读取 SST 的 Block,构造 Block 对象并缓存其地址在 Block Cache 中。

找到 key 对应的 value,调用 get_context->SaveValue,直接将 Block 中的数据地址赋给用户传进来的 PinnableSlice* 中,减少了一次数据拷贝,并用引用计数避免 Block 被淘汰值被清除。

回顾

参考

  1. Rocksdb Source Code 6.7.3
  2. Rocksdb Code Analysis Get
  3. MySQL · RocksDB · 数据的读取(二)
  4. 使用PinnableSlice减少Get时的内存拷贝