在 MetaSync
请求中,主节点给从节点回了 classic_mode
, run_id
, replication_id
,db_name
, slot_num
这些信息,用于从节点去对比自己的 DB 结构以及设置 replication_id
.
-
+
+
src/pika_repl_client_conn.cc
从节点拿到 Master 的 MetaSyncResponse
,先解析状态码,如果是 kOther
则重新进行 MetaSync
,如果不是 kOK
则报错,将 repl_state_
置为 PIKA_REPL_ERROR
, 如果顺利的话就从 Master 节点返回的 DBStruct 去对比自己的 DBStruct (这里对比的是 DBName, slot_sum, slot_id 这些值),如果不一致则将 repl_state_
置为 PIKA_REPL_ERROR
代表不能做主从同步, 如果和 Master 的 DB 结构一样的话,去查看 Master 返回的 ReplicationID
, 如果传过来为空则等待下次请求,如果不为空,判断从节点自己原来有没有 ReplicationID
,如果没有,则将主节点的设置成从节点自己的 ReplicationID
,同时将 force_fulll_sync
置为 true
代表进行强制全量同步 ,然后调用 PrepareSlotTrySync
, 以及调用 FinishMetaSync
将 repl_state_
状态置为 PIKA_REPL_META_SYNC_DONE
代表 MetaSync
流程完毕
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg)); std::shared_ptr<net::PbConn> conn = task_arg->conn; std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res; ...
const InnerMessage::InnerResponse_MetaSync meta_sync = response->meta_sync();
std::vector<DBStruct> master_db_structs; for (int idx = 0; idx < meta_sync.dbs_info_size(); ++idx) { const InnerMessage::InnerResponse_MetaSync_DBInfo& db_info = meta_sync.dbs_info(idx); master_db_structs.push_back({db_info.db_name(), static_cast<uint32_t>(db_info.slot_num()), {0}}); }
std::vector<DBStruct> self_db_structs = g_pika_conf->db_structs(); if (!PikaReplClientConn::IsDBStructConsistent(self_db_structs, master_db_structs)) { ... g_pika_server->SyncError(); conn->NotifyClose(); return; }
if (meta_sync.replication_id() == "") { LOG(WARNING) << "Meta Sync Failed: the relicationid obtained from the server is null, keep sending MetaSync msg"; return; }
if (g_pika_conf->replication_id() != meta_sync.replication_id() && g_pika_conf->replication_id() != "") { LOG(WARNING) << "Meta Sync Failed: replicationid on both sides of the connection are inconsistent"; g_pika_server->SyncError(); conn->NotifyClose(); return; }
if (g_pika_conf->replication_id() != meta_sync.replication_id()) { ... g_pika_server->force_full_sync_ = true; g_pika_conf->SetReplicationID(meta_sync.replication_id()); g_pika_conf->ConfigRewriteReplicationID(); }
g_pika_conf->SetWriteBinlog("yes"); g_pika_server->PrepareSlotTrySync(); g_pika_server->FinishMetaSync(); LOG(INFO) << "Finish to handle meta sync response"; }
|
@@ -276,8 +283,10 @@ FAQ
DBSync总结
在 DBSync
中主节点给从节点的回包信息中写了 session_id
, db_name
, slot_id
,同时主节点根据判断做出是不是要进行 bgsave
操作
-
-
+
+
+
+
src/pika_repl_client_conn.cc
从节点收到 Master 节点的 DBSync
回包后,设置一下自己的 session_id
, 然后停止 Rsync
线程,将状态改为 kWaitDBSync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| void PikaReplClientConn::HandleDBSyncResponse(void* arg) { ... if (response->code() != InnerMessage::kOk) { slave_slot->SetReplState(ReplState::kError); std::string reply = response->has_reply() ? response->reply() : ""; LOG(WARNING) << "DBSync Failed: " << reply; return; }
slave_slot->SetMasterSessionId(session_id);
std::string slot_name = slave_slot->SlotName(); slave_slot->StopRsync(); slave_slot->SetReplState(ReplState::kWaitDBSync); LOG(INFO) << "Slot: " << slot_name << " Need Wait To Sync"; }
|
@@ -339,7 +348,8 @@ FAQ
在 MetaRsync
请求中,主节点给从节点的回包包括 db_name
, slot_id
, reader_index
, snapshot_uuid
, filename
-
+
+
RsyncFile
src/rsync_client.cc
在 file_set_
里面保存着远端文件的元信息之后,work_threads
就开始调用 Copy
去远端拉取文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| void* RsyncClient::ThreadMain() { ... std::vector<std::set<std::string> > file_vec(GetParallelNum()); int index = 0; for (const auto& file : file_set_) { file_vec[index++ % GetParallelNum()].insert(file); }
for (int i = 0; i < GetParallelNum(); i++) { work_threads_[i] = std::move(std::thread(&RsyncClient::Copy, this, file_vec[i], i)); } ...
while (state_.load() == RUNNING) { uint64_t elapse = pstd::NowMicros() - start_time; if (elapse < kFlushIntervalUs) { int wait_for_us = kFlushIntervalUs - elapse; std::unique_lock<std::mutex> lock(mu_); cond_.wait_for(lock, std::chrono::microseconds(wait_for_us)); }
if (state_.load() != RUNNING) { break; }
start_time = pstd::NowMicros(); std::map<std::string, std::string> files_map; { std::lock_guard<std::mutex> guard(mu_); files_map.swap(meta_table_); } for (const auto& file : files_map) { meta_rep.append(file.first + ":" + file.second); meta_rep.append("\n"); } ...
if (finished_work_cnt_.load() == GetParallelNum()) { break; } }
for (int i = 0; i < GetParallelNum(); i++) { work_threads_[i].join(); } finished_work_cnt_.store(0); state_.store(STOP); ... }
|
@@ -378,7 +388,8 @@ FAQ
RsyncFile 总结
这里 Master 的 FileRsync
请求给从节点值包括 db_name
, slot_id
, read_index
, data
, eof
, checksum
, filename
, bytes_read
, offset
-
+
+
TrySync
src/pika_rm.cc
在获取到所有的远端文件之后,Rsync 线程关闭,调用 TryUpdateMasterOffset
更新 offset
@@ -440,7 +451,12 @@ FAQ