Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SegmentPool when we allocate more than capacity #312

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions base/segment_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,26 @@

namespace base {

std::optional<unsigned> SegmentPool::Request(unsigned length) {
if (taken_.empty())
using namespace std;

optional<unsigned> SegmentPool::Request(unsigned length) {
// Invariant: elem.first + elem.second <= size_ for any elem in the queue.
if (taken_.empty()) {
if (length > size_)
return nullopt;
return taken_.emplace_back(0, length).first;
}

// If possible, squeeze segment before first occupied segment
// If possible, squeeze segment before first occupied segment.
if (size_t head_mark = taken_.front().first; length <= head_mark)
return taken_.emplace_front(head_mark - length, length).first;

// Otherwise, try appending after last occupied segment
// Otherwise, try appending after last occupied segment.
size_t tail_mark = taken_.back().first + taken_.back().second;
if (size_ - tail_mark >= length)
if (tail_mark + length <= size_)
return taken_.emplace_back(tail_mark, length).first;

return std::nullopt;
return nullopt;
}

void SegmentPool::Return(unsigned offset) {
Expand All @@ -32,7 +38,7 @@ void SegmentPool::Return(unsigned offset) {
if (taken_.front().first == offset) // fast path
it = taken_.begin();
else
it = std::lower_bound(taken_.begin(), taken_.end(), std::make_pair(offset, 0u));
it = lower_bound(taken_.begin(), taken_.end(), make_pair(offset, 0u));
DCHECK(it != taken_.end());

it->second = 0; // clear length
Expand Down
28 changes: 19 additions & 9 deletions util/fibers/uring_proactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -321,23 +321,33 @@ int UringProactor::RegisterBuffers(size_t size) {

// Use mmap to create the backing
void* ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (ptr == MAP_FAILED)
if (ptr == MAP_FAILED) {
LOG(ERROR) << "Could not mmap " << size << " bytes";
return -errno;
}

iovec vec{ptr, size};
int res = io_uring_register_buffers(&ring_, &vec, 1);
if (res < 0) {
LOG(ERROR) << "Error calling io_uring_register_buffers: " << SafeErrorMessage(-res);
munmap(ptr, size);
return res;
}

buf_pool_.backing = reinterpret_cast<uint8_t*>(ptr);
buf_pool_.segments.Grow(size / UringBuf::kAlign);

iovec vec{buf_pool_.backing, size};
return io_uring_register_buffers(&ring_, &vec, 1);
return 0;
}

std::optional<UringBuf> UringProactor::RequestBuffer(size_t size) {
DCHECK(buf_pool_.backing);
// We keep track not of bytes, but 4kb segments and round up
size_t segments = (size + UringBuf::kAlign - 1) / UringBuf::kAlign;
if (auto offset = buf_pool_.segments.Request(segments)) {
uint8_t* ptr = buf_pool_.backing + *offset * UringBuf::kAlign;
return UringBuf{{ptr, segments * UringBuf::kAlign}, 0};
if (buf_pool_.backing) {
// We keep track not of bytes, but 4kb segments and round up
size_t segment_cnt = (size + UringBuf::kAlign - 1) / UringBuf::kAlign;
if (auto offset = buf_pool_.segments.Request(segment_cnt)) {
uint8_t* ptr = buf_pool_.backing + *offset * UringBuf::kAlign;
return UringBuf{{ptr, segment_cnt * UringBuf::kAlign}, 0};
}
}
return std::nullopt;
}
Expand Down
3 changes: 2 additions & 1 deletion util/fibers/uring_proactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ class UringProactor : public ProactorBase {
return IOURING;
}

// Register buffer with given size and allocate backing, calls io_uring_register_buffers
// Register buffer with given size and allocate backing, calls io_uring_register_buffers.
// Returns 0 on success, -errno on error.
int RegisterBuffers(size_t size);

// Request buffer of given size, returns none if there's no space left in the backing.
Expand Down
Loading