Skip to content

Commit

Permalink
dragonboat,raft: dropped requests now cause ErrClusterNotReady
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Jun 27, 2019
1 parent 6019fd3 commit 5c22480
Show file tree
Hide file tree
Showing 15 changed files with 296 additions and 5 deletions.
2 changes: 2 additions & 0 deletions binding/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func SelectOnRequestStateForMembershipChange(rsoid uint64) int {
err = dragonboat.ErrClusterClosed
} else if r.Rejected() {
err = dragonboat.ErrRejected
} else if r.Dropped() {
err = dragonboat.ErrClusterNotReady
} else {
panic("unknown code")
}
Expand Down
1 change: 1 addition & 0 deletions binding/include/dragonboat/binding.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ enum ErrorCode
ErrResultBufferTooSmall = -15,
ErrRejected = -16,
ErrInvalidClusterSettings = -17,
ErrClusterNotReady = -18,
};

// CompleteHandlerType is the type of complete handler. CompleteHandlerCPP is
Expand Down
1 change: 1 addition & 0 deletions binding/include/dragonboat/dragonboat.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ class Status
static const int ErrResultBufferTooSmall;
static const int ErrRejected;
static const int ErrInvalidClusterSettings;
static const int ErrClusterNotReady;
private:
int code_;
};
Expand Down
2 changes: 2 additions & 0 deletions binding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func cboolToBool(v C.Bool) bool {
func getErrorCode(err error) int {
if err == nil {
return int(C.StatusOK)
} else if err == dragonboat.ErrClusterNotReady {
return int(C.ErrClusterNotReady)
} else if err == dragonboat.ErrClusterNotFound {
return int(C.ErrClusterNotFound)
} else if err == dragonboat.ErrClusterAlreadyExist {
Expand Down
2 changes: 2 additions & 0 deletions execengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,8 @@ func (s *execEngine) execNodes(workerID uint64,
node := nodes[ud.ClusterID]
node.sendReplicateMessages(ud)
node.processReadyToRead(ud)
node.processDroppedEntries(ud)
node.processDroppedReadIndexes(ud)
}
p.step.end()
p.recordEntryCount(nodeUpdates)
Expand Down
3 changes: 2 additions & 1 deletion internal/drummer/client/nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ func (dc *DrummerClient) handleAddDeleteRequest(ctx context.Context,
if !v.Completed() &&
!v.Timeout() &&
!v.Terminated() &&
!v.Rejected() {
!v.Rejected() &&
!v.Dropped() {
plog.Panicf("unknown result code: %v", v)
}
if v.Completed() && req.Change.Type == pb.Request_DELETE {
Expand Down
2 changes: 2 additions & 0 deletions internal/drummer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ func waitDrummerRequestResult(ctx context.Context,
return nil, dragonboat.ErrTimeout
} else if r.Terminated() {
return nil, dragonboat.ErrClusterClosed
} else if r.Dropped() {
return nil, dragonboat.ErrClusterNotReady
}
plog.Panicf("unknown v code")
case <-ctx.Done():
Expand Down
11 changes: 11 additions & 0 deletions internal/raft/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,17 @@ func (rc *Peer) HasUpdate(moreEntriesToApply bool) bool {
if len(r.readyToRead) != 0 {
return true
}
if len(r.droppedEntries) > 0 || len(r.droppedReadIndexes) > 0 {
return true
}
return false
}

// Commit commits the Update state to mark it as processed.
func (rc *Peer) Commit(ud pb.Update) {
rc.raft.msgs = nil
rc.raft.droppedEntries = nil
rc.raft.droppedReadIndexes = nil
if !pb.IsEmptyState(ud.State) {
rc.prevState = ud.State
}
Expand Down Expand Up @@ -376,6 +381,12 @@ func getUpdate(r *raft,
if len(r.readyToRead) > 0 {
ud.ReadyToReads = r.readyToRead
}
if len(r.droppedEntries) > 0 {
ud.DroppedEntries = r.droppedEntries
}
if len(r.droppedReadIndexes) > 0 {
ud.DroppedReadIndexes = r.droppedReadIndexes
}
ud.UpdateCommit = getUpdateCommit(ud)
return ud
}
19 changes: 19 additions & 0 deletions internal/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ type raft struct {
pendingConfigChange bool
readIndex *readIndex
readyToRead []pb.ReadyToRead
droppedEntries []pb.Entry
droppedReadIndexes []pb.SystemCtx
checkQuorum bool
tickCount uint64
electionTick uint64
Expand Down Expand Up @@ -232,6 +234,7 @@ func newRaft(c *config.Config, logdb ILogDB) *raft {
nodeID: c.NodeID,
leaderID: NoLeader,
msgs: make([]pb.Message, 0),
droppedEntries: make([]pb.Entry, 0),
log: newEntryLog(logdb, rl),
remotes: make(map[uint64]*remote),
observers: make(map[uint64]*remote),
Expand Down Expand Up @@ -1390,6 +1393,7 @@ func (r *raft) handleLeaderPropose(m pb.Message) {
if r.hasPendingConfigChange() {
plog.Warningf("%s dropped a config change, one is pending",
r.describe())
r.reportDroppedConfigChange(m.Entries[i])
m.Entries[i] = pb.Entry{Type: pb.ApplicationEntry}
}
r.setPendingConfigChange()
Expand Down Expand Up @@ -1710,6 +1714,11 @@ func (r *raft) handleCandidatePropose(m pb.Message) {
func (r *raft) handleCandidateReadIndex(m pb.Message) {
plog.Warningf("%s dropping read index request, no leader", r.describe())
r.reportDroppedReadIndex(m)
ctx := pb.SystemCtx{
Low: m.Hint,
High: m.HintHigh,
}
r.droppedReadIndexes = append(r.droppedReadIndexes, ctx)
}

// when any of the following three methods
Expand Down Expand Up @@ -1754,7 +1763,12 @@ func (r *raft) handleCandidateRequestVoteResp(m pb.Message) {
}
}

func (r *raft) reportDroppedConfigChange(e pb.Entry) {
r.droppedEntries = append(r.droppedEntries, e)
}

func (r *raft) reportDroppedProposal(m pb.Message) {
r.droppedEntries = append(r.droppedEntries, newEntrySlice(m.Entries)...)
if r.events != nil {
info := server.ProposalInfo{
ClusterID: r.clusterID,
Expand All @@ -1766,6 +1780,11 @@ func (r *raft) reportDroppedProposal(m pb.Message) {
}

func (r *raft) reportDroppedReadIndex(m pb.Message) {
sysctx := pb.SystemCtx{
Low: m.Hint,
High: m.HintHigh,
}
r.droppedReadIndexes = append(r.droppedReadIndexes, sysctx)
if r.events != nil {
info := server.ReadIndexInfo{
ClusterID: r.clusterID,
Expand Down
18 changes: 18 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,24 @@ func (n *node) getUpdate() (pb.Update, bool) {
return pb.Update{}, false
}

func (n *node) processDroppedReadIndexes(ud pb.Update) {
for _, sysctx := range ud.DroppedReadIndexes {
n.pendingReadIndexes.dropped(sysctx)
}
}

func (n *node) processDroppedEntries(ud pb.Update) {
for _, e := range ud.DroppedEntries {
if e.Type == pb.ApplicationEntry {
n.pendingProposals.dropped(e.ClientID, e.SeriesID, e.Key)
} else if e.Type == pb.ConfigChangeEntry {
n.pendingConfigChange.dropped(e.Key)
} else {
panic("unknown dropped entry type")
}
}
}

func (n *node) processReadyToRead(ud pb.Update) {
if len(ud.ReadyToReads) > 0 {
n.pendingReadIndexes.addReadyToRead(ud.ReadyToReads)
Expand Down
4 changes: 4 additions & 0 deletions nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,8 @@ func (nh *NodeHost) linearizableRead(ctx context.Context,
return f(node)
} else if s.Terminated() {
return nil, ErrClusterClosed
} else if s.Dropped() {
return nil, ErrClusterNotReady
}
panic("unknown completedc code")
case <-ctx.Done():
Expand Down Expand Up @@ -1782,6 +1784,8 @@ func checkRequestState(ctx context.Context,
return sm.Result{}, ErrTimeout
} else if r.Terminated() {
return sm.Result{}, ErrClusterClosed
} else if r.Dropped() {
return sm.Result{}, ErrClusterNotReady
}
plog.Panicf("unknown v code %v", r)
case <-ctx.Done():
Expand Down
52 changes: 52 additions & 0 deletions nodehost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2919,6 +2919,58 @@ func TestLeaderInfoIsCorrectlyReported(t *testing.T) {
singleNodeHostTest(t, tf)
}

func TestDroppedRequestsAreReported(t *testing.T) {
tf := func(t *testing.T, nh *NodeHost) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
if err := nh.SyncRequestAddNode(ctx, 2, 2, "noidea:8080", 0); err != nil {
t.Fatalf("failed to add node %v", err)
}
for i := 0; i < 1000; i++ {
leaderID, ok, err := nh.GetLeaderID(2)
if err != nil {
t.Fatalf("failed to get leader id %v", err)
}
if err == nil && !ok && leaderID == 0 {
break
}
time.Sleep(10 * time.Millisecond)
if i == 999 {
t.Fatalf("leader failed to step down")
}
}
func() {
nctx, ncancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer ncancel()
cs := nh.GetNoOPSession(2)
for i := 0; i < 10; i++ {
if _, err := nh.SyncPropose(nctx, cs, make([]byte, 1)); err != ErrClusterNotReady {
t.Errorf("failed to get ErrClusterNotReady, got %v", err)
}
}
}()
func() {
nctx, ncancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer ncancel()
for i := 0; i < 10; i++ {
if err := nh.SyncRequestAddNode(nctx, 2, 3, "noidea:8080", 0); err != ErrClusterNotReady {
t.Errorf("failed to get ErrClusterNotReady, got %v", err)
}
}
}()
func() {
nctx, ncancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer ncancel()
for i := 0; i < 10; i++ {
if _, err := nh.SyncRead(nctx, 2, nil); err != ErrClusterNotReady {
t.Errorf("failed to get ErrClusterNotReady, got %v", err)
}
}
}()
}
singleNodeHostTest(t, tf)
}

type testRaftEventListener struct {
received []raftio.LeaderInfo
}
Expand Down
8 changes: 7 additions & 1 deletion raftpb/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ type Update struct {
// UpdateCommit contains info on how the Update instance can be committed
// to actually progress the state of raft.
UpdateCommit UpdateCommit
// DroppedEntries is a list of entries dropped when no leader is available
DroppedEntries []Entry
// DroppedReadIndexes is a list of read index requests dropped when no leader
// is available.
DroppedReadIndexes []SystemCtx
}

// HasUpdate returns a boolean value indicating whether the returned Update
Expand All @@ -103,7 +108,8 @@ func (ud *Update) HasUpdate() bool {
len(ud.EntriesToSave) > 0 ||
len(ud.CommittedEntries) > 0 ||
len(ud.Messages) > 0 ||
len(ud.ReadyToReads) != 0
len(ud.ReadyToReads) > 0 ||
len(ud.DroppedEntries) > 0
}

// IsEmptyState returns a boolean flag indicating whether the given State is
Expand Down
Loading

0 comments on commit 5c22480

Please sign in to comment.