Skip to content

Commit

Permalink
timer: support join inflight task
Browse files Browse the repository at this point in the history
  • Loading branch information
ehds committed Apr 24, 2024
1 parent 406ce3d commit d33cbbf
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
9 changes: 5 additions & 4 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1030,10 +1030,11 @@ void NodeImpl::shutdown(Closure* done) {
_state = STATE_SHUTTING;

// Destroy all the timer
_election_timer.destroy();
_vote_timer.destroy();
_stepdown_timer.destroy();
_snapshot_timer.destroy();
_election_timer.destroy(true /*wait*/);
_vote_timer.destroy(true /*wait*/);
_stepdown_timer.destroy(true /*wait*/);
_snapshot_timer.destroy(true /*wait*/);


// stop replicator and fsm_caller wait
if (_log_manager) {
Expand Down
15 changes: 13 additions & 2 deletions src/braft/repeated_timer_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ RepeatedTimerTask::RepeatedTimerTask()
_stopped(true),
_running(false),
_destroyed(true),
_invoking(false) {}
_invoking(false),
_finish_event(0) {}

RepeatedTimerTask::~RepeatedTimerTask() {
CHECK(!_running) << "Is still running";
Expand Down Expand Up @@ -71,6 +72,7 @@ void RepeatedTimerTask::on_timedout() {
lck.unlock();
on_destroy();
}
_finish_event.signal();
return;
}
return schedule(lck);
Expand All @@ -94,6 +96,7 @@ void RepeatedTimerTask::start() {
// is still running, in which case on_timedout would invoke
// schedule as it would not see _stopped
_running = true;
_finish_event.reset(1);
schedule(lck);
}

Expand Down Expand Up @@ -156,7 +159,7 @@ void RepeatedTimerTask::reset(int timeout_ms) {
// else on_timedout would invoke schdule
}

void RepeatedTimerTask::destroy() {
void RepeatedTimerTask::destroy(bool wait_infight_task) {
std::unique_lock<raft_mutex_t> lck(_mutex);
BRAFT_RETURN_IF(_destroyed);
_destroyed = true;
Expand All @@ -175,6 +178,13 @@ void RepeatedTimerTask::destroy() {
on_destroy();
return;
}

if (wait_infight_task) {
_finish_event.wait();
CHECK(!_running);
return;
}

CHECK(_running);
}

Expand Down Expand Up @@ -208,3 +218,4 @@ void RepeatedTimerTask::describe(std::ostream& os, bool use_html) {
}

} // namespace braft

6 changes: 5 additions & 1 deletion src/braft/repeated_timer_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <bthread/unstable.h>

#include "braft/macros.h"
#include "bthread/countdown_event.h"

namespace braft {

Expand Down Expand Up @@ -50,7 +51,7 @@ class RepeatedTimerTask {
void reset(int timeout_ms);

// Destroy the timer
void destroy();
void destroy(bool wait_inflight_task);

// Describe the current status of timer
void describe(std::ostream& os, bool use_html);
Expand Down Expand Up @@ -78,8 +79,11 @@ class RepeatedTimerTask {
bool _running;
bool _destroyed;
bool _invoking;
bthread::CountdownEvent _finish_event;

};

} // namespace braft

#endif // BRAFT_REPEATED_TIMER_TASK_H

0 comments on commit d33cbbf

Please sign in to comment.