diff --git a/docs/cn/witness.md b/docs/cn/witness.md new file mode 100644 index 00000000..889a8412 --- /dev/null +++ b/docs/cn/witness.md @@ -0,0 +1,27 @@ +witness 副本只作为仲裁者进行投票,不保存实际的业务数据。 +## 实现方案 +对于witness的实现,需要考虑部署方式。 对于2+1部署,如果不允许witness当选选主,那么当主节点异常宕机的时候,如果wintess拥有比另外一个副本更新的entry,那么会导致选主失败,为了提高可用性,需要考虑允许witness短时间内允许当选为主,wintess成为主以后再主动transfer leader给另一个副本。**通过允许witness临时成为主可以提高系统的可用性** 。 + +对于4+1 的部署方式,实现相对简单,只需要让witness不能当选为主即可,因为即便主节点故障,依然至少有一个副本拥有最新的entry从而可以当选为主。由于witness不能当选为主,因此在同步raft log的时候也可以不需要同步log data给witness。当4+1部署的时候,如果不允许witness当选为主,那么最多只能容忍一个节点故障,如果允许witness临时当选为主,那么可以容忍两个节点故障。允许witness当选为主时,实现 +则与2+1部署一致。 + +## 详细实现 +### witness不允许当选为主 +当witness不允许当选为主时,只需要在初始化Node的时候禁止election_timeout timer进行初始化即可,同时可以无需进行data复制。 + +### witness允许临时当选为主 +允许witness当选为主可以提升服务的可用性。具体实现为: +* 设置raft_enable_witness_to_leader flag为true,允许witness临时选举为主 +* election_timeout设置为正常节点的两倍,在主节点异常宕机的时候,允许witness发起选主,同时由于election_timeout比数据副本大,可以保证数据副本优先被选为主,只有数据副本选主失败时,witness才会主动发起选主。 +* witness当选为主时,禁止安装快照请求,避免从节点获取到空快照覆盖原有的业务数据 +* 新增witness副本时, witness向leader发送install sanpshot请求,如果replicator本身是witness,则无需进行data文件的复制,只需复制最新的entry即可。 + +## witness 使用注意事项 +* 如果不允许witness当选为主时,相比原有raft方式部署,服务可用性会明显降低 +* 当允许witness临时当选为主时,极端情况下,可能导致从节点无法获取到最新的log entry从而导致数据丢失。 +例如: +``` +2+1的时候,日志为 [1, 8],某一时刻 replica1(leader) [1, 8] replica2 [1, 5] witness[4,8]。witness snapshot save,truncate 数据到 [7,8]。replica1(leader) 挂了,replica2 [1, 5] 和 witness 的数据接不上了, 此时会导致日志丢失。 +``` +用户在使用witness的时候,需要评估witness带来的可用性降低以及可能丢失部分最新数据的风险。 +如果业务无法接受数据丢失,可以自定义实现LogStorage, 只有半数以上副本拥有entry时,witness才能truncate 该entry之前的log。 diff --git a/src/braft/configuration.h b/src/braft/configuration.h index 21dae30d..310539ae 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -34,32 +34,53 @@ typedef std::string GroupId; // GroupId with version, format: {group_id}_{index} typedef std::string VersionedGroupId; +enum Role { + REPLICA = 0, + WITNESS = 1, +}; + // Represent a participant in a replicating group. struct PeerId { butil::EndPoint addr; // ip+port. int idx; // idx in same addr, default 0 + Role role = REPLICA; + + PeerId() : idx(0), role(REPLICA) {} + explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0), role(REPLICA) {} + PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_), role(REPLICA) {} + PeerId(butil::EndPoint addr_, int idx_, bool witness) : addr(addr_), idx(idx_) { + if (witness) { + this->role = WITNESS; + } + } - PeerId() : idx(0) {} - explicit PeerId(butil::EndPoint addr_) : addr(addr_), idx(0) {} - PeerId(butil::EndPoint addr_, int idx_) : addr(addr_), idx(idx_) {} /*intended implicit*/PeerId(const std::string& str) { CHECK_EQ(0, parse(str)); } - PeerId(const PeerId& id) : addr(id.addr), idx(id.idx) {} + PeerId(const PeerId& id) : addr(id.addr), idx(id.idx), role(id.role) {} void reset() { addr.ip = butil::IP_ANY; addr.port = 0; idx = 0; + role = REPLICA; } bool is_empty() const { return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0); } - + bool is_witness() const { + return role == WITNESS; + } int parse(const std::string& str) { reset(); char ip_str[64]; - if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d", ip_str, &addr.port, &idx)) { + int value = REPLICA; + if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, &addr.port, &idx, &value)) { + reset(); + return -1; + } + role = (Role)value; + if (role > WITNESS) { reset(); return -1; } @@ -72,7 +93,7 @@ struct PeerId { std::string to_string() const { char str[128]; - snprintf(str, sizeof(str), "%s:%d", butil::endpoint2str(addr).c_str(), idx); + snprintf(str, sizeof(str), "%s:%d:%d", butil::endpoint2str(addr).c_str(), idx, int(role)); return std::string(str); } @@ -96,7 +117,7 @@ inline bool operator!=(const PeerId& id1, const PeerId& id2) { } inline std::ostream& operator << (std::ostream& os, const PeerId& id) { - return os << id.addr << ':' << id.idx; + return os << id.addr << ':' << id.idx << ':' << int(id.role); } struct NodeId { diff --git a/src/braft/node.cpp b/src/braft/node.cpp index f5802f7f..b5ba39eb 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -67,6 +67,9 @@ BRPC_VALIDATE_GFLAG(raft_rpc_channel_connect_timeout_ms, brpc::PositiveInteger); DECLARE_bool(raft_enable_leader_lease); +DEFINE_bool(raft_enable_witness_to_leader, false, + "enable witness temporarily to become leader when leader down accidently"); + #ifndef UNIT_TEST static bvar::Adder g_num_nodes("raft_node_count"); #else @@ -253,6 +256,10 @@ int NodeImpl::init_snapshot_storage() { opt.init_term = _current_term; opt.filter_before_copy_remote = _options.filter_before_copy_remote; opt.usercode_in_pthread = _options.usercode_in_pthread; + // not need to copy data file when it is witness. + if (_options.witness) { + opt.copy_file = false; + } if (_options.snapshot_file_system_adaptor) { opt.file_system_adaptor = *_options.snapshot_file_system_adaptor; } @@ -498,9 +505,18 @@ int NodeImpl::init(const NodeOptions& options) { << ", did you forget to call braft::add_service()?"; return -1; } - - CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms)); - CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms)); + if (options.witness) { + // When this node is a witness, set the election_timeout to be twice + // of the normal replica to ensure that the normal replica has a higher + // priority and is selected as the master + if (FLAGS_raft_enable_witness_to_leader) { + CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms * 2)); + CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms * 2 + options.max_clock_drift_ms)); + } + } else { + CHECK_EQ(0, _election_timer.init(this, options.election_timeout_ms)); + CHECK_EQ(0, _vote_timer.init(this, options.election_timeout_ms + options.max_clock_drift_ms)); + } CHECK_EQ(0, _stepdown_timer.init(this, options.election_timeout_ms)); CHECK_EQ(0, _snapshot_timer.init(this, options.snapshot_interval_s * 1000)); @@ -524,7 +540,11 @@ int NodeImpl::init(const NodeOptions& options) { _fsm_caller = new FSMCaller(); _leader_lease.init(options.election_timeout_ms); - _follower_lease.init(options.election_timeout_ms, options.max_clock_drift_ms); + if (options.witness) { + _follower_lease.init(options.election_timeout_ms * 2, options.max_clock_drift_ms); + } else { + _follower_lease.init(options.election_timeout_ms, options.max_clock_drift_ms); + } // log storage and log manager init if (init_log_storage() != 0) { @@ -812,7 +832,7 @@ void NodeImpl::handle_stepdown_timeout() { << " state is " << state2str(_state); return; } - + check_witness(_conf.conf); int64_t now = butil::monotonic_time_ms(); check_dead_nodes(_conf.conf, now); if (!_conf.old_conf.empty()) { @@ -820,6 +840,18 @@ void NodeImpl::handle_stepdown_timeout() { } } +void NodeImpl::check_witness(const Configuration& conf) { + if (is_witness()) { + LOG(WARNING) << "node " << node_id() + << " term " << _current_term + << " steps down as it's a witness but become leader temporarily" + << " conf: " << conf; + butil::Status status; + status.set_error(ETRANSFERLEADERSHIP, "Witness becomes leader temporarily"); + step_down(_current_term, true, status); + } +} + void NodeImpl::unsafe_register_conf_change(const Configuration& old_conf, const Configuration& new_conf, Closure* done) { @@ -1302,9 +1334,14 @@ void NodeImpl::unsafe_reset_election_timeout_ms(int election_timeout_ms, _replicator_group.reset_heartbeat_interval( heartbeat_timeout(_options.election_timeout_ms)); _replicator_group.reset_election_timeout_interval(_options.election_timeout_ms); - _election_timer.reset(election_timeout_ms); - _leader_lease.reset_election_timeout_ms(election_timeout_ms); - _follower_lease.reset_election_timeout_ms(election_timeout_ms, _options.max_clock_drift_ms); + if (_options.witness && FLAGS_raft_enable_witness_to_leader) { + _election_timer.reset(election_timeout_ms * 2); + _follower_lease.reset_election_timeout_ms(election_timeout_ms * 2, _options.max_clock_drift_ms); + } else { + _election_timer.reset(election_timeout_ms); + _leader_lease.reset_election_timeout_ms(election_timeout_ms); + _follower_lease.reset_election_timeout_ms(election_timeout_ms, _options.max_clock_drift_ms); + } } void NodeImpl::on_error(const Error& e) { @@ -2569,7 +2606,6 @@ void NodeImpl::handle_install_snapshot_request(brpc::Controller* cntl, InstallSnapshotResponse* response, google::protobuf::Closure* done) { brpc::ClosureGuard done_guard(done); - if (_snapshot_executor == NULL) { cntl->SetFailed(EINVAL, "Not support snapshot"); return; diff --git a/src/braft/node.h b/src/braft/node.h index d8565f39..b9dd3e82 100644 --- a/src/braft/node.h +++ b/src/braft/node.h @@ -240,7 +240,7 @@ friend class VoteBallotCtx; int bootstrap(const BootstrapOptions& options); bool disable_cli() const { return _options.disable_cli; } - + bool is_witness() const { return _options.witness; } private: friend class butil::RefCountedThreadSafe; @@ -305,7 +305,7 @@ friend class butil::RefCountedThreadSafe; void* meta, bthread::TaskIterator& iter); void apply(LogEntryAndClosure tasks[], size_t size); void check_dead_nodes(const Configuration& conf, int64_t now_ms); - + void check_witness(const Configuration& conf); bool handle_out_of_order_append_entries(brpc::Controller* cntl, const AppendEntriesRequest* request, AppendEntriesResponse* response, diff --git a/src/braft/raft.h b/src/braft/raft.h index d9730c47..8a7f5d8e 100644 --- a/src/braft/raft.h +++ b/src/braft/raft.h @@ -589,6 +589,20 @@ struct NodeOptions { // Default: false bool disable_cli; + // If true, this node is a witness. + // 1. FLAGS_raft_enable_witness_to_leader = false + // It will never be elected as leader. So we don't need to init _vote_timer and _election_timer. + // 2. FLAGS_raft_enable_witness_to_leader = true + // It can be electd as leader, but should transfer leader to normal replica as soon as possible. + // + // Warning: + // 1. FLAGS_raft_enable_witness_to_leader = false + // When leader down and witness had newer log entry, it may cause leader election fail. + // 2. FLAGS_raft_enable_witness_to_leader = true + // When leader shutdown and witness was elected as leader, if follower delay over one snapshot, + // it may cause data lost because witness had truncated log entry before snapshot. + // Default: false + bool witness = false; // Construct a default instance NodeOptions(); diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp index 07b19d9d..65aea0df 100644 --- a/src/braft/replicator.cpp +++ b/src/braft/replicator.cpp @@ -47,6 +47,7 @@ DEFINE_int32(raft_retry_replicate_interval_ms, 1000, BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms, brpc::PositiveInteger); +DECLARE_bool(raft_enable_witness_to_leader); DECLARE_int64(raft_append_entry_high_lat_us); DECLARE_bool(raft_trace_append_entry_latency); @@ -628,8 +629,10 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf *data) { } else { CHECK(entry->type != ENTRY_TYPE_CONFIGURATION) << "log_index=" << log_index; } - em->set_data_len(entry->data.length()); - data->append(entry->data); + if (!is_witness() || FLAGS_raft_enable_witness_to_leader) { + em->set_data_len(entry->data.length()); + data->append(entry->data); + } entry->Release(); return 0; } @@ -765,6 +768,10 @@ void Replicator::_wait_more_entries() { } void Replicator::_install_snapshot() { + NodeImpl *node_impl = _options.node; + if (node_impl->is_witness()) { + return _block(butil::gettimeofday_us(), EBUSY); + } if (_reader) { // follower's readonly mode change may cause two install_snapshot // one possible case is: @@ -1527,12 +1534,13 @@ int ReplicatorGroup::find_the_next_candidate( } const int64_t next_index = Replicator::get_next_index(iter->second.id); const int consecutive_error_times = Replicator::get_consecutive_error_times(iter->second.id); - if (consecutive_error_times == 0 && next_index > max_index) { + if (consecutive_error_times == 0 && next_index > max_index && !iter->first.is_witness()) { max_index = next_index; if (peer_id) { *peer_id = iter->first; } } + } if (max_index == 0) { return -1; diff --git a/src/braft/replicator.h b/src/braft/replicator.h index 7223900f..6ffd212a 100644 --- a/src/braft/replicator.h +++ b/src/braft/replicator.h @@ -216,6 +216,9 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator { } return true; } + bool is_witness() const { + return _options.peer_id.is_witness(); + } void _close_reader(); int64_t _last_rpc_send_timestamp() { return _options.replicator_status->last_rpc_send_timestamp.load(butil::memory_order_relaxed); diff --git a/src/braft/snapshot.cpp b/src/braft/snapshot.cpp index 8bf7d173..7a97132a 100644 --- a/src/braft/snapshot.cpp +++ b/src/braft/snapshot.cpp @@ -575,7 +575,7 @@ SnapshotWriter* LocalSnapshotStorage::create(bool from_empty) { } SnapshotCopier* LocalSnapshotStorage::start_to_copy_from(const std::string& uri) { - LocalSnapshotCopier* copier = new LocalSnapshotCopier(); + LocalSnapshotCopier* copier = new LocalSnapshotCopier(_copy_file); copier->_storage = this; copier->_filter_before_copy_remote = _filter_before_copy_remote; copier->_fs = _fs.get(); @@ -738,16 +738,19 @@ butil::Status LocalSnapshotStorage::gc_instance(const std::string& uri) const { // LocalSnapshotCopier LocalSnapshotCopier::LocalSnapshotCopier() - : _tid(INVALID_BTHREAD) + : LocalSnapshotCopier(true){} + +LocalSnapshotCopier::LocalSnapshotCopier(bool copy_file): + _tid(INVALID_BTHREAD) , _cancelled(false) , _filter_before_copy_remote(false) + , _copy_file(copy_file) , _fs(NULL) , _throttle(NULL) , _writer(NULL) , _storage(NULL) , _reader(NULL) - , _cur_session(NULL) -{} + , _cur_session(NULL){} LocalSnapshotCopier::~LocalSnapshotCopier() { CHECK(!_writer); @@ -769,6 +772,9 @@ void LocalSnapshotCopier::copy() { if (!ok()) { break; } + if (!_copy_file) { + break; + } std::vector files; _remote_snapshot.list_files(&files); for (size_t i = 0; i < files.size() && ok(); ++i) { diff --git a/src/braft/snapshot.h b/src/braft/snapshot.h index 448ff4cd..8d617d92 100644 --- a/src/braft/snapshot.h +++ b/src/braft/snapshot.h @@ -147,6 +147,7 @@ class LocalSnapshotCopier : public SnapshotCopier { friend class LocalSnapshotStorage; public: LocalSnapshotCopier(); + LocalSnapshotCopier(bool copy_file); ~LocalSnapshotCopier(); virtual void cancel(); virtual void join(); @@ -166,6 +167,7 @@ friend class LocalSnapshotStorage; bthread_t _tid; bool _cancelled; bool _filter_before_copy_remote; + bool _copy_file = true; FileSystemAdaptor* _fs; SnapshotThrottle* _throttle; LocalSnapshotWriter* _writer; @@ -204,6 +206,7 @@ friend class LocalSnapshotCopier; void set_server_addr(butil::EndPoint server_addr) { _addr = server_addr; } bool has_server_addr() { return _addr != butil::EndPoint(); } + void set_copy_file(bool copy_file) { _copy_file = copy_file; } private: SnapshotWriter* create(bool from_empty) WARN_UNUSED_RESULT; int destroy_snapshot(const std::string& path); @@ -217,6 +220,7 @@ friend class LocalSnapshotCopier; int64_t _last_snapshot_index; std::map _ref_map; butil::EndPoint _addr; + bool _copy_file = true; scoped_refptr _fs; scoped_refptr _snapshot_throttle; }; diff --git a/src/braft/snapshot_executor.cpp b/src/braft/snapshot_executor.cpp index 64403013..e33ece86 100644 --- a/src/braft/snapshot_executor.cpp +++ b/src/braft/snapshot_executor.cpp @@ -373,6 +373,9 @@ int SnapshotExecutor::init(const SnapshotExecutorOptions& options) { if (tmp != NULL && !tmp->has_server_addr()) { tmp->set_server_addr(options.addr); } + if (!options.copy_file) { + tmp->set_copy_file(false); + } SnapshotReader* reader = _snapshot_storage->open(); if (reader == NULL) { return 0; diff --git a/src/braft/snapshot_executor.h b/src/braft/snapshot_executor.h index 648cfa63..d5879505 100644 --- a/src/braft/snapshot_executor.h +++ b/src/braft/snapshot_executor.h @@ -44,6 +44,7 @@ struct SnapshotExecutorOptions { butil::EndPoint addr; bool filter_before_copy_remote; bool usercode_in_pthread; + bool copy_file = true; scoped_refptr file_system_adaptor; scoped_refptr snapshot_throttle; }; diff --git a/test/test_configuration.cpp b/test/test_configuration.cpp index aa8b17c9..9461128d 100644 --- a/test/test_configuration.cpp +++ b/test/test_configuration.cpp @@ -42,6 +42,18 @@ TEST_F(TestUsageSuits, PeerId) { LOG(INFO) << "id:" << id1.to_string(); LOG(INFO) << "id:" << id1; + ASSERT_EQ(0, id1.parse("1.1.1.1:1000:0:0")); + LOG(INFO) << "id:" << id1.to_string(); + LOG(INFO) << "id:" << id1; + ASSERT_FALSE(id1.is_witness()); + + ASSERT_EQ(0, id1.parse("1.1.1.1:1000:0:1")); + LOG(INFO) << "id:" << id1.to_string(); + LOG(INFO) << "id:" << id1; + ASSERT_TRUE(id1.is_witness()); + + ASSERT_EQ(-1, id1.parse("1.1.1.1:1000:0:2")); + ASSERT_EQ(0, id1.parse("1.1.1.1:1000")); LOG(INFO) << "id:" << id1.to_string(); LOG(INFO) << "id:" << id1; diff --git a/test/test_meta.cpp b/test/test_meta.cpp index 47433b74..903a645a 100644 --- a/test/test_meta.cpp +++ b/test/test_meta.cpp @@ -269,7 +269,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("2.2.2.2:2000:0", peer_bak.to_string()); + ASSERT_EQ("2.2.2.2:2000:0:0", peer_bak.to_string()); } { // _merged_impl already catch up data after Mixed first load @@ -278,7 +278,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = tmp->_merged_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("2.2.2.2:2000:0", peer_bak.to_string()); + ASSERT_EQ("2.2.2.2:2000:0:0", peer_bak.to_string()); } // test double write @@ -294,14 +294,14 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = tmp->_single_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); term_bak = 0; peer_bak.reset(); st = tmp->_merged_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); } delete storage; @@ -325,7 +325,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); } // test merged stable storage alone { @@ -340,7 +340,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_upgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("4.4.4.4:4000:4", peer_bak.to_string()); + ASSERT_EQ("4.4.4.4:4000:4:0", peer_bak.to_string()); } delete storage; } @@ -454,7 +454,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("2.2.2.2:2000:0", peer_bak.to_string()); + ASSERT_EQ("2.2.2.2:2000:0:0", peer_bak.to_string()); } { // _single_impl already catch up data after Mixed first load @@ -463,7 +463,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = tmp->_single_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("2.2.2.2:2000:0", peer_bak.to_string()); + ASSERT_EQ("2.2.2.2:2000:0:0", peer_bak.to_string()); } // test double write @@ -479,14 +479,14 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = tmp->_single_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); term_bak = 0; peer_bak.reset(); st = tmp->_merged_impl->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); } delete storage; @@ -510,7 +510,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("3.3.3.3:3000:3", peer_bak.to_string()); + ASSERT_EQ("3.3.3.3:3000:3:0", peer_bak.to_string()); } // test single stable storage alone { @@ -525,7 +525,7 @@ TEST_F(TestUsageSuits, mixed_stable_storage_downgrade) { st = storage->get_term_and_votedfor(&term_bak, &peer_bak, v_group_id); ASSERT_TRUE(st.ok()); ASSERT_EQ(term, term_bak); - ASSERT_EQ("4.4.4.4:4000:4", peer_bak.to_string()); + ASSERT_EQ("4.4.4.4:4000:4:0", peer_bak.to_string()); } delete storage; } diff --git a/test/test_node.cpp b/test/test_node.cpp index fc5f14bb..407e16af 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -23,6 +23,8 @@ extern bvar::Adder g_num_nodes; DECLARE_int32(raft_max_parallel_append_entries_rpc_num); DECLARE_bool(raft_enable_append_entries_cache); DECLARE_int32(raft_max_append_entries_cache_size); +DECLARE_bool(raft_enable_witness_to_leader); + } using braft::raft_mutex_t; @@ -39,7 +41,7 @@ class NodeTest : public testing::TestWithParam { void SetUp() { g_dont_print_apply_log = false; //logging::FLAGS_v = 90; - GFLAGS_NS::SetCommandLineOption("minloglevel", "1"); + // GFLAGS_NS::SetCommandLineOption("minloglevel", "1"); GFLAGS_NS::SetCommandLineOption("crash_on_fatal_log", "true"); if (GetParam() == std::string("NoReplication")) { braft::FLAGS_raft_max_parallel_append_entries_rpc_num = 1; @@ -412,6 +414,122 @@ TEST_P(NodeTest, LeaderFail) { cluster.stop_all(); } +TEST_P(NodeTest, LeaderFailWithWitness) { + std::vector peers; + for (int i = 0; i < 3; i++) { + braft::PeerId peer; + peer.addr.ip = butil::my_ip(); + peer.addr.port = 5006 + i; + peer.idx = 0; + if (i == 0) { + peer.role = braft::Role::WITNESS; + } + peers.push_back(peer); + } + + // start cluster + Cluster cluster("unittest", peers); + for (size_t i = 0; i < peers.size(); i++) { + ASSERT_EQ(0, cluster.start(peers[i].addr, false, 30, nullptr, peers[i].is_witness())); + } + + // elect leader + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + + // apply something + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // stop leader + butil::EndPoint old_leader = leader->node_id().peer_id.addr; + LOG(WARNING) << "stop leader " << leader->node_id(); + cluster.stop(leader->node_id().peer_id.addr); + + // apply something when follower + std::vector nodes; + cluster.followers(&nodes); + cond.reset(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "follower apply: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, -1); + // node 0 is witness; + nodes[1]->apply(task); + } + cond.wait(); + + // elect new leader + cluster.wait_leader(); + leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "elect new leader " << leader->node_id(); + + // apply something + cond.reset(10); + for (int i = 10; i < 20; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // old leader restart + ASSERT_EQ(0, cluster.start(old_leader)); + LOG(WARNING) << "restart old leader " << old_leader; + + // apply something + cond.reset(10); + for (int i = 20; i < 30; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // stop and clean old leader + LOG(WARNING) << "stop old leader " << old_leader; + cluster.stop(old_leader); + LOG(WARNING) << "clean old leader data " << old_leader; + cluster.clean(old_leader); + + sleep(2); + // restart old leader + ASSERT_EQ(0, cluster.start(old_leader)); + LOG(WARNING) << "restart old leader " << old_leader; + + cluster.ensure_same(); + + cluster.stop_all(); +} + TEST_P(NodeTest, JoinNode) { std::vector peers; braft::PeerId peer0; @@ -1807,6 +1925,84 @@ TEST_P(NodeTest, leader_transfer) { ASSERT_TRUE(cluster.ensure_same(5)); cluster.stop_all(); } +TEST_P(NodeTest, leader_witness_temporary_be_leader) { + FLAGS_raft_enable_witness_to_leader = true; + std::vector peers; + for (int i = 0; i < 3; i++) { + braft::PeerId peer; + peer.addr.ip = butil::my_ip(); + peer.addr.port = 5006 + i; + peer.idx = 0; + if (i == 0) { + peer.role = braft::Role::WITNESS; + } + peers.push_back(peer); + } + // start cluster + Cluster cluster("unittest", peers, 5000); + for (size_t i = 0; i < peers.size(); i++) { + ASSERT_EQ(0, cluster.start(peers[i].addr, false, 30, nullptr, peers[i].is_witness())); + } + + // elect leader + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + std::vector nodes; + cluster.followers(&nodes); + + // stop follower so witness would had more entry logs than follower + braft::Node* follower_node = nodes[1]; + braft::PeerId follower = follower_node->node_id().peer_id; + cluster.stop(follower.addr); + // apply something + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + + // stop leader + butil::EndPoint old_leader = leader->node_id().peer_id.addr; + LOG(WARNING) << "stop leader " << leader->node_id(); + cluster.stop(leader->node_id().peer_id.addr); + + // old follower restart + ASSERT_EQ(0, cluster.start(follower.addr)); + LOG(WARNING) << "restart old follower " << follower.addr; + + // elect leader + cluster.wait_leader(); + leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + // wait witness auto step_down and transfer leader. + while (true) { + if (leader->is_leader()) { + usleep(1000* 1000); + continue; + } + break; + } + cluster.wait_leader(); + leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + + cluster.start(old_leader); + LOG(WARNING) << "restart old leader " << old_leader; + cluster.ensure_same(); + + cluster.stop_all(); +} TEST_P(NodeTest, leader_transfer_before_log_is_compleleted) { std::vector peers; @@ -3381,6 +3577,7 @@ INSTANTIATE_TEST_CASE_P(NodeTestWithPipelineReplication, int main(int argc, char* argv[]) { ::testing::AddGlobalTestEnvironment(new TestEnvironment()); ::testing::InitGoogleTest(&argc, argv); + GFLAGS_NS::SetCommandLineOption("minloglevel", "1"); GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true); return RUN_ALL_TESTS(); } diff --git a/test/util.h b/test/util.h index e6984920..5cbcf241 100644 --- a/test/util.h +++ b/test/util.h @@ -17,6 +17,7 @@ #ifndef PUBLIC_RAFT_TEST_UTIL_H #define PUBLIC_RAFT_TEST_UTIL_H +#include #include "braft/node.h" #include "braft/enum.pb.h" #include "braft/errno.pb.h" @@ -25,20 +26,27 @@ using namespace braft; bool g_dont_print_apply_log = false; - +namespace braft { +DECLARE_bool(raft_enable_witness_to_leader); +} class MockFSM : public braft::StateMachine { public: - MockFSM(const butil::EndPoint& address_) + MockFSM(const butil::EndPoint& address_): + MockFSM(address_,false) { + } + MockFSM(const butil::EndPoint& address_, bool witness) : address(address_) , applied_index(0) , snapshot_index(0) , _on_start_following_times(0) , _on_stop_following_times(0) + , _witness(witness) , _leader_term(-1) , _on_leader_start_closure(NULL) { pthread_mutex_init(&mutex, NULL); } + virtual ~MockFSM() { pthread_mutex_destroy(&mutex); } @@ -50,6 +58,7 @@ class MockFSM : public braft::StateMachine { int64_t snapshot_index; int64_t _on_start_following_times; int64_t _on_stop_following_times; + bool _witness = false; volatile int64_t _leader_term; braft::Closure* _on_leader_start_closure; @@ -82,6 +91,10 @@ class MockFSM : public braft::StateMachine { virtual void on_apply(braft::Iterator& iter) { for (; iter.valid(); iter.next()) { + if (_witness && !FLAGS_raft_enable_witness_to_leader) { + LOG(INFO) << "addr " << address << " skip witness apply " << iter.index(); + continue; + } LOG_IF(INFO, !g_dont_print_apply_log) << "addr " << address << " apply " << iter.index() << " data_size " << iter.data().size(); @@ -233,7 +246,7 @@ class Cluster { int start(const butil::EndPoint& listen_addr, bool empty_peers = false, int snapshot_interval_s = 30, - braft::Closure* leader_start_closure = NULL) { + braft::Closure* leader_start_closure = NULL, bool witness = false) { if (_server_map[listen_addr] == NULL) { brpc::Server* server = new brpc::Server(); if (braft::add_service(server, listen_addr) != 0 @@ -246,13 +259,14 @@ class Cluster { } braft::NodeOptions options; + options.witness = witness; options.election_timeout_ms = _election_timeout_ms; options.max_clock_drift_ms = _max_clock_drift_ms; options.snapshot_interval_s = snapshot_interval_s; if (!empty_peers) { options.initial_conf = braft::Configuration(_peers); } - MockFSM* fsm = new MockFSM(listen_addr); + MockFSM* fsm = new MockFSM(listen_addr, witness); if (leader_start_closure) { fsm->set_on_leader_start_closure(leader_start_closure); } @@ -270,14 +284,14 @@ class Cluster { options.catchup_margin = 2; - braft::Node* node = new braft::Node(_name, braft::PeerId(listen_addr, 0)); + braft::Node* node = new braft::Node(_name, braft::PeerId(listen_addr, 0, witness)); int ret = node->init(options); if (ret != 0) { LOG(WARNING) << "init_node failed, server: " << listen_addr; delete node; return ret; } else { - LOG(INFO) << "init node " << listen_addr; + LOG(INFO) << "init node " << listen_addr << " witness " << witness;; } { @@ -424,11 +438,22 @@ class Cluster { LOG(INFO) << "_fsms.size()=" << _fsms.size(); int nround = 0; - MockFSM* first = _fsms[0]; + MockFSM* first = nullptr; + // get first normal fsm when raft_enable_witness_to_leader false + for (size_t i = 1; i < _fsms.size(); i++) { + if (_fsms[i]->_witness && !FLAGS_raft_enable_witness_to_leader) { + continue; + } + first = _fsms[i]; + break; + } CHECK: first->lock(); - for (size_t i = 1; i < _fsms.size(); i++) { + for (size_t i = 0; i < _fsms.size(); i++) { MockFSM* fsm = _fsms[i]; + if ((fsm->_witness && !FLAGS_raft_enable_witness_to_leader) || fsm->address == first->address) { + continue; + } fsm->lock(); if (first->logs.size() != fsm->logs.size()) {