RocksDB Get 流程

本文对 RocksDB 6.7.3 版本的 Get 流程进行分析。

概述

(1) 获取当前的 SuperVersion。SuperVersion 用于管理 CF 的元数据,如当前版本号、内存中的 MemTable 和 Immutable MemTable、SST 文件信息等:

1
2
3
4
5
6
Struct SuperVersion {
MemTable* mem;
MemTableListVersion* imm;
Version* current;
...
}

(2) 从内存读: 尝试从第一步 SuperVersion 中引用的 MemTable 以及Immutable MemTable 中获取对应的值

(3) 从持久化设备读: 首先通过 Table cache 获取到文件的元数据,如布隆过滤器(Bloom Filters)和数据块索引(Indexes), 如果 Block cache 中缓存了 SST 的数据块,如果命中那就直接读取成功,否则便需要从 SST 中读取数据块并插入到 Block cache

源码分析

DBImpl::Get

1
2
3
4
5
6
7
8
Status DBImpl::Get(const ReadOptions& read_options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) {
GetImplOptions get_impl_options;
get_impl_options.column_family = column_family;
get_impl_options.value = value;
return GetImpl(read_options, key, get_impl_options);
}

Rocksdb 的 Get 接口 DBImpl::Get 其实现主要靠 DBImpl::GetImpl 函数调用。

DBImpl::GetImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
...
SuperVersion* sv = GetAndRefSuperVersion(cfd);
...
SequenceNumber snapshot;
if (read_options.snapshot != nullptr) {
if (get_impl_options.callback) {
snapshot = get_impl_options.
} else {
snapshot =
reinterpret_cast<const SnapshotImpl*>(read_options.snapshot)->number_;
}
} else {
....
snapshot = last_seq_same_as_publish_seq_
? versions_->LastSequence()
: versions_->LastPublishedSequence();
...
}
}
...
if (!skip_memtable) {
...
if (sv->mem->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
...
} else if ((s.ok() || s.IsMergeInProgress()) &&
sv->imm->Get(lkey, get_impl_options.value->GetSelf(), &s,
&merge_context, &max_covering_tombstone_seq,
read_options, get_impl_options.callback,
get_impl_options.is_blob_index)) {
...
}
...
}
if (!done) {
sv->current->Get(
read_options, lkey, get_impl_options.value, &s, &merge_context,
&max_covering_tombstone_seq,
get_impl_options.get_value ? get_impl_options.value_found : nullptr,
nullptr, nullptr,
get_impl_options.get_value ? get_impl_options.callback : nullptr,
get_impl_options.get_value ? get_impl_options.is_blob_index : nullptr,
get_impl_options.get_value);
...
}
...
}

DBImpl::GetImpl 获取 SuperVersion 的信息,如果用户未指定 snapshot,需要获取当前的 snapshot。读取时不对 key 加锁,能读到什么数据完全取决于 Options 传入的 snapshot。

SuperVersion 中按照数据的新旧程度排序 MemTable -> MemTableListVersion -> Version,依次按序查找,如果在新的数据中找到符合 snapshot 规则的结果,就可以立即返回,完成本次查找。

MemTable::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
SequenceNumber* seq, const ReadOptions& read_opts,
ReadCallback* callback, bool* is_blob_index, bool do_merge) {
...

if (bloom_filter_ && !may_contain) {
// iter is null if prefix bloom says the key does not exist
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
*seq = kMaxSequenceNumber;
} else {
if (bloom_filter_) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
is_blob_index, value, s, merge_context, seq,
&found_final_value, &merge_in_progress);
}

...
}

void MemTable::GetFromTable(const LookupKey& key,
SequenceNumber max_covering_tombstone_seq,
bool do_merge, ReadCallback* callback,
bool* is_blob_index, std::string* value, Status* s,
MergeContext* merge_context, SequenceNumber* seq,
bool* found_final_value, bool* merge_in_progress) {
...
table_->Get(key, &saver, SaveValue);
*seq = saver.seq;
}

利用 MemTableRep 的 Get 函数进行查找(以 SkipListRep 实现为例,在 skiplist 中进行查找,从 seek 到的位置开始向后遍历,遍历 entry 是否符合SaveValue 定义的规则)。SaveValue 函数查看当前 entry 是否还是当前查找的 key,如果不是则返回;查看当前 entry 的 snapshot 是否小于或等于需要查找的 snapshot,不符合则继续循环。如果 entry 的snapshot 符合上述条件,那么则跳出循环,返回查找结果。

MemTableListVersion::Get

1
2
3
4
5
6
7
8
9
bool Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq,
const ReadOptions& read_opts, ReadCallback* callback = nullptr,
bool* is_blob_index = nullptr) {
SequenceNumber seq;
return Get(key, value, s, merge_context, max_covering_tombstone_seq, &seq,
read_opts, callback, is_blob_index);
}

MemTableListVersion 用链表的形式保存了所有 Immutable memtable 的结构,查找时,按时间序依次查找于每一个 memtable,如果任何一个 memtable 查找到结果则立即返回,即返回最新的返回值。具体 memtable 查找见上述 MemTable::Get 接口。

Version::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void Version::Get(const ReadOptions& read_options, const LookupKey& k,
PinnableSlice* value, Status* status,
MergeContext* merge_context,
SequenceNumber* max_covering_tombstone_seq, bool* value_found,
bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
bool* is_blob, bool do_merge) {
...
FilePicker fp(
storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile();

while (f != nullptr) {
...
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_.prefix_extractor.get(),
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
fp.IsHitFileLastInLevel()),
fp.GetCurrentLevel());
...
f = fp.GetNextFile();
}
...
}

GetNextFile 函数会遍历所有的 level,然后再遍历每个 level 的所有的文件,这里会对 level 0 的文件做一个特殊处理,这是因为只有 level 0 的 SST 的 range 不是有序的,因此我们每次查找需要查找所有的文件,也就是会一个个的遍历;而在非 level 0,我们只需要按照二分查找来得到对应的文件即可,如果二分查找不存在,那么我就需要进入下一个 level 进行查找。

调用 TableCache::Get 遍历单个 SST 文件,如果查找到结果立即返回。

TableCache::Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k,
GetContext* get_context,
const SliceTransform* prefix_extractor,
HistogramImpl* file_read_hist, bool skip_filters,
int level) {
auto& fd = file_meta.fd;
std::string* row_cache_entry = nullptr;
bool done = false;
#ifndef ROCKSDB_LITE
IterKey row_cache_key;
std::string row_cache_entry_buffer;

// Check row cache if enabled. Since row cache does not currently store
// sequence numbers, we cannot use it if we need to fetch the sequence.
if (ioptions_.row_cache && !get_context->NeedToReadSequence()) {
auto user_key = ExtractUserKey(k);
CreateRowCacheKeyPrefix(options, fd, k, get_context, row_cache_key);
done = GetFromRowCache(user_key, row_cache_key, row_cache_key.Size(),
get_context);
if (!done) {
row_cache_entry = &row_cache_entry_buffer;
}
}
#endif // ROCKSDB_LITE
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (!done && s.ok()) {
if (t == nullptr) {
s = FindTable(
file_options_, internal_comparator, fd, &handle, prefix_extractor,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record_read_stats */, file_read_hist, skip_filters, level);
if (s.ok()) {
t = GetTableReaderFromHandle(handle);
}
}
SequenceNumber* max_covering_tombstone_seq =
get_context->max_covering_tombstone_seq();
if (s.ok() && max_covering_tombstone_seq != nullptr &&
!options.ignore_range_deletions) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
t->NewRangeTombstoneIterator(options));
if (range_del_iter != nullptr) {
*max_covering_tombstone_seq = std::max(
*max_covering_tombstone_seq,
range_del_iter->MaxCoveringTombstoneSeqnum(ExtractUserKey(k)));
}
}
if (s.ok()) {
get_context->SetReplayLog(row_cache_entry); // nullptr if no cache.
s = t->Get(options, k, get_context, prefix_extractor, skip_filters);
get_context->SetReplayLog(nullptr);
} else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) {
// Couldn't find Table in cache but treat as kFound if no_io set
get_context->MarkKeyMayExist();
s = Status::OK();
done = true;
}
}

#ifndef ROCKSDB_LITE
// Put the replay log in row cache only if something was found.
if (!done && s.ok() && row_cache_entry && !row_cache_entry->empty()) {
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>);
}
#endif // ROCKSDB_LITE

if (handle != nullptr) {
ReleaseHandle(handle);
}
return s;
}

Status TableCache::FindTable(const FileOptions& file_options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, Cache::Handle** handle,
const SliceTransform* prefix_extractor,
const bool no_io, bool record_read_stats,
HistogramImpl* file_read_hist, bool skip_filters,
int level,
bool prefetch_index_and_filter_in_cache) {
...
std::unique_ptr<TableReader> table_reader;
s = GetTableReader(file_options, internal_comparator, fd,
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader, prefix_extractor,
skip_filters, level, prefetch_index_and_filter_in_cache);
if (!s.ok()) {
assert(table_reader == nullptr);
RecordTick(ioptions_.statistics, NO_FILE_ERRORS);
// We do not cache error results so that if the error is transient,
// or somebody repairs the file, we recover automatically.
} else {
s = cache_->Insert(key, table_reader.get(), 1, &DeleteEntry<TableReader>,
handle);
if (s.ok()) {
// Release ownership of table reader.
table_reader.release();
}
...
return s;
}

如果 row_cache 打开,首先它会计算 row cache 的 key,再在row cache 中进行一次查找,如果有对应的值则直接返回结果,否则则将会在对应的 SST 读取传递进来的 key。

调用 FindTable,进行对应 table_reader 的读取以及进行 Table cache。

接下来调用 t->Get,从 Block cache 或者 SST 中读取数据。

最后,如果 row_cache 打开,把读取的数据插入到 row cache 中。

BlockBasedTable::Get

1
2
3
4
5
6
7
8
for (iiter->Seek(key); iiter->Valid()&&!done; iiter->Next()) {
...
NewDataBlockIterator(&biter);
for(; biter.Valid; biter.Next()) {
...
get_context->SaveValue(biter->Value());
}
}

在 Table Cache 中,假设最终缓存的 table reader 是一个 BlockBasedTable 对象,调用 BlockBasedTable::Get。

首先,根据 Table 的元数据信息(布隆过滤器,数据块Index)查找 SST 内部的 Block。

调用 NewDataBlockIterator,若 Block 在 Block Cache 当中,直接返回对象地址,否则,发生磁盘IO,读取 SST 的 Block,构造 Block 对象并缓存其地址在 Block Cache 中。

找到 key 对应的 value,调用 get_context->SaveValue,直接将 Block 中的数据地址赋给用户传进来的 PinnableSlice* 中,减少了一次数据拷贝,并用引用计数避免 Block 被淘汰值被清除。

回顾

参考

  1. Rocksdb Source Code 6.7.3
  2. Rocksdb Code Analysis Get
  3. MySQL · RocksDB · 数据的读取(二)
  4. 使用PinnableSlice减少Get时的内存拷贝
Author

王亮

Posted on

2021-01-17

Licensed under