Author: ninehills
Labels: blog done
Created: 2018-02-28T09:36:26Z
Link and comments: ninehills#62
1 Raft 节点的角色,可以使用Go常量
const (
Follower = iota // 0
Candidate // 1
Leader // 2
)
2 使用Buffered Channel进行异步通信,比如等待心跳包结果等情况,我定义了如下Channel
chanHeartbeat chan bool // 收到心跳
chanWinVote chan bool // 赢得选举
chanGrantVote chan bool // 获得选举票
chanApply chan ApplyMsg // 用来commit的channel
在Make()
中需要初始化Channel为Buffered Channel
rf.chanWinVote = make(chan bool, 10)
rf.chanGrantVote = make(chan bool, 10)
rf.chanHeartbeat = make(chan bool, 10)
3 Raft struct大部分成员都是论文的Figure 2的内容
4 发送broadcastRequestVote后,需要进行voteCount 计数,以确定是否赢得Vote,所以 Raft struct需要增加voteCount成员
Raft有如下成员来进行加锁
mu sync.Mutex // Lock to protect shared access to this peer's state
使用方法一般有两种,一种是在明确的边界区进行
rf.mu.Lock()
......
rf.mu.Unlock()
还有一种是在函数头
rf.mu.Lock()
defer rf.mu.Unlock()
........
加锁时要小心加锁,以确保不会发生死锁,能不加锁就不加锁。此外测试时使用go test -race
来判断是否发生冲突
一般用在异步触发,比如广播requestVote、AppendEntries或者异步Commit上
论文中所有的协议都需要准确实现,否则很容易出错(在互联网上找到的实现中,大部分都通不过测试),几个容易出错的地方如下,都debug了很久
// RequestVote中判断日志是否up-to-date,应该如下实现
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
(args.LastLogTerm > rf.logs[rf.getLastIndex()].Term ||
(args.LastLogTerm == rf.logs[rf.getLastIndex()].Term && args.LastLogIndex >= rf.getLastIndex())) {
// Raft determines which of two logs is more up-to-date by comparing the
// index and term of the last entries in the logs. If the logs have last
// entries with different terms, then the log with the later term is
// more up-to-date. If the logs end with the same term, then whichever
// log is longer is more up-to-date.
reply.VoteGranted = true
rf.votedFor = args.CandidateId
} else {
reply.VoteGranted = false
}
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
/* Receiver implementation:
1. Reply false if term < currentTerm (§5.1)
2. Reply false if log doesn’t contain an entry at prevLogIndex
whose term matches prevLogTerm (§5.3)
3. If an existing entry conflicts with a new one (same index
but different terms), delete the existing entry and all that
follow it (§5.3)
4. Append any new entries not already in the log
5. If leaderCommit > commitIndex, set commitIndex =
min(leaderCommit, index of last new entry) */
rf.mu.Lock()
defer rf.mu.Unlock()
rf.chanHeartbeat <- true
if args.Term > rf.currentTerm {
rf.log("AppendEntries Term Bigger, convert to follower")
rf.role = Follower
rf.currentTerm = args.Term
rf.votedFor = -1
}
DPrintf("1 commitIndex %d lastApplied %d LastIndex %d", rf.commitIndex, rf.lastApplied, rf.getLastIndex())
reply.Term = rf.currentTerm
reply.NextTryIndex = -1 //默认为-1,代表index--,此处为简化逻辑,除日志不匹配的其他情况都统一--
if args.Term < rf.currentTerm {
reply.Success = false
} else if len(rf.logs) <= args.PrevLogIndex || rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if len(rf.logs) <= args.PrevLogIndex{
DPrintf("logsLen(%d) prevLogIndex(%d)",
len(rf.logs), args.PrevLogIndex)
} else {
DPrintf("logsLen(%d) prevLogIndex(%d) logsTerm(%d) prevLogTerm(%d)",
len(rf.logs), args.PrevLogIndex, rf.logs[args.PrevLogIndex].Term, args.PrevLogTerm)
}
if len(rf.logs) > args.PrevLogIndex {
// 如果日志内容不匹配,找到同Term中最早的Index,直接回退到那个Index
term := rf.logs[args.PrevLogIndex].Term
for reply.NextTryIndex = args.PrevLogIndex - 1;
reply.NextTryIndex > 0 && rf.logs[reply.NextTryIndex].Term == term; reply.NextTryIndex-- {}
reply.NextTryIndex++
} else {
// 如果日志长度不匹配,则以当前的日志长度为准
reply.NextTryIndex = rf.getLastIndex() + 1
}
reply.Success = false
} else {
// If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it.
// ------------------------
// The if here is crucial. If the follower has all the entries the leader sent,
// the follower MUST NOT truncate its log. Any elements following the entries
// sent by the leader MUST be kept. This is because we could be receiving an
// outdated AppendEntries RPC from the leader, and truncating the log would
// mean “taking back” entries that we may have already told the leader that
// we have in our log.
// ------- wrong code -------
// preLogs := rf.logs[:args.PrevLogIndex + 1]
// preLogs = append(preLogs, args.Entries...)
// rf.log(fmt.Sprintf("%d", preLogs))
// rf.logs = preLogs
// --------------------------
rf.log(fmt.Sprintf("Log append: rfLogs(%d) entries(%d) preLogIndex(%d)",
rf.logs, args.Entries, args.PrevLogIndex))
for i := 0; i < len(args.Entries); i++ {
if i + args.PrevLogIndex + 2 > len(rf.logs) {
// 如果Entries 长过了 rf.logs,那么直接append
rf.logs = append(rf.logs, args.Entries[i])
} else if rf.logs[i + args.PrevLogIndex + 1].Term != args.Entries[i].Term {
rf.logs = rf.logs[:i + args.PrevLogIndex + 1]
rf.logs = append(rf.logs, args.Entries[i])
}
}
rf.log(fmt.Sprintf("Log append result: rfLogs(%d)", rf.logs))
if args.LeaderCommit > rf.commitIndex {
if args.LeaderCommit > rf.getLastIndex() {
rf.commitIndex = rf.getLastIndex()
} else {
rf.commitIndex = args.LeaderCommit
}
if rf.commitIndex > rf.lastApplied {
go rf.commit()
}
}
reply.Success = true
}
DPrintf("2 commitIndex %d lastApplied %d LastIndex %d", rf.commitIndex, rf.lastApplied, rf.getLastIndex())
rf.log(fmt.Sprintf("reply AppendEntries from %d: %t", args.LeaderId, reply.Success))
rf.persist()
}
// 还有 sendAppendEntries 返回成功后的处理
if reply.Success {
// If successful: update nextIndex and matchIndex for follower (§5.3)
// 此处注意不能简单更新为 rf.getLastIndex(),因为sendAppendEntries是异步的,send的时候最新的index有可能已经变了
rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = rf.matchIndex[server] + 1
可以给Raft struce定义一个log方法,打印一些基本信息,如Term等,这样就不用每次都拼接。例如
func (rf *Raft) log(message string) {
// 封装日志方法
if Debug > 0 {
log.Printf("node(%d)role(%d)term(%d)commitIndex(%d)logs(%d): %s",
rf.me, rf.role, rf.currentTerm, rf.commitIndex, rf.logs, message)
}
}
当然这里图省事用的string,可以换成format,参见utils.go下的DPrintf的实现
测试随机性比较大,一次测试通过不能代表没问题,建议跑多轮测试都没有问题才算通过
// 选举超时,Paper中写 150–300ms 随机,测试环境是1000ms
rand.Seed(time.Now().UnixNano())
electionTimeout := func() time.Duration {
return time.Duration(500 + rand.Intn(150)) * time.Millisecond
}
// broadcastTime << electionTimeout << MTBF(平均无故障工作时间)
// << 代表数量级的差别
heartBeatInterval := time.Duration(50) * time.Millisecond
for {
rf.log("loop start")
rf.mu.Lock()
role := rf.role
rf.mu.Unlock()
switch role {
case Follower:
// Respond to RPCs from candidates and leaders
// If election timeout elapses without receiving AppendEntries RPC from current leader
// or granting vote to candidate: convert to candidate
select {
case <-rf.chanHeartbeat:
case <-rf.chanGrantVote:
case <-time.After(electionTimeout()):
rf.mu.Lock()
rf.log("follower no heartbeat/vote and timeout, convert to candidate")
rf.role = Candidate
rf.mu.Unlock()
}
case Candidate:
// On conversion to candidate, start election:
// - Increment currentTerm
// - Vote for self
// - Reset election timer
// - Send RequestVote RPCs to all other servers
// If votes received from majority of servers: become leader
// If AppendEntries RPC received from new leader: convert to follower
// If election timeout elapses: start new election
rf.mu.Lock()
rf.currentTerm++
rf.votedFor = rf.me
rf.voteCount = 1 // 自己算一票
rf.persist()
rf.mu.Unlock()
go rf.broadcastRequestVote()
select {
case <-rf.chanWinVote:
rf.mu.Lock()
if len(rf.chanHeartbeat) > 0 {
<-rf.chanHeartbeat
}
rf.log("win vote, convert to leader")
if rf.role != Leader {
// 有一种情况是 chanWinVote 和 chanHeartbeat同时收到消息,这时候Go会随机选择一个case
// 但是 chanWinVote中的数据还在,会干扰到下次选举
// 临时的解决办法是在配置chanWinVote的时候同时设定role,此处二次验证
rf.log("chanWinVote has outdated message, ignore it.")
continue
}
// 初始化leader的变量
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))
for i := range rf.peers {
rf.nextIndex[i] = rf.getLastIndex() + 1
rf.matchIndex[i] = 0
}
rf.mu.Unlock()
case <-rf.chanHeartbeat:
rf.mu.Lock()
rf.log("received heartbeat, convert to follower")
rf.role = Follower
rf.mu.Unlock()
case <-time.After(electionTimeout()):
rf.log("election timeout, skip")
}
case Leader:
// Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server;
// repeat during idle periods to prevent election timeouts (§5.2)
// If command received from client: append entry to local log,
// respond after entry applied to state machine (§5.3)
// If last log index ≥ nextIndex for a follower:
// send AppendEntries RPC with log entries starting at nextIndex
// - If successful: update nextIndex and matchIndex for follower (§5.3)
// - If AppendEntries fails because of log inconsistency:
// decrement nextIndex and retry (§5.3)
// If there exists an N such that N > commitIndex, a majority
// of matchIndex[i] ≥ N, and log[N].term == currentTerm:
// set commitIndex = N (§5.3, §5.4).
rf.broadcastAppendEntries()
time.Sleep(heartBeatInterval)
}
}
在完成的过程中以及完成后,因为没有人帮忙判卷,看了一些现在互联网已经有的实现作为参考,总的来说,都只是为了完成而完成,注释也好、代码组织形式也好都不是很重视。可能过上几个月就忘记了以前为什么这么写的原因,这点对于Raft协议这种实现的稍微有点差池就出大问题的,非常重要。