Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add PacificA data replication consistency scheme #2994

Open
wants to merge 12 commits into
base: unstable
Choose a base branch
from
1 change: 1 addition & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class SlaveofCmd : public Cmd {
private:
std::string master_ip_;
int64_t master_port_ = -1;
bool is_consistency_cmd_ = false;
bool is_none_ = false;
void DoInitial() override;
void Clear() override {
Expand Down
3 changes: 2 additions & 1 deletion include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Binlog : public pstd::noncopyable {
void Unlock() { mutex_.unlock(); }

pstd::Status Put(const std::string& item);
pstd::Status Put(const std::string& item, LogOffset *cur_logoffset,std::string& binlog);
pstd::Status IsOpened();
pstd::Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset, uint32_t* term = nullptr, uint64_t* logic_id = nullptr);
/*
Expand All @@ -78,7 +79,7 @@ class Binlog : public pstd::noncopyable {
void Close();

private:
pstd::Status Put(const char* item, int len);
pstd::Status Put(const char* item, int len,LogOffset *cur_logoffset = nullptr, bool is_consistency = false);
pstd::Status EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, int* temp_pro_offset);
static pstd::Status AppendPadding(pstd::WritableFile* file, uint64_t* len);
void InitLogFile();
Expand Down
3 changes: 3 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ class CmdRes {
kTxnAbort,
kMultiKey,
kNoExists,
kConsistencyTimeout, // consistency time out
};

CmdRes() = default;
Expand Down Expand Up @@ -434,6 +435,8 @@ class CmdRes {
break;
case kNoExists:
return message_;
case kConsistencyTimeout:
return "-ERR consistency timeout\r\n";
default:
break;
}
Expand Down
93 changes: 92 additions & 1 deletion include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Context : public pstd::noncopyable {
void Reset(const LogOffset& offset);

std::shared_mutex rwlock_;
LogOffset applied_index_;
LogOffset applied_index_ = LogOffset();
SyncWindow applied_win_;

std::string ToString() {
Expand All @@ -52,11 +52,28 @@ class SyncProgress {
pstd::Status Update(const std::string& ip, int port, const LogOffset& start, const LogOffset& end,
LogOffset* committed_index);
int SlaveSize();
int SlaveBinlogStateSize() {
std::shared_lock l(rwlock_);
return slave_binlog_state_size;
}
void AddSlaveBinlogStateSize() {
std::lock_guard l(rwlock_);
slave_binlog_state_size++;
}
void SubSlaveBinlogStateSize() {
std::lock_guard l(rwlock_);
slave_binlog_state_size--;
}
void AddMatchIndex(const std::string& ip, int port, const LogOffset& offset) {
std::lock_guard l(rwlock_);
match_index_[ip + std::to_string(port)] = offset;
}

private:
std::shared_mutex rwlock_;
std::unordered_map<std::string, std::shared_ptr<SlaveNode>> slaves_;
std::unordered_map<std::string, LogOffset> match_index_;
int slave_binlog_state_size = 0;
};

class MemLog {
Expand Down Expand Up @@ -100,6 +117,34 @@ class MemLog {
LogOffset last_offset_;
};

class Log {
public:
struct LogItem {
LogItem(const LogOffset& _offset, std::shared_ptr<Cmd> _cmd_ptr, std::string _binlog)
: offset(_offset), cmd_ptr(std::move(_cmd_ptr)), binlog_(std::move(_binlog)) {}
LogOffset offset;
std::shared_ptr<Cmd> cmd_ptr;
std::string binlog_;
};

Log();
int Size();
void AppendLog(const LogItem& item);
LogOffset LastOffset();
LogOffset FirstOffset();
LogItem At(int index);
int FindOffset(const LogOffset& send_offset);
pstd::Status Truncate(const LogOffset& offset);
pstd::Status TruncateFrom(const LogOffset& offset);

private:
int FindLogIndex(const LogOffset& offset);
std::shared_mutex logs_mutex_;
std::vector<LogItem> logs_;
LogOffset last_index_ = LogOffset();
LogOffset first_index_ = LogOffset();
};

class ConsensusCoordinator {
public:
ConsensusCoordinator(const std::string& db_name);
Expand Down Expand Up @@ -199,5 +244,51 @@ class ConsensusCoordinator {
SyncProgress sync_pros_;
std::shared_ptr<StableLog> stable_logger_;
std::shared_ptr<MemLog> mem_logger_;

// pacificA
public:
void InitContext() { context_->Init(); }
bool checkFinished(const LogOffset& offset);
pstd::Status AppendEntries(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_logoffset);
void SetConsistency(bool is_consistency);
bool GetISConsistency();
pstd::Status SendBinlog(std::shared_ptr<SlaveNode> slave_ptr, std::string db_name);
pstd::Status Truncate(const LogOffset& offset);
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
pstd::Status UpdateCommittedID();
pstd::Status ApplyBinlog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ProcessCoordination();

LogOffset GetCommittedId() {
std::lock_guard l(committed_id_rwlock_);
return committed_id_;
}
LogOffset GetPreparedId() {
std::lock_guard l(prepared_id__rwlock_);
return prepared_id_;
}
void SetPreparedId(const LogOffset& offset) {
std::lock_guard l(prepared_id__rwlock_);
prepared_id_ = offset;
}
void SetCommittedId(const LogOffset& offset) {
std::lock_guard l(committed_id_rwlock_);
committed_id_ = offset;
context_->UpdateAppliedIndex(committed_id_);
}

private:
pstd::Status PersistAppendBinlog(const std::shared_ptr<Cmd>& cmd_ptr, LogOffset& cur_offset);

private:
std::shared_mutex is_consistency_rwlock_;
bool is_consistency_ = false;
std::shared_mutex committed_id_rwlock_;
LogOffset committed_id_ = LogOffset();
std::shared_mutex prepared_id__rwlock_;
LogOffset prepared_id_ = LogOffset();
std::shared_ptr<Log> logs_;
};

#endif // INCLUDE_PIKA_CONSENSUS_H_
6 changes: 5 additions & 1 deletion include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ enum SlaveState {
kSlaveNotSync = 0,
kSlaveDbSync = 1,
kSlaveBinlogSync = 2,
KCandidate = 3,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Update SlaveStateMsg array to include KCandidate state.

The KCandidate state was added to the SlaveState enum, but the corresponding debug message array SlaveStateMsg needs to be updated.

Apply this diff:

-const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync"};
+const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync", "Candidate"};

Committable suggestion skipped: line range outside the PR's diff.

};

// debug only
const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync"};
const std::string SlaveStateMsg[] = {"SlaveNotSync", "SlaveDbSync", "SlaveBinlogSync", "Candidate"};

enum BinlogSyncState {
kNotSync = 0,
Expand Down Expand Up @@ -274,9 +275,12 @@ class RmNode : public Node {
struct WriteTask {
struct RmNode rm_node_;
struct BinlogChip binlog_chip_;
LogOffset committed_id_ = LogOffset();
LogOffset prev_offset_;
WriteTask(const RmNode& rm_node, const BinlogChip& binlog_chip, const LogOffset& prev_offset)
: rm_node_(rm_node), binlog_chip_(binlog_chip), prev_offset_(prev_offset) {}
WriteTask(const RmNode& rm_node, const BinlogChip& binlog_chip, const LogOffset& prev_offset, const LogOffset& committed_id)
: rm_node_(rm_node), binlog_chip_(binlog_chip), prev_offset_(prev_offset), committed_id_(committed_id) {}
};

// slowlog define
Expand Down
24 changes: 24 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class SyncMasterDB : public SyncDB {
pstd::Status ConsensusProposeLog(const std::shared_ptr<Cmd>& cmd_ptr);
pstd::Status ConsensusProcessLeaderLog(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
LogOffset ConsensusCommittedIndex();

LogOffset ConsensusLastIndex();

std::shared_ptr<StableLog> StableLogger() { return coordinator_.StableLogger(); }
Expand All @@ -92,6 +93,27 @@ class SyncMasterDB : public SyncDB {
pstd::Mutex session_mu_;
int32_t session_id_ = 0;
ConsensusCoordinator coordinator_;

//pacificA public:
public:
void InitContext(){
coordinator_.InitContext();
}
bool checkFinished(const LogOffset& offset);
void SetConsistency(bool is_consistenct);
bool GetISConsistency();
pstd::Status ProcessCoordination();
void SetPreparedId(const LogOffset& offset);
void SetCommittedId(const LogOffset& offset);
LogOffset GetPreparedId();
LogOffset GetCommittedId();
pstd::Status AppendSlaveEntries(const std::shared_ptr<Cmd>& cmd_ptr, const BinlogItem& attribute);
pstd::Status AppendCandidateBinlog(const std::string& ip, int port, const LogOffset& offset);
pstd::Status UpdateCommittedID();
pstd::Status CommitAppLog(const LogOffset& master_committed_id);
pstd::Status Truncate(const LogOffset& offset);


};

class SyncSlaveDB : public SyncDB {
Expand Down Expand Up @@ -191,6 +213,8 @@ class PikaReplicaManager {

std::shared_mutex& GetDBLock() { return dbs_rw_; }

void BuildBinlogOffset(const LogOffset& offset, InnerMessage::BinlogOffset* boffset);

void DBLock() {
dbs_rw_.lock();
}
Expand Down
12 changes: 11 additions & 1 deletion include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class PikaServer : public pstd::noncopyable {
std::string master_ip();
int master_port();
int role();
int last_role();
bool leader_protected_mode();
void CheckLeaderProtectedMode();
bool readonly(const std::string& table);
Expand Down Expand Up @@ -152,6 +153,10 @@ class PikaServer : public pstd::noncopyable {
bool TryAddSlave(const std::string& ip, int64_t port, int fd, const std::vector<DBStruct>& table_structs);
pstd::Mutex slave_mutex_; // protect slaves_;
std::vector<SlaveItem> slaves_;
int slave_size() {
std::lock_guard l(slave_mutex_);
return slaves_.size();
}

/**
* Sotsmgrt use
Expand All @@ -163,7 +168,7 @@ class PikaServer : public pstd::noncopyable {
*/
void SyncError();
void RemoveMaster();
bool SetMaster(std::string& master_ip, int master_port);
bool SetMaster(std::string& master_ip, int master_port, bool is_consistency = false);

/*
* Slave State Machine
Expand All @@ -176,6 +181,8 @@ class PikaServer : public pstd::noncopyable {
void UpdateMetaSyncTimestamp();
void UpdateMetaSyncTimestampWithoutLock();
bool IsFirstMetaSync();
bool IsConsistency();
void SetConsistency(bool is_consistency);
void SetFirstMetaSync(bool v);

/*
Expand Down Expand Up @@ -514,6 +521,7 @@ class PikaServer : public pstd::noncopyable {
exec_stat_map.insert(std::make_pair(cmd_name, 0));
}
}

private:
/*
* TimingTask use
Expand Down Expand Up @@ -571,7 +579,9 @@ class PikaServer : public pstd::noncopyable {
std::string master_ip_;
int master_port_ = 0;
int repl_state_ = PIKA_REPL_NO_CONNECT;
bool is_consistency_ = false;
int role_ = PIKA_ROLE_SINGLE;
int last_role_ = PIKA_ROLE_SINGLE;
int last_meta_sync_timestamp_ = 0;
bool first_meta_sync_ = false;
bool force_full_sync_ = false;
Expand Down
5 changes: 3 additions & 2 deletions include/pika_slave_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ class SlaveNode : public RmNode {

BinlogSyncState b_state{kNotSync};
SyncWindow sync_win;
LogOffset sent_offset;
LogOffset acked_offset;
LogOffset sent_offset = LogOffset();
LogOffset acked_offset = LogOffset();
LogOffset target_offset = LogOffset();

std::string ToStringStatus();

Expand Down
7 changes: 6 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ static AuthResult AuthenticateUser(const std::string& cmdName, const std::string
* slaveof no one
* slaveof ip port
* slaveof ip port force
* slaveof ip port strong
*/
void SlaveofCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand All @@ -112,6 +113,7 @@ void SlaveofCmd::DoInitial() {
is_none_ = true;
return;
}

// self is master of A , want to slaveof B
if ((g_pika_server->role() & PIKA_ROLE_MASTER) != 0) {
res_.SetRes(CmdRes::kErrOther, "already master of others, invalid usage");
Expand All @@ -133,12 +135,15 @@ void SlaveofCmd::DoInitial() {
if (argv_.size() == 4) {
if (strcasecmp(argv_[3].data(), "force") == 0) {
g_pika_server->SetForceFullSync(true);
} else if (strcasecmp(argv_[3].data(), "strong") == 0) {
is_consistency_cmd_ = true; // 设置 is_consistency 为 true
} else {
res_.SetRes(CmdRes::kWrongNum, kCmdNameSlaveof);
}
}
}


void SlaveofCmd::Do() {
// Check if we are already connected to the specified master
if ((master_ip_ == "127.0.0.1" || g_pika_server->master_ip() == master_ip_) &&
Expand All @@ -160,7 +165,7 @@ void SlaveofCmd::Do() {
* the data synchronization was successful, but only changes the status of the
* slaveof executor to slave */

bool sm_ret = g_pika_server->SetMaster(master_ip_, static_cast<int32_t>(master_port_));
bool sm_ret = g_pika_server->SetMaster(master_ip_, static_cast<int32_t>(master_port_),is_consistency_cmd_);

if (sm_ret) {
res_.SetRes(CmdRes::kOk);
Expand Down
Loading
Loading