From 1bc91d1a86a5a5d4d114097b60ca5726b88874cc Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 4 Sep 2024 17:51:51 +0300 Subject: [PATCH] fix: SegmentPool when we allocate more than capacity Also, fix UringProactor::RequestBuffer/RegisterBuffers to handle the errors properly. Signed-off-by: Roman Gershman --- base/segment_pool.cc | 20 +++++++++++++------- util/fibers/uring_proactor.cc | 28 +++++++++++++++++++--------- util/fibers/uring_proactor.h | 3 ++- 3 files changed, 34 insertions(+), 17 deletions(-) diff --git a/base/segment_pool.cc b/base/segment_pool.cc index b395e21a..b63a7571 100644 --- a/base/segment_pool.cc +++ b/base/segment_pool.cc @@ -10,20 +10,26 @@ namespace base { -std::optional SegmentPool::Request(unsigned length) { - if (taken_.empty()) +using namespace std; + +optional SegmentPool::Request(unsigned length) { + // Invariant: elem.first + elem.second <= size_ for any elem in the queue. + if (taken_.empty()) { + if (length > size_) + return nullopt; return taken_.emplace_back(0, length).first; + } - // If possible, squeeze segment before first occupied segment + // If possible, squeeze segment before first occupied segment. if (size_t head_mark = taken_.front().first; length <= head_mark) return taken_.emplace_front(head_mark - length, length).first; - // Otherwise, try appending after last occupied segment + // Otherwise, try appending after last occupied segment. size_t tail_mark = taken_.back().first + taken_.back().second; - if (size_ - tail_mark >= length) + if (tail_mark + length <= size_) return taken_.emplace_back(tail_mark, length).first; - return std::nullopt; + return nullopt; } void SegmentPool::Return(unsigned offset) { @@ -32,7 +38,7 @@ void SegmentPool::Return(unsigned offset) { if (taken_.front().first == offset) // fast path it = taken_.begin(); else - it = std::lower_bound(taken_.begin(), taken_.end(), std::make_pair(offset, 0u)); + it = lower_bound(taken_.begin(), taken_.end(), make_pair(offset, 0u)); DCHECK(it != taken_.end()); it->second = 0; // clear length diff --git a/util/fibers/uring_proactor.cc b/util/fibers/uring_proactor.cc index 4bd36e19..ea03adac 100644 --- a/util/fibers/uring_proactor.cc +++ b/util/fibers/uring_proactor.cc @@ -321,23 +321,33 @@ int UringProactor::RegisterBuffers(size_t size) { // Use mmap to create the backing void* ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); - if (ptr == MAP_FAILED) + if (ptr == MAP_FAILED) { + LOG(ERROR) << "Could not mmap " << size << " bytes"; return -errno; + } + + iovec vec{ptr, size}; + int res = io_uring_register_buffers(&ring_, &vec, 1); + if (res < 0) { + LOG(ERROR) << "Error calling io_uring_register_buffers: " << SafeErrorMessage(-res); + munmap(ptr, size); + return res; + } buf_pool_.backing = reinterpret_cast(ptr); buf_pool_.segments.Grow(size / UringBuf::kAlign); - iovec vec{buf_pool_.backing, size}; - return io_uring_register_buffers(&ring_, &vec, 1); + return 0; } std::optional UringProactor::RequestBuffer(size_t size) { - DCHECK(buf_pool_.backing); - // We keep track not of bytes, but 4kb segments and round up - size_t segments = (size + UringBuf::kAlign - 1) / UringBuf::kAlign; - if (auto offset = buf_pool_.segments.Request(segments)) { - uint8_t* ptr = buf_pool_.backing + *offset * UringBuf::kAlign; - return UringBuf{{ptr, segments * UringBuf::kAlign}, 0}; + if (buf_pool_.backing) { + // We keep track not of bytes, but 4kb segments and round up + size_t segment_cnt = (size + UringBuf::kAlign - 1) / UringBuf::kAlign; + if (auto offset = buf_pool_.segments.Request(segment_cnt)) { + uint8_t* ptr = buf_pool_.backing + *offset * UringBuf::kAlign; + return UringBuf{{ptr, segment_cnt * UringBuf::kAlign}, 0}; + } } return std::nullopt; } diff --git a/util/fibers/uring_proactor.h b/util/fibers/uring_proactor.h index 8b41ae67..dc7026c2 100644 --- a/util/fibers/uring_proactor.h +++ b/util/fibers/uring_proactor.h @@ -99,7 +99,8 @@ class UringProactor : public ProactorBase { return IOURING; } - // Register buffer with given size and allocate backing, calls io_uring_register_buffers + // Register buffer with given size and allocate backing, calls io_uring_register_buffers. + // Returns 0 on success, -errno on error. int RegisterBuffers(size_t size); // Request buffer of given size, returns none if there's no space left in the backing.