From 699d56739ec80a8446fb40777ce542d17f4fe00d Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Mon, 28 Oct 2024 16:41:45 +0000 Subject: [PATCH 1/2] Perf[BMQIO]: remove unnecessary function copy Signed-off-by: Evgeny Malygin --- src/groups/bmq/bmqio/bmqio_ntcchannel.cpp | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp b/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp index a5099ce958..d9c8e8c10d 100644 --- a/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp +++ b/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp @@ -406,7 +406,6 @@ NtcRead::~NtcRead() BSLS_ASSERT_OPT(d_numNeeded == 0); BSLS_ASSERT_OPT(d_complete); BSLS_ASSERT_OPT(!d_timer_sp); - BSLS_ASSERT_OPT(!d_callback); } // MANIPULATORS @@ -447,8 +446,6 @@ void NtcRead::clear() d_numNeeded = 0; d_complete = true; - - d_callback = bmqio::Channel::ReadCallback(); } // ACCESSORS @@ -758,11 +755,19 @@ void NtcChannel::processReadQueueLowWatermark( int numNeeded = 0; { - bmqio::Channel::ReadCallback readCallback = read->callback(); + const bmqio::Channel::ReadCallback& readCallback = + read->callback(); bslmt::UnLockGuard unlock(&d_mutex); readCallback(bmqio::Status(), &numNeeded, &d_readCache); } + if (read->isComplete()) { + // It's possible that we encountered canceled or timeout event + // when we unlocked `d_mutex`, so the `read` pointer that we + // hold now might be pointing to NtcRead already removed from + // `d_readQueue`. There is nothing we can do. + continue; + } BMQIO_NTCCHANNEL_LOG_READ_CACHE_DRAINED(this, d_streamSocket_sp, @@ -1307,6 +1312,8 @@ void NtcChannel::cancelRead() void NtcChannel::close(const Status& status) { + // Executed from *ANY* thread + bslmt::LockGuard lock(&d_mutex); bsl::shared_ptr self = this->shared_from_this(); @@ -1319,6 +1326,10 @@ void NtcChannel::close(const Status& status) bsl::shared_ptr read; d_readQueue.pop(&read); + // This code assumes thread-safety of `ntci::Timer::close` because we + // are not in IO thread. + // `read->d_callback` can still be executed concurrently from IO + // thread. read->setComplete(); read->clear(); } From f0fcea3e6f7ba9c5ae7135172140a86d2648ee5d Mon Sep 17 00:00:00 2001 From: Evgeny Malygin Date: Wed, 30 Oct 2024 19:17:45 +0000 Subject: [PATCH 2/2] Refactor[BMQIO]: get rid of setComplete Signed-off-by: Evgeny Malygin --- src/groups/bmq/bmqio/bmqio_ntcchannel.cpp | 36 ++++------------------- src/groups/bmq/bmqio/bmqio_ntcchannel.h | 3 -- 2 files changed, 6 insertions(+), 33 deletions(-) diff --git a/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp b/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp index d9c8e8c10d..fffdcb8c13 100644 --- a/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp +++ b/src/groups/bmq/bmqio/bmqio_ntcchannel.cpp @@ -426,17 +426,6 @@ void NtcRead::setTimer(const bsl::shared_ptr& timer) d_timer_sp = timer; } -void NtcRead::setComplete() -{ - if (d_timer_sp) { - d_timer_sp->close(); - d_timer_sp.reset(); - } - - d_numNeeded = 0; - d_complete = true; -} - void NtcRead::clear() { if (d_timer_sp) { @@ -631,14 +620,12 @@ void NtcChannel::processReadTimeout( d_readQueue.remove(read); - bool isComplete = read->isComplete(); - read->setComplete(); - if (d_state == e_STATE_CLOSED) { + read->clear(); return; } - if (isComplete) { + if (read->isComplete()) { return; } @@ -662,14 +649,12 @@ void NtcChannel::processReadCancelled( d_readQueue.remove(read); - bool isComplete = read->isComplete(); - read->setComplete(); - if (d_state == e_STATE_CLOSED) { + read->clear(); return; } - if (isComplete) { + if (read->isComplete()) { return; } @@ -708,7 +693,6 @@ void NtcChannel::processReadQueueLowWatermark( bsl::shared_ptr read = d_readQueue.front(); if (read->numNeeded() == 0 || read->isComplete()) { - read->setComplete(); read->clear(); d_readQueue.pop(); continue; @@ -777,7 +761,6 @@ void NtcChannel::processReadQueueLowWatermark( BMQIO_NTCCHANNEL_LOG_READ_COMPLETE(this, d_streamSocket_sp, read); - read->setComplete(); read->clear(); d_readQueue.remove(read); continue; @@ -807,10 +790,7 @@ void NtcChannel::processReadQueueLowWatermark( bsl::shared_ptr read; d_readQueue.pop(&read); - bool isComplete = read->isComplete(); - read->setComplete(); - - if (!isComplete) { + if (!read->isComplete()) { bmqio::Channel::ReadCallback readCallback = read->callback(); read->clear(); @@ -885,10 +865,7 @@ void NtcChannel::processShutdownReceive( bsl::shared_ptr read; d_readQueue.pop(&read); - bool isComplete = read->isComplete(); - read->setComplete(); - - if (!isComplete) { + if (!read->isComplete()) { bmqio::Channel::ReadCallback readCallback = read->callback(); read->clear(); @@ -1330,7 +1307,6 @@ void NtcChannel::close(const Status& status) // are not in IO thread. // `read->d_callback` can still be executed concurrently from IO // thread. - read->setComplete(); read->clear(); } diff --git a/src/groups/bmq/bmqio/bmqio_ntcchannel.h b/src/groups/bmq/bmqio/bmqio_ntcchannel.h index 0f28079d1d..a4072992a5 100644 --- a/src/groups/bmq/bmqio/bmqio_ntcchannel.h +++ b/src/groups/bmq/bmqio/bmqio_ntcchannel.h @@ -99,9 +99,6 @@ class NtcRead { /// Set the timer to the specified `timer`. void setTimer(const bsl::shared_ptr& timer); - /// Set the operation as completed. - void setComplete(); - /// Set the operation as completed and clear all resources. void clear();