Skip to content

Commit

Permalink
log_entry: replace peers pointer with object (#8)
Browse files Browse the repository at this point in the history
details: #8
  • Loading branch information
ehds authored Apr 28, 2024
1 parent be6e5f3 commit e9d26a8
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 88 deletions.
2 changes: 2 additions & 0 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ struct PeerId {
bool is_empty() const {
return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0);
}

bool is_witness() const { return role == WITNESS; }

int parse(const std::string& str) {
reset();
char ip_str[64];
Expand Down
8 changes: 4 additions & 4 deletions src/braft/configuration_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

namespace braft {

int ConfigurationManager::add(const ConfigurationEntry& entry) {
int ConfigurationManager::add(ConfigurationEntry&& entry) {
if (!_configurations.empty()) {
if (_configurations.back().id.index >= entry.id.index) {
CHECK(false) << "Did you forget to call truncate_suffix before "
" the last log index goes back";
return -1;
}
}
_configurations.push_back(entry);
_configurations.push_back(std::move(entry));
return 0;
}

Expand All @@ -44,9 +44,9 @@ void ConfigurationManager::truncate_suffix(const int64_t last_index_kept) {
}
}

void ConfigurationManager::set_snapshot(const ConfigurationEntry& entry) {
void ConfigurationManager::set_snapshot(ConfigurationEntry&& entry) {
CHECK_GE(entry.id, _snapshot.id);
_snapshot = entry;
_snapshot = std::move(entry);
}

void ConfigurationManager::get(int64_t last_included_index,
Expand Down
18 changes: 13 additions & 5 deletions src/braft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ struct ConfigurationEntry {
ConfigurationEntry() {}
ConfigurationEntry(const LogEntry& entry) {
id = entry.id;
conf = *(entry.peers);
if (entry.old_peers) {
old_conf = *(entry.old_peers);
conf = (entry.peers);
if (!entry.old_peers.empty()) {
old_conf = entry.old_peers;
}
}

ConfigurationEntry(LogEntry&& entry) {
id = entry.id;
conf = std::move(entry.peers);
if (!entry.old_peers.empty()) {
old_conf = std::move(entry.old_peers);
}
}

Expand All @@ -55,15 +63,15 @@ class ConfigurationManager {
~ConfigurationManager() {}

// add new configuration at index
int add(const ConfigurationEntry& entry);
int add(ConfigurationEntry&& entry);

// [1, first_index_kept) are being discarded
void truncate_prefix(int64_t first_index_kept);

// (last_index_kept, infinity) are being discarded
void truncate_suffix(int64_t last_index_kept);

void set_snapshot(const ConfigurationEntry& snapshot);
void set_snapshot(ConfigurationEntry&& snapshot);

void get(int64_t last_included_index, ConfigurationEntry* entry);

Expand Down
4 changes: 2 additions & 2 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ void FSMCaller::do_committed(int64_t committed_index) {
for (; iter_impl.is_good();) {
if (iter_impl.entry()->type != ENTRY_TYPE_DATA) {
if (iter_impl.entry()->type == ENTRY_TYPE_CONFIGURATION) {
if (iter_impl.entry()->old_peers == NULL) {
if (!iter_impl.entry()->old_peers.empty()) {
// Joint stage is not supposed to be noticeable by end
// users.
_fsm->on_configuration_committed(
Configuration(*iter_impl.entry()->peers),
Configuration(iter_impl.entry()->peers),
iter_impl.entry()->id.index);
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/braft/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ int Segment::load(ConfigurationManager* configuration_manager) {
entry->id.term = header.term;
butil::Status status = parse_configuration_meta(data, entry);
if (status.ok()) {
ConfigurationEntry conf_entry(*entry);
configuration_manager->add(conf_entry);
configuration_manager->add({std::move(*entry)});
} else {
LOG(ERROR) << "fail to parse configuration meta, path: "
<< _path << " entry_off " << entry_off;
Expand Down
26 changes: 9 additions & 17 deletions src/braft/log_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,9 @@ namespace braft {

bvar::Adder<int64_t> g_nentries("raft_num_log_entries");

LogEntry::LogEntry() : type(ENTRY_TYPE_UNKNOWN), peers(NULL), old_peers(NULL) {
g_nentries << 1;
}
LogEntry::LogEntry() : type(ENTRY_TYPE_UNKNOWN) { g_nentries << 1; }

LogEntry::~LogEntry() {
g_nentries << -1;
delete peers;
delete old_peers;
}
LogEntry::~LogEntry() { g_nentries << -1; }

butil::Status parse_configuration_meta(const butil::IOBuf& data,
LogEntry* entry) {
Expand All @@ -41,14 +35,12 @@ butil::Status parse_configuration_meta(const butil::IOBuf& data,
status.set_error(EINVAL, "Fail to parse ConfigurationPBMeta");
return status;
}
entry->peers = new std::vector<PeerId>;
for (int j = 0; j < meta.peers_size(); ++j) {
entry->peers->push_back(PeerId(meta.peers(j)));
entry->peers.push_back(PeerId(meta.peers(j)));
}
if (meta.old_peers_size() > 0) {
entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < meta.old_peers_size(); i++) {
entry->old_peers->push_back(PeerId(meta.old_peers(i)));
entry->old_peers.push_back(PeerId(meta.old_peers(i)));
}
}
return status;
Expand All @@ -58,12 +50,12 @@ butil::Status serialize_configuration_meta(const LogEntry* entry,
butil::IOBuf& data) {
butil::Status status;
ConfigurationPBMeta meta;
for (size_t i = 0; i < entry->peers->size(); ++i) {
meta.add_peers((*(entry->peers))[i].to_string());
for (size_t i = 0; i < entry->peers.size(); ++i) {
meta.add_peers((entry->peers)[i].to_string());
}
if (entry->old_peers) {
for (size_t i = 0; i < entry->old_peers->size(); ++i) {
meta.add_old_peers((*(entry->old_peers))[i].to_string());
if (!entry->old_peers.empty()) {
for (size_t i = 0; i < entry->old_peers.size(); ++i) {
meta.add_old_peers((entry->old_peers)[i].to_string());
}
}
butil::IOBufAsZeroCopyOutputStream wrapper(&data);
Expand Down
4 changes: 2 additions & 2 deletions src/braft/log_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ struct LogEntry : public butil::RefCountedThreadSafe<LogEntry> {
public:
EntryType type; // log type
LogId id;
std::vector<PeerId>* peers; // peers
std::vector<PeerId>* old_peers; // peers
std::vector<PeerId> peers; // peers
std::vector<PeerId> old_peers; // peers
butil::IOBuf data;

LogEntry();
Expand Down
5 changes: 2 additions & 3 deletions src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ void LogManager::append_entries(std::vector<LogEntry*>* entries,
// Add ref for disk_thread
(*entries)[i]->AddRef();
if ((*entries)[i]->type == ENTRY_TYPE_CONFIGURATION) {
ConfigurationEntry conf_entry(*((*entries)[i]));
_config_manager->add(conf_entry);
_config_manager->add({*((*entries)[i])});
}
}

Expand Down Expand Up @@ -631,7 +630,7 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
entry.id = LogId(meta->last_included_index(), meta->last_included_term());
entry.conf = conf;
entry.old_conf = old_conf;
_config_manager->set_snapshot(entry);
_config_manager->set_snapshot(std::move(entry));
int64_t term = unsafe_get_term(meta->last_included_index());

const LogId last_but_one_snapshot_id = _last_snapshot_id;
Expand Down
15 changes: 5 additions & 10 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,7 @@ int NodeImpl::bootstrap(const BootstrapOptions& options) {
entry->AddRef();
entry->id.term = _current_term;
entry->type = ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<PeerId>;
options.group_conf.list_peers(entry->peers);
options.group_conf.list_peers(&(entry->peers));

std::vector<LogEntry*> entries;
entries.push_back(entry);
Expand Down Expand Up @@ -2141,11 +2140,9 @@ void NodeImpl::unsafe_apply_configuration(const Configuration& new_conf,
entry->AddRef();
entry->id.term = _current_term;
entry->type = ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<PeerId>;
new_conf.list_peers(entry->peers);
new_conf.list_peers(&(entry->peers));
if (old_conf) {
entry->old_peers = new std::vector<PeerId>;
old_conf->list_peers(entry->old_peers);
old_conf->list_peers(&(entry->old_peers));
}
ConfigurationChangeDone* configuration_change_done =
new ConfigurationChangeDone(this, _current_term, leader_start,
Expand Down Expand Up @@ -2593,15 +2590,13 @@ void NodeImpl::handle_append_entries_request(
log_entry->id.index = index;
log_entry->type = (EntryType)entry.type();
if (entry.peers_size() > 0) {
log_entry->peers = new std::vector<PeerId>;
for (int i = 0; i < entry.peers_size(); i++) {
log_entry->peers->push_back(entry.peers(i));
log_entry->peers.push_back(entry.peers(i));
}
CHECK_EQ(log_entry->type, ENTRY_TYPE_CONFIGURATION);
if (entry.old_peers_size() > 0) {
log_entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < entry.old_peers_size(); i++) {
log_entry->old_peers->push_back(entry.old_peers(i));
log_entry->old_peers.push_back(entry.old_peers(i));
}
}
} else {
Expand Down
22 changes: 14 additions & 8 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include <butil/unique_ptr.h> // std::unique_ptr
#include <gflags/gflags.h> // DEFINE_int32

#include <algorithm>
#include <random>

#include "braft/ballot_box.h" // BallotBox
#include "braft/log_entry.h" // LogEntry
#include "braft/node.h" // NodeImpl
Expand Down Expand Up @@ -627,14 +630,14 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf* data) {
}
em->set_term(entry->id.term);
em->set_type(entry->type);
if (entry->peers != NULL) {
CHECK(!entry->peers->empty()) << "log_index=" << log_index;
for (size_t i = 0; i < entry->peers->size(); ++i) {
em->add_peers((*entry->peers)[i].to_string());
if (!entry->peers.empty()) {
CHECK(!entry->peers.empty()) << "log_index=" << log_index;
for (size_t i = 0; i < entry->peers.size(); ++i) {
em->add_peers((entry->peers)[i].to_string());
}
if (entry->old_peers != NULL) {
for (size_t i = 0; i < entry->old_peers->size(); ++i) {
em->add_old_peers((*entry->old_peers)[i].to_string());
if (!entry->old_peers.empty()) {
for (size_t i = 0; i < entry->old_peers.size(); ++i) {
em->add_old_peers((entry->old_peers)[i].to_string());
}
}
} else {
Expand Down Expand Up @@ -1572,7 +1575,10 @@ int ReplicatorGroup::find_the_next_candidate(PeerId* peer_id,
iter != _rmap.end(); ++iter) {
peers.emplace_back(peerInfo(iter->first, iter->second));
}
std::random_shuffle(peers.begin(), peers.end());

std::random_device rd;
std::mt19937 g(rd());
std::shuffle(peers.begin(), peers.end(), g);
for (auto iter = peers.begin(); iter != peers.end(); ++iter) {
if (!conf.contains(iter->peer_id)) {
continue;
Expand Down
1 change: 1 addition & 0 deletions test/test_ballot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <optional>

#include "braft/ballot.h"
#include "bthread/countdown_event.h"
#include "common.h"

class BallotTest : public testing::Test {};
Expand Down
7 changes: 4 additions & 3 deletions test/test_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ TEST_F(TestUsageSuits, ConfigurationManager) {
peers.emplace_back("1.1.1.1:1000:2");
entry.conf = peers;
entry.id = {8, 1};
conf_manager.add(entry);
conf_manager.add(ConfigurationEntry{entry});

ASSERT_EQ(LogId(8, 1), conf_manager.last_configuration().id);

conf_manager.get(10, &it1);
Expand All @@ -146,11 +147,11 @@ TEST_F(TestUsageSuits, ConfigurationManager) {

entry.id = LogId(10, 1);
entry.conf = peers;
conf_manager.add(entry);
conf_manager.add(ConfigurationEntry{entry});
peers.emplace_back("1.1.1.1:1000:3");
entry.id = LogId(20, 1);
entry.conf = peers;
conf_manager.add(entry);
conf_manager.add(ConfigurationEntry{entry});
ASSERT_EQ(LogId(20, 1), conf_manager.last_configuration().id);

conf_manager.truncate_prefix(15);
Expand Down
Loading

0 comments on commit e9d26a8

Please sign in to comment.