io_uring是Linux提供的一个异步I/O接口,于2019年引入Linux内核。本文主要介绍io_uring相关知识,以rocksdb中MultiRead作为例子。
io_uring的实现主要在fs/io_uring.c中;rocksdb中MultiRead的代码在该commit中。
MultiRead提供了一个同步batch读的接口,而其使用了io_uring,内部请求实现了异步。这种方法进行了批量读写,实现了并行I/O,避免了每个请求的上下文切换开销。
接口
如下图是MultiRead函数接口:
IOStatus PosixRandomAccessFile::MultiRead(FSReadRequest* reqs, size_t num_reqs, const IOOptions& options, IODebugContext* dbg);可以发现MultiRead一次传入num_reqs个FSReadRequest。
FSReadRequest类型如下所示,将其中注释用GPT翻译成了中文:
struct FSReadRequest {
// 输入参数,表示文件的偏移量(以字节为单位)。
uint64_t offset;
// 输入参数,表示要读取的长度(以字节为单位)。`result` 仅在到达文件末尾(或 `status` 不为 OK)时返回更少的字节。
size_t len;
// 一个缓冲区,MultiRead() 可以选择将数据放入其中。它可以忽略这个缓冲区并分配自己的缓冲区。
// scratch 的生命周期将持续到 I/O 完成。
//
// 在异步读取的情况下,它是一个输出参数,并将在回调被调用之前保持。Scratch 由 RocksDB 分配,并将传递给底层文件系统。
char* scratch;
// 输出参数,由 MultiRead() 设置,指向数据缓冲区的起始位置。
//
// 当 FSReadRequest::scratch 被提供时,这应该指向 FSReadRequest::scratch。当 FSSupportedOps::kFSBuffer 被启用且 FSReadRequest::scratch 为 nullptr 时,这指向由文件系统分配的数据缓冲区的起始位置。
//
// 警告:即使使用了 FSSupportedOps::kFSBuffer 优化,您仍然必须使用 result.data() 来获取实际读取数据的起始位置。
// 不要将 FSReadRequest::fs_scratch 视为指向有效数据缓冲区起始位置的 char*。
Slice result;
// 输出参数,由底层文件系统设置,表示读取请求的状态。
IOStatus status;
// fs_scratch 是指向由底层文件系统分配的任意对象的唯一指针。
//
// 通过让文件系统直接使用由文件系统分配的缓冲区,而不是让文件系统花费 CPU 周期将数据复制到 FSReadRequest::scratch 提供的缓冲区,RocksDB 可以提高效率。
//
// 此优化适用于 MultiReads(同步和异步)与非直接 I/O,当满足以下条件时:
// 1. 文件系统重写了 SupportedOps() API 并设置了 FSSupportedOps::kFSBuffer。
// 2. FSReadRequest::scratch 被设置为 nullptr。
//
// RocksDB 将:
// 1. 重用文件系统分配的缓冲区。
// 2. 拥有由 fs_scratch 管理的对象的所有权。
// 3. 处理从 FSAllocationPtr 调用自定义删除器函数。
//
// 警告:不要假设 fs_scratch 指向读取返回的实际 char* 数据的起始位置。正如类型签名所示,fs_scratch 是指向任意任意数据类型的指针。使用 result.data() 获取有效的实际数据起始位置。有关更多上下文,请参见 https://github.com/facebook/rocksdb/pull/13189。
FSAllocationPtr fs_scratch;
};可以发现class的参数主要为
offset:读取文件起始的偏移量。len:读取内容的长度。scratch:可以选择配置的缓冲区,在如io_uring这种异步读取的情况下,读取的内容会先被放在缓冲区里。result:封装好的Slice类型,最终的返回结果。status:读取的结果,成功/失败。
流程
对齐
首先是判断设置的I/O类型,先判断是否使用了direct I/O:
if (use_direct_io()) {
for (size_t i = 0; i < num_reqs; i++) {
assert(IsSectorAligned(reqs[i].offset, GetRequiredBufferAlignment()));
assert(IsSectorAligned(reqs[i].len, GetRequiredBufferAlignment()));
assert(IsSectorAligned(reqs[i].scratch, GetRequiredBufferAlignment()));
}
}可以发现主要是判断了使用direct I/O情况下,每个读取请求中读取数据量大小和缓存位置与扇区对齐的情况,使用了IsSectorAligned这个函数:
inline bool IsSectorAligned(const size_t off, size_t sector_size) {
assert((sector_size & (sector_size - 1)) == 0);
return (off & (sector_size - 1)) == 0;
}传入了一个offset偏移量,和sector_size扇区大小,然后先assert扇区大小是否是2的幂;然后判断偏移量是否是扇区大小的倍数。
这里对齐的意义是指每个偏移量的大小,位置起点都需要是扇区的倍数。
初始化
后面使用#if defined判断是否使用了io_uring。先对io_uring
进行一个初始化:
struct io_uring* iu = nullptr;
if (thread_local_io_urings_) {
iu = static_cast<struct io_uring*>(thread_local_io_urings_->Get());
if (iu == nullptr) {
iu = CreateIOUring();
if (iu != nullptr) {
thread_local_io_urings_->Reset(iu);
}
}
}
// Init failed, platform doesn't support io_uring. Fall back to
// serialized reads
if (iu == nullptr) {
return FSRandomAccessFile::MultiRead(reqs, num_reqs, options, dbg);
}这里先检查线程是否已经持有了一个该线程独有的thread_local的io_uring,如果没有则尝试初始化一个。如果初始化失败则说明平台不支持io_uring,则调用普通的FSRandomAccessFile::MultiRead接口。
然后是对于请求的初始化:
struct WrappedReadRequest {
FSReadRequest* req;
struct iovec iov;
size_t finished_len;
explicit WrappedReadRequest(FSReadRequest* r) : req(r), finished_len(0) {}
};
autovector<WrappedReadRequest, 32> req_wraps;
autovector<WrappedReadRequest*, 4> incomplete_rq_list;
std::unordered_set<WrappedReadRequest*> wrap_cache;
for (size_t i = 0; i < num_reqs; i++) {
req_wraps.emplace_back(&reqs[i]);
}先定义了一个Wrap的结构体,把req封装了了一下,加入两个新字段:
struct iovec_iov:把scratch封装了的缓冲区。finished_len:该请求已经完成读的长度。
之后声明了三个容器,用于存储当前的请求过程,都使用了autovector:
-
req_wraps:存储最初的包装后的WrappedReadRequest请求。 -
incomplete_rq_list:存储已经开始处理,但未完全处理好的请求(例如,当io_uring无法一次性处理所有的请求)。 -
wrap_cache:缓存本轮循环中的请求,一轮循环结束清空。
请求处理
size_t reqs_off = 0;
while (num_reqs > reqs_off || !incomplete_rq_list.empty()) {
size_t this_reqs = (num_reqs - reqs_off) + incomplete_rq_list.size();
// If requests exceed depth, split it into batches
if (this_reqs > kIoUringDepth) {
this_reqs = kIoUringDepth;
}
...如果还剩余未处理/处理了部分的请求,则继续处理。
this_reqs表示当前剩余的请求数,显然io_uring队列有大小限制,于是每次最多处理kIoUringDepth个请求。
然后是处理请求部分:
for (size_t i = 0; i < this_reqs; i++) {
WrappedReadRequest* rep_to_submit;
if (i < incomplete_rq_list.size()) {
rep_to_submit = incomplete_rq_list[i];
} else {
rep_to_submit = &req_wraps[reqs_off++];
}
// 设置 I/O 向量
rep_to_submit->iov.iov_base = rep_to_submit->req->scratch + rep_to_submit->finished_len;
rep_to_submit->iov.iov_len = rep_to_submit->req->len - rep_to_submit->finished_len;
// 获取提交队列入口并准备读取请求
struct io_uring_sqe* sqe = io_uring_get_sqe(iu);
io_uring_prep_readv(sqe, fd_, &rep_to_submit->iov, 1,
rep_to_submit->req->offset + rep_to_submit->finished_len);
io_uring_sqe_set_data(sqe, rep_to_submit);
wrap_cache.emplace(rep_to_submit);
}可以发现每次优先处理已经处理了部分的请求。
然后对缓冲区scratch进行包装,将新的缓冲区开始位置设置为scratch+finished_len,长度设置为len-finished_len。
接着从使用io_uring_get_sqe取出队头空闲的sqe项,然后用io_uring_prep_readv传入之前配置的文件操作符、缓冲区、缓冲区偏移量快速填充sqe。
最后使用io_uring_sqe_set_data,对user_data字段进行填充,用处是I/O完成后生成对应的cqe,可以直接从cqe中取出该user_data。io_uring允许直接操作user_data字段,但是更好的方式是使用提供的接口。
将所有本轮需要提交的sqe全部提交后,之后是获得返回的cqe并处理:
ssize_t ret =
io_uring_submit_and_wait(iu, static_cast<unsigned int>(this_reqs));
if (static_cast<size_t>(ret) != this_reqs) {
fprintf(stderr, "ret = %ld this_reqs: %ld\n", (long)ret, (long)this_reqs);
// If error happens and we submitted fewer than expected, it is an
// exception case and we don't retry here. We should still consume
// what is is submitted in the ring.
for (ssize_t i = 0; i < ret; i++) {
struct io_uring_cqe* cqe = nullptr;
io_uring_wait_cqe(iu, &cqe);
if (cqe != nullptr) {
io_uring_cqe_seen(iu, cqe);
}
}
return IOStatus::IOError("io_uring_submit_and_wait() requested " +
std::to_string(this_reqs) + " but returned " +
std::to_string(ret));
}io_uring_submit_and_wait提交了当前队列中所有sqe,然后等待有this_req个事件提交成功。
如果返回值告诉成功提交的事件数目ret不等于this_req,那么说明失败了,于是调用io_uring_wait_cqe等待ret个cqe完成,再调用io_uring_cqe_seen把这些事件标记为consumed,返回错误。
之后考虑正常的情况:
for (size_t i = 0; i < this_reqs; i++) {
struct io_uring_cqe* cqe = nullptr;
WrappedReadRequest* req_wrap;
// We could use the peek variant here, but this seems safer in terms
// of our initial wait not reaping all completions
ret = io_uring_wait_cqe(iu, &cqe);
TEST_SYNC_POINT_CALLBACK(
"PosixRandomAccessFile::MultiRead:io_uring_wait_cqe:return", &ret);
if (ret) {
ios = IOStatus::IOError("io_uring_wait_cqe() returns " +
std::to_string(ret));
if (cqe != nullptr) {
io_uring_cqe_seen(iu, cqe);
}
continue;
}
req_wrap = static_cast<WrappedReadRequest*>(io_uring_cqe_get_data(cqe));
// Reset cqe data to catch any stray reuse of it
static_cast<struct io_uring_cqe*>(cqe)->user_data = 0xd5d5d5d5d5d5d5d5;
// Check that we got a valid unique cqe data
auto wrap_check = wrap_cache.find(req_wrap);
if (wrap_check == wrap_cache.end()) {
fprintf(stderr,
"PosixRandomAccessFile::MultiRead: "
"Bad cqe data from IO uring - %p\n",
req_wrap);
port::PrintStack();
ios = IOStatus::IOError("io_uring_cqe_get_data() returned " +
std::to_string((uint64_t)req_wrap));
continue;
}
wrap_cache.erase(wrap_check);
FSReadRequest* req = req_wrap->req;
size_t bytes_read = 0;
bool read_again = false;
UpdateResult(cqe, filename_, req->len, req_wrap->iov.iov_len,
false /*async_read*/, use_direct_io(),
GetRequiredBufferAlignment(), req_wrap->finished_len, req,
bytes_read, read_again);
int32_t res = cqe->res;
if (res >= 0) {
if (bytes_read == 0) {
if (read_again) {
Slice tmp_slice;
req->status =
Read(req->offset + req_wrap->finished_len,
req->len - req_wrap->finished_len, options, &tmp_slice,
req->scratch + req_wrap->finished_len, dbg);
req->result =
Slice(req->scratch, req_wrap->finished_len + tmp_slice.size());
}
// else It means EOF so no need to do anything.
} else if (bytes_read < req_wrap->iov.iov_len) {
incomplete_rq_list.push_back(req_wrap);
}
}
io_uring_cqe_seen(iu, cqe);
}
wrap_cache.clear();先枚举每个cqe,调用io_uring_wait_cqe等待一个cqe完成。如果该cqe获取失败,则标记consume然后继续看下一个cqe。
成功获取cqe后,先通过io_uring_cqe_get_data获取之前存进对应sqe的user_data,也就是一个WrappedReadRequest,然后把该user_data标记已使用,然后到之前的cache中去寻找是否还存在,决定是否报错。
然后考虑通过cqe获取输出,调用UpdateResult。
if (cqe->res < 0) {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req failed", file_name, cqe->res);
} else {
bytes_read = static_cast<size_t>(cqe->res);
TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
if (bytes_read == iov_len) {
req->result = Slice(req->scratch, req->len);
req->status = IOStatus::OK();
} else if (bytes_read == 0) {
/// cqe->res == 0 can means EOF, or can mean partial results. See
// comment
// https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
// Fall back to pread in this case.
if (use_direct_io && !IsSectorAligned(finished_len, alignment)) {
// Bytes reads don't fill sectors. Should only happen at the end
// of the file.
req->result = Slice(req->scratch, finished_len);
req->status = IOStatus::OK();
} else {
if (async_read) {
// No bytes read. It can means EOF. In case of partial results, it's
// caller responsibility to call read/readasync again.
req->result = Slice(req->scratch, 0);
req->status = IOStatus::OK();
} else {
read_again = true;
}
}
} else if (bytes_read < iov_len) {
assert(bytes_read > 0);
if (async_read) {
req->result = Slice(req->scratch, bytes_read);
req->status = IOStatus::OK();
} else {
assert(bytes_read + finished_len < len);
finished_len += bytes_read;
}
} else {
req->result = Slice(req->scratch, 0);
req->status = IOError("Req returned more bytes than requested", file_name,
cqe->res);
}
}cqe->res存储了当前cqe读取的位数。
分以下情况处理:
cqe->res==iov_len:表示全部读取成功,读取的量和分配的缓冲区大小一致,于是直接把scratch中的值Slice化赋给req->result。cqe->res==0:分以下两种情况- 当前读到了
EOF。 - 当前读取了
0。
- 当前读到了
由于MultiRead中io_uring设置的是同步读,那么对于读取了0的情况则需要重试,给read_again置成true。这里重试则会调用Read()函数,尝试直接读取。
cqe_res<iov_len:说明读取了部分,如果是异步读则返回true并把当前读取的部分放到result里,否则在finished_len中加上。对于这种还有剩余的情况,MultiRead中把该req放入incomplete_rq_list中等待之后继续读取。
最后,则把当前处理完的cqe消费掉,继续循环。
总而言之,MultiRead实现了一个同步API+异步实现的模式,利用I/O的并行能力增强了性能。