diff --git a/util/fibers/fiber_socket_test.cc b/util/fibers/fiber_socket_test.cc index 82d54f7d..2fc44b24 100644 --- a/util/fibers/fiber_socket_test.cc +++ b/util/fibers/fiber_socket_test.cc @@ -384,6 +384,27 @@ TEST_P(FiberSocketTest, NotEmpty) { proactor_->Await([&] { std::ignore = sock->Close(); }); } + +TEST_P(FiberSocketTest, OpenMany) { + bool use_uring = GetParam() == "uring"; + if (!use_uring) { + GTEST_SKIP() << "OpenMany requires iouring"; + return; + } + + proactor_->Await([&] { + for (unsigned i = 0; i < 10000; ++i) { + UringProactor* up = static_cast(proactor_.get()); + UringSocket sock(up); + auto ec = sock.Create(AF_INET); + ASSERT_FALSE(ec); + ec = sock.Close(); + ASSERT_FALSE(ec); + usleep(100); + } + }); +} + #endif } // namespace fb2 diff --git a/util/fibers/uring_proactor.cc b/util/fibers/uring_proactor.cc index 9615aab6..6842dcd9 100644 --- a/util/fibers/uring_proactor.cc +++ b/util/fibers/uring_proactor.cc @@ -19,10 +19,9 @@ #include "util/fibers/detail/scheduler.h" #include "util/fibers/uring_socket.h" -// TODO: we need to fix register_fds_ resize flow. -// Also we must ensure that there is no leakage of socket descriptors with enable_direct_fd enabled. +// We must ensure that there is no leakage of socket descriptors with enable_direct_fd enabled. // See AcceptServerTest.Shutdown to trigger direct fd resize. -ABSL_FLAG(bool, enable_direct_fd, false, "If true tries to register file descriptors"); +ABSL_FLAG(uint32_t, uring_direct_table_len, 0, "If positive create direct fd table of this length"); #define URING_CHECK(x) \ do { \ @@ -91,7 +90,7 @@ UringProactor::~UringProactor() { } } - if (direct_fd_) { + if (!register_fds_.empty()) { io_uring_unregister_files(&ring_); } io_uring_queue_exit(&ring_); @@ -117,13 +116,8 @@ void UringProactor::Init(unsigned pool_index, size_t ring_size, int wq_fd) { msgring_f_ = 0; poll_first_ = 0; - direct_fd_ = 0; buf_ring_f_ = 0; - if (kver.kernel > 5 || (kver.kernel == 5 && kver.major >= 15)) { - direct_fd_ = absl::GetFlag(FLAGS_enable_direct_fd); // failswitch to disable direct fds. - } - // If we setup flags that kernel does not recognize, it fails the setup call. if (kver.kernel > 5 || (kver.kernel == 5 && kver.major >= 19)) { params.flags |= IORING_SETUP_SUBMIT_ALL; @@ -132,6 +126,9 @@ void UringProactor::Init(unsigned pool_index, size_t ring_size, int wq_fd) { // io_uring_register_buf_ring is supported since 5.19. buf_ring_f_ = 1; + + // FLAGS_uring_direct_table_len is a failswitch to disable direct fds. + register_fds_.resize(absl::GetFlag(FLAGS_uring_direct_table_len), -1); } if (kver.kernel >= 6 && kver.major >= 1) { @@ -174,8 +171,7 @@ void UringProactor::Init(unsigned pool_index, size_t ring_size, int wq_fd) { int res = io_uring_register_ring_fd(&ring_); VLOG_IF(1, res < 0) << "io_uring_register_ring_fd failed: " << -res; - if (direct_fd_) { - register_fds_.resize(512, -1); + if (!register_fds_.empty()) { int res = io_uring_register_files(&ring_, register_fds_.data(), register_fds_.size()); CHECK_EQ(0, res); } @@ -524,34 +520,13 @@ void UringProactor::CancelPeriodicInternal(PeriodicItem* item) { } unsigned UringProactor::RegisterFd(int source_fd) { - if (!direct_fd_) + if (register_fds_.empty()) return kInvalidDirectFd; // TODO: to create a linked list from free fds. auto next = std::find(register_fds_.begin() + next_free_index_, register_fds_.end(), -1); - if (next == register_fds_.end()) { - size_t prev_sz = register_fds_.size(); - DCHECK_GT(prev_sz, 0u); - - // enlarge direct fds table. - register_fds_.resize(prev_sz * 2, -1); - register_fds_[prev_sz] = source_fd; // source fd will map to prev_sz index. - next_free_index_ = prev_sz + 1; - - // TODO: this does not work because it seems we need to unregister first - // to be able re-register. See - int res = io_uring_register_files(&ring_, register_fds_.data(), register_fds_.size()); - if (res < 0) { - LOG(ERROR) << "Error registering files: " << -res << " " << SafeErrorMessage(-res) << " " - << prev_sz; - register_fds_.resize(prev_sz); - next_free_index_ = prev_sz; - return kInvalidDirectFd; - } - ++direct_fds_cnt_; - - return prev_sz; - } + if (next == register_fds_.end()) // it is not possible to resize this table. + return kInvalidDirectFd; *next = source_fd; next_free_index_ = next - register_fds_.begin(); @@ -567,7 +542,6 @@ unsigned UringProactor::RegisterFd(int source_fd) { } int UringProactor::TranslateDirectFd(unsigned fixed_fd) const { - DCHECK(direct_fd_); DCHECK_LT(fixed_fd, register_fds_.size()); DCHECK_GE(register_fds_[fixed_fd], 0); @@ -575,9 +549,9 @@ int UringProactor::TranslateDirectFd(unsigned fixed_fd) const { } int UringProactor::UnregisterFd(unsigned fixed_fd) { - DCHECK(direct_fd_); + DCHECK(!register_fds_.empty()); - if (!direct_fd_) + if (register_fds_.empty()) return -1; DCHECK_LT(fixed_fd, register_fds_.size()); diff --git a/util/fibers/uring_proactor.h b/util/fibers/uring_proactor.h index 296f646f..8b41ae67 100644 --- a/util/fibers/uring_proactor.h +++ b/util/fibers/uring_proactor.h @@ -77,7 +77,7 @@ class UringProactor : public ProactorBase { } bool HasDirectFD() const { - return direct_fd_; + return !register_fds_.empty(); } int ring_fd() const { @@ -151,9 +151,8 @@ class UringProactor : public ProactorBase { uint8_t msgring_f_ : 1; uint8_t poll_first_ : 1; - uint8_t direct_fd_ : 1; uint8_t buf_ring_f_ : 1; - uint8_t : 4; + uint8_t : 5; EventCount sqe_avail_;