From 068008d8e22e55858484b0b41dcfb90bf8aa3f92 Mon Sep 17 00:00:00 2001 From: Dongsheng He Date: Mon, 29 Apr 2024 17:27:05 +0800 Subject: [PATCH] peer: support ipv6 and uds endpoint. (#11) brpc endpoint: https://github.com/apache/brpc/blob/master/docs/cn/endpoint.md details: https://github.com/ehds/mbraft/pull/11 --- src/braft/configuration.h | 47 ++++++++++++++++++++++++++++--------- src/braft/node.cpp | 4 +++- src/braft/node_manager.cpp | 2 +- test/test_configuration.cpp | 27 +++++++++++++++++++++ test/test_node.cpp | 32 +++++++++++++++++++++++++ test/util.h | 24 ++++++++++++------- 6 files changed, 115 insertions(+), 21 deletions(-) diff --git a/src/braft/configuration.h b/src/braft/configuration.h index dcfb8ce..cc11399 100644 --- a/src/braft/configuration.h +++ b/src/braft/configuration.h @@ -25,10 +25,10 @@ #include #include +#include #include #include #include - namespace braft { typedef std::string GroupId; @@ -78,22 +78,47 @@ struct PeerId { int parse(const std::string& str) { reset(); - char ip_str[64]; - int value = REPLICA; - if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str, - &addr.port, &idx, &value)) { - reset(); + // peer_id format: + // endpoint:idx:role + // ^ + // |- host:port + // |- ip:port + // |- [ipv6]:port + // |- unix:path/to/sock + // clang-format off + std::regex peerid_reg("((([^:]+)|(\\[.*\\])):[^:]+)(:(\\d)?)?(:(\\d+)?)?"); + // ^ ^ ^ ^ + // | | | | + // unix,host,ipv4 | idx(6)(opt) | + // ipv6 role(8)(opt) + // clang-format on + std::cmatch m; + auto ret = std::regex_match(str.c_str(), m, peerid_reg); + if (!ret || m.size() != 9) { return -1; } - role = (Role)value; - if (role > WITNESS) { + + // Using str2endpoint to check endpoint is valid. + std::string enpoint_str = m[1]; + if (butil::str2endpoint(enpoint_str.c_str(), &addr) != 0) { reset(); return -1; } - if (0 != butil::str2ip(ip_str, &addr.ip)) { - reset(); - return -1; + + if (m[6].matched) { + // Check idx. + idx = std::stoi(m[6]); } + + // Check role if it existed. + if (m[8].matched) { + role = static_cast(std::stoi(m[8])); + if (role > WITNESS) { + reset(); + return -1; + } + } + return 0; } diff --git a/src/braft/node.cpp b/src/braft/node.cpp index 228661a..53a7f98 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -37,6 +37,7 @@ #include "braft/snapshot_executor.h" #include "braft/sync_point.h" #include "braft/util.h" +#include "butil/endpoint.h" #include "butil/logging.h" namespace braft { @@ -506,7 +507,8 @@ int NodeImpl::init(const NodeOptions& options) { _options = options; // check _server_id - if (butil::IP_ANY == _server_id.addr.ip) { + if (!butil::is_endpoint_extended(_server_id.addr) && + butil::IP_ANY == _server_id.addr.ip) { LOG(ERROR) << "Group " << _group_id << " Node can't started from IP_ANY"; return -1; diff --git a/src/braft/node_manager.cpp b/src/braft/node_manager.cpp index 9e193a3..1e802f1 100644 --- a/src/braft/node_manager.cpp +++ b/src/braft/node_manager.cpp @@ -29,7 +29,7 @@ NodeManager::~NodeManager() {} bool NodeManager::server_exists(butil::EndPoint addr) { BAIDU_SCOPED_LOCK(_mutex); - if (addr.ip != butil::IP_ANY) { + if (!is_endpoint_extended(addr) && addr.ip != butil::IP_ANY) { butil::EndPoint any_addr(butil::IP_ANY, addr.port); if (_addr_set.find(any_addr) != _addr_set.end()) { return true; diff --git a/test/test_configuration.cpp b/test/test_configuration.cpp index a8eee49..569eadf 100644 --- a/test/test_configuration.cpp +++ b/test/test_configuration.cpp @@ -69,6 +69,33 @@ TEST_F(TestUsageSuits, PeerId) { PeerId id3("1.2.3.4:1000:0"); LOG(INFO) << "id:" << id3; + + // UDS format + PeerId id4; + id4.parse("unix:/path/to/sock:1:1"); + LOG(INFO) << "id:" << id4; + ASSERT_EQ(id4.idx, 1); + ASSERT_EQ(id4.role, 1); + + PeerId id5; + ASSERT_EQ(id5.parse("invalid:1:1"), -1); + + // ipv6 format + PeerId id6; + id6.parse("[::1]:1"); + ASSERT_EQ(id6.idx, 0); + ASSERT_EQ(id6.role, 0); + LOG(INFO) << "id:" << id6; + + PeerId id7; + id7.parse("[::1]:1:1:1"); + ASSERT_EQ(id7.idx, 1); + ASSERT_EQ(id7.role, 1); + LOG(INFO) << "id:" << id7; + + PeerId id8; + ASSERT_EQ(id8.parse("[:::1:1"), -1); + ASSERT_EQ(id8.parse("::]:1:1"), -1); } TEST_F(TestUsageSuits, Configuration) { diff --git a/test/test_node.cpp b/test/test_node.cpp index 11668d4..40a2bcf 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -18,6 +18,7 @@ #include #include +#include #include #include "../test/util.h" @@ -61,6 +62,7 @@ class NodeTest : public testing::TestWithParam { } LOG(INFO) << "Start unitests: " << GetParam(); ::system("rm -rf data"); + ::system("mkdir data"); ASSERT_EQ(0, braft::g_num_nodes.get_value()); } void TearDown() { @@ -114,6 +116,36 @@ TEST_P(NodeTest, Server) { server2.Start("0.0.0.0:5007", NULL); } +TEST_P(NodeTest, UDSNode) { + braft::PeerId peer0("unix:data/0.sock"); + braft::PeerId peer1("unix:data/1.sock"); + braft::PeerId peer2("unix:data/2.sock"); + std::vector peers = {peer0, peer1, peer2}; + Cluster cluster("unittest", peers); + + for (int i = 0; i < 3; i++) { + cluster.start(peers[i].addr); + } + cluster.wait_leader(); + LOG(INFO) << "leader:" << cluster.leader()->leader_id().to_string(); +} + +TEST_P(NodeTest, IPV6Node) { + std::vector peers; + braft::PeerId peer0("[::1]:5006"); + braft::PeerId peer1("[::1]:5007"); + braft::PeerId peer2("[::1]:5008"); + peers = {peer0, peer1, peer2}; + + Cluster cluster("unittest", peers); + for (int i = 0; i < 3; i++) { + cluster.start(peers[i].addr); + } + + cluster.wait_leader(); + LOG(INFO) << "leader:" << cluster.leader()->leader_id().to_string(); +} + TEST_P(NodeTest, SingleNode) { brpc::Server server; int ret = braft::add_service(&server, 5006); diff --git a/test/util.h b/test/util.h index e468153..58e5c14 100644 --- a/test/util.h +++ b/test/util.h @@ -18,7 +18,10 @@ #define PUBLIC_RAFT_TEST_UTIL_H #include +#include +#include "_deps/brpc/src/src/butil/endpoint.h" +#include "brpc/server.h" #include "butil/time.h" #include "braft/enum.pb.h" #include "braft/errno.pb.h" @@ -252,9 +255,11 @@ class Cluster { int snapshot_interval_s = 30, braft::Closure* leader_start_closure = NULL, bool witness = false) { if (_server_map[listen_addr] == NULL) { + brpc::ServerOptions server_options; + server_options.server_info_name = butil::endpoint2str(listen_addr).c_str(); brpc::Server* server = new brpc::Server(); if (braft::add_service(server, listen_addr) != 0 - || server->Start(listen_addr, NULL) != 0) { + || server->Start(listen_addr, &server_options) != 0) { LOG(ERROR) << "Fail to start raft service"; delete server; return -1; @@ -276,12 +281,15 @@ class Cluster { } options.fsm = fsm; options.node_owns_fsm = true; + std::string endpoint_str = butil::endpoint2str(listen_addr).c_str(); butil::string_printf(&options.log_uri, "local://./data/%s/log", - butil::endpoint2str(listen_addr).c_str()); - butil::string_printf(&options.raft_meta_uri, "local://./data/%s/raft_meta", - butil::endpoint2str(listen_addr).c_str()); - butil::string_printf(&options.snapshot_uri, "local://./data/%s/snapshot", - butil::endpoint2str(listen_addr).c_str()); + endpoint_str.c_str()); + butil::string_printf(&options.raft_meta_uri, + "local://./data/%s/raft_meta", + endpoint_str.c_str()); + butil::string_printf(&options.snapshot_uri, + "local://./data/%s/snapshot", + endpoint_str.c_str()); options.snapshot_throttle = &_throttle; @@ -388,7 +396,7 @@ class Cluster { // return true if there is a leader, false when reach timeout. void wait_leader(int64_t timeout_ms = 100 * 1000 /*100 seconds*/) { - int64_t deadline = butil::timespec_to_microseconds( + int64_t deadline = butil::timespec_to_milliseconds( butil::milliseconds_from_now(timeout_ms)); while (butil::gettimeofday_ms() < deadline) { @@ -521,7 +529,7 @@ class Cluster { braft::Node* node = NULL; std::vector new_nodes; for (size_t i = 0; i < _nodes.size(); i++) { - if (addr.port == _nodes[i]->node_id().peer_id.addr.port) { + if (addr == _nodes[i]->node_id().peer_id.addr) { node = _nodes[i]; } else { new_nodes.push_back(_nodes[i]);