diff --git a/go.mod b/go.mod index 1e8a3a4041..4e2c7917c5 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,10 @@ module github.com/ethereum/go-ethereum go 1.15 -// Quorum - Replace Go modules that use modifications done by us -replace github.com/coreos/etcd => github.com/Consensys/etcd v3.3.13-quorum197+incompatible +// Quorum - Replaced Go modules which required for etcd +replace github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.6 + +replace google.golang.org/grpc/naming => google.golang.org/grpc v1.23.0 // End Quorum @@ -22,7 +24,7 @@ require ( github.com/cespare/cp v0.1.0 github.com/cloudflare/cloudflare-go v0.14.0 github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f - github.com/coreos/etcd v3.3.20+incompatible + github.com/coreos/etcd v3.3.27+incompatible github.com/coreos/go-semver v0.3.0 // indirect github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect @@ -31,6 +33,7 @@ require ( github.com/dlclark/regexp2 v1.7.0 // indirect github.com/docker/docker v20.10.12+incompatible github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498 + github.com/dustin/go-humanize v1.0.0 // indirect github.com/eapache/channels v1.1.0 github.com/eapache/queue v1.1.0 // indirect github.com/edsrzf/mmap-go v1.0.0 diff --git a/go.sum b/go.sum index 13f1803377..89e177ed6c 100644 --- a/go.sum +++ b/go.sum @@ -39,8 +39,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/ConsenSys/quorum-qlight-token-manager-plugin-sdk-go v0.0.0-20220427130631-ecd75caa6e73 h1:+rKpO1dxIETamILitp6JT+muHgPEAbIAZOEhdHbVA9k= github.com/ConsenSys/quorum-qlight-token-manager-plugin-sdk-go v0.0.0-20220427130631-ecd75caa6e73/go.mod h1:95u0d9ysJQnH2rUdT30jUGiNfocnw6294Cw3sOWa1KI= -github.com/Consensys/etcd v3.3.13-quorum197+incompatible h1:ZBM9sH4QEufgaShSyNNhffuZv6Zhl5kyD2b/NHViByM= -github.com/Consensys/etcd v3.3.13-quorum197+incompatible/go.mod h1:wz4o/jwsTgMkSZUY9DmwVEIL3b2JX3t+tCDdy/J5ilY= github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= @@ -112,6 +110,8 @@ github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWH github.com/consensys/bavard v0.1.8-0.20210406032232-f3452dc9b572/go.mod h1:Bpd0/3mZuaj6Sj+PqrmIquiOKy397AKGThQPaGzNXAQ= github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f h1:C43yEtQ6NIf4ftFXD/V55gnGFgPbMQobd//YlnLjUJ8= github.com/consensys/gnark-crypto v0.4.1-0.20210426202927-39ac3d4b3f1f/go.mod h1:815PAHg3wvysy0SyIqanF8gZ0Y1wjk/hrDHD/iT88+Q= +github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA= +github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf h1:iW4rZ826su+pqaw19uhpSCzhj44qo35pNgKFGqzDKkU= @@ -135,6 +135,8 @@ github.com/docker/docker v20.10.12+incompatible h1:CEeNmFM0QZIsJCZKMkZx0ZcahTiew github.com/docker/docker v20.10.12+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498 h1:Y9vTBSsV4hSwPSj4bacAU/eSnV3dAxVpepaghAdhGoQ= github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498/go.mod h1:Mw6PkjjMXWbTj+nnj4s3QPXq1jaT0s5pC0iFD4+BOAA= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/channels v1.1.0 h1:F1taHcn7/F0i8DYqKXJnyhJcVpp2kgFcNePxXtnyu4k= github.com/eapache/channels v1.1.0/go.mod h1:jMm2qB5Ubtg9zLd+inMZd2/NUvXgzmWXsDaLyQIGfH0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= @@ -458,8 +460,11 @@ go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= diff --git a/raft/constants.go b/raft/constants.go index 012df25d12..8528b04415 100644 --- a/raft/constants.go +++ b/raft/constants.go @@ -1,8 +1,6 @@ package raft -import ( - etcdRaft "github.com/coreos/etcd/raft" -) +import etcd "github.com/ethereum/go-ethereum/raft/etcd/raft" const ( //protocolName = "raft" @@ -10,7 +8,7 @@ const ( //raftMsg = 0x00 - minterRole = etcdRaft.LEADER + minterRole = etcd.LEADER //verifierRole = etcdRaft.NOT_LEADER // Raft's ticker interval @@ -26,8 +24,6 @@ const ( snapshotPeriod = 250 //peerUrlKeyPrefix = "peerUrl-" - - chainExtensionMessage = "Successfully extended chain" ) var ( diff --git a/raft/etcd/raft/log.go b/raft/etcd/raft/log.go new file mode 100644 index 0000000000..515c308cd4 --- /dev/null +++ b/raft/etcd/raft/log.go @@ -0,0 +1,358 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "fmt" + "log" + + pb "github.com/coreos/etcd/raft/raftpb" +) + +type raftLog struct { + // storage contains all stable entries since the last snapshot. + storage Storage + + // unstable contains all unstable entries and snapshot. + // they will be saved into storage. + unstable unstable + + // committed is the highest log position that is known to be in + // stable storage on a quorum of nodes. + committed uint64 + // applied is the highest log position that the application has + // been instructed to apply to its state machine. + // Invariant: applied <= committed + applied uint64 + + logger Logger +} + +// newLog returns log using the given storage. It recovers the log to the state +// that it just commits and applies the latest snapshot. +func newLog(storage Storage, logger Logger) *raftLog { + if storage == nil { + log.Panic("storage must not be nil") + } + log := &raftLog{ + storage: storage, + logger: logger, + } + firstIndex, err := storage.FirstIndex() + if err != nil { + panic(err) // TODO(bdarnell) + } + lastIndex, err := storage.LastIndex() + if err != nil { + panic(err) // TODO(bdarnell) + } + log.unstable.offset = lastIndex + 1 + log.unstable.logger = logger + // Initialize our committed and applied pointers to the time of the last compaction. + log.committed = firstIndex - 1 + log.applied = firstIndex - 1 + + return log +} + +func (l *raftLog) String() string { + return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries)) +} + +// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, +// it returns (last index of new entries, true). +func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { + if l.matchTerm(index, logTerm) { + lastnewi = index + uint64(len(ents)) + ci := l.findConflict(ents) + switch { + case ci == 0: + case ci <= l.committed: + l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) + default: + offset := index + 1 + l.append(ents[ci-offset:]...) + } + l.commitTo(min(committed, lastnewi)) + return lastnewi, true + } + return 0, false +} + +func (l *raftLog) append(ents ...pb.Entry) uint64 { + if len(ents) == 0 { + return l.lastIndex() + } + if after := ents[0].Index - 1; after < l.committed { + l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) + } + l.unstable.truncateAndAppend(ents) + return l.lastIndex() +} + +// findConflict finds the index of the conflict. +// It returns the first pair of conflicting entries between the existing +// entries and the given entries, if there are any. +// If there is no conflicting entries, and the existing entries contains +// all the given entries, zero will be returned. +// If there is no conflicting entries, but the given entries contains new +// entries, the index of the first new entry will be returned. +// An entry is considered to be conflicting if it has the same index but +// a different term. +// The first entry MUST have an index equal to the argument 'from'. +// The index of the given entries MUST be continuously increasing. +func (l *raftLog) findConflict(ents []pb.Entry) uint64 { + for _, ne := range ents { + if !l.matchTerm(ne.Index, ne.Term) { + if ne.Index <= l.lastIndex() { + l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", + ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) + } + return ne.Index + } + } + return 0 +} + +func (l *raftLog) unstableEntries() []pb.Entry { + if len(l.unstable.entries) == 0 { + return nil + } + return l.unstable.entries +} + +// nextEnts returns all the available entries for execution. +// If applied is smaller than the index of snapshot, it returns all committed +// entries after the index of snapshot. +func (l *raftLog) nextEnts() (ents []pb.Entry) { + off := max(l.applied+1, l.firstIndex()) + if l.committed+1 > off { + ents, err := l.slice(off, l.committed+1, noLimit) + if err != nil { + l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) + } + return ents + } + return nil +} + +// hasNextEnts returns if there is any available entries for execution. This +// is a fast check without heavy raftLog.slice() in raftLog.nextEnts(). +func (l *raftLog) hasNextEnts() bool { + off := max(l.applied+1, l.firstIndex()) + return l.committed+1 > off +} + +func (l *raftLog) snapshot() (pb.Snapshot, error) { + if l.unstable.snapshot != nil { + return *l.unstable.snapshot, nil + } + return l.storage.Snapshot() +} + +func (l *raftLog) firstIndex() uint64 { + if i, ok := l.unstable.maybeFirstIndex(); ok { + return i + } + index, err := l.storage.FirstIndex() + if err != nil { + panic(err) // TODO(bdarnell) + } + return index +} + +func (l *raftLog) lastIndex() uint64 { + if i, ok := l.unstable.maybeLastIndex(); ok { + return i + } + i, err := l.storage.LastIndex() + if err != nil { + panic(err) // TODO(bdarnell) + } + return i +} + +func (l *raftLog) commitTo(tocommit uint64) { + // never decrease commit + if l.committed < tocommit { + if l.lastIndex() < tocommit { + l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex()) + } + l.committed = tocommit + } +} + +func (l *raftLog) appliedTo(i uint64) { + if i == 0 { + return + } + if l.committed < i || i < l.applied { + l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) + } + l.applied = i +} + +func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } + +func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } + +func (l *raftLog) lastTerm() uint64 { + t, err := l.term(l.lastIndex()) + if err != nil { + l.logger.Panicf("unexpected error when getting the last term (%v)", err) + } + return t +} + +func (l *raftLog) term(i uint64) (uint64, error) { + // the valid term range is [index of dummy entry, last index] + dummyIndex := l.firstIndex() - 1 + if i < dummyIndex || i > l.lastIndex() { + // TODO: return an error instead? + return 0, nil + } + + if t, ok := l.unstable.maybeTerm(i); ok { + return t, nil + } + + t, err := l.storage.Term(i) + if err == nil { + return t, nil + } + if err == ErrCompacted || err == ErrUnavailable { + return 0, err + } + panic(err) // TODO(bdarnell) +} + +func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) { + if i > l.lastIndex() { + return nil, nil + } + return l.slice(i, l.lastIndex()+1, maxsize) +} + +// allEntries returns all entries in the log. +func (l *raftLog) allEntries() []pb.Entry { + ents, err := l.entries(l.firstIndex(), noLimit) + if err == nil { + return ents + } + if err == ErrCompacted { // try again if there was a racing compaction + return l.allEntries() + } + // TODO (xiangli): handle error? + panic(err) +} + +// isUpToDate determines if the given (lastIndex,term) log is more up-to-date +// by comparing the index and term of the last entries in the existing logs. +// If the logs have last entries with different terms, then the log with the +// later term is more up-to-date. If the logs end with the same term, then +// whichever log has the larger lastIndex is more up-to-date. If the logs are +// the same, the given log is up-to-date. +func (l *raftLog) isUpToDate(lasti, term uint64) bool { + return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex()) +} + +func (l *raftLog) matchTerm(i, term uint64) bool { + t, err := l.term(i) + if err != nil { + return false + } + return t == term +} + +func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { + if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { + l.commitTo(maxIndex) + return true + } + return false +} + +func (l *raftLog) restore(s pb.Snapshot) { + l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) + l.committed = s.Metadata.Index + l.unstable.restore(s) +} + +// slice returns a slice of log entries from lo through hi-1, inclusive. +func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { + err := l.mustCheckOutOfBounds(lo, hi) + if err != nil { + return nil, err + } + if lo == hi { + return nil, nil + } + var ents []pb.Entry + if lo < l.unstable.offset { + storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize) + if err == ErrCompacted { + return nil, err + } else if err == ErrUnavailable { + l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) + } else if err != nil { + panic(err) // TODO(bdarnell) + } + + // check if ents has reached the size limitation + if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo { + return storedEnts, nil + } + + ents = storedEnts + } + if hi > l.unstable.offset { + unstable := l.unstable.slice(max(lo, l.unstable.offset), hi) + if len(ents) > 0 { + ents = append([]pb.Entry{}, ents...) + ents = append(ents, unstable...) + } else { + ents = unstable + } + } + return limitSize(ents, maxSize), nil +} + +// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) +func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { + if lo > hi { + l.logger.Panicf("invalid slice %d > %d", lo, hi) + } + fi := l.firstIndex() + if lo < fi { + return ErrCompacted + } + + length := l.lastIndex() + 1 - fi + if lo < fi || hi > fi+length { + l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) + } + return nil +} + +func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 { + if err == nil { + return t + } + if err == ErrCompacted { + return 0 + } + l.logger.Panicf("unexpected error (%v)", err) + return 0 +} diff --git a/raft/etcd/raft/log_unstable.go b/raft/etcd/raft/log_unstable.go new file mode 100644 index 0000000000..f95ecdc672 --- /dev/null +++ b/raft/etcd/raft/log_unstable.go @@ -0,0 +1,159 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import pb "github.com/coreos/etcd/raft/raftpb" + +// unstable.entries[i] has raft log position i+unstable.offset. +// Note that unstable.offset may be less than the highest log +// position in storage; this means that the next write to storage +// might need to truncate the log before persisting unstable.entries. +type unstable struct { + // the incoming unstable snapshot, if any. + snapshot *pb.Snapshot + // all entries that have not yet been written to storage. + entries []pb.Entry + offset uint64 + + logger Logger +} + +// maybeFirstIndex returns the index of the first possible entry in entries +// if it has a snapshot. +func (u *unstable) maybeFirstIndex() (uint64, bool) { + if u.snapshot != nil { + return u.snapshot.Metadata.Index + 1, true + } + return 0, false +} + +// maybeLastIndex returns the last index if it has at least one +// unstable entry or snapshot. +func (u *unstable) maybeLastIndex() (uint64, bool) { + if l := len(u.entries); l != 0 { + return u.offset + uint64(l) - 1, true + } + if u.snapshot != nil { + return u.snapshot.Metadata.Index, true + } + return 0, false +} + +// maybeTerm returns the term of the entry at index i, if there +// is any. +func (u *unstable) maybeTerm(i uint64) (uint64, bool) { + if i < u.offset { + if u.snapshot == nil { + return 0, false + } + if u.snapshot.Metadata.Index == i { + return u.snapshot.Metadata.Term, true + } + return 0, false + } + + last, ok := u.maybeLastIndex() + if !ok { + return 0, false + } + if i > last { + return 0, false + } + return u.entries[i-u.offset].Term, true +} + +func (u *unstable) stableTo(i, t uint64) { + gt, ok := u.maybeTerm(i) + if !ok { + return + } + // if i < offset, term is matched with the snapshot + // only update the unstable entries if term is matched with + // an unstable entry. + if gt == t && i >= u.offset { + u.entries = u.entries[i+1-u.offset:] + u.offset = i + 1 + u.shrinkEntriesArray() + } +} + +// shrinkEntriesArray discards the underlying array used by the entries slice +// if most of it isn't being used. This avoids holding references to a bunch of +// potentially large entries that aren't needed anymore. Simply clearing the +// entries wouldn't be safe because clients might still be using them. +func (u *unstable) shrinkEntriesArray() { + // We replace the array if we're using less than half of the space in + // it. This number is fairly arbitrary, chosen as an attempt to balance + // memory usage vs number of allocations. It could probably be improved + // with some focused tuning. + const lenMultiple = 2 + if len(u.entries) == 0 { + u.entries = nil + } else if len(u.entries)*lenMultiple < cap(u.entries) { + newEntries := make([]pb.Entry, len(u.entries)) + copy(newEntries, u.entries) + u.entries = newEntries + } +} + +func (u *unstable) stableSnapTo(i uint64) { + if u.snapshot != nil && u.snapshot.Metadata.Index == i { + u.snapshot = nil + } +} + +func (u *unstable) restore(s pb.Snapshot) { + u.offset = s.Metadata.Index + 1 + u.entries = nil + u.snapshot = &s +} + +func (u *unstable) truncateAndAppend(ents []pb.Entry) { + after := ents[0].Index + switch { + case after == u.offset+uint64(len(u.entries)): + // after is the next index in the u.entries + // directly append + u.entries = append(u.entries, ents...) + case after <= u.offset: + u.logger.Infof("replace the unstable entries from index %d", after) + // The log is being truncated to before our current offset + // portion, so set the offset and replace the entries + u.offset = after + u.entries = ents + default: + // truncate to after and copy to u.entries + // then append + u.logger.Infof("truncate the unstable entries before index %d", after) + u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...) + u.entries = append(u.entries, ents...) + } +} + +func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { + u.mustCheckOutOfBounds(lo, hi) + return u.entries[lo-u.offset : hi-u.offset] +} + +// u.offset <= lo <= hi <= u.offset+len(u.offset) +func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) { + if lo > hi { + u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi) + } + upper := u.offset + uint64(len(u.entries)) + if lo < u.offset || hi > upper { + u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper) + } +} diff --git a/raft/etcd/raft/logger.go b/raft/etcd/raft/logger.go new file mode 100644 index 0000000000..4840cf30d8 --- /dev/null +++ b/raft/etcd/raft/logger.go @@ -0,0 +1,124 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "fmt" + "log" + "os" +) + +type Logger interface { + Debug(v ...interface{}) + Debugf(format string, v ...interface{}) + + Error(v ...interface{}) + Errorf(format string, v ...interface{}) + + Info(v ...interface{}) + Infof(format string, v ...interface{}) + + Warning(v ...interface{}) + Warningf(format string, v ...interface{}) + + Fatal(v ...interface{}) + Fatalf(format string, v ...interface{}) + + Panic(v ...interface{}) + Panicf(format string, v ...interface{}) +} + +func SetLogger(l Logger) { raftLogger = l } + +var ( + defaultLogger = &DefaultLogger{Logger: log.New(os.Stderr, "raft", log.Ldate|log.Lmicroseconds)} + raftLogger = Logger(defaultLogger) +) + +const ( + calldepth = 2 +) + +// DefaultLogger is a default implementation of the Logger interface. +type DefaultLogger struct { + *log.Logger + debug bool +} + +func (l *DefaultLogger) EnableTimestamps() { + l.SetFlags(l.Flags() | log.Ldate | log.Ltime) +} + +func (l *DefaultLogger) EnableDebug() { + l.debug = true +} + +func (l *DefaultLogger) Debug(v ...interface{}) { + if l.debug { + l.Output(calldepth, header("DEBUG", fmt.Sprint(v...))) + } +} + +func (l *DefaultLogger) Debugf(format string, v ...interface{}) { + if l.debug { + l.Output(calldepth, header("DEBUG", fmt.Sprintf(format, v...))) + } +} + +func (l *DefaultLogger) Info(v ...interface{}) { + l.Output(calldepth, header("INFO", fmt.Sprint(v...))) +} + +func (l *DefaultLogger) Infof(format string, v ...interface{}) { + l.Output(calldepth, header("INFO", fmt.Sprintf(format, v...))) +} + +func (l *DefaultLogger) Error(v ...interface{}) { + l.Output(calldepth, header("ERROR", fmt.Sprint(v...))) +} + +func (l *DefaultLogger) Errorf(format string, v ...interface{}) { + l.Output(calldepth, header("ERROR", fmt.Sprintf(format, v...))) +} + +func (l *DefaultLogger) Warning(v ...interface{}) { + l.Output(calldepth, header("WARN", fmt.Sprint(v...))) +} + +func (l *DefaultLogger) Warningf(format string, v ...interface{}) { + l.Output(calldepth, header("WARN", fmt.Sprintf(format, v...))) +} + +func (l *DefaultLogger) Fatal(v ...interface{}) { + l.Output(calldepth, header("FATAL", fmt.Sprint(v...))) + os.Exit(1) +} + +func (l *DefaultLogger) Fatalf(format string, v ...interface{}) { + l.Output(calldepth, header("FATAL", fmt.Sprintf(format, v...))) + os.Exit(1) +} + +func (l *DefaultLogger) Panic(v ...interface{}) { + l.Logger.Panic(v...) +} + +func (l *DefaultLogger) Panicf(format string, v ...interface{}) { + l.Logger.Panicf(format, v...) +} + +func header(lvl, msg string) string { + return fmt.Sprintf("%s: %s", lvl, msg) +} diff --git a/raft/etcd/raft/node.go b/raft/etcd/raft/node.go new file mode 100644 index 0000000000..4c2dd3829d --- /dev/null +++ b/raft/etcd/raft/node.go @@ -0,0 +1,601 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "context" + "errors" + + etcdRaft "github.com/coreos/etcd/raft" + pb "github.com/coreos/etcd/raft/raftpb" + "github.com/eapache/channels" +) + +const ( + LEADER = 1 + NOT_LEADER = 2 +) + +var ( + emptyState = pb.HardState{} + + // ErrStopped is returned by methods on Nodes that have been stopped. + ErrStopped = errors.New("raft: stopped") +) + +// SoftState provides state that is useful for logging and debugging. +// The state is volatile and does not need to be persisted to the WAL. +type SoftState struct { + Lead uint64 // must use atomic operations to access; keep 64-bit aligned. + RaftState StateType +} + +func (a *SoftState) equal(b *SoftState) bool { + return a.Lead == b.Lead && a.RaftState == b.RaftState +} + +// Ready encapsulates the entries and messages that are ready to read, +// be saved to stable storage, committed or sent to other peers. +// All fields in Ready are read-only. +type Ready struct { + // The current volatile state of a Node. + // SoftState will be nil if there is no update. + // It is not required to consume or store SoftState. + *SoftState + + // The current state of a Node to be saved to stable storage BEFORE + // Messages are sent. + // HardState will be equal to empty state if there is no update. + pb.HardState + + // ReadStates can be used for node to serve linearizable read requests locally + // when its applied index is greater than the index in ReadState. + // Note that the readState will be returned when raft receives msgReadIndex. + // The returned is only valid for the request that requested to read. + ReadStates []ReadState + + // Entries specifies entries to be saved to stable storage BEFORE + // Messages are sent. + Entries []pb.Entry + + // Snapshot specifies the snapshot to be saved to stable storage. + Snapshot pb.Snapshot + + // CommittedEntries specifies entries to be committed to a + // store/state-machine. These have previously been committed to stable + // store. + CommittedEntries []pb.Entry + + // Messages specifies outbound messages to be sent AFTER Entries are + // committed to stable storage. + // If it contains a MsgSnap message, the application MUST report back to raft + // when the snapshot has been received or has failed by calling ReportSnapshot. + Messages []pb.Message + + // MustSync indicates whether the HardState and Entries must be synchronously + // written to disk or if an asynchronous write is permissible. + MustSync bool +} + +func isHardStateEqual(a, b pb.HardState) bool { + return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit +} + +// IsEmptyHardState returns true if the given HardState is empty. +func IsEmptyHardState(st pb.HardState) bool { + return isHardStateEqual(st, emptyState) +} + +// IsEmptySnap returns true if the given Snapshot is empty. +func IsEmptySnap(sp pb.Snapshot) bool { + return sp.Metadata.Index == 0 +} + +func (rd Ready) containsUpdates() bool { + return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) || + !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 || + len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 +} + +// Node represents a node in a raft cluster. +type Node interface { + // Tick increments the internal logical clock for the Node by a single tick. Election + // timeouts and heartbeat timeouts are in units of ticks. + Tick() + // Campaign causes the Node to transition to candidate state and start campaigning to become leader. + Campaign(ctx context.Context) error + // Propose proposes that data be appended to the log. + Propose(ctx context.Context, data []byte) error + // ProposeConfChange proposes config change. + // At most one ConfChange can be in the process of going through consensus. + // Application needs to call ApplyConfChange when applying EntryConfChange type entry. + ProposeConfChange(ctx context.Context, cc pb.ConfChange) error + // Step advances the state machine using the given message. ctx.Err() will be returned, if any. + Step(ctx context.Context, msg pb.Message) error + + // Ready returns a channel that returns the current point-in-time state. + // Users of the Node must call Advance after retrieving the state returned by Ready. + // + // NOTE: No committed entries from the next Ready may be applied until all committed entries + // and snapshots from the previous one have finished. + Ready() <-chan Ready + + // Advance notifies the Node that the application has saved progress up to the last Ready. + // It prepares the node to return the next available Ready. + // + // The application should generally call Advance after it applies the entries in last Ready. + // + // However, as an optimization, the application may call Advance while it is applying the + // commands. For example. when the last Ready contains a snapshot, the application might take + // a long time to apply the snapshot data. To continue receiving Ready without blocking raft + // progress, it can call Advance before finishing applying the last ready. + Advance() + // ApplyConfChange applies config change to the local node. + // Returns an opaque ConfState protobuf which must be recorded + // in snapshots. Will never return nil; it returns a pointer only + // to match MemoryStorage.Compact. + ApplyConfChange(cc pb.ConfChange) *pb.ConfState + + // TransferLeadership attempts to transfer leadership to the given transferee. + TransferLeadership(ctx context.Context, lead, transferee uint64) + + // ReadIndex request a read state. The read state will be set in the ready. + // Read state has a read index. Once the application advances further than the read + // index, any linearizable read requests issued before the read request can be + // processed safely. The read state will have the same rctx attached. + ReadIndex(ctx context.Context, rctx []byte) error + + // Status returns the current status of the raft state machine. + Status() Status + // ReportUnreachable reports the given node is not reachable for the last send. + ReportUnreachable(id uint64) + // ReportSnapshot reports the status of the sent snapshot. + ReportSnapshot(id uint64, status etcdRaft.SnapshotStatus) + // Stop performs any necessary termination of the Node. + Stop() + + // Report when the node's role in the cluster changes, as either LEADER or + // NOT_LEADER + RoleChan() *channels.RingChannel +} + +type Peer struct { + ID uint64 + Context []byte +} + +// StartNode returns a new Node given configuration and a list of raft peers. +// It appends a ConfChangeAddNode entry for each given peer to the initial log. +func StartNode(c *Config, peers []Peer) Node { + r := newRaft(c) + // become the follower at term 1 and apply initial configuration + // entries of term 1 + r.becomeFollower(1, None) + for _, peer := range peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} + d, err := cc.Marshal() + if err != nil { + panic("unexpected marshal error") + } + e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} + r.raftLog.append(e) + } + // Mark these initial entries as committed. + // TODO(bdarnell): These entries are still unstable; do we need to preserve + // the invariant that committed < unstable? + r.raftLog.committed = r.raftLog.lastIndex() + // Now apply them, mainly so that the application can call Campaign + // immediately after StartNode in tests. Note that these nodes will + // be added to raft twice: here and when the application's Ready + // loop calls ApplyConfChange. The calls to addNode must come after + // all calls to raftLog.append so progress.next is set after these + // bootstrapping entries (it is an error if we try to append these + // entries since they have already been committed). + // We do not set raftLog.applied so the application will be able + // to observe all conf changes via Ready.CommittedEntries. + for _, peer := range peers { + r.addNode(peer.ID) + } + + n := newNode() + n.logger = c.Logger + go n.run(r) + return &n +} + +// RestartNode is similar to StartNode but does not take a list of peers. +// The current membership of the cluster will be restored from the Storage. +// If the caller has an existing state machine, pass in the last log index that +// has been applied to it; otherwise use zero. +func RestartNode(c *Config) Node { + r := newRaft(c) + + n := newNode() + n.logger = c.Logger + go n.run(r) + return &n +} + +type msgWithResult struct { + m pb.Message + result chan error +} + +// node is the canonical implementation of the Node interface +type node struct { + propc chan msgWithResult + recvc chan pb.Message + confc chan pb.ConfChange + confstatec chan pb.ConfState + readyc chan Ready + advancec chan struct{} + tickc chan struct{} + done chan struct{} + stop chan struct{} + status chan chan Status + + // we use a ring channel (of size 1) because we only want the node's latest + // role + rolec *channels.RingChannel + + logger Logger +} + +func newNode() node { + return node{ + propc: make(chan msgWithResult), + recvc: make(chan pb.Message), + confc: make(chan pb.ConfChange), + confstatec: make(chan pb.ConfState), + readyc: make(chan Ready), + advancec: make(chan struct{}), + // make tickc a buffered chan, so raft node can buffer some ticks when the node + // is busy processing raft messages. Raft node will resume process buffered + // ticks when it becomes idle. + tickc: make(chan struct{}, 128), + done: make(chan struct{}), + stop: make(chan struct{}), + status: make(chan chan Status), + rolec: channels.NewRingChannel(1), + } +} + +func (n *node) Stop() { + select { + case n.stop <- struct{}{}: + // Not already stopped, so trigger it + case <-n.done: + // Node has already been stopped - no need to do anything + return + } + // Block until the stop has been acknowledged by run() + <-n.done +} + +func (n *node) RoleChan() *channels.RingChannel { + return n.rolec +} + +func (n *node) run(r *raft) { + var propc chan msgWithResult + var readyc chan Ready + var advancec chan struct{} + var prevLastUnstablei, prevLastUnstablet uint64 + var havePrevLastUnstablei bool + var prevSnapi uint64 + var rd Ready + + lead := None + prevSoftSt := r.softState() + prevHardSt := emptyState + + for { + if advancec != nil { + readyc = nil + } else { + rd = newReady(r, prevSoftSt, prevHardSt) + if rd.containsUpdates() { + readyc = n.readyc + } else { + readyc = nil + } + } + + if lead != r.lead { + if r.hasLeader() { + if lead == None { + r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term) + } else { + r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term) + } + propc = n.propc + } else { + r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term) + propc = nil + } + lead = r.lead + + var role int + if lead == r.id { + role = LEADER + } else { + role = NOT_LEADER + } + n.rolec.In() <- role + } + + select { + // TODO: maybe buffer the config propose if there exists one (the way + // described in raft dissertation) + // Currently it is dropped in Step silently. + case pm := <-propc: + m := pm.m + m.From = r.id + err := r.Step(m) + if pm.result != nil { + pm.result <- err + close(pm.result) + } + case m := <-n.recvc: + // filter out response message from unknown From. + if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { + r.Step(m) // raft never returns an error + } + case cc := <-n.confc: + if cc.NodeID == None { + r.resetPendingConf() + select { + case n.confstatec <- pb.ConfState{Nodes: r.voters(), Learners: r.learners()}: + case <-n.done: + } + break + } + switch cc.Type { + case pb.ConfChangeAddNode: + r.addNode(cc.NodeID) + case pb.ConfChangeAddLearnerNode: + r.addLearner(cc.NodeID) + case pb.ConfChangeRemoveNode: + // block incoming proposal when local node is + // removed + if cc.NodeID == r.id { + propc = nil + } + r.removeNode(cc.NodeID) + case pb.ConfChangeUpdateNode: + r.resetPendingConf() + default: + panic("unexpected conf type") + } + select { + case n.confstatec <- pb.ConfState{Nodes: r.voters(), Learners: r.learners()}: + case <-n.done: + } + case <-n.tickc: + r.tick() + case readyc <- rd: + if rd.SoftState != nil { + prevSoftSt = rd.SoftState + } + if len(rd.Entries) > 0 { + prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index + prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term + havePrevLastUnstablei = true + } + if !IsEmptyHardState(rd.HardState) { + prevHardSt = rd.HardState + } + if !IsEmptySnap(rd.Snapshot) { + prevSnapi = rd.Snapshot.Metadata.Index + } + + r.msgs = nil + r.readStates = nil + advancec = n.advancec + case <-advancec: + if prevHardSt.Commit != 0 { + r.raftLog.appliedTo(prevHardSt.Commit) + } + if havePrevLastUnstablei { + r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) + havePrevLastUnstablei = false + } + r.raftLog.stableSnapTo(prevSnapi) + advancec = nil + case c := <-n.status: + c <- getStatus(r) + case <-n.stop: + close(n.done) + return + } + } +} + +// Tick increments the internal logical clock for this Node. Election timeouts +// and heartbeat timeouts are in units of ticks. +func (n *node) Tick() { + select { + case n.tickc <- struct{}{}: + case <-n.done: + default: + n.logger.Warningf("A tick missed to fire. Node blocks too long!") + } +} + +func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) } + +func (n *node) Propose(ctx context.Context, data []byte) error { + return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) +} + +func (n *node) Step(ctx context.Context, m pb.Message) error { + // ignore unexpected local messages receiving over network + if IsLocalMsg(m.Type) { + // TODO: return an error? + return nil + } + return n.step(ctx, m) +} + +func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error { + data, err := cc.Marshal() + if err != nil { + return err + } + return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}}) +} + +func (n *node) step(ctx context.Context, m pb.Message) error { + return n.stepWithWaitOption(ctx, m, false) +} + +func (n *node) stepWait(ctx context.Context, m pb.Message) error { + return n.stepWithWaitOption(ctx, m, true) +} + +// Step advances the state machine using msgs. The ctx.Err() will be returned, +// if any. +func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error { + if m.Type != pb.MsgProp { + select { + case n.recvc <- m: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-n.done: + return ErrStopped + } + } + ch := n.propc + pm := msgWithResult{m: m} + if wait { + pm.result = make(chan error, 1) + } + select { + case ch <- pm: + if !wait { + return nil + } + case <-ctx.Done(): + return ctx.Err() + case <-n.done: + return ErrStopped + } + select { + case rsp := <-pm.result: + if rsp != nil { + return rsp + } + case <-ctx.Done(): + return ctx.Err() + case <-n.done: + return ErrStopped + } + return nil +} + +func (n *node) Ready() <-chan Ready { return n.readyc } + +func (n *node) Advance() { + select { + case n.advancec <- struct{}{}: + case <-n.done: + } +} + +func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { + var cs pb.ConfState + select { + case n.confc <- cc: + case <-n.done: + } + select { + case cs = <-n.confstatec: + case <-n.done: + } + return &cs +} + +func (n *node) Status() Status { + c := make(chan Status) + select { + case n.status <- c: + return <-c + case <-n.done: + return Status{} + } +} + +func (n *node) ReportUnreachable(id uint64) { + select { + case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}: + case <-n.done: + } +} + +func (n *node) ReportSnapshot(id uint64, status etcdRaft.SnapshotStatus) { + rej := status == etcdRaft.SnapshotFailure + + select { + case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}: + case <-n.done: + } +} + +func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) { + select { + // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership + case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}: + case <-n.done: + case <-ctx.Done(): + } +} + +func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { + return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) +} + +func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { + rd := Ready{ + Entries: r.raftLog.unstableEntries(), + CommittedEntries: r.raftLog.nextEnts(), + Messages: r.msgs, + } + if softSt := r.softState(); !softSt.equal(prevSoftSt) { + rd.SoftState = softSt + } + if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { + rd.HardState = hardSt + } + if r.raftLog.unstable.snapshot != nil { + rd.Snapshot = *r.raftLog.unstable.snapshot + } + if len(r.readStates) != 0 { + rd.ReadStates = r.readStates + } + rd.MustSync = MustSync(rd.HardState, prevHardSt, len(rd.Entries)) + return rd +} + +// MustSync returns true if the hard state and count of Raft entries indicate +// that a synchronous write to persistent storage is required. +func MustSync(st, prevst pb.HardState, entsnum int) bool { + // Persistent state on all servers: + // (Updated on stable storage before responding to RPCs) + // currentTerm + // votedFor + // log entries[] + return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term +} diff --git a/raft/etcd/raft/progress.go b/raft/etcd/raft/progress.go new file mode 100644 index 0000000000..2fd99b31a5 --- /dev/null +++ b/raft/etcd/raft/progress.go @@ -0,0 +1,284 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import "fmt" + +const ( + ProgressStateProbe ProgressStateType = iota + ProgressStateReplicate + ProgressStateSnapshot +) + +type ProgressStateType uint64 + +var prstmap = [...]string{ + "ProgressStateProbe", + "ProgressStateReplicate", + "ProgressStateSnapshot", +} + +func (st ProgressStateType) String() string { return prstmap[uint64(st)] } + +// Progress represents a follower’s progress in the view of the leader. Leader maintains +// progresses of all followers, and sends entries to the follower based on its progress. +type Progress struct { + Match, Next uint64 + // State defines how the leader should interact with the follower. + // + // When in ProgressStateProbe, leader sends at most one replication message + // per heartbeat interval. It also probes actual progress of the follower. + // + // When in ProgressStateReplicate, leader optimistically increases next + // to the latest entry sent after sending replication message. This is + // an optimized state for fast replicating log entries to the follower. + // + // When in ProgressStateSnapshot, leader should have sent out snapshot + // before and stops sending any replication message. + State ProgressStateType + + // Paused is used in ProgressStateProbe. + // When Paused is true, raft should pause sending replication message to this peer. + Paused bool + // PendingSnapshot is used in ProgressStateSnapshot. + // If there is a pending snapshot, the pendingSnapshot will be set to the + // index of the snapshot. If pendingSnapshot is set, the replication process of + // this Progress will be paused. raft will not resend snapshot until the pending one + // is reported to be failed. + PendingSnapshot uint64 + + // RecentActive is true if the progress is recently active. Receiving any messages + // from the corresponding follower indicates the progress is active. + // RecentActive can be reset to false after an election timeout. + RecentActive bool + + // inflights is a sliding window for the inflight messages. + // Each inflight message contains one or more log entries. + // The max number of entries per message is defined in raft config as MaxSizePerMsg. + // Thus inflight effectively limits both the number of inflight messages + // and the bandwidth each Progress can use. + // When inflights is full, no more message should be sent. + // When a leader sends out a message, the index of the last + // entry should be added to inflights. The index MUST be added + // into inflights in order. + // When a leader receives a reply, the previous inflights should + // be freed by calling inflights.freeTo with the index of the last + // received entry. + ins *inflights + + // IsLearner is true if this progress is tracked for a learner. + IsLearner bool +} + +func (pr *Progress) resetState(state ProgressStateType) { + pr.Paused = false + pr.PendingSnapshot = 0 + pr.State = state + pr.ins.reset() +} + +func (pr *Progress) becomeProbe() { + // If the original state is ProgressStateSnapshot, progress knows that + // the pending snapshot has been sent to this peer successfully, then + // probes from pendingSnapshot + 1. + if pr.State == ProgressStateSnapshot { + pendingSnapshot := pr.PendingSnapshot + pr.resetState(ProgressStateProbe) + pr.Next = max(pr.Match+1, pendingSnapshot+1) + } else { + pr.resetState(ProgressStateProbe) + pr.Next = pr.Match + 1 + } +} + +func (pr *Progress) becomeReplicate() { + pr.resetState(ProgressStateReplicate) + pr.Next = pr.Match + 1 +} + +func (pr *Progress) becomeSnapshot(snapshoti uint64) { + pr.resetState(ProgressStateSnapshot) + pr.PendingSnapshot = snapshoti +} + +// maybeUpdate returns false if the given n index comes from an outdated message. +// Otherwise it updates the progress and returns true. +func (pr *Progress) maybeUpdate(n uint64) bool { + var updated bool + if pr.Match < n { + pr.Match = n + updated = true + pr.resume() + } + if pr.Next < n+1 { + pr.Next = n + 1 + } + return updated +} + +func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 } + +// maybeDecrTo returns false if the given to index comes from an out of order message. +// Otherwise it decreases the progress next index to min(rejected, last) and returns true. +func (pr *Progress) maybeDecrTo(rejected, last uint64) bool { + if pr.State == ProgressStateReplicate { + // the rejection must be stale if the progress has matched and "rejected" + // is smaller than "match". + if rejected <= pr.Match { + return false + } + // directly decrease next to match + 1 + pr.Next = pr.Match + 1 + return true + } + + // the rejection must be stale if "rejected" does not match next - 1 + if pr.Next-1 != rejected { + return false + } + + if pr.Next = min(rejected, last+1); pr.Next < 1 { + pr.Next = 1 + } + pr.resume() + return true +} + +func (pr *Progress) pause() { pr.Paused = true } +func (pr *Progress) resume() { pr.Paused = false } + +// IsPaused returns whether sending log entries to this node has been +// paused. A node may be paused because it has rejected recent +// MsgApps, is currently waiting for a snapshot, or has reached the +// MaxInflightMsgs limit. +func (pr *Progress) IsPaused() bool { + switch pr.State { + case ProgressStateProbe: + return pr.Paused + case ProgressStateReplicate: + return pr.ins.full() + case ProgressStateSnapshot: + return true + default: + panic("unexpected state") + } +} + +func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 } + +// needSnapshotAbort returns true if snapshot progress's Match +// is equal or higher than the pendingSnapshot. +func (pr *Progress) needSnapshotAbort() bool { + return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot +} + +func (pr *Progress) String() string { + return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot) +} + +type inflights struct { + // the starting index in the buffer + start int + // number of inflights in the buffer + count int + + // the size of the buffer + size int + + // buffer contains the index of the last entry + // inside one message. + buffer []uint64 +} + +func newInflights(size int) *inflights { + return &inflights{ + size: size, + } +} + +// add adds an inflight into inflights +func (in *inflights) add(inflight uint64) { + if in.full() { + panic("cannot add into a full inflights") + } + next := in.start + in.count + size := in.size + if next >= size { + next -= size + } + if next >= len(in.buffer) { + in.growBuf() + } + in.buffer[next] = inflight + in.count++ +} + +// grow the inflight buffer by doubling up to inflights.size. We grow on demand +// instead of preallocating to inflights.size to handle systems which have +// thousands of Raft groups per process. +func (in *inflights) growBuf() { + newSize := len(in.buffer) * 2 + if newSize == 0 { + newSize = 1 + } else if newSize > in.size { + newSize = in.size + } + newBuffer := make([]uint64, newSize) + copy(newBuffer, in.buffer) + in.buffer = newBuffer +} + +// freeTo frees the inflights smaller or equal to the given `to` flight. +func (in *inflights) freeTo(to uint64) { + if in.count == 0 || to < in.buffer[in.start] { + // out of the left side of the window + return + } + + idx := in.start + var i int + for i = 0; i < in.count; i++ { + if to < in.buffer[idx] { // found the first large inflight + break + } + + // increase index and maybe rotate + size := in.size + if idx++; idx >= size { + idx -= size + } + } + // free i inflights and set new start index + in.count -= i + in.start = idx + if in.count == 0 { + // inflights is empty, reset the start index so that we don't grow the + // buffer unnecessarily. + in.start = 0 + } +} + +func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) } + +// full returns true if the inflights is full. +func (in *inflights) full() bool { + return in.count == in.size +} + +// resets frees all inflights. +func (in *inflights) reset() { + in.count = 0 + in.start = 0 +} diff --git a/raft/etcd/raft/raft.go b/raft/etcd/raft/raft.go new file mode 100644 index 0000000000..8f39574b84 --- /dev/null +++ b/raft/etcd/raft/raft.go @@ -0,0 +1,1437 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "bytes" + "errors" + "fmt" + "math" + "math/rand" + "sort" + "strings" + "sync" + "time" + + pb "github.com/coreos/etcd/raft/raftpb" +) + +// None is a placeholder node ID used when there is no leader. +const None uint64 = 0 +const noLimit = math.MaxUint64 + +// Possible values for StateType. +const ( + StateFollower StateType = iota + StateCandidate + StateLeader + StatePreCandidate +) + +type ReadOnlyOption int + +const ( + // ReadOnlySafe guarantees the linearizability of the read only request by + // communicating with the quorum. It is the default and suggested option. + ReadOnlySafe ReadOnlyOption = iota + // ReadOnlyLeaseBased ensures linearizability of the read only request by + // relying on the leader lease. It can be affected by clock drift. + // If the clock drift is unbounded, leader might keep the lease longer than it + // should (clock can move backward/pause without any bound). ReadIndex is not safe + // in that case. + ReadOnlyLeaseBased +) + +// Possible values for CampaignType +const ( + // campaignPreElection represents the first phase of a normal election when + // Config.PreVote is true. + campaignPreElection CampaignType = "CampaignPreElection" + // campaignElection represents a normal (time-based) election (the second phase + // of the election when Config.PreVote is true). + campaignElection CampaignType = "CampaignElection" + // campaignTransfer represents the type of leader transfer + campaignTransfer CampaignType = "CampaignTransfer" +) + +// ErrProposalDropped is returned when the proposal is ignored by some cases, +// so that the proposer can be notified and fail fast. +var ErrProposalDropped = errors.New("raft proposal dropped") + +// lockedRand is a small wrapper around rand.Rand to provide +// synchronization. Only the methods needed by the code are exposed +// (e.g. Intn). +type lockedRand struct { + mu sync.Mutex + rand *rand.Rand +} + +func (r *lockedRand) Intn(n int) int { + r.mu.Lock() + v := r.rand.Intn(n) + r.mu.Unlock() + return v +} + +var globalRand = &lockedRand{ + rand: rand.New(rand.NewSource(time.Now().UnixNano())), +} + +// CampaignType represents the type of campaigning +// the reason we use the type of string instead of uint64 +// is because it's simpler to compare and fill in raft entries +type CampaignType string + +// StateType represents the role of a node in a cluster. +type StateType uint64 + +var stmap = [...]string{ + "StateFollower", + "StateCandidate", + "StateLeader", + "StatePreCandidate", +} + +func (st StateType) String() string { + return stmap[uint64(st)] +} + +// Config contains the parameters to start a raft. +type Config struct { + // ID is the identity of the local raft. ID cannot be 0. + ID uint64 + + // peers contains the IDs of all nodes (including self) in the raft cluster. It + // should only be set when starting a new raft cluster. Restarting raft from + // previous configuration will panic if peers is set. peer is private and only + // used for testing right now. + peers []uint64 + + // learners contains the IDs of all leaner nodes (including self if the local node is a leaner) in the raft cluster. + // learners only receives entries from the leader node. It does not vote or promote itself. + learners []uint64 + + // ElectionTick is the number of Node.Tick invocations that must pass between + // elections. That is, if a follower does not receive any message from the + // leader of current term before ElectionTick has elapsed, it will become + // candidate and start an election. ElectionTick must be greater than + // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid + // unnecessary leader switching. + ElectionTick int + // HeartbeatTick is the number of Node.Tick invocations that must pass between + // heartbeats. That is, a leader sends heartbeat messages to maintain its + // leadership every HeartbeatTick ticks. + HeartbeatTick int + + // Storage is the storage for raft. raft generates entries and states to be + // stored in storage. raft reads the persisted entries and states out of + // Storage when it needs. raft reads out the previous state and configuration + // out of storage when restarting. + Storage Storage + // Applied is the last applied index. It should only be set when restarting + // raft. raft will not return entries to the application smaller or equal to + // Applied. If Applied is unset when restarting, raft might return previous + // applied entries. This is a very application dependent configuration. + Applied uint64 + + // MaxSizePerMsg limits the max size of each append message. Smaller value + // lowers the raft recovery cost(initial probing and message lost during normal + // operation). On the other side, it might affect the throughput during normal + // replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per + // message. + MaxSizePerMsg uint64 + // MaxInflightMsgs limits the max number of in-flight append messages during + // optimistic replication phase. The application transportation layer usually + // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid + // overflowing that sending buffer. TODO (xiangli): feedback to application to + // limit the proposal rate? + MaxInflightMsgs int + + // CheckQuorum specifies if the leader should check quorum activity. Leader + // steps down when quorum is not active for an electionTimeout. + CheckQuorum bool + + // PreVote enables the Pre-Vote algorithm described in raft thesis section + // 9.6. This prevents disruption when a node that has been partitioned away + // rejoins the cluster. + PreVote bool + + // ReadOnlyOption specifies how the read only request is processed. + // + // ReadOnlySafe guarantees the linearizability of the read only request by + // communicating with the quorum. It is the default and suggested option. + // + // ReadOnlyLeaseBased ensures linearizability of the read only request by + // relying on the leader lease. It can be affected by clock drift. + // If the clock drift is unbounded, leader might keep the lease longer than it + // should (clock can move backward/pause without any bound). ReadIndex is not safe + // in that case. + // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased. + ReadOnlyOption ReadOnlyOption + + // Logger is the logger used for raft log. For multinode which can host + // multiple raft group, each raft group can have its own logger + Logger Logger + + // DisableProposalForwarding set to true means that followers will drop + // proposals, rather than forwarding them to the leader. One use case for + // this feature would be in a situation where the Raft leader is used to + // compute the data of a proposal, for example, adding a timestamp from a + // hybrid logical clock to data in a monotonically increasing way. Forwarding + // should be disabled to prevent a follower with an innaccurate hybrid + // logical clock from assigning the timestamp and then forwarding the data + // to the leader. + DisableProposalForwarding bool +} + +func (c *Config) validate() error { + if c.ID == None { + return errors.New("cannot use none as id") + } + + if c.HeartbeatTick <= 0 { + return errors.New("heartbeat tick must be greater than 0") + } + + if c.ElectionTick <= c.HeartbeatTick { + return errors.New("election tick must be greater than heartbeat tick") + } + + if c.Storage == nil { + return errors.New("storage cannot be nil") + } + + if c.MaxInflightMsgs <= 0 { + return errors.New("max inflight messages must be greater than 0") + } + + if c.Logger == nil { + c.Logger = raftLogger + } + + if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum { + return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased") + } + + return nil +} + +type raft struct { + id uint64 + + Term uint64 + Vote uint64 + + readStates []ReadState + + // the log + raftLog *raftLog + + maxInflight int + maxMsgSize uint64 + prs map[uint64]*Progress + learnerPrs map[uint64]*Progress + + state StateType + + // isLearner is true if the local raft node is a learner. + isLearner bool + + votes map[uint64]bool + + msgs []pb.Message + + // the leader id + lead uint64 + // leadTransferee is id of the leader transfer target when its value is not zero. + // Follow the procedure defined in raft thesis 3.10. + leadTransferee uint64 + // New configuration is ignored if there exists unapplied configuration. + pendingConf bool + + readOnly *readOnly + + // number of ticks since it reached last electionTimeout when it is leader + // or candidate. + // number of ticks since it reached last electionTimeout or received a + // valid message from current leader when it is a follower. + electionElapsed int + + // number of ticks since it reached last heartbeatTimeout. + // only leader keeps heartbeatElapsed. + heartbeatElapsed int + + checkQuorum bool + preVote bool + + heartbeatTimeout int + electionTimeout int + // randomizedElectionTimeout is a random number between + // [electiontimeout, 2 * electiontimeout - 1]. It gets reset + // when raft changes its state to follower or candidate. + randomizedElectionTimeout int + disableProposalForwarding bool + + tick func() + step stepFunc + + logger Logger +} + +func newRaft(c *Config) *raft { + if err := c.validate(); err != nil { + panic(err.Error()) + } + raftlog := newLog(c.Storage, c.Logger) + hs, cs, err := c.Storage.InitialState() + + c.Logger.Info("newRaft storage", "hardState", hs, " confState", cs) + + if err != nil { + panic(err) // TODO(bdarnell) + } + peers := c.peers + learners := c.learners + + c.Logger.Info("newRaft ", "config.peers", cs.Nodes, " config.learners", cs.Learners) + + if len(cs.Nodes) > 0 || len(cs.Learners) > 0 { + if len(peers) > 0 || len(learners) > 0 { + // TODO(bdarnell): the peers argument is always nil except in + // tests; the argument should be removed and these tests should be + // updated to specify their nodes through a snapshot. + panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)") + } + peers = cs.Nodes + learners = cs.Learners + } + r := &raft{ + id: c.ID, + lead: None, + isLearner: false, + raftLog: raftlog, + maxMsgSize: c.MaxSizePerMsg, + maxInflight: c.MaxInflightMsgs, + prs: make(map[uint64]*Progress), + learnerPrs: make(map[uint64]*Progress), + electionTimeout: c.ElectionTick, + heartbeatTimeout: c.HeartbeatTick, + logger: c.Logger, + checkQuorum: c.CheckQuorum, + preVote: c.PreVote, + readOnly: newReadOnly(c.ReadOnlyOption), + disableProposalForwarding: c.DisableProposalForwarding, + } + for _, p := range peers { + r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)} + } + for _, p := range learners { + if _, ok := r.prs[p]; ok { + panic(fmt.Sprintf("node %x is in both learner and peer list", p)) + } + r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true} + if r.id == p { + r.isLearner = true + r.logger.Infof("raft %d is a learner", r.id) + } + } + + if !isHardStateEqual(hs, emptyState) { + r.loadState(hs) + } + if c.Applied > 0 { + raftlog.appliedTo(c.Applied) + } + r.becomeFollower(r.Term, None) + + var nodesStrs []string + for _, n := range r.nodes() { + nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) + } + + r.logger.Infof("newRaft %x learner: %v [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", + r.id, r.isLearner, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) + return r +} + +func (r *raft) hasLeader() bool { return r.lead != None } + +func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} } + +func (r *raft) hardState() pb.HardState { + return pb.HardState{ + Term: r.Term, + Vote: r.Vote, + Commit: r.raftLog.committed, + } +} + +func (r *raft) quorum() int { return len(r.prs)/2 + 1 } + +func (r *raft) nodes() []uint64 { + nodes := make([]uint64, 0, len(r.prs)+len(r.learnerPrs)) + for id := range r.prs { + nodes = append(nodes, id) + } + for id := range r.learnerPrs { + nodes = append(nodes, id) + } + sort.Sort(uint64Slice(nodes)) + return nodes +} + +func (r *raft) voters() []uint64 { + nodes := make([]uint64, 0, len(r.prs)) + for id := range r.prs { + nodes = append(nodes, id) + } + + sort.Sort(uint64Slice(nodes)) + return nodes +} + +// TODO (Amal): review +func (r *raft) learners() []uint64 { + nodes := make([]uint64, 0, len(r.learnerPrs)) + + for id := range r.learnerPrs { + nodes = append(nodes, id) + } + sort.Sort(uint64Slice(nodes)) + return nodes +} + +// send persists state to stable storage and then sends to its mailbox. +func (r *raft) send(m pb.Message) { + m.From = r.id + if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp { + if m.Term == 0 { + // All {pre-,}campaign messages need to have the term set when + // sending. + // - MsgVote: m.Term is the term the node is campaigning for, + // non-zero as we increment the term when campaigning. + // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was + // granted, non-zero for the same reason MsgVote is + // - MsgPreVote: m.Term is the term the node will campaign, + // non-zero as we use m.Term to indicate the next term we'll be + // campaigning for + // - MsgPreVoteResp: m.Term is the term received in the original + // MsgPreVote if the pre-vote was granted, non-zero for the + // same reasons MsgPreVote is + panic(fmt.Sprintf("term should be set when sending %s", m.Type)) + } + } else { + if m.Term != 0 { + panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term)) + } + // do not attach term to MsgProp, MsgReadIndex + // proposals are a way to forward to the leader and + // should be treated as local message. + // MsgReadIndex is also forwarded to leader. + if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex { + m.Term = r.Term + } + } + r.msgs = append(r.msgs, m) +} + +func (r *raft) getProgress(id uint64) *Progress { + if pr, ok := r.prs[id]; ok { + return pr + } + + return r.learnerPrs[id] +} + +// sendAppend sends RPC, with entries to the given peer. +func (r *raft) sendAppend(to uint64) { + pr := r.getProgress(to) + if pr.IsPaused() { + return + } + m := pb.Message{} + m.To = to + + term, errt := r.raftLog.term(pr.Next - 1) + ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + + if errt != nil || erre != nil { // send snapshot if we failed to get term or entries + if !pr.RecentActive { + r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) + return + } + + m.Type = pb.MsgSnap + snapshot, err := r.raftLog.snapshot() + if err != nil { + if err == ErrSnapshotTemporarilyUnavailable { + r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) + return + } + panic(err) // TODO(bdarnell) + } + if IsEmptySnap(snapshot) { + panic("need non-empty snapshot") + } + m.Snapshot = snapshot + sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term + r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", + r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) + pr.becomeSnapshot(sindex) + r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) + } else { + m.Type = pb.MsgApp + m.Index = pr.Next - 1 + m.LogTerm = term + m.Entries = ents + m.Commit = r.raftLog.committed + if n := len(m.Entries); n != 0 { + switch pr.State { + // optimistically increase the next when in ProgressStateReplicate + case ProgressStateReplicate: + last := m.Entries[n-1].Index + pr.optimisticUpdate(last) + pr.ins.add(last) + case ProgressStateProbe: + pr.pause() + default: + r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) + } + } + } + r.send(m) +} + +// sendHeartbeat sends an empty MsgApp +func (r *raft) sendHeartbeat(to uint64, ctx []byte) { + // Attach the commit as min(to.matched, r.committed). + // When the leader sends out heartbeat message, + // the receiver(follower) might not be matched with the leader + // or it might not have all the committed entries. + // The leader MUST NOT forward the follower's commit to + // an unmatched index. + commit := min(r.getProgress(to).Match, r.raftLog.committed) + m := pb.Message{ + To: to, + Type: pb.MsgHeartbeat, + Commit: commit, + Context: ctx, + } + + r.send(m) +} + +func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) { + for id, pr := range r.prs { + f(id, pr) + } + + for id, pr := range r.learnerPrs { + f(id, pr) + } +} + +// bcastAppend sends RPC, with entries to all peers that are not up-to-date +// according to the progress recorded in r.prs. +func (r *raft) bcastAppend() { + r.forEachProgress(func(id uint64, _ *Progress) { + if id == r.id { + return + } + + r.sendAppend(id) + }) +} + +// bcastHeartbeat sends RPC, without entries to all the peers. +func (r *raft) bcastHeartbeat() { + lastCtx := r.readOnly.lastPendingRequestCtx() + if len(lastCtx) == 0 { + r.bcastHeartbeatWithCtx(nil) + } else { + r.bcastHeartbeatWithCtx([]byte(lastCtx)) + } +} + +func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { + r.forEachProgress(func(id uint64, _ *Progress) { + if id == r.id { + return + } + r.sendHeartbeat(id, ctx) + }) +} + +// maybeCommit attempts to advance the commit index. Returns true if +// the commit index changed (in which case the caller should call +// r.bcastAppend). +func (r *raft) maybeCommit() bool { + // TODO(bmizerany): optimize.. Currently naive + mis := make(uint64Slice, 0, len(r.prs)) + for _, p := range r.prs { + mis = append(mis, p.Match) + } + sort.Sort(sort.Reverse(mis)) + mci := mis[r.quorum()-1] + return r.raftLog.maybeCommit(mci, r.Term) +} + +func (r *raft) reset(term uint64) { + if r.Term != term { + r.Term = term + r.Vote = None + } + r.lead = None + + r.electionElapsed = 0 + r.heartbeatElapsed = 0 + r.resetRandomizedElectionTimeout() + + r.abortLeaderTransfer() + + r.votes = make(map[uint64]bool) + r.forEachProgress(func(id uint64, pr *Progress) { + *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner} + if id == r.id { + pr.Match = r.raftLog.lastIndex() + } + }) + + r.pendingConf = false + r.readOnly = newReadOnly(r.readOnly.option) +} + +func (r *raft) appendEntry(es ...pb.Entry) { + li := r.raftLog.lastIndex() + for i := range es { + es[i].Term = r.Term + es[i].Index = li + 1 + uint64(i) + } + r.raftLog.append(es...) + r.getProgress(r.id).maybeUpdate(r.raftLog.lastIndex()) + // Regardless of maybeCommit's return, our caller will call bcastAppend. + r.maybeCommit() +} + +// tickElection is run by followers and candidates after r.electionTimeout. +func (r *raft) tickElection() { + r.electionElapsed++ + + if r.promotable() && r.pastElectionTimeout() { + r.electionElapsed = 0 + r.Step(pb.Message{From: r.id, Type: pb.MsgHup}) + } +} + +// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout. +func (r *raft) tickHeartbeat() { + r.heartbeatElapsed++ + r.electionElapsed++ + + if r.electionElapsed >= r.electionTimeout { + r.electionElapsed = 0 + if r.checkQuorum { + r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum}) + } + // If current leader cannot transfer leadership in electionTimeout, it becomes leader again. + if r.state == StateLeader && r.leadTransferee != None { + r.abortLeaderTransfer() + } + } + + if r.state != StateLeader { + return + } + + if r.heartbeatElapsed >= r.heartbeatTimeout { + r.heartbeatElapsed = 0 + r.Step(pb.Message{From: r.id, Type: pb.MsgBeat}) + } +} + +func (r *raft) becomeFollower(term uint64, lead uint64) { + r.step = stepFollower + r.reset(term) + r.tick = r.tickElection + r.lead = lead + r.state = StateFollower + r.logger.Infof("%x became follower at term %d", r.id, r.Term) +} + +func (r *raft) becomeCandidate() { + // TODO(xiangli) remove the panic when the raft implementation is stable + if r.state == StateLeader { + panic("invalid transition [leader -> candidate]") + } + r.step = stepCandidate + r.reset(r.Term + 1) + r.tick = r.tickElection + r.Vote = r.id + r.state = StateCandidate + r.logger.Infof("%x became candidate at term %d", r.id, r.Term) +} + +func (r *raft) becomePreCandidate() { + // TODO(xiangli) remove the panic when the raft implementation is stable + if r.state == StateLeader { + panic("invalid transition [leader -> pre-candidate]") + } + // Becoming a pre-candidate changes our step functions and state, + // but doesn't change anything else. In particular it does not increase + // r.Term or change r.Vote. + r.step = stepCandidate + r.votes = make(map[uint64]bool) + r.tick = r.tickElection + r.lead = None + r.state = StatePreCandidate + r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term) +} + +func (r *raft) becomeLeader() { + // TODO(xiangli) remove the panic when the raft implementation is stable + if r.state == StateFollower { + panic("invalid transition [follower -> leader]") + } + r.step = stepLeader + r.reset(r.Term) + r.tick = r.tickHeartbeat + r.lead = r.id + r.state = StateLeader + ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit) + if err != nil { + r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) + } + + nconf := numOfPendingConf(ents) + if nconf > 1 { + panic("unexpected multiple uncommitted config entry") + } + if nconf == 1 { + r.pendingConf = true + } + + r.appendEntry(pb.Entry{Data: nil}) + r.logger.Infof("%x became leader at term %d", r.id, r.Term) +} + +func (r *raft) campaign(t CampaignType) { + var term uint64 + var voteMsg pb.MessageType + if t == campaignPreElection { + r.becomePreCandidate() + voteMsg = pb.MsgPreVote + // PreVote RPCs are sent for the next term before we've incremented r.Term. + term = r.Term + 1 + } else { + r.becomeCandidate() + voteMsg = pb.MsgVote + term = r.Term + } + if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) { + // We won the election after voting for ourselves (which must mean that + // this is a single-node cluster). Advance to the next state. + if t == campaignPreElection { + r.campaign(campaignElection) + } else { + r.becomeLeader() + } + return + } + for id := range r.prs { + if id == r.id { + continue + } + r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d", + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term) + + var ctx []byte + if t == campaignTransfer { + ctx = []byte(t) + } + r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx}) + } +} + +func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) { + if v { + r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term) + } else { + r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term) + } + if _, ok := r.votes[id]; !ok { + r.votes[id] = v + } + for _, vv := range r.votes { + if vv { + granted++ + } + } + return granted +} + +func (r *raft) Step(m pb.Message) error { + // Handle the message term, which may result in our stepping down to a follower. + switch { + case m.Term == 0: + // local message + case m.Term > r.Term: + if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote { + force := bytes.Equal(m.Context, []byte(campaignTransfer)) + inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout + if !force && inLease { + // If a server receives a RequestVote request within the minimum election timeout + // of hearing from a current leader, it does not update its term or grant its vote + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)", + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed) + return nil + } + } + switch { + case m.Type == pb.MsgPreVote: + // Never change our term in response to a PreVote + case m.Type == pb.MsgPreVoteResp && !m.Reject: + // We send pre-vote requests with a term in our future. If the + // pre-vote is granted, we will increment our term when we get a + // quorum. If it is not, the term comes from the node that + // rejected our vote so we should become a follower at the new + // term. + default: + r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]", + r.id, r.Term, m.Type, m.From, m.Term) + if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap { + r.becomeFollower(m.Term, m.From) + } else { + r.becomeFollower(m.Term, None) + } + } + + case m.Term < r.Term: + if r.checkQuorum && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) { + // We have received messages from a leader at a lower term. It is possible + // that these messages were simply delayed in the network, but this could + // also mean that this node has advanced its term number during a network + // partition, and it is now unable to either win an election or to rejoin + // the majority on the old term. If checkQuorum is false, this will be + // handled by incrementing term numbers in response to MsgVote with a + // higher term, but if checkQuorum is true we may not advance the term on + // MsgVote and must generate other messages to advance the term. The net + // result of these two features is to minimize the disruption caused by + // nodes that have been removed from the cluster's configuration: a + // removed node will send MsgVotes (or MsgPreVotes) which will be ignored, + // but it will not receive MsgApp or MsgHeartbeat, so it will not create + // disruptive term increases + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp}) + } else { + // ignore other cases + r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", + r.id, r.Term, m.Type, m.From, m.Term) + } + return nil + } + + switch m.Type { + case pb.MsgHup: + if r.state != StateLeader { + ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) + if err != nil { + r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) + } + if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { + r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n) + return nil + } + + r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) + if r.preVote { + r.campaign(campaignPreElection) + } else { + r.campaign(campaignElection) + } + } else { + r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) + } + + case pb.MsgVote, pb.MsgPreVote: + if r.isLearner { + // TODO: learner may need to vote, in case of node down when confchange. + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote", + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + return nil + } + // The m.Term > r.Term clause is for MsgPreVote. For MsgVote m.Term should + // always equal r.Term. + if (r.Vote == None || m.Term > r.Term || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d", + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + // When responding to Msg{Pre,}Vote messages we include the term + // from the message, not the local term. To see why consider the + // case where a single node was previously partitioned away and + // it's local term is now of date. If we include the local term + // (recall that for pre-votes we don't update the local term), the + // (pre-)campaigning node on the other end will proceed to ignore + // the message (it ignores all out of date messages). + // The term in the original message and current local term are the + // same in the case of regular votes, but different for pre-votes. + r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)}) + if m.Type == pb.MsgVote { + // Only record real votes. + r.electionElapsed = 0 + r.Vote = m.From + } + } else { + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d", + r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term) + r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true}) + } + + default: + err := r.step(r, m) + if err != nil { + return err + } + } + return nil +} + +type stepFunc func(r *raft, m pb.Message) error + +func stepLeader(r *raft, m pb.Message) error { + // These message types do not require any progress for m.From. + switch m.Type { + case pb.MsgBeat: + r.bcastHeartbeat() + return nil + case pb.MsgCheckQuorum: + if !r.checkQuorumActive() { + r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id) + r.becomeFollower(r.Term, None) + } + return nil + case pb.MsgProp: + if len(m.Entries) == 0 { + r.logger.Panicf("%x stepped empty MsgProp", r.id) + } + if _, ok := r.prs[r.id]; !ok { + // If we are not currently a member of the range (i.e. this node + // was removed from the configuration while serving as leader), + // drop any new proposals. + return ErrProposalDropped + } + if r.leadTransferee != None { + r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) + return ErrProposalDropped + } + + for i, e := range m.Entries { + if e.Type == pb.EntryConfChange { + if r.pendingConf { + r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String()) + m.Entries[i] = pb.Entry{Type: pb.EntryNormal} + } + r.pendingConf = true + } + } + r.appendEntry(m.Entries...) + r.bcastAppend() + return nil + case pb.MsgReadIndex: + if r.quorum() > 1 { + if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { + // Reject read only request when this leader has not committed any log entry at its term. + return nil + } + + // thinking: use an interally defined context instead of the user given context. + // We can express this in terms of the term and index instead of a user-supplied value. + // This would allow multiple reads to piggyback on the same message. + switch r.readOnly.option { + case ReadOnlySafe: + r.readOnly.addRequest(r.raftLog.committed, m) + r.bcastHeartbeatWithCtx(m.Entries[0].Data) + case ReadOnlyLeaseBased: + ri := r.raftLog.committed + if m.From == None || m.From == r.id { // from local member + r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) + } else { + r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) + } + } + } else { + r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) + } + + return nil + } + + // All other message types require a progress for m.From (pr). + pr := r.getProgress(m.From) + if pr == nil { + r.logger.Debugf("%x no progress available for %x", r.id, m.From) + return nil + } + switch m.Type { + case pb.MsgAppResp: + pr.RecentActive = true + + if m.Reject { + r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d", + r.id, m.RejectHint, m.From, m.Index) + if pr.maybeDecrTo(m.Index, m.RejectHint) { + r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) + if pr.State == ProgressStateReplicate { + pr.becomeProbe() + } + r.sendAppend(m.From) + } + } else { + oldPaused := pr.IsPaused() + if pr.maybeUpdate(m.Index) { + switch { + case pr.State == ProgressStateProbe: + pr.becomeReplicate() + case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort(): + r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + pr.becomeProbe() + case pr.State == ProgressStateReplicate: + pr.ins.freeTo(m.Index) + } + + if r.maybeCommit() { + r.bcastAppend() + } else if oldPaused { + // update() reset the wait state on this node. If we had delayed sending + // an update before, send it now. + r.sendAppend(m.From) + } + // Transfer leadership is in progress. + if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { + r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From) + r.sendTimeoutNow(m.From) + } + } + } + case pb.MsgHeartbeatResp: + pr.RecentActive = true + pr.resume() + + // free one slot for the full inflights window to allow progress. + if pr.State == ProgressStateReplicate && pr.ins.full() { + pr.ins.freeFirstOne() + } + if pr.Match < r.raftLog.lastIndex() { + r.sendAppend(m.From) + } + + if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { + return nil + } + + ackCount := r.readOnly.recvAck(m) + if ackCount < r.quorum() { + return nil + } + + rss := r.readOnly.advance(m) + for _, rs := range rss { + req := rs.req + if req.From == None || req.From == r.id { // from local member + r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data}) + } else { + r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries}) + } + } + case pb.MsgSnapStatus: + if pr.State != ProgressStateSnapshot { + return nil + } + if !m.Reject { + pr.becomeProbe() + r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + } else { + pr.snapshotFailure() + pr.becomeProbe() + r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + } + // If snapshot finish, wait for the msgAppResp from the remote node before sending + // out the next msgApp. + // If snapshot failure, wait for a heartbeat interval before next try + pr.pause() + case pb.MsgUnreachable: + // During optimistic replication, if the remote becomes unreachable, + // there is huge probability that a MsgApp is lost. + if pr.State == ProgressStateReplicate { + pr.becomeProbe() + } + r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) + case pb.MsgTransferLeader: + if pr.IsLearner { + r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id) + return nil + } + leadTransferee := m.From + lastLeadTransferee := r.leadTransferee + if lastLeadTransferee != None { + if lastLeadTransferee == leadTransferee { + r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x", + r.id, r.Term, leadTransferee, leadTransferee) + return nil + } + r.abortLeaderTransfer() + r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee) + } + if leadTransferee == r.id { + r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id) + return nil + } + // Transfer leadership to third party. + r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee) + // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed. + r.electionElapsed = 0 + r.leadTransferee = leadTransferee + if pr.Match == r.raftLog.lastIndex() { + r.sendTimeoutNow(leadTransferee) + r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) + } else { + r.sendAppend(leadTransferee) + } + } + + return nil +} + +// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is +// whether they respond to MsgVoteResp or MsgPreVoteResp. +func stepCandidate(r *raft, m pb.Message) error { + // Only handle vote responses corresponding to our candidacy (while in + // StateCandidate, we may get stale MsgPreVoteResp messages in this term from + // our pre-candidate state). + var myVoteRespType pb.MessageType + if r.state == StatePreCandidate { + myVoteRespType = pb.MsgPreVoteResp + } else { + myVoteRespType = pb.MsgVoteResp + } + switch m.Type { + case pb.MsgProp: + r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) + return ErrProposalDropped + case pb.MsgApp: + r.becomeFollower(r.Term, m.From) + r.handleAppendEntries(m) + case pb.MsgHeartbeat: + r.becomeFollower(r.Term, m.From) + r.handleHeartbeat(m) + case pb.MsgSnap: + r.becomeFollower(m.Term, m.From) + r.handleSnapshot(m) + case myVoteRespType: + gr := r.poll(m.From, m.Type, !m.Reject) + r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr) + switch r.quorum() { + case gr: + if r.state == StatePreCandidate { + r.campaign(campaignElection) + } else { + r.becomeLeader() + r.bcastAppend() + } + case len(r.votes) - gr: + r.becomeFollower(r.Term, None) + } + case pb.MsgTimeoutNow: + r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From) + } + + return nil +} + +func stepFollower(r *raft, m pb.Message) error { + switch m.Type { + case pb.MsgProp: + if r.lead == None { + r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) + return ErrProposalDropped + } else if r.disableProposalForwarding { + r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term) + return ErrProposalDropped + } + m.To = r.lead + r.send(m) + case pb.MsgApp: + r.electionElapsed = 0 + r.lead = m.From + r.handleAppendEntries(m) + case pb.MsgHeartbeat: + r.electionElapsed = 0 + r.lead = m.From + r.handleHeartbeat(m) + case pb.MsgSnap: + r.electionElapsed = 0 + r.lead = m.From + r.handleSnapshot(m) + case pb.MsgTransferLeader: + if r.lead == None { + r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term) + return nil + } + m.To = r.lead + r.send(m) + case pb.MsgTimeoutNow: + if r.promotable() { + r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From) + // Leadership transfers never use pre-vote even if r.preVote is true; we + // know we are not recovering from a partition so there is no need for the + // extra round trip. + r.campaign(campaignTransfer) + } else { + r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From) + } + case pb.MsgReadIndex: + if r.lead == None { + r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term) + return nil + } + m.To = r.lead + r.send(m) + case pb.MsgReadIndexResp: + if len(m.Entries) != 1 { + r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries)) + return nil + } + r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data}) + } + + return nil +} + +func (r *raft) handleAppendEntries(m pb.Message) { + if m.Index < r.raftLog.committed { + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) + return + } + + if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) + } else { + r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", + r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) + } +} + +func (r *raft) handleHeartbeat(m pb.Message) { + r.raftLog.commitTo(m.Commit) + r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context}) +} + +func (r *raft) handleSnapshot(m pb.Message) { + sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term + if r.restore(m.Snapshot) { + r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", + r.id, r.raftLog.committed, sindex, sterm) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) + } else { + r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", + r.id, r.raftLog.committed, sindex, sterm) + r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) + } +} + +// restore recovers the state machine from a snapshot. It restores the log and the +// configuration of state machine. +func (r *raft) restore(s pb.Snapshot) bool { + if s.Metadata.Index <= r.raftLog.committed { + return false + } + if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", + r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) + r.raftLog.commitTo(s.Metadata.Index) + return false + } + + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", + r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) + + r.raftLog.restore(s) + r.prs = make(map[uint64]*Progress) + r.learnerPrs = make(map[uint64]*Progress) + r.restoreNode(s.Metadata.ConfState.Nodes, false) + r.restoreNode(s.Metadata.ConfState.Learners, true) + return true +} + +func (r *raft) restoreNode(nodes []uint64, isLearner bool) { + for _, n := range nodes { + match, next := uint64(0), r.raftLog.lastIndex()+1 + if n == r.id { + match = next - 1 + r.isLearner = isLearner + } + r.setProgress(n, match, next, isLearner) + r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n)) + } +} + +// promotable indicates whether state machine can be promoted to leader, +// which is true when its own id is in progress list. +func (r *raft) promotable() bool { + _, ok := r.prs[r.id] + return ok +} + +func (r *raft) addNode(id uint64) { + r.addNodeOrLearnerNode(id, false) +} + +func (r *raft) addLearner(id uint64) { + r.addNodeOrLearnerNode(id, true) +} + +func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { + r.pendingConf = false + pr := r.getProgress(id) + if pr == nil { + r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) + } else { + if isLearner && !pr.IsLearner { + // can only change Learner to Voter + r.logger.Infof("%x ignored addLeaner: do not support changing %x from raft peer to learner.", r.id, id) + return + } + + if isLearner == pr.IsLearner { + // Ignore any redundant addNode calls (which can happen because the + // initial bootstrapping entries are applied twice). + return + } + + // change Learner to Voter, use origin Learner progress + delete(r.learnerPrs, id) + pr.IsLearner = false + r.prs[id] = pr + } + + if r.id == id { + r.isLearner = isLearner + } + + // When a node is first added, we should mark it as recently active. + // Otherwise, CheckQuorum may cause us to step down if it is invoked + // before the added node has a chance to communicate with us. + pr = r.getProgress(id) + pr.RecentActive = true +} + +func (r *raft) removeNode(id uint64) { + r.delProgress(id) + r.pendingConf = false + + // do not try to commit or abort transferring if there is no nodes in the cluster. + if len(r.prs) == 0 && len(r.learnerPrs) == 0 { + return + } + + // The quorum size is now smaller, so see if any pending entries can + // be committed. + if r.maybeCommit() { + r.bcastAppend() + } + // If the removed node is the leadTransferee, then abort the leadership transferring. + if r.state == StateLeader && r.leadTransferee == id { + r.abortLeaderTransfer() + } +} + +func (r *raft) resetPendingConf() { r.pendingConf = false } + +func (r *raft) setProgress(id, match, next uint64, isLearner bool) { + if !isLearner { + delete(r.learnerPrs, id) + r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)} + return + } + + if _, ok := r.prs[id]; ok { + panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id)) + } + r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true} +} + +func (r *raft) delProgress(id uint64) { + delete(r.prs, id) + delete(r.learnerPrs, id) +} + +func (r *raft) loadState(state pb.HardState) { + if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { + r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) + } + r.raftLog.committed = state.Commit + r.Term = state.Term + r.Vote = state.Vote +} + +// pastElectionTimeout returns true iff r.electionElapsed is greater +// than or equal to the randomized election timeout in +// [electiontimeout, 2 * electiontimeout - 1]. +func (r *raft) pastElectionTimeout() bool { + return r.electionElapsed >= r.randomizedElectionTimeout +} + +func (r *raft) resetRandomizedElectionTimeout() { + r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout) +} + +// checkQuorumActive returns true if the quorum is active from +// the view of the local raft state machine. Otherwise, it returns +// false. +// checkQuorumActive also resets all RecentActive to false. +func (r *raft) checkQuorumActive() bool { + var act int + + r.forEachProgress(func(id uint64, pr *Progress) { + if id == r.id { // self is always active + act++ + return + } + + if pr.RecentActive && !pr.IsLearner { + act++ + } + + pr.RecentActive = false + }) + + return act >= r.quorum() +} + +func (r *raft) sendTimeoutNow(to uint64) { + r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow}) +} + +func (r *raft) abortLeaderTransfer() { + r.leadTransferee = None +} + +func numOfPendingConf(ents []pb.Entry) int { + n := 0 + for i := range ents { + if ents[i].Type == pb.EntryConfChange { + n++ + } + } + return n +} diff --git a/raft/etcd/raft/rawnode.go b/raft/etcd/raft/rawnode.go new file mode 100644 index 0000000000..1ce67a0603 --- /dev/null +++ b/raft/etcd/raft/rawnode.go @@ -0,0 +1,267 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "errors" + + etcdRaft "github.com/coreos/etcd/raft" + pb "github.com/coreos/etcd/raft/raftpb" +) + +// ErrStepLocalMsg is returned when try to step a local raft message +var ErrStepLocalMsg = errors.New("raft: cannot step raft local message") + +// ErrStepPeerNotFound is returned when try to step a response message +// but there is no peer found in raft.prs for that node. +var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found") + +// RawNode is a thread-unsafe Node. +// The methods of this struct correspond to the methods of Node and are described +// more fully there. +type RawNode struct { + raft *raft + prevSoftSt *SoftState + prevHardSt pb.HardState +} + +func (rn *RawNode) newReady() Ready { + return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt) +} + +func (rn *RawNode) commitReady(rd Ready) { + if rd.SoftState != nil { + rn.prevSoftSt = rd.SoftState + } + if !IsEmptyHardState(rd.HardState) { + rn.prevHardSt = rd.HardState + } + if rn.prevHardSt.Commit != 0 { + // In most cases, prevHardSt and rd.HardState will be the same + // because when there are new entries to apply we just sent a + // HardState with an updated Commit value. However, on initial + // startup the two are different because we don't send a HardState + // until something changes, but we do send any un-applied but + // committed entries (and previously-committed entries may be + // incorporated into the snapshot, even if rd.CommittedEntries is + // empty). Therefore we mark all committed entries as applied + // whether they were included in rd.HardState or not. + rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit) + } + if len(rd.Entries) > 0 { + e := rd.Entries[len(rd.Entries)-1] + rn.raft.raftLog.stableTo(e.Index, e.Term) + } + if !IsEmptySnap(rd.Snapshot) { + rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index) + } + if len(rd.ReadStates) != 0 { + rn.raft.readStates = nil + } +} + +// NewRawNode returns a new RawNode given configuration and a list of raft peers. +func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { + if config.ID == 0 { + panic("config.ID must not be zero") + } + r := newRaft(config) + rn := &RawNode{ + raft: r, + } + lastIndex, err := config.Storage.LastIndex() + if err != nil { + panic(err) // TODO(bdarnell) + } + // If the log is empty, this is a new RawNode (like StartNode); otherwise it's + // restoring an existing RawNode (like RestartNode). + // TODO(bdarnell): rethink RawNode initialization and whether the application needs + // to be able to tell us when it expects the RawNode to exist. + if lastIndex == 0 { + r.becomeFollower(1, None) + ents := make([]pb.Entry, len(peers)) + for i, peer := range peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context} + data, err := cc.Marshal() + if err != nil { + panic("unexpected marshal error") + } + + ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data} + } + r.raftLog.append(ents...) + r.raftLog.committed = uint64(len(ents)) + for _, peer := range peers { + r.addNode(peer.ID) + } + } + + // Set the initial hard and soft states after performing all initialization. + rn.prevSoftSt = r.softState() + if lastIndex == 0 { + rn.prevHardSt = emptyState + } else { + rn.prevHardSt = r.hardState() + } + + return rn, nil +} + +// Tick advances the internal logical clock by a single tick. +func (rn *RawNode) Tick() { + rn.raft.tick() +} + +// TickQuiesced advances the internal logical clock by a single tick without +// performing any other state machine processing. It allows the caller to avoid +// periodic heartbeats and elections when all of the peers in a Raft group are +// known to be at the same state. Expected usage is to periodically invoke Tick +// or TickQuiesced depending on whether the group is "active" or "quiesced". +// +// WARNING: Be very careful about using this method as it subverts the Raft +// state machine. You should probably be using Tick instead. +func (rn *RawNode) TickQuiesced() { + rn.raft.electionElapsed++ +} + +// Campaign causes this RawNode to transition to candidate state. +func (rn *RawNode) Campaign() error { + return rn.raft.Step(pb.Message{ + Type: pb.MsgHup, + }) +} + +// Propose proposes data be appended to the raft log. +func (rn *RawNode) Propose(data []byte) error { + return rn.raft.Step(pb.Message{ + Type: pb.MsgProp, + From: rn.raft.id, + Entries: []pb.Entry{ + {Data: data}, + }}) +} + +// ProposeConfChange proposes a config change. +func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error { + data, err := cc.Marshal() + if err != nil { + return err + } + return rn.raft.Step(pb.Message{ + Type: pb.MsgProp, + Entries: []pb.Entry{ + {Type: pb.EntryConfChange, Data: data}, + }, + }) +} + +// ApplyConfChange applies a config change to the local node. +func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { + if cc.NodeID == None { + rn.raft.resetPendingConf() + return &pb.ConfState{Nodes: rn.raft.voters(), Learners: rn.raft.learners()} + } + switch cc.Type { + case pb.ConfChangeAddNode: + rn.raft.addNode(cc.NodeID) + case pb.ConfChangeAddLearnerNode: + rn.raft.addLearner(cc.NodeID) + case pb.ConfChangeRemoveNode: + rn.raft.removeNode(cc.NodeID) + case pb.ConfChangeUpdateNode: + rn.raft.resetPendingConf() + default: + panic("unexpected conf type") + } + return &pb.ConfState{Nodes: rn.raft.voters(), Learners: rn.raft.learners()} +} + +// Step advances the state machine using the given message. +func (rn *RawNode) Step(m pb.Message) error { + // ignore unexpected local messages receiving over network + if IsLocalMsg(m.Type) { + return ErrStepLocalMsg + } + if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) { + return rn.raft.Step(m) + } + return ErrStepPeerNotFound +} + +// Ready returns the current point-in-time state of this RawNode. +func (rn *RawNode) Ready() Ready { + rd := rn.newReady() + rn.raft.msgs = nil + return rd +} + +// HasReady called when RawNode user need to check if any Ready pending. +// Checking logic in this method should be consistent with Ready.containsUpdates(). +func (rn *RawNode) HasReady() bool { + r := rn.raft + if !r.softState().equal(rn.prevSoftSt) { + return true + } + if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) { + return true + } + if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) { + return true + } + if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() { + return true + } + if len(r.readStates) != 0 { + return true + } + return false +} + +// Advance notifies the RawNode that the application has applied and saved progress in the +// last Ready results. +func (rn *RawNode) Advance(rd Ready) { + rn.commitReady(rd) +} + +// Status returns the current status of the given group. +func (rn *RawNode) Status() *Status { + status := getStatus(rn.raft) + return &status +} + +// ReportUnreachable reports the given node is not reachable for the last send. +func (rn *RawNode) ReportUnreachable(id uint64) { + _ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id}) +} + +// ReportSnapshot reports the status of the sent snapshot. +func (rn *RawNode) ReportSnapshot(id uint64, status etcdRaft.SnapshotStatus) { + rej := status == etcdRaft.SnapshotFailure + + _ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}) +} + +// TransferLeader tries to transfer leadership to the given transferee. +func (rn *RawNode) TransferLeader(transferee uint64) { + _ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee}) +} + +// ReadIndex requests a read state. The read state will be set in ready. +// Read State has a read index. Once the application advances further than the read +// index, any linearizable read requests issued before the read request can be +// processed safely. The read state will have the same rctx attached. +func (rn *RawNode) ReadIndex(rctx []byte) { + _ = rn.raft.Step(pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) +} diff --git a/raft/etcd/raft/read_only.go b/raft/etcd/raft/read_only.go new file mode 100644 index 0000000000..c4f7db9a54 --- /dev/null +++ b/raft/etcd/raft/read_only.go @@ -0,0 +1,118 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import pb "github.com/coreos/etcd/raft/raftpb" + +// ReadState provides state for read only query. +// It's caller's responsibility to call ReadIndex first before getting +// this state from ready, it's also caller's duty to differentiate if this +// state is what it requests through RequestCtx, eg. given a unique id as +// RequestCtx +type ReadState struct { + Index uint64 + RequestCtx []byte +} + +type readIndexStatus struct { + req pb.Message + index uint64 + acks map[uint64]struct{} +} + +type readOnly struct { + option ReadOnlyOption + pendingReadIndex map[string]*readIndexStatus + readIndexQueue []string +} + +func newReadOnly(option ReadOnlyOption) *readOnly { + return &readOnly{ + option: option, + pendingReadIndex: make(map[string]*readIndexStatus), + } +} + +// addRequest adds a read only reuqest into readonly struct. +// `index` is the commit index of the raft state machine when it received +// the read only request. +// `m` is the original read only request message from the local or remote node. +func (ro *readOnly) addRequest(index uint64, m pb.Message) { + ctx := string(m.Entries[0].Data) + if _, ok := ro.pendingReadIndex[ctx]; ok { + return + } + ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})} + ro.readIndexQueue = append(ro.readIndexQueue, ctx) +} + +// recvAck notifies the readonly struct that the raft state machine received +// an acknowledgment of the heartbeat that attached with the read only request +// context. +func (ro *readOnly) recvAck(m pb.Message) int { + rs, ok := ro.pendingReadIndex[string(m.Context)] + if !ok { + return 0 + } + + rs.acks[m.From] = struct{}{} + // add one to include an ack from local node + return len(rs.acks) + 1 +} + +// advance advances the read only request queue kept by the readonly struct. +// It dequeues the requests until it finds the read only request that has +// the same context as the given `m`. +func (ro *readOnly) advance(m pb.Message) []*readIndexStatus { + var ( + i int + found bool + ) + + ctx := string(m.Context) + rss := []*readIndexStatus{} + + for _, okctx := range ro.readIndexQueue { + i++ + rs, ok := ro.pendingReadIndex[okctx] + if !ok { + panic("cannot find corresponding read state from pending map") + } + rss = append(rss, rs) + if okctx == ctx { + found = true + break + } + } + + if found { + ro.readIndexQueue = ro.readIndexQueue[i:] + for _, rs := range rss { + delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data)) + } + return rss + } + + return nil +} + +// lastPendingRequestCtx returns the context of the last pending read only +// request in readonly struct. +func (ro *readOnly) lastPendingRequestCtx() string { + if len(ro.readIndexQueue) == 0 { + return "" + } + return ro.readIndexQueue[len(ro.readIndexQueue)-1] +} diff --git a/raft/etcd/raft/status.go b/raft/etcd/raft/status.go new file mode 100644 index 0000000000..ed2d037da1 --- /dev/null +++ b/raft/etcd/raft/status.go @@ -0,0 +1,88 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "fmt" + + pb "github.com/coreos/etcd/raft/raftpb" +) + +type Status struct { + ID uint64 + + pb.HardState + SoftState + + Applied uint64 + Progress map[uint64]Progress + + LeadTransferee uint64 +} + +// getStatus gets a copy of the current raft status. +func getStatus(r *raft) Status { + s := Status{ + ID: r.id, + LeadTransferee: r.leadTransferee, + } + + s.HardState = r.hardState() + s.SoftState = *r.softState() + + s.Applied = r.raftLog.applied + + if s.RaftState == StateLeader { + s.Progress = make(map[uint64]Progress) + for id, p := range r.prs { + s.Progress[id] = *p + } + + for id, p := range r.learnerPrs { + s.Progress[id] = *p + } + } + + return s +} + +// MarshalJSON translates the raft status into JSON. +// TODO: try to simplify this by introducing ID type into raft +func (s Status) MarshalJSON() ([]byte, error) { + j := fmt.Sprintf(`{"id":"%x","term":%d,"vote":"%x","commit":%d,"lead":"%x","raftState":%q,"applied":%d,"progress":{`, + s.ID, s.Term, s.Vote, s.Commit, s.Lead, s.RaftState, s.Applied) + + if len(s.Progress) == 0 { + j += "}," + } else { + for k, v := range s.Progress { + subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"state":%q},`, k, v.Match, v.Next, v.State) + j += subj + } + // remove the trailing "," + j = j[:len(j)-1] + "}," + } + + j += fmt.Sprintf(`"leadtransferee":"%x"}`, s.LeadTransferee) + return []byte(j), nil +} + +func (s Status) String() string { + b, err := s.MarshalJSON() + if err != nil { + raftLogger.Panicf("unexpected error: %v", err) + } + return string(b) +} diff --git a/raft/etcd/raft/storage.go b/raft/etcd/raft/storage.go new file mode 100644 index 0000000000..d8cf9c16ce --- /dev/null +++ b/raft/etcd/raft/storage.go @@ -0,0 +1,271 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "errors" + "sync" + + pb "github.com/coreos/etcd/raft/raftpb" +) + +// ErrCompacted is returned by Storage.Entries/Compact when a requested +// index is unavailable because it predates the last snapshot. +var ErrCompacted = errors.New("requested index is unavailable due to compaction") + +// ErrSnapOutOfDate is returned by Storage.CreateSnapshot when a requested +// index is older than the existing snapshot. +var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot") + +// ErrUnavailable is returned by Storage interface when the requested log entries +// are unavailable. +var ErrUnavailable = errors.New("requested entry at index is unavailable") + +// ErrSnapshotTemporarilyUnavailable is returned by the Storage interface when the required +// snapshot is temporarily unavailable. +var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unavailable") + +// Storage is an interface that may be implemented by the application +// to retrieve log entries from storage. +// +// If any Storage method returns an error, the raft instance will +// become inoperable and refuse to participate in elections; the +// application is responsible for cleanup and recovery in this case. +type Storage interface { + // InitialState returns the saved HardState and ConfState information. + InitialState() (pb.HardState, pb.ConfState, error) + // Entries returns a slice of log entries in the range [lo,hi). + // MaxSize limits the total size of the log entries returned, but + // Entries returns at least one entry if any. + Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) + // Term returns the term of entry i, which must be in the range + // [FirstIndex()-1, LastIndex()]. The term of the entry before + // FirstIndex is retained for matching purposes even though the + // rest of that entry may not be available. + Term(i uint64) (uint64, error) + // LastIndex returns the index of the last entry in the log. + LastIndex() (uint64, error) + // FirstIndex returns the index of the first log entry that is + // possibly available via Entries (older entries have been incorporated + // into the latest Snapshot; if storage only contains the dummy entry the + // first log entry is not available). + FirstIndex() (uint64, error) + // Snapshot returns the most recent snapshot. + // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable, + // so raft state machine could know that Storage needs some time to prepare + // snapshot and call Snapshot later. + Snapshot() (pb.Snapshot, error) +} + +// MemoryStorage implements the Storage interface backed by an +// in-memory array. +type MemoryStorage struct { + // Protects access to all fields. Most methods of MemoryStorage are + // run on the raft goroutine, but Append() is run on an application + // goroutine. + sync.Mutex + + hardState pb.HardState + snapshot pb.Snapshot + // ents[i] has raft log position i+snapshot.Metadata.Index + ents []pb.Entry +} + +// NewMemoryStorage creates an empty MemoryStorage. +func NewMemoryStorage() *MemoryStorage { + return &MemoryStorage{ + // When starting from scratch populate the list with a dummy entry at term zero. + ents: make([]pb.Entry, 1), + } +} + +// InitialState implements the Storage interface. +func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) { + return ms.hardState, ms.snapshot.Metadata.ConfState, nil +} + +// SetHardState saves the current HardState. +func (ms *MemoryStorage) SetHardState(st pb.HardState) error { + ms.Lock() + defer ms.Unlock() + ms.hardState = st + return nil +} + +// Entries implements the Storage interface. +func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) { + ms.Lock() + defer ms.Unlock() + offset := ms.ents[0].Index + if lo <= offset { + return nil, ErrCompacted + } + if hi > ms.lastIndex()+1 { + raftLogger.Panicf("entries' hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex()) + } + // only contains dummy entries. + if len(ms.ents) == 1 { + return nil, ErrUnavailable + } + + ents := ms.ents[lo-offset : hi-offset] + return limitSize(ents, maxSize), nil +} + +// Term implements the Storage interface. +func (ms *MemoryStorage) Term(i uint64) (uint64, error) { + ms.Lock() + defer ms.Unlock() + offset := ms.ents[0].Index + if i < offset { + return 0, ErrCompacted + } + if int(i-offset) >= len(ms.ents) { + return 0, ErrUnavailable + } + return ms.ents[i-offset].Term, nil +} + +// LastIndex implements the Storage interface. +func (ms *MemoryStorage) LastIndex() (uint64, error) { + ms.Lock() + defer ms.Unlock() + return ms.lastIndex(), nil +} + +func (ms *MemoryStorage) lastIndex() uint64 { + return ms.ents[0].Index + uint64(len(ms.ents)) - 1 +} + +// FirstIndex implements the Storage interface. +func (ms *MemoryStorage) FirstIndex() (uint64, error) { + ms.Lock() + defer ms.Unlock() + return ms.firstIndex(), nil +} + +func (ms *MemoryStorage) firstIndex() uint64 { + return ms.ents[0].Index + 1 +} + +// Snapshot implements the Storage interface. +func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) { + ms.Lock() + defer ms.Unlock() + return ms.snapshot, nil +} + +// ApplySnapshot overwrites the contents of this Storage object with +// those of the given snapshot. +func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error { + ms.Lock() + defer ms.Unlock() + + //handle check for old snapshot being applied + msIndex := ms.snapshot.Metadata.Index + snapIndex := snap.Metadata.Index + if msIndex >= snapIndex { + return ErrSnapOutOfDate + } + + ms.snapshot = snap + ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}} + return nil +} + +// CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and +// can be used to reconstruct the state at that point. +// If any configuration changes have been made since the last compaction, +// the result of the last ApplyConfChange must be passed in. +func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error) { + ms.Lock() + defer ms.Unlock() + if i <= ms.snapshot.Metadata.Index { + return pb.Snapshot{}, ErrSnapOutOfDate + } + + offset := ms.ents[0].Index + if i > ms.lastIndex() { + raftLogger.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex()) + } + + ms.snapshot.Metadata.Index = i + ms.snapshot.Metadata.Term = ms.ents[i-offset].Term + if cs != nil { + ms.snapshot.Metadata.ConfState = *cs + } + ms.snapshot.Data = data + return ms.snapshot, nil +} + +// Compact discards all log entries prior to compactIndex. +// It is the application's responsibility to not attempt to compact an index +// greater than raftLog.applied. +func (ms *MemoryStorage) Compact(compactIndex uint64) error { + ms.Lock() + defer ms.Unlock() + offset := ms.ents[0].Index + if compactIndex <= offset { + return ErrCompacted + } + if compactIndex > ms.lastIndex() { + raftLogger.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex()) + } + + i := compactIndex - offset + ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i) + ents[0].Index = ms.ents[i].Index + ents[0].Term = ms.ents[i].Term + ents = append(ents, ms.ents[i+1:]...) + ms.ents = ents + return nil +} + +// Append the new entries to storage. +// TODO (xiangli): ensure the entries are continuous and +// entries[0].Index > ms.entries[0].Index +func (ms *MemoryStorage) Append(entries []pb.Entry) error { + if len(entries) == 0 { + return nil + } + + ms.Lock() + defer ms.Unlock() + + first := ms.firstIndex() + last := entries[0].Index + uint64(len(entries)) - 1 + + // shortcut if there is no new entry. + if last < first { + return nil + } + // truncate compacted entries + if first > entries[0].Index { + entries = entries[first-entries[0].Index:] + } + + offset := entries[0].Index - ms.ents[0].Index + switch { + case uint64(len(ms.ents)) > offset: + ms.ents = append([]pb.Entry{}, ms.ents[:offset]...) + ms.ents = append(ms.ents, entries...) + case uint64(len(ms.ents)) == offset: + ms.ents = append(ms.ents, entries...) + default: + raftLogger.Panicf("missing log entry [last: %d, append at: %d]", + ms.lastIndex(), entries[0].Index) + } + return nil +} diff --git a/raft/etcd/raft/util.go b/raft/etcd/raft/util.go new file mode 100644 index 0000000000..db32c4dce8 --- /dev/null +++ b/raft/etcd/raft/util.go @@ -0,0 +1,129 @@ +// Copyright 2015 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package etcd + +import ( + "bytes" + "fmt" + + pb "github.com/coreos/etcd/raft/raftpb" +) + +func (st StateType) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%q", st.String())), nil +} + +// uint64Slice implements sort interface +type uint64Slice []uint64 + +func (p uint64Slice) Len() int { return len(p) } +func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func min(a, b uint64) uint64 { + if a > b { + return b + } + return a +} + +func max(a, b uint64) uint64 { + if a > b { + return a + } + return b +} + +func IsLocalMsg(msgt pb.MessageType) bool { + return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable || + msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum +} + +func IsResponseMsg(msgt pb.MessageType) bool { + return msgt == pb.MsgAppResp || msgt == pb.MsgVoteResp || msgt == pb.MsgHeartbeatResp || msgt == pb.MsgUnreachable || msgt == pb.MsgPreVoteResp +} + +// voteResponseType maps vote and prevote message types to their corresponding responses. +func voteRespMsgType(msgt pb.MessageType) pb.MessageType { + switch msgt { + case pb.MsgVote: + return pb.MsgVoteResp + case pb.MsgPreVote: + return pb.MsgPreVoteResp + default: + panic(fmt.Sprintf("not a vote message: %s", msgt)) + } +} + +// EntryFormatter can be implemented by the application to provide human-readable formatting +// of entry data. Nil is a valid EntryFormatter and will use a default format. +type EntryFormatter func([]byte) string + +// DescribeMessage returns a concise human-readable description of a +// Message for debugging. +func DescribeMessage(m pb.Message, f EntryFormatter) string { + var buf bytes.Buffer + fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index) + if m.Reject { + fmt.Fprintf(&buf, " Rejected") + if m.RejectHint != 0 { + fmt.Fprintf(&buf, "(Hint:%d)", m.RejectHint) + } + } + if m.Commit != 0 { + fmt.Fprintf(&buf, " Commit:%d", m.Commit) + } + if len(m.Entries) > 0 { + fmt.Fprintf(&buf, " Entries:[") + for i, e := range m.Entries { + if i != 0 { + buf.WriteString(", ") + } + buf.WriteString(DescribeEntry(e, f)) + } + fmt.Fprintf(&buf, "]") + } + if !IsEmptySnap(m.Snapshot) { + fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot) + } + return buf.String() +} + +// DescribeEntry returns a concise human-readable description of an +// Entry for debugging. +func DescribeEntry(e pb.Entry, f EntryFormatter) string { + var formatted string + if e.Type == pb.EntryNormal && f != nil { + formatted = f(e.Data) + } else { + formatted = fmt.Sprintf("%q", e.Data) + } + return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) +} + +func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry { + if len(ents) == 0 { + return ents + } + size := ents[0].Size() + var limit int + for limit = 1; limit < len(ents); limit++ { + size += ents[limit].Size() + if uint64(size) > maxSize { + break + } + } + return ents[:limit] +} diff --git a/raft/handler.go b/raft/handler.go index 81b85ba1aa..4439f15bc1 100644 --- a/raft/handler.go +++ b/raft/handler.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" mapset "github.com/deckarep/golang-set" + etcd "github.com/ethereum/go-ethereum/raft/etcd/raft" "github.com/syndtr/goleveldb/leveldb" "github.com/ethereum/go-ethereum/core" @@ -75,7 +76,7 @@ type ProtocolManager struct { confChangeProposalC chan raftpb.ConfChange // for config changes from js console to raft // Raft transport - unsafeRawNode etcdRaft.Node + unsafeRawNode etcd.Node transport *rafthttp.Transport httpstopc chan struct{} httpdonec chan struct{} @@ -90,8 +91,8 @@ type ProtocolManager struct { wal *wal.WAL // Storage - quorumRaftDb *leveldb.DB // Persistent storage for last-applied raft index - raftStorage *etcdRaft.MemoryStorage // Volatile raft storage + quorumRaftDb *leveldb.DB // Persistent storage for last-applied raft index + raftStorage *etcd.MemoryStorage // Volatile raft storage } var errNoLeaderElected = errors.New("no leader is currently elected") @@ -108,7 +109,7 @@ func NewProtocolManager(raftId uint16, raftPort uint16, blockchain *core.BlockCh manager := &ProtocolManager{ bootstrapNodes: bootstrapNodes, peers: make(map[uint16]*Peer), - leader: uint16(etcdRaft.None), + leader: uint16(etcd.None), removedPeers: mapset.NewSet(), joinExisting: joinExisting, blockchain: blockchain, @@ -123,7 +124,7 @@ func NewProtocolManager(raftId uint16, raftPort uint16, blockchain *core.BlockCh raftId: raftId, raftPort: raftPort, quitSync: make(chan struct{}), - raftStorage: etcdRaft.NewMemoryStorage(), + raftStorage: etcd.NewMemoryStorage(), minter: minter, downloader: downloader, useDns: useDns, @@ -236,7 +237,7 @@ func (pm *ProtocolManager) NodeInfo() *RaftNodeInfo { // `raft.Node`, `pm.unsafeRawNode`, to us. This re-entrance through a separate // thread will cause a nil pointer dereference. To work around this, this // getter method should be used instead of reading `pm.unsafeRawNode` directly. -func (pm *ProtocolManager) rawNode() etcdRaft.Node { +func (pm *ProtocolManager) rawNode() etcd.Node { for pm.unsafeRawNode == nil { time.Sleep(100 * time.Millisecond) } @@ -545,7 +546,7 @@ func (pm *ProtocolManager) startRaft() { // bugs" enablePreVote := true - raftConfig := &etcdRaft.Config{ + raftConfig := &etcd.Config{ Applied: lastAppliedIndex, ID: uint64(pm.raftId), ElectionTick: 10, // NOTE: cockroach sets this to 15 @@ -582,10 +583,10 @@ func (pm *ProtocolManager) startRaft() { if walExisted { log.Info("remounting an existing raft log; connecting to peers.") - pm.unsafeRawNode = etcdRaft.RestartNode(raftConfig) + pm.unsafeRawNode = etcd.RestartNode(raftConfig) } else if pm.joinExisting { log.Info("newly joining an existing cluster; waiting for connections.") - pm.unsafeRawNode = etcdRaft.StartNode(raftConfig, nil) + pm.unsafeRawNode = etcd.StartNode(raftConfig, nil) } else { if numPeers := len(pm.bootstrapNodes); numPeers == 0 { panic("exiting due to empty raft peers list") @@ -604,7 +605,7 @@ func (pm *ProtocolManager) startRaft() { for _, peerAddress := range peerAddresses { pm.addPeer(peerAddress) } - pm.unsafeRawNode = etcdRaft.StartNode(raftConfig, raftPeers) + pm.unsafeRawNode = etcd.StartNode(raftConfig, raftPeers) } log.Info("raft node started") go pm.serveRaft() @@ -735,15 +736,13 @@ func (pm *ProtocolManager) serveLocalProposals() { return } - size, r, err := rlp.EncodeToReader(block) + data, err := rlp.EncodeToBytes(block) if err != nil { - panic(fmt.Sprintf("error: failed to send RLP-encoded block: %s", err.Error())) + panic(fmt.Sprintf("error: failed to send RLP-encoded block: %v", err)) } - var buffer = make([]byte, uint32(size)) - r.Read(buffer) // blocks until accepted by the raft state machine - pm.rawNode().Propose(context.TODO(), buffer) + pm.rawNode().Propose(context.TODO(), data) case cc, ok := <-pm.confChangeProposalC: if !ok { log.Info("error: read from confChangeProposalC failed") @@ -860,7 +859,7 @@ func (pm *ProtocolManager) eventLoop() { pm.updateLeader(rd.SoftState.Lead) } - if snap := rd.Snapshot; !etcdRaft.IsEmptySnap(snap) { + if snap := rd.Snapshot; !etcd.IsEmptySnap(snap) { pm.saveRaftSnapshot(snap) pm.applyRaftSnapshot(snap) pm.advanceAppliedIndex(snap.Metadata.Index) @@ -996,9 +995,9 @@ func (pm *ProtocolManager) eventLoop() { } } -func (pm *ProtocolManager) makeInitialRaftPeers() (raftPeers []etcdRaft.Peer, peerAddresses []*Address, localAddress *Address) { +func (pm *ProtocolManager) makeInitialRaftPeers() (raftPeers []etcd.Peer, peerAddresses []*Address, localAddress *Address) { initialNodes := pm.bootstrapNodes - raftPeers = make([]etcdRaft.Peer, len(initialNodes)) // Entire cluster + raftPeers = make([]etcd.Peer, len(initialNodes)) // Entire cluster peerAddresses = make([]*Address, len(initialNodes)-1) // Cluster without *this* node peersSeen := 0 @@ -1008,7 +1007,7 @@ func (pm *ProtocolManager) makeInitialRaftPeers() (raftPeers []etcdRaft.Peer, pe // requiring the use of static peers for the initial set, and load them from e.g. another JSON file which // contains pairs of enodes and raft ports, or we can get this initial peer list from commandline flags. address := newAddress(raftId, node.RaftPort(), node, pm.useDns) - raftPeers[i] = etcdRaft.Peer{ + raftPeers[i] = etcd.Peer{ ID: uint64(raftId), Context: address.toBytes(), } diff --git a/raft/minter.go b/raft/minter.go index bdddedf854..9df80915e0 100644 --- a/raft/minter.go +++ b/raft/minter.go @@ -18,13 +18,13 @@ package raft import ( "fmt" + "strconv" "sync" "sync/atomic" "time" "github.com/eapache/channels" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" @@ -226,9 +226,9 @@ func throttle(rate time.Duration, f func()) func() { // This function spins continuously, blocking until a block should be created // (via requestMinting()). This is throttled by `minter.blockTime`: // -// 1. A block is guaranteed to be minted within `blockTime` of being -// requested. -// 2. We never mint a block more frequently than `blockTime`. +// 1. A block is guaranteed to be minted within `blockTime` of being +// requested. +// 2. We never mint a block more frequently than `blockTime`. func (minter *minter) mintingLoop() { throttledMintNewBlock := throttle(minter.blockTime, func() { if atomic.LoadInt32(&minter.minting) == 1 { @@ -447,10 +447,9 @@ func (minter *minter) buildExtraSeal(headerHash common.Hash) []byte { } //build the extraSeal struct - raftIdString := hexutil.EncodeUint64(uint64(minter.eth.raftProtocolManager.raftId)) - + raftId := uint64(minter.eth.raftProtocolManager.raftId) extra := extraSeal{ - RaftId: []byte(raftIdString[2:]), //remove the 0x prefix + RaftId: strconv.AppendUint(make([]byte, 0, 8), raftId, 16), Signature: sig, } diff --git a/raft/snapshot.go b/raft/snapshot.go index e2e1bd17a5..2ce313a2c0 100644 --- a/raft/snapshot.go +++ b/raft/snapshot.go @@ -333,7 +333,7 @@ func (pm *ProtocolManager) applyRaftSnapshot(raftSnapshot raftpb.Snapshot) { pm.syncBlockchainUntil(latestBlockHash) pm.logNewlyAcceptedTransactions(preSyncHead) - log.Info(chainExtensionMessage, "hash", pm.blockchain.CurrentBlock().Hash()) + log.Info("Successfully extended chain", "hash", pm.blockchain.CurrentBlock().Hash()) } else { // added for permissions changes to indicate node sync up has started core.SetSyncStatus() @@ -356,13 +356,13 @@ func (pm *ProtocolManager) syncBlockchainUntil(hash common.Hash) { pm.mu.RUnlock() for { - for peerId, peer := range peerMap { - log.Info("synchronizing with peer", "peer id", peerId, "hash", hash) + for _, peer := range peerMap { + nodeId := peer.p2pNode.ID().String() - peerId := peer.p2pNode.ID().String() + log.Info("synchronizing blockchain with peer", "peer", nodeId, "hash", hash) - if err := pm.downloader.Synchronise(peerId, hash, big.NewInt(0), downloader.BoundedFullSync); err != nil { - log.Info("failed to synchronize with peer", "peer id", peerId) + if err := pm.downloader.Synchronise(nodeId, hash, big.NewInt(0), downloader.BoundedFullSync); err != nil { + log.Info("failed to synchronize blockchain with peer", "peer", nodeId) time.Sleep(500 * time.Millisecond) } else {