Skip to content

Commit

Permalink
fix: reset kScheduleRemote when we pull the queue (#311)
Browse files Browse the repository at this point in the history
Before, we reset kScheduleRemote bit right after we pushed into the remote queue.
But if this thread is slow, it is possible that the notified fiber will wake up before we reset the bit,
and then it will go sleeping again and then another notifier will try to awake it and will check fail at line
scheduler.cc:238 when it checks that the bit is set.
If we reset it upon waking up, the fiber will reset it before suspending, so this situation can not happen.

In addition, we print the stacktrace when we reach unexpected state.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Sep 8, 2024
1 parent b535a02 commit ff26222
Showing 1 changed file with 5 additions and 6 deletions.
11 changes: 5 additions & 6 deletions util/fibers/detail/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ void Scheduler::ScheduleFromRemote(FiberInterface* cntx) {
// This should not happen as ScheduleFromRemote should be called under a WaitQueue lock.
if ((cntx->flags_.fetch_or(FiberInterface::kScheduleRemote, memory_order_acquire) &
FiberInterface::kScheduleRemote) != 0) {
LOG(DFATAL) << "Already scheduled remotely " << cntx->name();
LOG(DFATAL) << "Already scheduled remotely " << cntx->name() << " " << cntx->DEBUG_use_count();
return;
}

Expand All @@ -253,10 +253,6 @@ void Scheduler::ScheduleFromRemote(FiberInterface* cntx) {
cntx->DEBUG_remote_epoch = remote_epoch_.fetch_add(1, memory_order_relaxed);
remote_ready_queue_.Push(cntx);


// clear the bit after we pushed to the queue.
cntx->flags_.fetch_and(~FiberInterface::kScheduleRemote, memory_order_release);

DVLOG(2) << "ScheduleFromRemote " << cntx->name() << " " << cntx->use_count_.load();

if (custom_policy_) {
Expand Down Expand Up @@ -342,7 +338,7 @@ bool Scheduler::ProcessRemoteReady(FiberInterface* active) {
<< iteration << " remote_empty: " << qempty << ", current_epoch: " << epoch
<< ", push_epoch: " << active->DEBUG_remote_epoch
<< ", next:" << (uint64_t)next;

LOG(ERROR) << "Stacktrace: " << GetStacktrace();
if (next != (FiberInterface*)FiberInterface::kRemoteFree) {
if (iteration < 100) {
// Work around the inconsistency by retrying.
Expand All @@ -363,6 +359,9 @@ bool Scheduler::ProcessRemoteReady(FiberInterface* active) {
fi->remote_next_.store((FiberInterface*)FiberInterface::kRemoteFree, memory_order_relaxed);
fi->DEBUG_remote_epoch = 0;

// clear the bit after we pulled from the queue.
fi->flags_.fetch_and(~FiberInterface::kScheduleRemote, memory_order_release);

DVLOG(2) << "Pulled " << fi->name() << " " << fi->DEBUG_use_count();

DCHECK(fi->scheduler_ == this);
Expand Down

0 comments on commit ff26222

Please sign in to comment.