这里主要研究一下读取memtable的代码流程,作为记录。
单点查询
get
首先从get接口进入,可以发现先生成了一个getters,里面包含了sst/mem/l0,然后调用get_inner
async fn get(&'a self) -> Result<Option<Bytes>, SlateDBError> {
let getters: Vec<BoxFuture<'a, Result<Option<RowEntry>, SlateDBError>>> =
vec![self.get_memtable(), self.get_l0(), self.get_compacted()];
self.get_inner(getters).await
}这里先看get_memtable,挨个mem进行查询,通过.table()暴露出内部的跳表,然后查询。如果查到了则直接返回
fn get_memtable(&'a self) -> BoxFuture<'a, Result<Option<RowEntry>, SlateDBError>> {
async move {
if self.include_wal_memtables {
let maybe_val = std::iter::once(self.snapshot.wal())
.chain(self.snapshot.imm_wal().iter().map(|imm| imm.table()))
.find_map(|memtable| memtable.get(self.key, self.max_seq));
if let Some(val) = maybe_val {
return Ok(Some(val));
}
}
if self.include_memtables {
let maybe_val = std::iter::once(self.snapshot.memtable())
.chain(self.snapshot.imm_memtable().iter().map(|imm| imm.table()))
.find_map(|memtable| memtable.get(self.key, self.max_seq));
if let Some(val) = maybe_val {
return Ok(Some(val));
}
}
Ok(None)
}
.boxed()
}scan
scan with option
可以发现这里先遍历了所有mem,然后通过.table()暴露出接口,再通过MergeIterator::new(memtable_iters)生成所有mem的迭代器memtable_iters,最后和sst/l0的迭代器一起传入 DbIterator::new()内,生成一个大的迭代器
pub(crate) async fn scan_with_options<'a>(
&'a self,
range: BytesRange,
options: &ScanOptions,
snapshot: &(dyn ReadSnapshot + Sync),
) -> Result<DbIterator<'a>, SlateDBError> {
let mut memtables = VecDeque::new();
if self.include_wal_memtables(options.durability_filter) {
memtables.push_back(Arc::clone(&snapshot.wal()));
for imm_wal in snapshot.imm_wal() {
memtables.push_back(imm_wal.table());
}
}
if self.include_memtables(options.durability_filter) {
memtables.push_back(Arc::clone(&snapshot.memtable()));
for memtable in snapshot.imm_memtable() {
memtables.push_back(memtable.table());
}
}
let memtable_iters = memtables
.iter()
.map(|t| t.range_ascending(range.clone()))
.collect();
let mem_iter = MergeIterator::new(memtable_iters).await?;
// ...其他代码...
DbIterator::new(range.clone(), mem_iter, l0_iters, sr_iters).await
}接着简单看一下最后是如何生成一个迭代器,前面先对所有迭代器进行一次max_seq的filter,如果当前有的key的seq_num小于max_seq,则把这些key删除,最后合成一个迭代器数组,传入最后合成一个大的迭代器
pub(crate) async fn new(
range: BytesRange,
mem_iters: impl IntoIterator<Item = MemTableIterator>,
l0_iters: impl IntoIterator<Item = SstIterator<'a>>,
sr_iters: impl IntoIterator<Item = SortedRunIterator<'a>>,
max_seq: Option<u64>,
) -> Result<Self, SlateDBError> {
let iters: [Box<dyn KeyValueIterator>; 3] = {
// Apply the max_seq filter to all the iterators. Please note that we should apply this filter BEFORE
// merging the iterators.
//
// For example, if have the following iterators:
// - Iterator A with entries [(key1, seq=96), (key1, seq=110)]
// - Iterator B with entries [(key1, seq=95)]
//
// If we filter the iterator after merging with max_seq=100, we'll lost the entry with seq=96 from the
// iterator A. But the element with seq=96 is actually the correct answer for this scan.
let mem_iters = Self::apply_max_seq_filter(mem_iters, max_seq);
let l0_iters = Self::apply_max_seq_filter(l0_iters, max_seq);
let sr_iters = Self::apply_max_seq_filter(sr_iters, max_seq);
let (mem_iter, l0_iter, sr_iter) = tokio::join!(
MergeIterator::new(mem_iters),
MergeIterator::new(l0_iters),
MergeIterator::new(sr_iters)
);
[Box::new(mem_iter?), Box::new(l0_iter?), Box::new(sr_iter?)]
};
let iter = MergeIterator::new(iters).await?;
Ok(DbIterator {
range,
iter,
invalidated_error: None,
last_key: None,
})
}range
回过头来看一下如何生成一个memtable的迭代器,通过range函数生成
pub(crate) fn range_ascending<T: RangeBounds<Bytes>>(&self, range: T) -> MemTableIterator {
self.range(range, IterationOrder::Ascending)
}
pub(crate) fn range<T: RangeBounds<Bytes>>(
&self,
range: T,
ordering: IterationOrder,
) -> MemTableIterator {
let internal_range = KVTableInternalKeyRange::from(range);
let mut iterator = MemTableIteratorInnerBuilder {
map: self.map.clone(),
inner_builder: |map| map.range(internal_range),
ordering,
item: None,
}
.build();
iterator.next_entry_sync();
iterator
}
#[self_referencing]
pub(crate) struct MemTableIteratorInner<T: RangeBounds<KVTableInternalKey>> {
map: Arc<SkipMap<KVTableInternalKey, RowEntry>>,
/// `inner` is the Iterator impl of SkipMap, which is the underlying data structure of MemTable.
#[borrows(map)]
#[not_covariant]
inner: Range<'this, KVTableInternalKey, T, KVTableInternalKey, RowEntry>,
ordering: IterationOrder,
item: Option<RowEntry>,
}这里build是自引用库的生成方式,类似构造函数,然后调用next_entry_sync。重点关注一下inner,可以发现实际上调用了crossbeam_skiplist::map::Range,功能是根据传入的internal_range会返回该范围的迭代器,可以发现每次next或者next_back实际使用了crossbeam_skiplist::map的功能
impl MemTableIterator {
pub(crate) fn next_entry_sync(&mut self) -> Option<RowEntry> {
let ans = self.borrow_item().clone();
let next_entry = match self.borrow_ordering() {
IterationOrder::Ascending => self.with_inner_mut(|inner| inner.next()),
IterationOrder::Descending => self.with_inner_mut(|inner| inner.next_back()),
};
let cloned_entry = next_entry.map(|entry| entry.value().clone());
self.with_item_mut(|item| *item = cloned_entry);
ans
}
}迭代器
核心在于需要对于每个迭代器实现以下接口
pub trait KeyValueIterator: Send + Sync {
/// Returns the next non-deleted key-value pair in the iterator.
async fn next(&mut self) -> Result<Option<KeyValue>, SlateDBError> {
loop {
let entry = self.next_entry().await?;
if let Some(kv) = entry {
match kv.value {
ValueDeletable::Value(v) => {
return Ok(Some(KeyValue {
key: kv.key,
value: v,
}))
}
ValueDeletable::Merge(_) => todo!(),
ValueDeletable::Tombstone => continue,
}
} else {
return Ok(None);
}
}
}
/// Returns the next entry in the iterator, which may be a key-value pair or
/// a tombstone of a deleted key-value pair.
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError>;
/// Seek to the next (inclusive) key
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError>;
}例如memtable有以下实现
#[async_trait]
impl KeyValueIterator for MemTableIterator {
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
Ok(self.next_entry_sync())
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
loop {
let front = self.borrow_item().clone();
if front.is_some_and(|record| record.key < next_key) {
self.next_entry_sync();
} else {
return Ok(());
}
}
}
}