diff --git a/src/braft/fsm_caller.cpp b/src/braft/fsm_caller.cpp index 98913eea..8be576b6 100644 --- a/src/braft/fsm_caller.cpp +++ b/src/braft/fsm_caller.cpp @@ -496,6 +496,10 @@ void FSMCaller::do_stop_following(const LeaderChangeContext& stop_following_cont _fsm->on_stop_following(stop_following_context); } +void FSMCaller::on_pre_send_snapshot(const PeerId& peer_id) { + _fsm->on_pre_send_snapshot(peer_id); +} + void FSMCaller::describe(std::ostream &os, bool use_html) { const char* newline = (use_html) ? "
" : "\n"; TaskType cur_task = _cur_task; diff --git a/src/braft/fsm_caller.h b/src/braft/fsm_caller.h index 897a50f9..6ad1c84e 100644 --- a/src/braft/fsm_caller.h +++ b/src/braft/fsm_caller.h @@ -116,6 +116,7 @@ class BAIDU_CACHELINE_ALIGNMENT FSMCaller { int on_leader_start(int64_t term, int64_t lease_epoch); int on_start_following(const LeaderChangeContext& start_following_context); int on_stop_following(const LeaderChangeContext& stop_following_context); + void on_pre_send_snapshot(const PeerId& peer_id); BRAFT_MOCK int on_error(const Error& e); int64_t last_applied_index() const { return _last_applied_index.load(butil::memory_order_relaxed); diff --git a/src/braft/log_manager.cpp b/src/braft/log_manager.cpp index cf077751..cc0d14a3 100644 --- a/src/braft/log_manager.cpp +++ b/src/braft/log_manager.cpp @@ -668,6 +668,10 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) { // We have last snapshot index _virtual_first_log_id = last_but_one_snapshot_id; truncate_prefix(last_but_one_snapshot_id.index + 1, lck); + } else { + // after restart, no followers, we can truncate log safely + _virtual_first_log_id = _last_snapshot_id; + truncate_prefix(meta->last_included_index() + 1, lck); } return; } else { diff --git a/src/braft/node.cpp b/src/braft/node.cpp index b5ba39eb..30e67a34 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -1365,6 +1365,11 @@ void NodeImpl::on_error(const Error& e) { lck.unlock(); } +void NodeImpl::pre_send_snapshot(const PeerId& peer_id) { + _fsm_caller->on_pre_send_snapshot(peer_id); +} + + void NodeImpl::handle_vote_timeout() { std::unique_lock lck(_mutex); diff --git a/src/braft/node.h b/src/braft/node.h index b9dd3e82..e6230f02 100644 --- a/src/braft/node.h +++ b/src/braft/node.h @@ -241,6 +241,9 @@ friend class VoteBallotCtx; bool disable_cli() const { return _options.disable_cli; } bool is_witness() const { return _options.witness; } + + // Called when leader start to send snapshot to remote peer + void pre_send_snapshot(const PeerId& peer_id); private: friend class butil::RefCountedThreadSafe; diff --git a/src/braft/raft.cpp b/src/braft/raft.cpp index 6069f706..c7c66ef6 100644 --- a/src/braft/raft.cpp +++ b/src/braft/raft.cpp @@ -313,6 +313,7 @@ void StateMachine::on_configuration_committed(const Configuration& conf, int64_t void StateMachine::on_stop_following(const LeaderChangeContext&) {} void StateMachine::on_start_following(const LeaderChangeContext&) {} +void StateMachine::on_pre_send_snapshot(const PeerId& peer_id) {} BootstrapOptions::BootstrapOptions() : last_log_index(0) diff --git a/src/braft/raft.h b/src/braft/raft.h index cb80163a..335e5e58 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -265,6 +265,11 @@ class StateMachine { // the very leader whom the follower starts to follow. // User can reset the node's information as it starts to follow some leader. virtual void on_start_following(const ::braft::LeaderChangeContext& ctx); + + // Invoked when the leader start to send snapshot to |peer_id| + // Default: Do nothing + virtual void on_pre_send_snapshot(const PeerId& peer_id); + }; enum State { diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index f2e2bb5c..faec0977 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -789,6 +789,8 @@ void Replicator::_install_snapshot() { add_one_more_task(true)) { return _block(butil::gettimeofday_us(), EBUSY); } + + node_impl->pre_send_snapshot(_options.peer_id); // pre-set replicator state to INSTALLING_SNAPSHOT, so replicator could be // blocked if something is wrong, such as throttled for a period of time