Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support set truncate_index manually in on_snapshot_save #457

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ void FSMCaller::do_snapshot_save(SaveSnapshotClosure* done) {
iter != conf_entry.old_conf.end(); ++iter) {
*meta.add_old_peers() = iter->to_string();
}
done->set_snapshot_index(last_applied_index);

SnapshotWriter* writer = done->start(meta);
if (!writer) {
Expand Down
11 changes: 11 additions & 0 deletions src/braft/fsm_caller.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,17 @@ class SaveSnapshotClosure : public Closure {
public:
// TODO: comments
virtual SnapshotWriter* start(const SnapshotMeta& meta) = 0;

void set_snapshot_index(int64_t index){
_index = index;
}

int64_t snapshot_index() const {
return _index;
}

private:
int64_t _index;
};

class LoadSnapshotClosure : public Closure {
Expand Down
13 changes: 13 additions & 0 deletions src/braft/snapshot_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class SaveSnapshotDone : public SaveSnapshotClosure {
private:
static void* continue_run(void* arg);

void set_meta();

SnapshotExecutor* _se;
SnapshotWriter* _writer;
Closure* _done; // user done
Expand Down Expand Up @@ -308,6 +310,9 @@ SnapshotWriter* SaveSnapshotDone::start(const SnapshotMeta& meta) {
void* SaveSnapshotDone::continue_run(void* arg) {
SaveSnapshotDone* self = (SaveSnapshotDone*)arg;
std::unique_ptr<SaveSnapshotDone> self_guard(self);
if (self->status().ok()) {
self->set_meta();
}
// Must call on_snapshot_save_done to clear _saving_snapshot
int ret = self->_se->on_snapshot_save_done(
self->status(), self->_meta, self->_writer);
Expand All @@ -324,6 +329,14 @@ void* SaveSnapshotDone::continue_run(void* arg) {
return NULL;
}

void SaveSnapshotDone::set_meta(){
const int64_t last_include_index = snapshot_index();
_meta.set_last_included_index(last_include_index);
const int64_t last_include_term = _se->_log_manager->get_term(last_include_index);
CHECK(last_include_term > 0) << "last_included_index: " << last_include_index << " is out of range";
_meta.set_last_included_term(last_include_term);
}

void SaveSnapshotDone::Run() {
// Avoid blocking FSMCaller
// This continuation of snapshot saving is likely running inplace where the
Expand Down
82 changes: 80 additions & 2 deletions test/test_fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,12 @@ class DummySnapshoWriter : public braft::SnapshotWriter {
class MockSaveSnapshotClosure : public braft::SaveSnapshotClosure {
public:
MockSaveSnapshotClosure(braft::SnapshotWriter* writer,
braft::SnapshotMeta *expected_meta)
braft::SnapshotMeta *expected_meta,
braft::LogManager* lm)
: _start_times(0)
, _writer(writer)
, _expected_meta(expected_meta)
, _lm(lm)
{
}
~MockSaveSnapshotClosure() {}
Expand All @@ -222,10 +224,18 @@ class MockSaveSnapshotClosure : public braft::SaveSnapshotClosure {
++_start_times;
return _writer;
}

void set_meta(){
const int64_t last_include_index = snapshot_index();
_expected_meta->set_last_included_index(last_include_index);
const int64_t last_include_term = _lm->get_term(last_include_index);
_expected_meta->set_last_included_term(last_include_term);
}
private:
int _start_times;
braft::SnapshotWriter* _writer;
braft::SnapshotMeta* _expected_meta;
braft::LogManager* _lm;
};

class MockLoadSnapshotClosure : public braft::LoadSnapshotClosure {
Expand Down Expand Up @@ -253,13 +263,13 @@ TEST_F(FSMCallerTest, snapshot) {
snapshot_meta.set_last_included_term(0);
DummySnapshotReader dummy_reader(&snapshot_meta);
DummySnapshoWriter dummy_writer;
MockSaveSnapshotClosure save_snapshot_done(&dummy_writer, &snapshot_meta);
system("rm -rf ./data");
scoped_ptr<braft::ConfigurationManager> cm(
new braft::ConfigurationManager);
scoped_ptr<braft::SegmentLogStorage> storage(
new braft::SegmentLogStorage("./data"));
scoped_ptr<braft::LogManager> lm(new braft::LogManager());
MockSaveSnapshotClosure save_snapshot_done(&dummy_writer, &snapshot_meta, lm.get());
braft::LogManagerOptions log_opt;
log_opt.log_storage = storage.get();
log_opt.configuration_manager = cm.get();
Expand All @@ -286,3 +296,71 @@ TEST_F(FSMCallerTest, snapshot) {
ASSERT_EQ(1, load_snapshot_done._start_times);
}

TEST_F(FSMCallerTest, manually_set_snapshot_index) {
braft::SnapshotMeta snapshot_meta;
int64_t term_1_first_include_index = 1;
int64_t term_1_last_include_index = 10;
int64_t term_2_last_include_index = 20;
// in term 1
int64_t first_snapshot_truncate_index = (term_1_last_include_index - term_1_first_include_index) / 2;
// in term 2
int64_t second_snapshot_truncate_index = term_1_last_include_index+ (term_2_last_include_index - term_1_last_include_index) / 2;
int64_t frist_term = 1;
int64_t second_term = 2;

DummySnapshotReader dummy_reader(&snapshot_meta);
DummySnapshoWriter dummy_writer;
system("rm -rf ./data");
scoped_ptr<braft::ConfigurationManager> cm(
new braft::ConfigurationManager);
scoped_ptr<braft::SegmentLogStorage> storage(
new braft::SegmentLogStorage("./data"));
scoped_ptr<braft::LogManager> lm(new braft::LogManager());
MockSaveSnapshotClosure save_snapshot_done(&dummy_writer, &snapshot_meta, lm.get());
braft::LogManagerOptions log_opt;
log_opt.log_storage = storage.get();
log_opt.configuration_manager = cm.get();
ASSERT_EQ(0, lm->init(log_opt));
for (int i = term_1_first_include_index; i <= term_1_last_include_index; i++){
braft::LogEntry* entry = new braft::LogEntry;
entry->AddRef();
butil::StringPiece data = "test";
entry->data.append(data.data(), data.size());
entry->type = braft::ENTRY_TYPE_DATA;
entry->id = braft::LogId(i, frist_term);
SyncClosure sc;
std::vector<braft::LogEntry*> entries;
entries.push_back(entry);
lm->append_entries(&entries, &sc);
sc.join();
}
for (int i = term_1_last_include_index + 1; i <= term_2_last_include_index; i++){
braft::LogEntry* entry = new braft::LogEntry;
entry->AddRef();
butil::StringPiece data = "test";
entry->data.append(data.data(), data.size());
entry->type = braft::ENTRY_TYPE_DATA;
entry->id = braft::LogId(i, second_term);
SyncClosure sc;
std::vector<braft::LogEntry*> entries;
entries.push_back(entry);
lm->append_entries(&entries, &sc);
sc.join();
}
// LogManager
//
// Log: |___1-10______| |___11-20____|
// term1 term2

save_snapshot_done.set_snapshot_index(first_snapshot_truncate_index);
save_snapshot_done.set_meta();
ASSERT_EQ(snapshot_meta.last_included_index(), first_snapshot_truncate_index);
ASSERT_EQ(snapshot_meta.last_included_term(), frist_term);

save_snapshot_done.set_snapshot_index(second_snapshot_truncate_index);
save_snapshot_done.set_meta();
ASSERT_EQ(snapshot_meta.last_included_index(), second_snapshot_truncate_index);
ASSERT_EQ(snapshot_meta.last_included_term(), second_term);

}

35 changes: 35 additions & 0 deletions test/test_log_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,3 +681,38 @@ TEST_F(LogManagerTest, set_snapshot_and_get_log_term) {
ASSERT_EQ(1L, lm->get_term(N - 1));
LOG(INFO) << "Last_index=" << lm->last_log_index();
}

TEST_F(LogManagerTest, manually_set_snapshot_index) {
system("rm -rf ./data");
int64_t first_include_index = 1;
int64_t last_include_index = 20;
int64_t first_snapshot_truncate_index = 10;
int64_t second_snapshot_truncate_index = 15;
int64_t term = 1;
scoped_ptr<braft::ConfigurationManager> cm(
new braft::ConfigurationManager);
scoped_ptr<braft::SegmentLogStorage> storage(
new braft::SegmentLogStorage("./data"));
scoped_ptr<braft::LogManager> lm(new braft::LogManager());
braft::LogManagerOptions opt;
opt.log_storage = storage.get();
opt.configuration_manager = cm.get();
ASSERT_EQ(0, lm->init(opt));
for (int i = first_include_index; i <= last_include_index; ++i) {
append_entry(lm.get(), "test", i, term);
}
braft::SnapshotMeta first_snapshot_meta;
first_snapshot_meta.set_last_included_index(first_snapshot_truncate_index);
first_snapshot_meta.set_last_included_term(term);
//LogManager will truncate nothing in frist snapshot
lm->set_snapshot(&first_snapshot_meta);
ASSERT_EQ(first_include_index, lm->first_log_index());


braft::SnapshotMeta second_snapshot_meta;
second_snapshot_meta.set_last_included_index(second_snapshot_truncate_index);
second_snapshot_meta.set_last_included_term(term);
// Logmanager will truncate first snapshot.
lm->set_snapshot(&second_snapshot_meta);
ASSERT_EQ(first_snapshot_truncate_index + 1, lm->first_log_index());
}