Skip to content

Commit

Permalink
peer: support ipv6 and uds endpoint. (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
ehds authored Apr 29, 2024
1 parent f2cda9e commit 068008d
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 21 deletions.
47 changes: 36 additions & 11 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

#include <map>
#include <ostream>
#include <regex>
#include <set>
#include <string>
#include <vector>

namespace braft {

typedef std::string GroupId;
Expand Down Expand Up @@ -78,22 +78,47 @@ struct PeerId {

int parse(const std::string& str) {
reset();
char ip_str[64];
int value = REPLICA;
if (2 > sscanf(str.c_str(), "%[^:]%*[:]%d%*[:]%d%*[:]%d", ip_str,
&addr.port, &idx, &value)) {
reset();
// peer_id format:
// endpoint:idx:role
// ^
// |- host:port
// |- ip:port
// |- [ipv6]:port
// |- unix:path/to/sock
// clang-format off
std::regex peerid_reg("((([^:]+)|(\\[.*\\])):[^:]+)(:(\\d)?)?(:(\\d+)?)?");
// ^ ^ ^ ^
// | | | |
// unix,host,ipv4 | idx(6)(opt) |
// ipv6 role(8)(opt)
// clang-format on
std::cmatch m;
auto ret = std::regex_match(str.c_str(), m, peerid_reg);
if (!ret || m.size() != 9) {
return -1;
}
role = (Role)value;
if (role > WITNESS) {

// Using str2endpoint to check endpoint is valid.
std::string enpoint_str = m[1];
if (butil::str2endpoint(enpoint_str.c_str(), &addr) != 0) {
reset();
return -1;
}
if (0 != butil::str2ip(ip_str, &addr.ip)) {
reset();
return -1;

if (m[6].matched) {
// Check idx.
idx = std::stoi(m[6]);
}

// Check role if it existed.
if (m[8].matched) {
role = static_cast<Role>(std::stoi(m[8]));
if (role > WITNESS) {
reset();
return -1;
}
}

return 0;
}

Expand Down
4 changes: 3 additions & 1 deletion src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "braft/snapshot_executor.h"
#include "braft/sync_point.h"
#include "braft/util.h"
#include "butil/endpoint.h"
#include "butil/logging.h"

namespace braft {
Expand Down Expand Up @@ -506,7 +507,8 @@ int NodeImpl::init(const NodeOptions& options) {
_options = options;

// check _server_id
if (butil::IP_ANY == _server_id.addr.ip) {
if (!butil::is_endpoint_extended(_server_id.addr) &&
butil::IP_ANY == _server_id.addr.ip) {
LOG(ERROR) << "Group " << _group_id
<< " Node can't started from IP_ANY";
return -1;
Expand Down
2 changes: 1 addition & 1 deletion src/braft/node_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ NodeManager::~NodeManager() {}

bool NodeManager::server_exists(butil::EndPoint addr) {
BAIDU_SCOPED_LOCK(_mutex);
if (addr.ip != butil::IP_ANY) {
if (!is_endpoint_extended(addr) && addr.ip != butil::IP_ANY) {
butil::EndPoint any_addr(butil::IP_ANY, addr.port);
if (_addr_set.find(any_addr) != _addr_set.end()) {
return true;
Expand Down
27 changes: 27 additions & 0 deletions test/test_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,33 @@ TEST_F(TestUsageSuits, PeerId) {

PeerId id3("1.2.3.4:1000:0");
LOG(INFO) << "id:" << id3;

// UDS format
PeerId id4;
id4.parse("unix:/path/to/sock:1:1");
LOG(INFO) << "id:" << id4;
ASSERT_EQ(id4.idx, 1);
ASSERT_EQ(id4.role, 1);

PeerId id5;
ASSERT_EQ(id5.parse("invalid:1:1"), -1);

// ipv6 format
PeerId id6;
id6.parse("[::1]:1");
ASSERT_EQ(id6.idx, 0);
ASSERT_EQ(id6.role, 0);
LOG(INFO) << "id:" << id6;

PeerId id7;
id7.parse("[::1]:1:1:1");
ASSERT_EQ(id7.idx, 1);
ASSERT_EQ(id7.role, 1);
LOG(INFO) << "id:" << id7;

PeerId id8;
ASSERT_EQ(id8.parse("[:::1:1"), -1);
ASSERT_EQ(id8.parse("::]:1:1"), -1);
}

TEST_F(TestUsageSuits, Configuration) {
Expand Down
32 changes: 32 additions & 0 deletions test/test_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sys/types.h>

#include <algorithm>
#include <string>
#include <vector>

#include "../test/util.h"
Expand Down Expand Up @@ -61,6 +62,7 @@ class NodeTest : public testing::TestWithParam<const char*> {
}
LOG(INFO) << "Start unitests: " << GetParam();
::system("rm -rf data");
::system("mkdir data");
ASSERT_EQ(0, braft::g_num_nodes.get_value());
}
void TearDown() {
Expand Down Expand Up @@ -114,6 +116,36 @@ TEST_P(NodeTest, Server) {
server2.Start("0.0.0.0:5007", NULL);
}

TEST_P(NodeTest, UDSNode) {
braft::PeerId peer0("unix:data/0.sock");
braft::PeerId peer1("unix:data/1.sock");
braft::PeerId peer2("unix:data/2.sock");
std::vector<braft::PeerId> peers = {peer0, peer1, peer2};
Cluster cluster("unittest", peers);

for (int i = 0; i < 3; i++) {
cluster.start(peers[i].addr);
}
cluster.wait_leader();
LOG(INFO) << "leader:" << cluster.leader()->leader_id().to_string();
}

TEST_P(NodeTest, IPV6Node) {
std::vector<braft::PeerId> peers;
braft::PeerId peer0("[::1]:5006");
braft::PeerId peer1("[::1]:5007");
braft::PeerId peer2("[::1]:5008");
peers = {peer0, peer1, peer2};

Cluster cluster("unittest", peers);
for (int i = 0; i < 3; i++) {
cluster.start(peers[i].addr);
}

cluster.wait_leader();
LOG(INFO) << "leader:" << cluster.leader()->leader_id().to_string();
}

TEST_P(NodeTest, SingleNode) {
brpc::Server server;
int ret = braft::add_service(&server, 5006);
Expand Down
24 changes: 16 additions & 8 deletions test/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
#define PUBLIC_RAFT_TEST_UTIL_H

#include <gflags/gflags.h>
#include <algorithm>

#include "_deps/brpc/src/src/butil/endpoint.h"
#include "brpc/server.h"
#include "butil/time.h"
#include "braft/enum.pb.h"
#include "braft/errno.pb.h"
Expand Down Expand Up @@ -252,9 +255,11 @@ class Cluster {
int snapshot_interval_s = 30,
braft::Closure* leader_start_closure = NULL, bool witness = false) {
if (_server_map[listen_addr] == NULL) {
brpc::ServerOptions server_options;
server_options.server_info_name = butil::endpoint2str(listen_addr).c_str();
brpc::Server* server = new brpc::Server();
if (braft::add_service(server, listen_addr) != 0
|| server->Start(listen_addr, NULL) != 0) {
|| server->Start(listen_addr, &server_options) != 0) {
LOG(ERROR) << "Fail to start raft service";
delete server;
return -1;
Expand All @@ -276,12 +281,15 @@ class Cluster {
}
options.fsm = fsm;
options.node_owns_fsm = true;
std::string endpoint_str = butil::endpoint2str(listen_addr).c_str();
butil::string_printf(&options.log_uri, "local://./data/%s/log",
butil::endpoint2str(listen_addr).c_str());
butil::string_printf(&options.raft_meta_uri, "local://./data/%s/raft_meta",
butil::endpoint2str(listen_addr).c_str());
butil::string_printf(&options.snapshot_uri, "local://./data/%s/snapshot",
butil::endpoint2str(listen_addr).c_str());
endpoint_str.c_str());
butil::string_printf(&options.raft_meta_uri,
"local://./data/%s/raft_meta",
endpoint_str.c_str());
butil::string_printf(&options.snapshot_uri,
"local://./data/%s/snapshot",
endpoint_str.c_str());

options.snapshot_throttle = &_throttle;

Expand Down Expand Up @@ -388,7 +396,7 @@ class Cluster {

// return true if there is a leader, false when reach timeout.
void wait_leader(int64_t timeout_ms = 100 * 1000 /*100 seconds*/) {
int64_t deadline = butil::timespec_to_microseconds(
int64_t deadline = butil::timespec_to_milliseconds(
butil::milliseconds_from_now(timeout_ms));

while (butil::gettimeofday_ms() < deadline) {
Expand Down Expand Up @@ -521,7 +529,7 @@ class Cluster {
braft::Node* node = NULL;
std::vector<braft::Node*> new_nodes;
for (size_t i = 0; i < _nodes.size(); i++) {
if (addr.port == _nodes[i]->node_id().peer_id.addr.port) {
if (addr == _nodes[i]->node_id().peer_id.addr) {
node = _nodes[i];
} else {
new_nodes.push_back(_nodes[i]);
Expand Down

0 comments on commit 068008d

Please sign in to comment.