Skip to content

Commit

Permalink
log_entry: replace peers pointer with object
Browse files Browse the repository at this point in the history
  • Loading branch information
ehds committed Apr 28, 2024
1 parent be6e5f3 commit 679a5b9
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 69 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ SET(CPACK_DEBIAN_PACKAGE_MAINTAINER "braft authors") #required

INCLUDE(CPack)

list(APPEND CMAKE_INCLUDE_PATH "/Users/ehds/Programs/protobuf-3.17.3/install/include")
list(APPEND CMAKE_LIBRARY_PATH "/Users/ehds/Programs/protobuf-3.17.3/install/lib")
#option(EXAMPLE_LINK_SO "Whether examples are linked dynamically" OFF)
option(BRPC_WITH_GLOG "With glog" OFF)
option(WITH_DEBUG_SYMBOLS "With debug symbols" ON)
Expand Down
2 changes: 2 additions & 0 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ struct PeerId {
bool is_empty() const {
return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0);
}

bool is_witness() const { return role == WITNESS; }

int parse(const std::string& str) {
reset();
char ip_str[64];
Expand Down
14 changes: 11 additions & 3 deletions src/braft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ struct ConfigurationEntry {
ConfigurationEntry() {}
ConfigurationEntry(const LogEntry& entry) {
id = entry.id;
conf = *(entry.peers);
if (entry.old_peers) {
old_conf = *(entry.old_peers);
conf = (entry.peers);
if (!entry.old_peers.empty()) {
old_conf = entry.old_peers;
}
}

ConfigurationEntry(LogEntry&& entry) {
id = entry.id;
conf = std::move(entry.peers);
if (!entry.old_peers.empty()) {
old_conf = std::move(entry.old_peers);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ void FSMCaller::do_committed(int64_t committed_index) {
for (; iter_impl.is_good();) {
if (iter_impl.entry()->type != ENTRY_TYPE_DATA) {
if (iter_impl.entry()->type == ENTRY_TYPE_CONFIGURATION) {
if (iter_impl.entry()->old_peers == NULL) {
if (!iter_impl.entry()->old_peers.empty()) {
// Joint stage is not supposed to be noticeable by end
// users.
_fsm->on_configuration_committed(
Configuration(*iter_impl.entry()->peers),
Configuration(iter_impl.entry()->peers),
iter_impl.entry()->id.index);
}
}
Expand Down
20 changes: 8 additions & 12 deletions src/braft/log_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ namespace braft {

bvar::Adder<int64_t> g_nentries("raft_num_log_entries");

LogEntry::LogEntry() : type(ENTRY_TYPE_UNKNOWN), peers(NULL), old_peers(NULL) {
LogEntry::LogEntry() : type(ENTRY_TYPE_UNKNOWN) {
g_nentries << 1;
}

LogEntry::~LogEntry() {
g_nentries << -1;
delete peers;
delete old_peers;
}

butil::Status parse_configuration_meta(const butil::IOBuf& data,
Expand All @@ -41,14 +39,12 @@ butil::Status parse_configuration_meta(const butil::IOBuf& data,
status.set_error(EINVAL, "Fail to parse ConfigurationPBMeta");
return status;
}
entry->peers = new std::vector<PeerId>;
for (int j = 0; j < meta.peers_size(); ++j) {
entry->peers->push_back(PeerId(meta.peers(j)));
entry->peers.push_back(PeerId(meta.peers(j)));
}
if (meta.old_peers_size() > 0) {
entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < meta.old_peers_size(); i++) {
entry->old_peers->push_back(PeerId(meta.old_peers(i)));
entry->old_peers.push_back(PeerId(meta.old_peers(i)));
}
}
return status;
Expand All @@ -58,12 +54,12 @@ butil::Status serialize_configuration_meta(const LogEntry* entry,
butil::IOBuf& data) {
butil::Status status;
ConfigurationPBMeta meta;
for (size_t i = 0; i < entry->peers->size(); ++i) {
meta.add_peers((*(entry->peers))[i].to_string());
for (size_t i = 0; i < entry->peers.size(); ++i) {
meta.add_peers((entry->peers)[i].to_string());
}
if (entry->old_peers) {
for (size_t i = 0; i < entry->old_peers->size(); ++i) {
meta.add_old_peers((*(entry->old_peers))[i].to_string());
if (!entry->old_peers.empty()) {
for (size_t i = 0; i < entry->old_peers.size(); ++i) {
meta.add_old_peers((entry->old_peers)[i].to_string());
}
}
butil::IOBufAsZeroCopyOutputStream wrapper(&data);
Expand Down
4 changes: 2 additions & 2 deletions src/braft/log_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ struct LogEntry : public butil::RefCountedThreadSafe<LogEntry> {
public:
EntryType type; // log type
LogId id;
std::vector<PeerId>* peers; // peers
std::vector<PeerId>* old_peers; // peers
std::vector<PeerId> peers; // peers
std::vector<PeerId> old_peers; // peers
butil::IOBuf data;

LogEntry();
Expand Down
15 changes: 5 additions & 10 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,7 @@ int NodeImpl::bootstrap(const BootstrapOptions& options) {
entry->AddRef();
entry->id.term = _current_term;
entry->type = ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<PeerId>;
options.group_conf.list_peers(entry->peers);
options.group_conf.list_peers(&(entry->peers));

std::vector<LogEntry*> entries;
entries.push_back(entry);
Expand Down Expand Up @@ -2141,11 +2140,9 @@ void NodeImpl::unsafe_apply_configuration(const Configuration& new_conf,
entry->AddRef();
entry->id.term = _current_term;
entry->type = ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<PeerId>;
new_conf.list_peers(entry->peers);
new_conf.list_peers(&(entry->peers));
if (old_conf) {
entry->old_peers = new std::vector<PeerId>;
old_conf->list_peers(entry->old_peers);
old_conf->list_peers(&(entry->old_peers));
}
ConfigurationChangeDone* configuration_change_done =
new ConfigurationChangeDone(this, _current_term, leader_start,
Expand Down Expand Up @@ -2593,15 +2590,13 @@ void NodeImpl::handle_append_entries_request(
log_entry->id.index = index;
log_entry->type = (EntryType)entry.type();
if (entry.peers_size() > 0) {
log_entry->peers = new std::vector<PeerId>;
for (int i = 0; i < entry.peers_size(); i++) {
log_entry->peers->push_back(entry.peers(i));
log_entry->peers.push_back(entry.peers(i));
}
CHECK_EQ(log_entry->type, ENTRY_TYPE_CONFIGURATION);
if (entry.old_peers_size() > 0) {
log_entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < entry.old_peers_size(); i++) {
log_entry->old_peers->push_back(entry.old_peers(i));
log_entry->old_peers.push_back(entry.old_peers(i));
}
}
} else {
Expand Down
21 changes: 13 additions & 8 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <butil/time.h> // butil::gettimeofday_us
#include <butil/unique_ptr.h> // std::unique_ptr
#include <gflags/gflags.h> // DEFINE_int32
#include <algorithm>
#include <random>

#include "braft/ballot_box.h" // BallotBox
#include "braft/log_entry.h" // LogEntry
Expand Down Expand Up @@ -627,14 +629,14 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf* data) {
}
em->set_term(entry->id.term);
em->set_type(entry->type);
if (entry->peers != NULL) {
CHECK(!entry->peers->empty()) << "log_index=" << log_index;
for (size_t i = 0; i < entry->peers->size(); ++i) {
em->add_peers((*entry->peers)[i].to_string());
if (!entry->peers.empty()) {
CHECK(!entry->peers.empty()) << "log_index=" << log_index;
for (size_t i = 0; i < entry->peers.size(); ++i) {
em->add_peers((entry->peers)[i].to_string());
}
if (entry->old_peers != NULL) {
for (size_t i = 0; i < entry->old_peers->size(); ++i) {
em->add_old_peers((*entry->old_peers)[i].to_string());
if (!entry->old_peers.empty()) {
for (size_t i = 0; i < entry->old_peers.size(); ++i) {
em->add_old_peers((entry->old_peers)[i].to_string());
}
}
} else {
Expand Down Expand Up @@ -1572,7 +1574,10 @@ int ReplicatorGroup::find_the_next_candidate(PeerId* peer_id,
iter != _rmap.end(); ++iter) {
peers.emplace_back(peerInfo(iter->first, iter->second));
}
std::random_shuffle(peers.begin(), peers.end());

std::random_device rd;
std::mt19937 g(rd());
std::shuffle(peers.begin(), peers.end(), g);
for (auto iter = peers.begin(); iter != peers.end(); ++iter) {
if (!conf.contains(iter->peer_id)) {
continue;
Expand Down
1 change: 1 addition & 0 deletions test/test_ballot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <optional>

#include "braft/ballot.h"
#include "bthread/countdown_event.h"
#include "common.h"

class BallotTest : public testing::Test {};
Expand Down
46 changes: 21 additions & 25 deletions test/test_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,10 +815,9 @@ TEST_F(LogStorageTest, configuration) {
entry.type = braft::ENTRY_TYPE_CONFIGURATION;
entry.id.term = 1;
entry.id.index = 2;
entry.peers = new std::vector<braft::PeerId>;
entry.peers->push_back(braft::PeerId("1.1.1.1:1000:0"));
entry.peers->push_back(braft::PeerId("1.1.1.1:2000:0"));
entry.peers->push_back(braft::PeerId("1.1.1.1:3000:0"));
entry.peers.push_back(braft::PeerId("1.1.1.1:1000:0"));
entry.peers.push_back(braft::PeerId("1.1.1.1:2000:0"));
entry.peers.push_back(braft::PeerId("1.1.1.1:3000:0"));
storage->append_entry(&entry);
}

Expand Down Expand Up @@ -851,9 +850,8 @@ TEST_F(LogStorageTest, configuration) {
entry.type = braft::ENTRY_TYPE_CONFIGURATION;
entry.id.term = 1;
entry.id.index = index;
entry.peers = new std::vector<braft::PeerId>;
entry.peers->push_back(braft::PeerId("1.1.1.1:1000:0"));
entry.peers->push_back(braft::PeerId("1.1.1.1:2000:0"));
entry.peers.push_back(braft::PeerId("1.1.1.1:1000:0"));
entry.peers.push_back(braft::PeerId("1.1.1.1:2000:0"));
storage->append_entry(&entry);
}

Expand Down Expand Up @@ -1109,14 +1107,12 @@ TEST_F(LogStorageTest, joint_configuration) {
for (int i = 1; i <= 20; ++i) {
scoped_refptr<braft::LogEntry> entry = new braft::LogEntry;
entry->id = braft::LogId(i, 1);
entry->peers = new std::vector<braft::PeerId>;
entry->type = braft::ENTRY_TYPE_CONFIGURATION;
for (int j = 0; j < 3; ++j) {
entry->peers->push_back("127.0.0.1:" + std::to_string(i + j));
entry->peers.push_back("127.0.0.1:" + std::to_string(i + j));
}
entry->old_peers = new std::vector<braft::PeerId>;
for (int j = 1; j <= 3; ++j) {
entry->old_peers->push_back("127.0.0.1:" + std::to_string(i + j));
entry->old_peers.push_back("127.0.0.1:" + std::to_string(i + j));
}
ASSERT_EQ(0, log_storage->append_entry(entry));
}
Expand All @@ -1125,8 +1121,8 @@ TEST_F(LogStorageTest, joint_configuration) {
braft::LogEntry* entry = log_storage->get_entry(i);
ASSERT_TRUE(entry != NULL);
ASSERT_EQ(entry->type, braft::ENTRY_TYPE_CONFIGURATION);
ASSERT_TRUE(entry->peers != NULL);
ASSERT_TRUE(entry->old_peers != NULL);
ASSERT_TRUE(!entry->peers.empty());
ASSERT_TRUE(!entry->old_peers.empty());
braft::Configuration conf;
for (int j = 0; j < 3; ++j) {
conf.add_peer("127.0.0.1:" + std::to_string(i + j));
Expand All @@ -1135,10 +1131,10 @@ TEST_F(LogStorageTest, joint_configuration) {
for (int j = 1; j <= 3; ++j) {
old_conf.add_peer("127.0.0.1:" + std::to_string(i + j));
}
ASSERT_TRUE(conf.equals(*entry->peers))
<< conf << " xxxx " << braft::Configuration(*entry->peers);
ASSERT_TRUE(conf.equals(entry->peers))
<< conf << " xxxx " << braft::Configuration(entry->peers);

ASSERT_TRUE(old_conf.equals(*entry->old_peers));
ASSERT_TRUE(old_conf.equals(entry->old_peers));
entry->Release();
}

Expand All @@ -1149,8 +1145,8 @@ TEST_F(LogStorageTest, joint_configuration) {
braft::LogEntry* entry = log_storage->get_entry(i);
ASSERT_TRUE(entry != NULL);
ASSERT_EQ(entry->type, braft::ENTRY_TYPE_CONFIGURATION);
ASSERT_TRUE(entry->peers != NULL);
ASSERT_TRUE(entry->old_peers != NULL);
ASSERT_TRUE(!entry->peers.empty());
ASSERT_TRUE(!entry->old_peers.empty());
braft::Configuration conf;
for (int j = 0; j < 3; ++j) {
conf.add_peer("127.0.0.1:" + std::to_string(i + j));
Expand All @@ -1159,19 +1155,19 @@ TEST_F(LogStorageTest, joint_configuration) {
for (int j = 1; j <= 3; ++j) {
old_conf.add_peer("127.0.0.1:" + std::to_string(i + j));
}
ASSERT_TRUE(conf.equals(*entry->peers))
<< conf << " xxxx " << braft::Configuration(*entry->peers);
ASSERT_TRUE(conf.equals(entry->peers))
<< conf << " xxxx " << braft::Configuration(entry->peers);

ASSERT_TRUE(old_conf.equals(*entry->old_peers));
ASSERT_TRUE(old_conf.equals(entry->old_peers));
entry->Release();
}

for (int i = 1; i <= 20; ++i) {
braft::LogEntry* entry = log_storage->get_entry(i);
ASSERT_TRUE(entry != NULL);
ASSERT_EQ(entry->type, braft::ENTRY_TYPE_CONFIGURATION);
ASSERT_TRUE(entry->peers != NULL);
ASSERT_TRUE(entry->old_peers != NULL);
ASSERT_TRUE(!entry->peers.empty());
ASSERT_TRUE(!entry->old_peers.empty());
ASSERT_EQ(1, entry->id.term);
braft::Configuration conf;
for (int j = 0; j < 3; ++j) {
Expand All @@ -1181,8 +1177,8 @@ TEST_F(LogStorageTest, joint_configuration) {
for (int j = 1; j <= 3; ++j) {
old_conf.add_peer("127.0.0.1:" + std::to_string(i + j));
}
ASSERT_TRUE(conf.equals(*entry->peers));
ASSERT_TRUE(old_conf.equals(*entry->old_peers));
ASSERT_TRUE(conf.equals(entry->peers));
ASSERT_TRUE(old_conf.equals(entry->old_peers));
entry->Release();
}

Expand Down
2 changes: 1 addition & 1 deletion test/test_log_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ TEST_F(TestUsageSuits, LogEntry) {
peers.push_back(braft::PeerId("1.2.3.4:2000"));
peers.push_back(braft::PeerId("1.2.3.4:3000"));
entry->type = braft::ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<braft::PeerId>(peers);
entry->peers = std::move(peers);

entry->AddRef();
entry->Release();
Expand Down
11 changes: 5 additions & 6 deletions test/test_log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,9 @@ TEST_F(LogManagerTest, configuration_changes) {
braft::LogEntry* entry = new braft::LogEntry;
entry->AddRef();
entry->type = braft::ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<braft::PeerId>(peers);
entry->peers = peers;
if (peers.size() > 1u) {
entry->old_peers = new std::vector<braft::PeerId>(
peers.begin() + 1, peers.end());
entry->old_peers = {peers.begin() + 1, peers.end()};
}
entry->AddRef();
entry->id = braft::LogId(i + 1, 1);
Expand Down Expand Up @@ -198,7 +197,7 @@ TEST_F(LogManagerTest, truncate_suffix_also_revert_configuration) {
braft::LogEntry* entry = new braft::LogEntry;
entry->AddRef();
entry->type = braft::ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<braft::PeerId>(peers);
entry->peers = std::move(peers);
entry->AddRef();
entry->id = braft::LogId(i + 1, 1);
saved_entries[i] = entry;
Expand Down Expand Up @@ -379,7 +378,7 @@ TEST_F(LogManagerTest, pipelined_append) {
entry->AddRef();
entry->type = braft::ENTRY_TYPE_CONFIGURATION;
entry->id = braft::LogId(N, 1);
entry->peers = new std::vector<braft::PeerId>(peers);
entry->peers = std::move(peers);
entries0.push_back(entry);
}
SyncClosure sc0;
Expand Down Expand Up @@ -410,7 +409,7 @@ TEST_F(LogManagerTest, pipelined_append) {
entry->AddRef();
entry->type = braft::ENTRY_TYPE_CONFIGURATION;
entry->id = braft::LogId(N, 2);
entry->peers = new std::vector<braft::PeerId>(peers);
entry->peers = std::move(peers);
entries1.push_back(entry);
}
SyncClosure sc1;
Expand Down

0 comments on commit 679a5b9

Please sign in to comment.