Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
refactor: move primary's learning preparation of durable states into …
Browse files Browse the repository at this point in the history
…another function
  • Loading branch information
neverchanje committed Feb 28, 2021
1 parent 81ddcf1 commit 3c92c29
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 92 deletions.
5 changes: 5 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
void notify_learn_completion();
error_code apply_learned_state_from_private_log(learn_state &state);

void prepare_durable_learn_state(decree learn_start_decree,
const learn_request &request,
learn_response &response,
remote_learner_state &learn_state);

// Gets the position where this round of the learning process should begin.
// This method is called on primary-side.
// TODO(wutao1): mark it const
Expand Down
181 changes: 89 additions & 92 deletions src/replica/replica_learn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,98 +493,7 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
// learn delta state or checkpoint
// in this case, the state on the PS is still incomplete
else {
if (learn_start_decree > _app->last_durable_decree()) {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, "
"because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64
")",
name(),
request.signature,
request.learner.to_string(),
learn_start_decree,
_app->last_durable_decree());
_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state);
response.type = learn_type::LT_LOG;
} else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, "
"because mutation_log::get_learn_state() returns true",
name(),
request.signature,
request.learner.to_string());
response.type = learn_type::LT_LOG;
} else if (learn_start_decree < request.last_committed_decree_in_app + 1) {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, "
"because learn_start_decree steps back for duplication",
name(),
request.signature,
request.learner.to_string());
response.type = learn_type::LT_LOG;
} else {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, "
"beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64
"), "
"and mutation_log::get_learn_state() returns false",
name(),
request.signature,
request.learner.to_string(),
learn_start_decree,
_app->last_durable_decree());
response.type = learn_type::LT_APP;
response.state = learn_state();
}

if (response.type == learn_type::LT_LOG) {
response.base_local_dir = _private_log->dir();
if (response.state.files.size() > 0) {
auto &last_file = response.state.files.back();
if (last_file == learner_state.last_learn_log_file) {
ddebug(
"%s: on_learn[%016" PRIx64
"]: learner = %s, learn the same file %s repeatedly, hint to switch file",
name(),
request.signature,
request.learner.to_string(),
last_file.c_str());
_private_log->hint_switch_file();
} else {
learner_state.last_learn_log_file = last_file;
}
}
// it is safe to commit to last_committed_decree() now
response.state.to_decree_included = last_committed_decree();
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn private logs succeed, "
"learned_meta_size = %u, learned_file_count = %u, "
"to_decree_included = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
response.state.meta.length(),
static_cast<uint32_t>(response.state.files.size()),
response.state.to_decree_included);
} else {
::dsn::error_code err = _app->get_checkpoint(
learn_start_decree, request.app_specific_learn_request, response.state);

if (err != ERR_OK) {
response.err = ERR_GET_LEARN_STATE_FAILED;
derror("%s: on_learn[%016" PRIx64
"]: learner = %s, get app checkpoint failed, error = %s",
name(),
request.signature,
request.learner.to_string(),
err.to_string());
} else {
response.base_local_dir = _app->data_dir();
ddebug(
"%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, "
"learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
response.state.meta.length(),
static_cast<uint32_t>(response.state.files.size()),
response.state.to_decree_included);
}
}
prepare_durable_learn_state(learn_start_decree, request, response, learner_state);
}

for (auto &file : response.state.files) {
Expand All @@ -599,6 +508,94 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request)
}
}

void replica::prepare_durable_learn_state(decree learn_start_decree,
const learn_request &request,
learn_response &response,
remote_learner_state &learner_state)
{
if (learn_start_decree > _app->last_durable_decree()) {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, "
"because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 ")",
name(),
request.signature,
request.learner.to_string(),
learn_start_decree,
_app->last_durable_decree());
_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state);
response.type = learn_type::LT_LOG;
} else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, "
"because mutation_log::get_learn_state() returns true",
name(),
request.signature,
request.learner.to_string());
response.type = learn_type::LT_LOG;
} else {
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, "
"beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64 "), "
"and mutation_log::get_learn_state() returns false",
name(),
request.signature,
request.learner.to_string(),
learn_start_decree,
_app->last_durable_decree());
response.type = learn_type::LT_APP;
response.state = learn_state();
}

if (response.type == learn_type::LT_LOG) {
response.base_local_dir = _private_log->dir();
if (response.state.files.size() > 0) {
auto &last_file = response.state.files.back();
if (last_file == learner_state.last_learn_log_file) {
ddebug("%s: on_learn[%016" PRIx64
"]: learner = %s, learn the same file %s repeatedly, hint to switch file",
name(),
request.signature,
request.learner.to_string(),
last_file.c_str());
_private_log->hint_switch_file();
} else {
learner_state.last_learn_log_file = last_file;
}
}
// it is safe to commit to last_committed_decree() now
response.state.to_decree_included = last_committed_decree();
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn private logs succeed, "
"learned_meta_size = %u, learned_file_count = %u, "
"to_decree_included = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
response.state.meta.length(),
static_cast<uint32_t>(response.state.files.size()),
response.state.to_decree_included);
} else {
::dsn::error_code err = _app->get_checkpoint(
learn_start_decree, request.app_specific_learn_request, response.state);

if (err != ERR_OK) {
response.err = ERR_GET_LEARN_STATE_FAILED;
derror("%s: on_learn[%016" PRIx64
"]: learner = %s, get app checkpoint failed, error = %s",
name(),
request.signature,
request.learner.to_string(),
err.to_string());
} else {
response.base_local_dir = _app->data_dir();
ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, "
"learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64,
name(),
request.signature,
request.learner.to_string(),
response.state.meta.length(),
static_cast<uint32_t>(response.state.files.size()),
response.state.to_decree_included);
}
}
}

void replica::on_learn_reply(error_code err, learn_request &&req, learn_response &&resp)
{
_checker.only_one_thread_access();
Expand Down

0 comments on commit 3c92c29

Please sign in to comment.