From ce2ca6af2de9d18733cd053e49b06d0fd284d3a9 Mon Sep 17 00:00:00 2001 From: LiYiChao Date: Tue, 29 Nov 2022 21:47:36 +0800 Subject: [PATCH] raft arbiter support. Signed-off-by: Yichao Li Signed-off-by: Li Wang --- src/braft/fsm_caller.cpp | 25 ++++++++++++++++++++----- src/braft/log_manager.cpp | 4 +++- src/braft/log_manager.h | 5 +++++ src/braft/node.cpp | 37 ++++++++++++++++++++++--------------- src/braft/node.h | 3 +++ src/braft/raft.cpp | 2 +- src/braft/raft.h | 8 ++++++++ src/braft/raft.proto | 1 + src/braft/replicator.cpp | 14 ++++++++++++++ src/braft/replicator.h | 3 +++ 10 files changed, 80 insertions(+), 22 deletions(-) diff --git a/src/braft/fsm_caller.cpp b/src/braft/fsm_caller.cpp index 98913eea..8e310635 100644 --- a/src/braft/fsm_caller.cpp +++ b/src/braft/fsm_caller.cpp @@ -298,7 +298,11 @@ void FSMCaller::do_committed(int64_t committed_index) { continue; } Iterator iter(&iter_impl); - _fsm->on_apply(iter); + if (!_node->arbiter()) { + _fsm->on_apply(iter); + } else { + for (;iter.valid();iter.next()) {} + } LOG_IF(ERROR, iter.valid()) << "Node " << _node->node_id() << " Iterator is still valid, did you return before iterator " @@ -353,7 +357,11 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) { return; } - _fsm->on_snapshot_save(writer, done); + if (!_node->arbiter()) { + _fsm->on_snapshot_save(writer, done); + } else { + done->Run(); + } return; } @@ -402,7 +410,10 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) { return done->Run(); } - ret = _fsm->on_snapshot_load(reader); + if (!_node->arbiter()) { + ret = _fsm->on_snapshot_load(reader); + } + if (ret != 0) { done->status().set_error(ret, "StateMachine on_snapshot_load failed"); done->Run(); @@ -454,12 +465,16 @@ int FSMCaller::on_leader_start(int64_t term, int64_t lease_epoch) { } void FSMCaller::do_leader_stop(const butil::Status& status) { - _fsm->on_leader_stop(status); + if (!_node->arbiter()) { + _fsm->on_leader_stop(status); + } } void FSMCaller::do_leader_start(const LeaderStartContext& leader_start_context) { _node->leader_lease_start(leader_start_context.lease_epoch); - _fsm->on_leader_start(leader_start_context.term); + if (!_node->arbiter()) { + _fsm->on_leader_start(leader_start_context.term); + } } int FSMCaller::on_start_following(const LeaderChangeContext& start_following_context) { diff --git a/src/braft/log_manager.cpp b/src/braft/log_manager.cpp index cf077751..9b3e2ad4 100644 --- a/src/braft/log_manager.cpp +++ b/src/braft/log_manager.cpp @@ -71,6 +71,7 @@ LogManager::LogManager() , _next_wait_id(0) , _first_log_index(0) , _last_log_index(0) + , _complete_index(std::numeric_limits::max()) { CHECK_EQ(0, start_disk_thread()); } @@ -276,7 +277,8 @@ int LogManager::truncate_prefix(const int64_t first_index_kept, _last_log_index = first_index_kept - 1; } _config_manager->truncate_prefix(first_index_kept); - TruncatePrefixClosure* c = new TruncatePrefixClosure(first_index_kept); + TruncatePrefixClosure* c = new TruncatePrefixClosure( + std::min(first_index_kept, _complete_index.load(butil::memory_order_relaxed))); const int rc = bthread::execution_queue_execute(_disk_queue, c); lck.unlock(); for (size_t i = 0; i < saved_logs_in_memory.size(); ++i) { diff --git a/src/braft/log_manager.h b/src/braft/log_manager.h index fcd52dc3..17d68a02 100644 --- a/src/braft/log_manager.h +++ b/src/braft/log_manager.h @@ -149,6 +149,10 @@ class BAIDU_CACHELINE_ALIGNMENT LogManager { // Get the internal status of LogManager. void get_status(LogManagerStatus* status); + void set_complete_index(int64_t index) { + _complete_index.store(index, butil::memory_order_relaxed); + } + private: friend class AppendBatcher; struct WaitMeta { @@ -218,6 +222,7 @@ friend class AppendBatcher; int64_t _last_log_index; // the last snapshot's log_id LogId _last_snapshot_id; + butil::atomic _complete_index; // the virtual first log, for finding next_index of replicator, which // can avoid install_snapshot too often in extreme case where a follower's // install_snapshot is slower than leader's save_snapshot diff --git a/src/braft/node.cpp b/src/braft/node.cpp index f5802f7f..a209cb5c 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -96,6 +96,13 @@ class ConfigurationChangeDone : public Closure { if (_leader_start) { _node->leader_lease_start(_lease_epoch); _node->_options.fsm->on_leader_start(_term); + if (_node->arbiter()) { + // todo: handle errors + CHECK(!_node->transfer_leadership_to(ANY_PEER)) << "Arbiter " << _node->node_id() + << " fail to transfer leader to others"; + CHECK(!_node->is_leader()) << "Arbiter " << _node->node_id() + << " is still leader after transfer_leadership_to ANY_PEER"; + } } } delete this; @@ -671,6 +678,10 @@ int NodeImpl::execute_applying_tasks( } void NodeImpl::apply(const Task& task) { + if (arbiter()) { + task.done->status().set_error(EPERM, "Node is arbiter"); + return run_closure_in_bthread(task.done); + } LogEntry* entry = new LogEntry; entry->AddRef(); entry->data.swap(*task.data); @@ -1751,6 +1762,10 @@ void NodeImpl::request_peers_to_vote(const std::set& peers, } } +int64_t NodeImpl::complete_index() { + return _replicator_group.complete_index(); +} + // in lock void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate, const butil::Status& status) { @@ -1878,21 +1893,6 @@ void NodeImpl::check_step_down(const int64_t request_term, const PeerId& server_ } } -class LeaderStartClosure : public Closure { -public: - LeaderStartClosure(StateMachine* fsm, int64_t term) : _fsm(fsm), _term(term) {} - ~LeaderStartClosure() {} - void Run() { - if (status().ok()) { - _fsm->on_leader_start(_term); - } - delete this; - } -private: - StateMachine* _fsm; - int64_t _term; -}; - // in lock void NodeImpl::become_leader() { CHECK(_state == STATE_CANDIDATE); @@ -2483,6 +2483,9 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl, _ballot_box->set_last_committed_index( std::min(request->committed_index(), prev_log_index)); + if (arbiter()) { + _log_manager->set_complete_index(request->complete_index()); + } return; } @@ -3394,6 +3397,10 @@ bool NodeImpl::is_leader_lease_valid() { void NodeImpl::get_leader_lease_status(LeaderLeaseStatus* lease_status) { // Fast path for leader to lease check + if (arbiter()) { + lease_status->state = LEASE_EXPIRED; + return; + } LeaderLease::LeaseInfo internal_info; _leader_lease.get_lease_info(&internal_info); switch (internal_info.state) { diff --git a/src/braft/node.h b/src/braft/node.h index d8565f39..17b235c8 100644 --- a/src/braft/node.h +++ b/src/braft/node.h @@ -241,6 +241,9 @@ friend class VoteBallotCtx; bool disable_cli() const { return _options.disable_cli; } + bool arbiter() { return _options.arbiter;} + int64_t complete_index(); + private: friend class butil::RefCountedThreadSafe; diff --git a/src/braft/raft.cpp b/src/braft/raft.cpp index 6069f706..2ed8d839 100644 --- a/src/braft/raft.cpp +++ b/src/braft/raft.cpp @@ -155,7 +155,7 @@ PeerId Node::leader_id() { } bool Node::is_leader() { - return _impl->is_leader(); + return !_impl->arbiter() && _impl->is_leader(); } bool Node::is_leader_lease_valid() { diff --git a/src/braft/raft.h b/src/braft/raft.h index ef9cead8..38f1d7f7 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -588,6 +588,13 @@ struct NodeOptions { // Default: false bool disable_cli; + // If true, this node will not have a copy of data and only participates in elections + // from user's viewpoint this node will never become leader, + // on_apply/on_snapshot_save/on_snapshot_load/on_leader_start/on_leader_stop etc will not be called + // todo: avoid installing snapshot for arbiter + // Default: false + bool arbiter; + // Construct a default instance NodeOptions(); @@ -609,6 +616,7 @@ inline NodeOptions::NodeOptions() , snapshot_file_system_adaptor(NULL) , snapshot_throttle(NULL) , disable_cli(false) + , arbiter(false) {} inline int NodeOptions::get_catchup_timeout_ms() { diff --git a/src/braft/raft.proto b/src/braft/raft.proto index b2df8e99..e36f0497 100644 --- a/src/braft/raft.proto +++ b/src/braft/raft.proto @@ -48,6 +48,7 @@ message AppendEntriesRequest { required int64 prev_log_index = 6; repeated EntryMeta entries = 7; required int64 committed_index = 8; + optional int64 complete_index = 9; }; message AppendEntriesResponse { diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index d64d8385..ce9df91c 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -560,6 +560,7 @@ void Replicator::_send_empty_entries(bool is_heartbeat) { _heartbeat_counter++; // set RPC timeout for heartbeat, how long should timeout be is waiting to be optimized. cntl->set_timeout_ms(*_options.election_timeout_ms / 2); + request->set_complete_index(_options.node->complete_index()); } else { _st.st = APPENDING_ENTRIES; _st.first_log_index = _next_index; @@ -758,6 +759,10 @@ void Replicator::_wait_more_entries() { } void Replicator::_install_snapshot() { + CHECK(!_options.node->arbiter()) << "node " << _options.group_id << ":" << _options.server_id + << " refuse to send InstallSnapshotRequest to " << _options.peer_id + << " because I am arbiter"; + if (_reader) { // follower's readonly mode change may cause two install_snapshot // one possible case is: @@ -1569,4 +1574,13 @@ bool ReplicatorGroup::readonly(const PeerId& peer) const { return Replicator::readonly(rid); } +int64_t ReplicatorGroup::complete_index() const { + int64_t rst = std::numeric_limits::max(); + for (std::map::const_iterator + iter = _rmap.begin(); iter != _rmap.end(); ++iter) { + rst = std::min(rst, Replicator::get_next_index(iter->second.id)); + } + return rst; +} + } // namespace braft diff --git a/src/braft/replicator.h b/src/braft/replicator.h index 7223900f..c6c31a58 100644 --- a/src/braft/replicator.h +++ b/src/braft/replicator.h @@ -358,6 +358,9 @@ class ReplicatorGroup { // Check if a replicator is in readonly bool readonly(const PeerId& peer) const; + // all log index before `complete_index()` have been persisted by all peers + int64_t complete_index() const; + private: int _add_replicator(const PeerId& peer, ReplicatorId *rid);