Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log_entry: replace peers pointer with object #8

Merged
merged 1 commit into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/braft/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ struct PeerId {
bool is_empty() const {
return (addr.ip == butil::IP_ANY && addr.port == 0 && idx == 0);
}

bool is_witness() const { return role == WITNESS; }

int parse(const std::string& str) {
reset();
char ip_str[64];
Expand Down
8 changes: 4 additions & 4 deletions src/braft/configuration_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

namespace braft {

int ConfigurationManager::add(const ConfigurationEntry& entry) {
int ConfigurationManager::add(ConfigurationEntry&& entry) {
if (!_configurations.empty()) {
if (_configurations.back().id.index >= entry.id.index) {
CHECK(false) << "Did you forget to call truncate_suffix before "
" the last log index goes back";
return -1;
}
}
_configurations.push_back(entry);
_configurations.push_back(std::move(entry));
return 0;
}

Expand All @@ -44,9 +44,9 @@ void ConfigurationManager::truncate_suffix(const int64_t last_index_kept) {
}
}

void ConfigurationManager::set_snapshot(const ConfigurationEntry& entry) {
void ConfigurationManager::set_snapshot(ConfigurationEntry&& entry) {
CHECK_GE(entry.id, _snapshot.id);
_snapshot = entry;
_snapshot = std::move(entry);
}

void ConfigurationManager::get(int64_t last_included_index,
Expand Down
18 changes: 13 additions & 5 deletions src/braft/configuration_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ struct ConfigurationEntry {
ConfigurationEntry() {}
ConfigurationEntry(const LogEntry& entry) {
id = entry.id;
conf = *(entry.peers);
if (entry.old_peers) {
old_conf = *(entry.old_peers);
conf = (entry.peers);
if (!entry.old_peers.empty()) {
old_conf = entry.old_peers;
}
}

ConfigurationEntry(LogEntry&& entry) {
id = entry.id;
conf = std::move(entry.peers);
if (!entry.old_peers.empty()) {
old_conf = std::move(entry.old_peers);
}
}

Expand All @@ -55,15 +63,15 @@ class ConfigurationManager {
~ConfigurationManager() {}

// add new configuration at index
int add(const ConfigurationEntry& entry);
int add(ConfigurationEntry&& entry);

// [1, first_index_kept) are being discarded
void truncate_prefix(int64_t first_index_kept);

// (last_index_kept, infinity) are being discarded
void truncate_suffix(int64_t last_index_kept);

void set_snapshot(const ConfigurationEntry& snapshot);
void set_snapshot(ConfigurationEntry&& snapshot);

void get(int64_t last_included_index, ConfigurationEntry* entry);

Expand Down
4 changes: 2 additions & 2 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,11 @@ void FSMCaller::do_committed(int64_t committed_index) {
for (; iter_impl.is_good();) {
if (iter_impl.entry()->type != ENTRY_TYPE_DATA) {
if (iter_impl.entry()->type == ENTRY_TYPE_CONFIGURATION) {
if (iter_impl.entry()->old_peers == NULL) {
if (!iter_impl.entry()->old_peers.empty()) {
// Joint stage is not supposed to be noticeable by end
// users.
_fsm->on_configuration_committed(
Configuration(*iter_impl.entry()->peers),
Configuration(iter_impl.entry()->peers),
iter_impl.entry()->id.index);
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/braft/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ int Segment::load(ConfigurationManager* configuration_manager) {
entry->id.term = header.term;
butil::Status status = parse_configuration_meta(data, entry);
if (status.ok()) {
ConfigurationEntry conf_entry(*entry);
configuration_manager->add(conf_entry);
configuration_manager->add({std::move(*entry)});
} else {
LOG(ERROR) << "fail to parse configuration meta, path: "
<< _path << " entry_off " << entry_off;
Expand Down
26 changes: 9 additions & 17 deletions src/braft/log_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,9 @@ namespace braft {

bvar::Adder<int64_t> g_nentries("raft_num_log_entries");

LogEntry::LogEntry() : type(ENTRY_TYPE_UNKNOWN), peers(NULL), old_peers(NULL) {
g_nentries << 1;
}
LogEntry::LogEntry() : type(ENTRY_TYPE_UNKNOWN) { g_nentries << 1; }

LogEntry::~LogEntry() {
g_nentries << -1;
delete peers;
delete old_peers;
}
LogEntry::~LogEntry() { g_nentries << -1; }

butil::Status parse_configuration_meta(const butil::IOBuf& data,
LogEntry* entry) {
Expand All @@ -41,14 +35,12 @@ butil::Status parse_configuration_meta(const butil::IOBuf& data,
status.set_error(EINVAL, "Fail to parse ConfigurationPBMeta");
return status;
}
entry->peers = new std::vector<PeerId>;
for (int j = 0; j < meta.peers_size(); ++j) {
entry->peers->push_back(PeerId(meta.peers(j)));
entry->peers.push_back(PeerId(meta.peers(j)));
}
if (meta.old_peers_size() > 0) {
entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < meta.old_peers_size(); i++) {
entry->old_peers->push_back(PeerId(meta.old_peers(i)));
entry->old_peers.push_back(PeerId(meta.old_peers(i)));
}
}
return status;
Expand All @@ -58,12 +50,12 @@ butil::Status serialize_configuration_meta(const LogEntry* entry,
butil::IOBuf& data) {
butil::Status status;
ConfigurationPBMeta meta;
for (size_t i = 0; i < entry->peers->size(); ++i) {
meta.add_peers((*(entry->peers))[i].to_string());
for (size_t i = 0; i < entry->peers.size(); ++i) {
meta.add_peers((entry->peers)[i].to_string());
}
if (entry->old_peers) {
for (size_t i = 0; i < entry->old_peers->size(); ++i) {
meta.add_old_peers((*(entry->old_peers))[i].to_string());
if (!entry->old_peers.empty()) {
for (size_t i = 0; i < entry->old_peers.size(); ++i) {
meta.add_old_peers((entry->old_peers)[i].to_string());
}
}
butil::IOBufAsZeroCopyOutputStream wrapper(&data);
Expand Down
4 changes: 2 additions & 2 deletions src/braft/log_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ struct LogEntry : public butil::RefCountedThreadSafe<LogEntry> {
public:
EntryType type; // log type
LogId id;
std::vector<PeerId>* peers; // peers
std::vector<PeerId>* old_peers; // peers
std::vector<PeerId> peers; // peers
std::vector<PeerId> old_peers; // peers
butil::IOBuf data;

LogEntry();
Expand Down
5 changes: 2 additions & 3 deletions src/braft/log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,7 @@ void LogManager::append_entries(std::vector<LogEntry*>* entries,
// Add ref for disk_thread
(*entries)[i]->AddRef();
if ((*entries)[i]->type == ENTRY_TYPE_CONFIGURATION) {
ConfigurationEntry conf_entry(*((*entries)[i]));
_config_manager->add(conf_entry);
_config_manager->add({*((*entries)[i])});
}
}

Expand Down Expand Up @@ -631,7 +630,7 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
entry.id = LogId(meta->last_included_index(), meta->last_included_term());
entry.conf = conf;
entry.old_conf = old_conf;
_config_manager->set_snapshot(entry);
_config_manager->set_snapshot(std::move(entry));
int64_t term = unsafe_get_term(meta->last_included_index());

const LogId last_but_one_snapshot_id = _last_snapshot_id;
Expand Down
15 changes: 5 additions & 10 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,7 @@ int NodeImpl::bootstrap(const BootstrapOptions& options) {
entry->AddRef();
entry->id.term = _current_term;
entry->type = ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<PeerId>;
options.group_conf.list_peers(entry->peers);
options.group_conf.list_peers(&(entry->peers));

std::vector<LogEntry*> entries;
entries.push_back(entry);
Expand Down Expand Up @@ -2141,11 +2140,9 @@ void NodeImpl::unsafe_apply_configuration(const Configuration& new_conf,
entry->AddRef();
entry->id.term = _current_term;
entry->type = ENTRY_TYPE_CONFIGURATION;
entry->peers = new std::vector<PeerId>;
new_conf.list_peers(entry->peers);
new_conf.list_peers(&(entry->peers));
if (old_conf) {
entry->old_peers = new std::vector<PeerId>;
old_conf->list_peers(entry->old_peers);
old_conf->list_peers(&(entry->old_peers));
}
ConfigurationChangeDone* configuration_change_done =
new ConfigurationChangeDone(this, _current_term, leader_start,
Expand Down Expand Up @@ -2593,15 +2590,13 @@ void NodeImpl::handle_append_entries_request(
log_entry->id.index = index;
log_entry->type = (EntryType)entry.type();
if (entry.peers_size() > 0) {
log_entry->peers = new std::vector<PeerId>;
for (int i = 0; i < entry.peers_size(); i++) {
log_entry->peers->push_back(entry.peers(i));
log_entry->peers.push_back(entry.peers(i));
}
CHECK_EQ(log_entry->type, ENTRY_TYPE_CONFIGURATION);
if (entry.old_peers_size() > 0) {
log_entry->old_peers = new std::vector<PeerId>;
for (int i = 0; i < entry.old_peers_size(); i++) {
log_entry->old_peers->push_back(entry.old_peers(i));
log_entry->old_peers.push_back(entry.old_peers(i));
}
}
} else {
Expand Down
22 changes: 14 additions & 8 deletions src/braft/replicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include <butil/unique_ptr.h> // std::unique_ptr
#include <gflags/gflags.h> // DEFINE_int32

#include <algorithm>
#include <random>

#include "braft/ballot_box.h" // BallotBox
#include "braft/log_entry.h" // LogEntry
#include "braft/node.h" // NodeImpl
Expand Down Expand Up @@ -627,14 +630,14 @@ int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf* data) {
}
em->set_term(entry->id.term);
em->set_type(entry->type);
if (entry->peers != NULL) {
CHECK(!entry->peers->empty()) << "log_index=" << log_index;
for (size_t i = 0; i < entry->peers->size(); ++i) {
em->add_peers((*entry->peers)[i].to_string());
if (!entry->peers.empty()) {
CHECK(!entry->peers.empty()) << "log_index=" << log_index;
for (size_t i = 0; i < entry->peers.size(); ++i) {
em->add_peers((entry->peers)[i].to_string());
}
if (entry->old_peers != NULL) {
for (size_t i = 0; i < entry->old_peers->size(); ++i) {
em->add_old_peers((*entry->old_peers)[i].to_string());
if (!entry->old_peers.empty()) {
for (size_t i = 0; i < entry->old_peers.size(); ++i) {
em->add_old_peers((entry->old_peers)[i].to_string());
}
}
} else {
Expand Down Expand Up @@ -1572,7 +1575,10 @@ int ReplicatorGroup::find_the_next_candidate(PeerId* peer_id,
iter != _rmap.end(); ++iter) {
peers.emplace_back(peerInfo(iter->first, iter->second));
}
std::random_shuffle(peers.begin(), peers.end());

std::random_device rd;
std::mt19937 g(rd());
std::shuffle(peers.begin(), peers.end(), g);
for (auto iter = peers.begin(); iter != peers.end(); ++iter) {
if (!conf.contains(iter->peer_id)) {
continue;
Expand Down
1 change: 1 addition & 0 deletions test/test_ballot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <optional>

#include "braft/ballot.h"
#include "bthread/countdown_event.h"
#include "common.h"

class BallotTest : public testing::Test {};
Expand Down
7 changes: 4 additions & 3 deletions test/test_configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ TEST_F(TestUsageSuits, ConfigurationManager) {
peers.emplace_back("1.1.1.1:1000:2");
entry.conf = peers;
entry.id = {8, 1};
conf_manager.add(entry);
conf_manager.add(ConfigurationEntry{entry});

ASSERT_EQ(LogId(8, 1), conf_manager.last_configuration().id);

conf_manager.get(10, &it1);
Expand All @@ -146,11 +147,11 @@ TEST_F(TestUsageSuits, ConfigurationManager) {

entry.id = LogId(10, 1);
entry.conf = peers;
conf_manager.add(entry);
conf_manager.add(ConfigurationEntry{entry});
peers.emplace_back("1.1.1.1:1000:3");
entry.id = LogId(20, 1);
entry.conf = peers;
conf_manager.add(entry);
conf_manager.add(ConfigurationEntry{entry});
ASSERT_EQ(LogId(20, 1), conf_manager.last_configuration().id);

conf_manager.truncate_prefix(15);
Expand Down
Loading
Loading