Memtable的写入
对于Rocksdb中后台flush到l0的过程,后台线程会PickUp合适的ImmutableMemtables刷到l0,然后删除对应的WAL。
为什么说是挑选?因为Rocksdb-3.0中引入了cf(Column Family),每个cf有自己的memtable,但是所有cf共享一个WAL,因此显然当我们把一个cf刷到磁盘后,不能将WAL删除,因此需要考虑WAL的生命周期的问题,只有所有memtable都被删除后,对应的WAL才能被刷盘。
所以这里可以发现,cf的个数不能很多,因为显然的,如果cf的个数过多,会导致WAL很难被刷到磁盘里,会占用大量的资源,因此可以考虑使用多个Rocksdb解决该问题。
WriteBatch
为什么总结这篇文章是因为看到了一个问题:WriteBatch如何保证跨cf的原子性写入?
原子性写入的意义是,一次批量的写入,要么全部成功要么全部失败,显然这是满足WriteBatch的定义。由于所有cf是共享一个WAL,所以问题转化为将一个WriteBatch的结构体写入一个磁盘文件WAL,是否是原子的。
这里存在不原子的风险即将一块数据写入文件系统,这里有两种想法
- 一个
WriteBatch大小和扇区大小一致,利用文件系统本身的原子性实现原子性。 - 加一个
Checksum,如果数据损坏则Checksum不一致
还有一个问题是,如果不开启WAL,怎么保证WriteBatch跨cf的原子性写入?这里提到的方法是
- 原子性写入
MANIFEST,对于MANIFEST开启O_DIRECT和fsync强制刷盘,即使得元数据是原子性写入的
本文主要抄自此处。
WriteBatch准备工作
首先看下WriteBatch内部存储的格式
|sequnce|count|kv_1|kv_2|...|kv_count|前12个字节存储了一个WriteBatch的元信息
sequnce:8个字节,记录了当前WriteBatch是Rocksdb创建以来第几个WriteBatch。count:4个字节,记录了当前WriteBatch有几个kv对。
然后看kv对是如何存储的,这里序列化成Record,分为有无cf
default cf:
|KTypeValue|
|key_size|key_bytes|
|value_length|value_bytes|
specify cf:
|kTypeColumnFamilyValue|column_family_id|
|key_size|key_bytes|
|value_length|value_bytes|TypeValue:1个字节,表示是put还是delete,以及是否指定了cf。column family id:4个字节,如果是指定了cf才会有这个字段。
WriteBatch有大小限制,如果当前写入超过了WriteBatch写入大小限制,那么新建一个WriteBatch。
WriteThread::Writer
WriteBatch实际是封装到WriteThread::Writer里
struct WriteThread::Writer {
WriteBatch* batch;
//.. too many other fields ..
std::atomic<uint8_t> state;
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;
std::aligned_storage<sizeof(std::condition_variable)>::type state_cv_bytes;
Writer* link_older; // this 之前之前写入的 writers
Writer* link_newer; // this 之后写入的 writers
//... methods
};可以发现Writer实际上是一个双向链表,多个Writer用链表连接了起来。
WriteThread::WriteGroup
显然一次只能写入一个Writer是很慢的,这样所有Writer串行写入,那么其他Writer需要阻塞,显然效率低。于是使用了一个WriteGroup统一写入。
struct WriteThread::WriteGroup {
Writer* leader = nullptr;
Writer* last_writer = nullptr;
//... other fields
struct Iterator {
Writer* writer;
Writer* last_writer;
explicit Iterator(Writer* w, Writer* last)
: writer(w), last_writer(last) {}
Writer* operator*() const { return writer; }
Iterator& operator++() {
assert(writer != nullptr);
if (writer == last_writer) {
writer = nullptr;
} else {
writer = writer->link_newer;
}
return *this;
}
bool operator!=(const Iterator& other) const {
return writer != other.writer;
}
};
Iterator begin() const { return Iterator(leader, last_writer); }
Iterator end() const { return Iterator(nullptr, nullptr); }
};可以发现WriteGroup对多个Writer进行了一个封装
- 记录了
WriteGroup对应Writer链表的head和tail,且链表是按照时间顺序排序的。 - 对链表封装了
Iterator。
WriteThread::State
自然对于Writer需要维护当前Writer的状态
enum WriteThread::State : uint8_t {
STATE_INIT = 1,
STATE_GROUP_LEADER = 2,
STATE_MEMTABLE_WRITER_LEADER = 4,
STATE_PARALLEL_MEMTABLE_WRITER = 8,
STATE_COMPLETED = 16,
STATE_LOCKED_WAITING = 32,
};Writer创立后先INIT,然后经过WriteThread::JoinBatch
- 要么成为本次写入流程的
leader,即State::STATE_GROUP_LEADER状态,然后组建自己的writer_group,代替writer_group中所有writers完成写入,所有的writers状态都变成State::STATE_COMPLETED。 - 要么加入一个已经选出
leader但是尚未执行的writer_group成为follower,让该leader代替自己执行完本次写入,完成后自己状态即State::STATE_COMPLETED。 - 要么阻塞等待之前的
WriteGroup写入完成。
JoinGroup
写入的流程为JoinGroup->选出GroupLeader->WAL->Memtable->Exit。
这里JoinGroup相当于一个锁,只有leader才能执行写入WAL和Memtable操作,其他Writer阻塞等待。
现在考虑如何JoinGroup,其实很简单,主要考虑WriteThread中newest_writer_,即最新的Write
- 新插入一个
Writer对象w后,尝试让newest_writer_指向w,如果当前WriteStall了则等待后再指向 - 如果
newest_writer_==NULL则直接进入后续流程 - 否则认为当前已经存在
leader,当前writer所需做的就是等待leader写完所有的writer,即等待w->state变成STATE_GROUP_LEADER或STATE_COMPLETED,即当前writer变成了leader或者被leader做完了
具体如下
void WriteThread::JoinBatchGroup(Writer* w) {
assert(w->batch != nullptr);
// 1. 让 nnewest_writer_ 指向 w
bool linked_as_leader = LinkOne(w, &newest_writer_);
if (linked_as_leader) {
SetState(w, STATE_GROUP_LEADER);
}
if (!linked_as_leader) {
// 2. 阻塞等待 w->state & mask != 0
AwaitState(w,
STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER |
STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED,
&jbg_ctx);
}
}这里WriteStall还有一种额外配置
Writer::no_slowdown == false,通过条件变量stall_cv_等待WriteStall结束被唤醒Writer::no_slowdown == true,直接返回Status::Incomplete,类似非阻塞行为
bool WriteThread::LinkOne(Writer* w, std::atomic<Writer*>* newest_writer) {
assert(newest_writer != nullptr);
assert(w->state == STATE_INIT);
Writer* writers = newest_writer->load(std::memory_order_relaxed);
while (true) {
assert(writers != w);
if (writers == &write_stall_dummy_) {
if (w->no_slowdown) {
w->status = Status::Incomplete("Write stall");
SetState(w, STATE_COMPLETED);
return false;
}
{
MutexLock lock(&stall_mu_);
writers = newest_writer->load(std::memory_order_relaxed);
if (writers == &write_stall_dummy_) {
stall_cv_.Wait();
// Load newest_writers_ again since it may have changed
writers = newest_writer->load(std::memory_order_relaxed);
continue;
}
}
}
w->link_older = writers;
if (newest_writer->compare_exchange_weak(writers, w)) {
return (writers == nullptr);
}
}
}EnterAsBatchGroupLeader
然后看leader是如何处理写的,可以发现传入leader后,不断向后枚举其他Writer,直到WriteBatch达到大小上限
size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
WriteGroup* write_group) {
assert(leader->link_older == nullptr);
assert(leader->batch != nullptr);
assert(write_group != nullptr);
size_t size = WriteBatchInternal::ByteSize(leader->batch);
// 1. write_group 大小限制
size_t max_size = max_write_batch_group_size_bytes;
const uint64_t min_batch_size_bytes = max_write_batch_group_size_bytes / 8;
if (size <= min_batch_size_bytes) {
max_size = size + min_batch_size_bytes;
}
// 2. 初始化 write_group
leader->write_group = write_group;
write_group->leader = leader;
write_group->last_writer = leader;
write_group->size = 1;
Writer* newest_writer = newest_writer_.load(std::memory_order_acquire);
// 3. 补全 [leader-writer, newest_writer] 丢失的 prev 指针
CreateMissingNewerLinks(newest_writer);
// 4. 封装到 write_group
Writer* w = leader;
while (w != newest_writer) {
assert(w->link_newer);
w = w->link_newer;
// ... other break conditions
if (size + WriteBatchInternal::ByteSize(w->batch) > max_size) {
break;
}
w->write_group = write_group; // 设置 w 属于当前 write_group
size += batch_size; // 这个 write_group 的数据量大小
write_group->last_writer = w; // 更新 last_writer
write_group->size++; // writer_group 中 writers 的个数
}
return size;
}ExitAsBatchGroupLeader
接着看leader完成后如何退出。首先需要选出下一个leader,方法是看newest_writer_是否出现了新的writer,如果出现了新的则将当前WriteBatch下一个Writer变成leader。
然后将当前完成的Writer修改,所有状态变成STATE_COMPLETED。
void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group,
Status& status) {
Writer* leader = write_group.leader;
Writer* last_writer = write_group.last_writer;
assert(leader->link_older == nullptr);
if (enable_pipelined_write_) {
// ...
} else {
Writer* head = newest_writer_.load(std::memory_order_acquire);
if (head != last_writer ||
!newest_writer_.compare_exchange_strong(head, nullptr)) {
// 存在新插入的 writer
assert(head != last_writer);
// 1. 补全 [last_writer, head] 的 prev指针
CreateMissingNewerLinks(head);
assert(last_writer->link_newer != nullptr);
assert(last_writer->link_newer->link_older == last_writer);
// 2. 断开链表
last_writer->link_newer->link_older = nullptr;
// 3. 设置新的 leader-writer
SetState(last_writer->link_newer, STATE_GROUP_LEADER);
}
// 4. 将 [leader, last_writer] 区间的 writers 状态设置为 STATE_COMPLETED
// 即解除阻塞,让 follower-writer 返回应用层
while (last_writer != leader) {
assert(last_writer);
last_writer->status = status;
// 需要先获取 next指针,再更改状态为 STATE_COMPLETED
// 因为先更改 STATE_COMPLETED 很可能导致 last_writer 就被正在阻塞的线程销毁了
// 再获取 next 指针就会触发 coredump
auto next = last_writer->link_older;
SetState(last_writer, STATE_COMPLETED); // 解除阻塞
last_writer = next;
}
}
}Pipeline优化
优化的地方主要是,原来所有WriteGroup都是串行的,即WAL1->MEM1->WAL2->MEM2...。显然这样有浪费,因为如果一个WriteGroup已经写入了WAL,那么可以认为数据已经持久化了,因此写MEM的时候可以和下一个WriteGroup写WAL并行。
Memtable并发写
目前只有skiplist支持并发写。
流程
- 完成当前
write_group0的WAL写入流程 - 通知
write_group1开启WAL写入流程,即write_group1无需等待write_group0完成MemTable写入流程才开启自己的WAL写入流程 write_group0的WAL写入流程完成后,需要启动write_group0的并发写MemTable流程
由于后两个任务都需要等待第一个任务完成,因此三个任务的分界点就可以设置在 WrriteThread::ExitAsBatchGroupLeader 函数中: T1 在写 WAL 期间需要整个 RocksDB 只有一个leader-writer,在 T1 任务结束后就可以不再担任leader角色。此时有两件事需要做
- 挑选出下一个
WriteGroup中的leader-writer,让 T2 任务可以pipeline执行 - 开启当前
WriteGroup并发写入MemTable流程