diff --git a/src/dist/replication/lib/replica.h b/src/dist/replication/lib/replica.h index cc6d46f985..bfa1ad1422 100644 --- a/src/dist/replication/lib/replica.h +++ b/src/dist/replication/lib/replica.h @@ -234,6 +234,11 @@ class replica : public serverlet, public ref_counter, public replica_ba void notify_learn_completion(); error_code apply_learned_state_from_private_log(learn_state &state); + void prepare_durable_learn_state(decree learn_start_decree, + const learn_request &request, + learn_response &response, + remote_learner_state &learn_state); + ///////////////////////////////////////////////////////////////// // failure handling void handle_local_failure(error_code error); diff --git a/src/dist/replication/lib/replica_learn.cpp b/src/dist/replication/lib/replica_learn.cpp index da6f659d78..4daa701efb 100644 --- a/src/dist/replication/lib/replica_learn.cpp +++ b/src/dist/replication/lib/replica_learn.cpp @@ -401,91 +401,7 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) // learn delta state or checkpoint // in this case, the state on the PS is still incomplete else { - if (learn_start_decree > _app->last_durable_decree()) { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " - "because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 - ")", - name(), - request.signature, - request.learner.to_string(), - learn_start_decree, - _app->last_durable_decree()); - _private_log->get_learn_state(get_gpid(), learn_start_decree, response.state); - response.type = learn_type::LT_LOG; - } else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " - "because mutation_log::get_learn_state() returns true", - name(), - request.signature, - request.learner.to_string()); - response.type = learn_type::LT_LOG; - } else { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, " - "beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64 - "), " - "and mutation_log::get_learn_state() returns false", - name(), - request.signature, - request.learner.to_string(), - learn_start_decree, - _app->last_durable_decree()); - response.type = learn_type::LT_APP; - response.state = learn_state(); - } - - if (response.type == learn_type::LT_LOG) { - response.base_local_dir = _private_log->dir(); - if (response.state.files.size() > 0) { - auto &last_file = response.state.files.back(); - if (last_file == learner_state.last_learn_log_file) { - ddebug( - "%s: on_learn[%016" PRIx64 - "]: learner = %s, learn the same file %s repeatedly, hint to switch file", - name(), - request.signature, - request.learner.to_string(), - last_file.c_str()); - _private_log->hint_switch_file(); - } else { - learner_state.last_learn_log_file = last_file; - } - } - // it is safe to commit to last_committed_decree() now - response.state.to_decree_included = last_committed_decree(); - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn private logs succeed, " - "learned_meta_size = %u, learned_file_count = %u, " - "to_decree_included = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - response.state.meta.length(), - static_cast(response.state.files.size()), - response.state.to_decree_included); - } else { - ::dsn::error_code err = _app->get_checkpoint( - learn_start_decree, request.app_specific_learn_request, response.state); - - if (err != ERR_OK) { - response.err = ERR_GET_LEARN_STATE_FAILED; - derror("%s: on_learn[%016" PRIx64 - "]: learner = %s, get app checkpoint failed, error = %s", - name(), - request.signature, - request.learner.to_string(), - err.to_string()); - } else { - response.base_local_dir = _app->data_dir(); - ddebug( - "%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, " - "learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - response.state.meta.length(), - static_cast(response.state.files.size()), - response.state.to_decree_included); - } - } + prepare_durable_learn_state(learn_start_decree, request, response, learner_state); } for (auto &file : response.state.files) { @@ -500,6 +416,94 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) } } +void replica::prepare_durable_learn_state(decree learn_start_decree, + const learn_request &request, + learn_response &response, + remote_learner_state &learner_state) +{ + if (learn_start_decree > _app->last_durable_decree()) { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " + "because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 ")", + name(), + request.signature, + request.learner.to_string(), + learn_start_decree, + _app->last_durable_decree()); + _private_log->get_learn_state(get_gpid(), learn_start_decree, response.state); + response.type = learn_type::LT_LOG; + } else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " + "because mutation_log::get_learn_state() returns true", + name(), + request.signature, + request.learner.to_string()); + response.type = learn_type::LT_LOG; + } else { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, " + "beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64 "), " + "and mutation_log::get_learn_state() returns false", + name(), + request.signature, + request.learner.to_string(), + learn_start_decree, + _app->last_durable_decree()); + response.type = learn_type::LT_APP; + response.state = learn_state(); + } + + if (response.type == learn_type::LT_LOG) { + response.base_local_dir = _private_log->dir(); + if (response.state.files.size() > 0) { + auto &last_file = response.state.files.back(); + if (last_file == learner_state.last_learn_log_file) { + ddebug("%s: on_learn[%016" PRIx64 + "]: learner = %s, learn the same file %s repeatedly, hint to switch file", + name(), + request.signature, + request.learner.to_string(), + last_file.c_str()); + _private_log->hint_switch_file(); + } else { + learner_state.last_learn_log_file = last_file; + } + } + // it is safe to commit to last_committed_decree() now + response.state.to_decree_included = last_committed_decree(); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn private logs succeed, " + "learned_meta_size = %u, learned_file_count = %u, " + "to_decree_included = %" PRId64, + name(), + request.signature, + request.learner.to_string(), + response.state.meta.length(), + static_cast(response.state.files.size()), + response.state.to_decree_included); + } else { + ::dsn::error_code err = _app->get_checkpoint( + learn_start_decree, request.app_specific_learn_request, response.state); + + if (err != ERR_OK) { + response.err = ERR_GET_LEARN_STATE_FAILED; + derror("%s: on_learn[%016" PRIx64 + "]: learner = %s, get app checkpoint failed, error = %s", + name(), + request.signature, + request.learner.to_string(), + err.to_string()); + } else { + response.base_local_dir = _app->data_dir(); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, " + "learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64, + name(), + request.signature, + request.learner.to_string(), + response.state.meta.length(), + static_cast(response.state.files.size()), + response.state.to_decree_included); + } + } +} + void replica::on_learn_reply(error_code err, learn_request &&req, learn_response &&resp) { _checker.only_one_thread_access(); @@ -1470,5 +1474,5 @@ error_code replica::apply_learned_state_from_private_log(learn_state &state) return err; } -} -} // namespace +} // namespace replication +} // namespace dsn