MultiGet的意义是一次读入多个Key,然后返回多个Value,希望相对比一个一个Key进行点查效率提高。
流程
DBImpl::MultiGetImpl
这个函数是执行查询的核心函数。
首先标记剩余的Key的数量,然后循环不断分批去查
size_t keys_left = num_keys;
Status s;
uint64_t curr_value_size = 0;
while (keys_left) {...}循环先检查当前是否超时,如果超时则返回TimeOut
if (read_options.deadline.count() &&
immutable_db_options_.clock->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count())) {
s = Status::TimedOut();
break;
}然后计算当前查询一批Key的批的大小,最大不能超过MAX_BATCH_SIZE,同时创建CONTEXT和RANGE,用于跟踪上下文以及当前累计查询的Value的总大小
size_t batch_size = (keys_left > MultiGetContext::MAX_BATCH_SIZE)
? MultiGetContext::MAX_BATCH_SIZE
: keys_left;
MultiGetContext ctx(sorted_keys, start_key + num_keys - keys_left,
batch_size, snapshot, read_options, GetFileSystem(),
stats_);
MultiGetRange range = ctx.GetMultiGetRange();
range.AddValueSize(curr_value_size);然后开始检查查询范围,首先检查是否需要检查SST,然后将上下文清空,初始状态设置为OK
bool lookup_current = true;
keys_left -= batch_size;
for (auto mget_iter = range.begin(); mget_iter != range.end(); ++mget_iter) {
mget_iter->merge_context.Clear();
*mget_iter->s = Status::OK();
}首先看memtable,如果是仅查询已持久化的数据,且包含未持久化的数据则跳过memtable。如果不要跳过,则通过super_version调用memtable的MultiGet查询。
具体逻辑为:
- 先检查
mem - 如果有未查询到的则看
imm - 如果所有
Key都看到了则把lookup_current设为false,跳过查询SST - 如果不能跳过
SST则记录mem中的miss数量
bool skip_memtable =
(read_options.read_tier == kPersistedTier &&
has_unpersisted_data_.load(std::memory_order_relaxed));
if (!skip_memtable) {
super_version->mem->MultiGet(read_options, &range, callback,
false /* immutable_memtable */);
if (!range.empty()) {
super_version->imm->MultiGet(read_options, &range, callback);
}
if (!range.empty()) {
uint64_t left = range.KeysLeft();
RecordTick(stats_, MEMTABLE_MISS, left);
} else {
lookup_current = false;
}
}然后如果不跳过SST,则在SST里查询
if (lookup_current) {
PERF_TIMER_GUARD(get_from_output_files_time);
super_version->current->MultiGet(read_options, &range, callback);
}然后检查当前已经收集的Value大小,如果超过阈值read_options.value_size_soft_limit则返回Status::Aborted()
curr_value_size = range.GetValueSize();
if (curr_value_size > read_options.value_size_soft_limit) {
s = Status::Aborted();
break;
}最后再进行统计,统计当前已经收集到的Key的状态
- 如果一个
Key的合并次数大于merge_threshold,那么标记成Status::OkMergeOperandThresholdExceeded状态 - 统计所有读取的字节数及读取成功个数
for (size_t i = start_key; i < start_key + num_keys - keys_left; ++i) {
KeyContext* key = (*sorted_keys)[i];
assert(key);
assert(key->s);
if (key->s->ok()) {
const auto& merge_threshold = read_options.merge_operand_count_threshold;
if (merge_threshold.has_value() &&
key->merge_context.GetNumOperands() > merge_threshold) {
*(key->s) = Status::OkMergeOperandThresholdExceeded();
}
if (key->value) {
bytes_read += key->value->size();
} else {
assert(key->columns);
bytes_read += key->columns->serialized_size();
}
num_found++;
}
}最后将错误状态传递,如果之前读取异常,则将异常状态传递给所有Key
if (keys_left) {
assert(s.IsTimedOut() || s.IsAborted());
for (size_t i = start_key + num_keys - keys_left; i < start_key + num_keys;
++i) {
KeyContext* key = (*sorted_keys)[i];
*key->s = s;
}
}放上一个AI的总结
- 批处理机制:不是一次处理所有键,而是分批处理,每批最多处理 MAX_BATCH_SIZE 个键。
- 查询层次:遵循 RocksDB 的存储层次,先查内存表,再查不可变内存表,最后查 SST 文件。
- 提前退出策略:
- 超过截止时间时退出
- 累积值大小超过软限制时退出
- 性能监控:详细的性能计时和统计收集,用于监控和优化。
- 实时限制:通过监控累积值大小,防止返回过大的数据集。
- 错误处理:保证所有键都有明确的状态,即使是未处理的键。
MemTable::MultiGet
首先判断是否有范围删除,如果read_option没配或mem不允许则没有范围删除
bool no_range_del = read_options.ignore_range_deletions || is_range_del_table_empty_.load(std::memory_order_relaxed);如果有范围删除,则不能用bloom filter。如果mem配置了bloom filter且没有范围删除,则先用bloom filter筛一遍。这里有个小技巧使用了std::array<>,由于是分配在栈上,因此性能较好。
bloom filter在这里有两种,一种是全量的Key,一种是前缀Key。然后将所有Key丢进bloom filter筛一遍,统计命中率,并将未命中的Key从temp_range里删除,减少枚举次数。
MultiGetRange temp_range(*range, range->begin(), range->end());
if (bloom_filter_ && no_range_del) {
bool whole_key =
!prefix_extractor_ || moptions_.memtable_whole_key_filtering;
std::array<Slice, MultiGetContext::MAX_BATCH_SIZE> bloom_keys;
std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match;
std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> range_indexes;
int num_keys = 0;
for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
if (whole_key) {
bloom_keys[num_keys] = iter->ukey_without_ts;
range_indexes[num_keys++] = iter.index();
} else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) {
bloom_keys[num_keys] =
prefix_extractor_->Transform(iter->ukey_without_ts);
range_indexes[num_keys++] = iter.index();
}
}
bloom_filter_->MayContain(num_keys, bloom_keys.data(), may_match.data());
for (int i = 0; i < num_keys; ++i) {
if (!may_match[i]) {
temp_range.SkipIndex(range_indexes[i]);
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
} else {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
}
}然后考虑删除的情况(待补充)
if (!no_range_del) {
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
NewRangeTombstoneIteratorInternal(
read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
immutable_memtable));
SequenceNumber covering_seq = range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key());
if (covering_seq > iter->max_covering_tombstone_seq) {
iter->max_covering_tombstone_seq = covering_seq;
if (iter->timestamp) {
// Will be overwritten in SaveValue() if there is a point key with
// a higher seqno.
iter->timestamp->assign(range_del_iter->timestamp().data(), range_del_iter->timestamp().size());
}
}
}从mem里获取Value
GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
callback, &iter->is_blob_index,
iter->value ? iter->value->GetSelf() : nullptr, iter->columns,
iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq,
&found_final_value, &merge_in_progress);检查获取的Value的状态,如果正在合并当中则标记为Status::MergeInProgress
if (!found_final_value && merge_in_progress) {
if (iter->s->ok()) {
*(iter->s) = Status::MergeInProgress();
} else {
assert(iter->s->IsMergeInProgress());
}
}最后对当前的Value获取结果进行处理,无非两种情况
- 读取成功,
found_final_value = true - 读取失败,
!iter->s->ok() && !iter->s->IsMergeInProgress(),表示读取不成功且不是因为正在合并
无论以上那种情况,都进行处理,首先将Value的大小累加进当前获取Value大小总和,然后标记当前Key查询完成。如果当前Value大小总和超过了read_options.value_size_soft_limit,那么这次读取需要终止,于是进行状态传播,对所有Key标记成Status::Aborted(),然后返回。
if (found_final_value || (!iter->s->ok() && !iter->s->IsMergeInProgress())) {
// `found_final_value` should be set if an error/corruption occurs.
// The check on iter->s is just there in case GetFromTable() did not
// set `found_final_value` properly.
assert(found_final_value);
if (iter->value) {
iter->value->PinSelf();
range->AddValueSize(iter->value->size());
} else {
assert(iter->columns);
range->AddValueSize(iter->columns->serialized_size());
}
range->MarkKeyDone(iter);
RecordTick(moptions_.statistics, MEMTABLE_HIT);
if (range->GetValueSize() > read_options.value_size_soft_limit) {
// Set all remaining keys in range to Abort
for (auto range_iter = range->begin(); range_iter != range->end(); ++range_iter) {
range->MarkKeyDone(range_iter);
*(range_iter->s) = Status::Aborted();
}
break;
}
}