From ff2622222c9b0f091a37f466779cbfdc56df8344 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 8 Sep 2024 08:39:01 +0300 Subject: [PATCH] fix: reset kScheduleRemote when we pull the queue (#311) Before, we reset kScheduleRemote bit right after we pushed into the remote queue. But if this thread is slow, it is possible that the notified fiber will wake up before we reset the bit, and then it will go sleeping again and then another notifier will try to awake it and will check fail at line scheduler.cc:238 when it checks that the bit is set. If we reset it upon waking up, the fiber will reset it before suspending, so this situation can not happen. In addition, we print the stacktrace when we reach unexpected state. Signed-off-by: Roman Gershman --- util/fibers/detail/scheduler.cc | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/util/fibers/detail/scheduler.cc b/util/fibers/detail/scheduler.cc index 00a5bc08..1f07e1cf 100644 --- a/util/fibers/detail/scheduler.cc +++ b/util/fibers/detail/scheduler.cc @@ -235,7 +235,7 @@ void Scheduler::ScheduleFromRemote(FiberInterface* cntx) { // This should not happen as ScheduleFromRemote should be called under a WaitQueue lock. if ((cntx->flags_.fetch_or(FiberInterface::kScheduleRemote, memory_order_acquire) & FiberInterface::kScheduleRemote) != 0) { - LOG(DFATAL) << "Already scheduled remotely " << cntx->name(); + LOG(DFATAL) << "Already scheduled remotely " << cntx->name() << " " << cntx->DEBUG_use_count(); return; } @@ -253,10 +253,6 @@ void Scheduler::ScheduleFromRemote(FiberInterface* cntx) { cntx->DEBUG_remote_epoch = remote_epoch_.fetch_add(1, memory_order_relaxed); remote_ready_queue_.Push(cntx); - - // clear the bit after we pushed to the queue. - cntx->flags_.fetch_and(~FiberInterface::kScheduleRemote, memory_order_release); - DVLOG(2) << "ScheduleFromRemote " << cntx->name() << " " << cntx->use_count_.load(); if (custom_policy_) { @@ -342,7 +338,7 @@ bool Scheduler::ProcessRemoteReady(FiberInterface* active) { << iteration << " remote_empty: " << qempty << ", current_epoch: " << epoch << ", push_epoch: " << active->DEBUG_remote_epoch << ", next:" << (uint64_t)next; - + LOG(ERROR) << "Stacktrace: " << GetStacktrace(); if (next != (FiberInterface*)FiberInterface::kRemoteFree) { if (iteration < 100) { // Work around the inconsistency by retrying. @@ -363,6 +359,9 @@ bool Scheduler::ProcessRemoteReady(FiberInterface* active) { fi->remote_next_.store((FiberInterface*)FiberInterface::kRemoteFree, memory_order_relaxed); fi->DEBUG_remote_epoch = 0; + // clear the bit after we pulled from the queue. + fi->flags_.fetch_and(~FiberInterface::kScheduleRemote, memory_order_release); + DVLOG(2) << "Pulled " << fi->name() << " " << fi->DEBUG_use_count(); DCHECK(fi->scheduler_ == this);