Skip to content

Commit

Permalink
Merge pull request #436 from lhsoft/recover_log_from_corrupt
Browse files Browse the repository at this point in the history
recover log from data corrupt
  • Loading branch information
PFZheng authored Feb 21, 2024
2 parents ae887a0 + 7fd7b80 commit b37c610
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/braft/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ BRPC_VALIDATE_GFLAG(raft_max_segment_size, brpc::PositiveInteger);
DEFINE_bool(raft_sync_segments, false, "call fsync when a segment is closed");
BRPC_VALIDATE_GFLAG(raft_sync_segments, ::brpc::PassValidate);

DEFINE_bool(raft_recover_log_from_corrupt, false, "recover log by truncating corrupted log");
BRPC_VALIDATE_GFLAG(raft_recover_log_from_corrupt, ::brpc::PassValidate);

static bvar::LatencyRecorder g_open_segment_latency("raft_open_segment");
static bvar::LatencyRecorder g_segment_append_entry_latency("raft_segment_append_entry");
static bvar::LatencyRecorder g_sync_segment_latency("raft_sync_segment");
Expand Down Expand Up @@ -284,6 +287,7 @@ int Segment::load(ConfigurationManager* configuration_manager) {
int64_t file_size = st_buf.st_size;
int64_t entry_off = 0;
int64_t actual_last_index = _first_index - 1;
bool is_entry_corrupted = false;
for (int64_t i = _first_index; entry_off < file_size; i++) {
EntryHeader header;
const int rc = _load_entry(entry_off, &header, NULL, ENTRY_HEADER_SIZE);
Expand All @@ -292,6 +296,7 @@ int Segment::load(ConfigurationManager* configuration_manager) {
break;
}
if (rc < 0) {
is_entry_corrupted = true;
ret = rc;
break;
}
Expand Down Expand Up @@ -345,7 +350,12 @@ int Segment::load(ConfigurationManager* configuration_manager) {
}

if (ret != 0) {
return ret;
if (!_is_open || !FLAGS_raft_recover_log_from_corrupt || !is_entry_corrupted) {
return ret;
// Truncate the log to the last normal index
} else {
ret = 0;
}
}

if (_is_open) {
Expand Down
92 changes: 92 additions & 0 deletions test/test_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

namespace braft {
DECLARE_bool(raft_trace_append_entry_latency);
DECLARE_bool(raft_recover_log_from_corrupt);
}

class LogStorageTest : public testing::Test {
Expand Down Expand Up @@ -503,6 +504,18 @@ int truncate_uninterrupted(const char* filename, off_t length) {
return rc;
}

int append_corrupted_data(const char* filename) {
char header_buf[200];
memset(header_buf, 1, 200);
FILE* fp = std::fopen(filename, "a");
if (fp == NULL) {
return -1;
}
int ret = std::fputs(header_buf, fp);
std::fclose(fp);
return ret;
}

TEST_F(LogStorageTest, data_lost) {
::system("rm -rf data");
braft::LogStorage* storage = new braft::SegmentLogStorage("./data");
Expand Down Expand Up @@ -1281,3 +1294,82 @@ TEST_F(LogStorageTest, append_close_load_append_with_io_metric) {
delete storage;
delete configuration_manager;
}

TEST_F(LogStorageTest, data_corrupt) {
::system("rm -rf data");
braft::LogStorage* storage = new braft::SegmentLogStorage("./data");
braft::ConfigurationManager* configuration_manager = new braft::ConfigurationManager;
ASSERT_EQ(0, storage->init(configuration_manager));

// append entry
for (int i = 0; i < 100000; i++) {
std::vector<braft::LogEntry*> entries;
for (int j = 0; j < 5; j++) {
int64_t index = 5*i + j + 1;
braft::LogEntry* entry = new braft::LogEntry();
entry->type = braft::ENTRY_TYPE_DATA;
entry->id.term = 1;
entry->id.index = index;

char data_buf[128];
snprintf(data_buf, sizeof(data_buf), "hello, world: %ld", index);
entry->data.append(data_buf);
entries.push_back(entry);
}

ASSERT_EQ(5, storage->append_entries(entries, NULL));

for (size_t j = 0; j < entries.size(); j++) {
delete entries[j];
}
}

delete storage;
delete configuration_manager;

// reinit
storage = new braft::SegmentLogStorage("./data");
configuration_manager = new braft::ConfigurationManager;
ASSERT_EQ(0, storage->init(configuration_manager));

ASSERT_EQ(storage->first_log_index(), 1);
ASSERT_EQ(storage->last_log_index(), 100000*5);

delete storage;
delete configuration_manager;

// last segment data corrupt
butil::DirReaderPosix dir_reader1("./data");
ASSERT_TRUE(dir_reader1.IsValid());
while (dir_reader1.Next()) {
int64_t first_index = 0;
int match = sscanf(dir_reader1.name(), "log_inprogress_%020ld",
&first_index);
std::string path;
butil::string_appendf(&path, "./data/%s", dir_reader1.name());
if (match == 1) {
ASSERT_LE(0, append_corrupted_data(path.c_str()));
}
}


storage = new braft::SegmentLogStorage("./data");
configuration_manager = new braft::ConfigurationManager;
ASSERT_NE(0, storage->init(configuration_manager));

delete storage;
delete configuration_manager;

braft::FLAGS_raft_recover_log_from_corrupt = true;

storage = new braft::SegmentLogStorage("./data");
configuration_manager = new braft::ConfigurationManager;
ASSERT_EQ(0, storage->init(configuration_manager));

ASSERT_EQ(storage->first_log_index(), 1);
ASSERT_EQ(storage->last_log_index(), 100000*5);

delete storage;
delete configuration_manager;
}

0 comments on commit b37c610

Please sign in to comment.