Skip to content

Commit

Permalink
dragonboat: use a goroutine to handle all leader info update events
Browse files Browse the repository at this point in the history
  • Loading branch information
lni committed Jul 1, 2019
1 parent 23c54cc commit 4c64732
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 64 deletions.
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,10 @@ type NodeHostConfig struct {
// EnableMetrics determines whether health metrics in Prometheus format should
// be enabled.
EnableMetrics bool
// RaftEventListener is the listener to get notified for certain Raft events.
// RaftEventListener is the listener for Raft events exposed to user space.
// NodeHost uses a single dedicated goroutine to invoke all RaftEventListener
// methods one by one, CPU intensive or IO related procedures that can cause
// long delaies should be offloaded to worker goroutines managed by users.
RaftEventListener raftio.IRaftEventListener
}

Expand Down
67 changes: 9 additions & 58 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ package dragonboat
import (
"fmt"
"io"
"sync"
"sync/atomic"

"github.com/VictoriaMetrics/metrics"
"github.com/lni/dragonboat/v3/internal/server"
"github.com/lni/dragonboat/v3/internal/utils/syncutil"
"github.com/lni/dragonboat/v3/raftio"
)

Expand All @@ -38,50 +36,25 @@ type raftEventListener struct {
nodeID uint64
leaderID *uint64
metrics bool
workCh chan struct{}
mu sync.Mutex
notifications []raftio.LeaderInfo
stopper *syncutil.Stopper
queue *leaderInfoQueue
isLeader *metrics.Gauge
campaignLaunched *metrics.Counter
campaignSkipped *metrics.Counter
snapshotRejected *metrics.Counter
replicationRejected *metrics.Counter
proposalDropped *metrics.Counter
readIndexDropped *metrics.Counter
userListener raftio.IRaftEventListener
}

func newRaftEventListener(clusterID uint64, nodeID uint64,
leaderID *uint64, useMetrics bool,
userListener raftio.IRaftEventListener) *raftEventListener {
queue *leaderInfoQueue) *raftEventListener {
el := &raftEventListener{
clusterID: clusterID,
nodeID: nodeID,
leaderID: leaderID,
metrics: useMetrics,
userListener: userListener,
stopper: syncutil.NewStopper(),
workCh: make(chan struct{}, 1),
notifications: make([]raftio.LeaderInfo, 0),
}
if userListener != nil {
el.stopper.RunWorker(func() {
for {
select {
case <-el.stopper.ShouldStop():
return
case <-el.workCh:
for {
v, ok := el.getLeaderInfo()
if !ok {
break
}
el.userListener.LeaderUpdated(v)
}
}
}
})
clusterID: clusterID,
nodeID: nodeID,
leaderID: leaderID,
metrics: useMetrics,
queue: queue,
}
if useMetrics {
label := fmt.Sprintf(`{clusterid="%d",nodeid="%d"}`, clusterID, nodeID)
Expand Down Expand Up @@ -117,23 +90,18 @@ func newRaftEventListener(clusterID uint64, nodeID uint64,
}

func (e *raftEventListener) stop() {
e.stopper.Stop()
}

func (e *raftEventListener) LeaderUpdated(info server.LeaderInfo) {
atomic.StoreUint64(e.leaderID, info.LeaderID)
if e.userListener != nil {
if e.queue != nil {
ui := raftio.LeaderInfo{
ClusterID: info.ClusterID,
NodeID: info.NodeID,
Term: info.Term,
LeaderID: info.LeaderID,
}
e.addLeaderInfo(ui)
select {
case e.workCh <- struct{}{}:
default:
}
e.queue.addLeaderInfo(ui)
}
}

Expand Down Expand Up @@ -172,20 +140,3 @@ func (e *raftEventListener) ReadIndexDropped(info server.ReadIndexInfo) {
e.readIndexDropped.Add(1)
}
}

func (e *raftEventListener) addLeaderInfo(li raftio.LeaderInfo) {
e.mu.Lock()
defer e.mu.Unlock()
e.notifications = append(e.notifications, li)
}

func (e *raftEventListener) getLeaderInfo() (raftio.LeaderInfo, bool) {
e.mu.Lock()
defer e.mu.Unlock()
if len(e.notifications) > 0 {
v := e.notifications[0]
e.notifications = e.notifications[1:]
return v, true
}
return raftio.LeaderInfo{}, false
}
4 changes: 2 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newNode(raftAddress string,
dataStore rsm.IManagedStateMachine,
smType pb.StateMachineType,
engine engine,
raftEventListener raftio.IRaftEventListener,
liQueue *leaderInfoQueue,
getStreamConnection func(uint64, uint64) pb.IChunkSink,
handleSnapshotStatus func(uint64, uint64, bool),
sendMessage func(pb.Message),
Expand Down Expand Up @@ -171,7 +171,7 @@ func newNode(raftAddress string,
rn.taskQ = sm.TaskQ()
rn.sm = sm
rn.raftEvents = newRaftEventListener(config.ClusterID,
config.NodeID, &rn.leaderID, useMetrics, raftEventListener)
config.NodeID, &rn.leaderID, useMetrics, liQueue)
new, err := rn.startRaft(config, lr, peers, initialMember)
if err != nil {
return nil, err
Expand Down
30 changes: 27 additions & 3 deletions nodehost.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,9 @@ type NodeHost struct {
execEngine *execEngine
logdb raftio.ILogDB
transport transport.ITransport
raftEvents raftio.IRaftEventListener
msgHandler *messageHandler
liQueue *leaderInfoQueue
userListener raftio.IRaftEventListener
transportLatency *sample
}

Expand All @@ -267,7 +268,7 @@ func NewNodeHost(nhConfig config.NodeHostConfig) (*NodeHost, error) {
duStopper: syncutil.NewStopper(),
nodes: transport.NewNodes(streamConnections),
transportLatency: newSample(),
raftEvents: nhConfig.RaftEventListener,
userListener: nhConfig.RaftEventListener,
}
nh.snapshotStatus = newSnapshotFeedback(nh.pushSnapshotStatus)
nh.msgHandler = newNodeHostMessageHandler(nh)
Expand Down Expand Up @@ -300,6 +301,12 @@ func NewNodeHost(nhConfig config.NodeHostConfig) (*NodeHost, error) {
nh.stopper.RunWorker(func() {
nh.tickWorkerMain()
})
if nhConfig.RaftEventListener != nil {
nh.liQueue = newLeaderInfoQueue()
nh.stopper.RunWorker(func() {
nh.handleLeaderUpdatedEvents()
})
}
nh.logNodeHostDetails()
return nh, nil
}
Expand Down Expand Up @@ -1445,7 +1452,7 @@ func (nh *NodeHost) startCluster(nodes map[uint64]string,
createStateMachine(clusterID, nodeID, stopc),
smType,
nh.execEngine,
nh.raftEvents,
nh.liQueue,
getStreamConn,
handleSnapshotStatus,
nh.sendMessage,
Expand Down Expand Up @@ -1590,6 +1597,23 @@ func (nh *NodeHost) tickWorkerMain() {
server.RunTicker(time.Millisecond, tf, nh.stopper.ShouldStop(), nil)
}

func (nh *NodeHost) handleLeaderUpdatedEvents() {
for {
select {
case <-nh.stopper.ShouldStop():
return
case <-nh.liQueue.workReady():
for {
v, ok := nh.liQueue.getLeaderInfo()
if !ok {
break
}
nh.userListener.LeaderUpdated(v)
}
}
}
}

func (nh *NodeHost) getCurrentClusters(index uint64,
clusters []*node, queues map[uint64]*server.MessageQueue) (uint64,
[]*node, map[uint64]*server.MessageQueue) {
Expand Down
42 changes: 42 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dragonboat
import (
"sync"

"github.com/lni/dragonboat/v3/raftio"
pb "github.com/lni/dragonboat/v3/raftpb"
)

Expand Down Expand Up @@ -208,3 +209,44 @@ func (r *readyCluster) getReadyClusters() map[uint64]struct{} {
r.mu.Unlock()
return v
}

type leaderInfoQueue struct {
mu sync.Mutex
notifications []raftio.LeaderInfo
workCh chan struct{}
}

func newLeaderInfoQueue() *leaderInfoQueue {
q := &leaderInfoQueue{
workCh: make(chan struct{}, 1),
notifications: make([]raftio.LeaderInfo, 0),
}
return q
}

func (li *leaderInfoQueue) workReady() chan struct{} {
return li.workCh
}

func (li *leaderInfoQueue) addLeaderInfo(info raftio.LeaderInfo) {
func() {
li.mu.Lock()
defer li.mu.Unlock()
li.notifications = append(li.notifications, info)
}()
select {
case li.workCh <- struct{}{}:
default:
}
}

func (li *leaderInfoQueue) getLeaderInfo() (raftio.LeaderInfo, bool) {
li.mu.Lock()
defer li.mu.Unlock()
if len(li.notifications) > 0 {
v := li.notifications[0]
li.notifications = li.notifications[1:]
return v, true
}
return raftio.LeaderInfo{}, false
}
54 changes: 54 additions & 0 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package dragonboat
import (
"testing"

"github.com/lni/dragonboat/v3/raftio"
"github.com/lni/dragonboat/v3/raftpb"
)

Expand Down Expand Up @@ -269,3 +270,56 @@ func TestReadyClusterCanBeReturnedAndCleared(t *testing.T) {
t.Errorf("cluster ready not set")
}
}

func TestLeaderInfoQueueCanBeCreated(t *testing.T) {
q := newLeaderInfoQueue()
if cap(q.workCh) != 1 {
t.Errorf("unexpected queue cap")
}
if len(q.workCh) != 0 {
t.Errorf("unexpected queue length")
}
if len(q.notifications) != 0 {
t.Errorf("unexpected notifications length")
}
ch := q.workReady()
if ch == nil {
t.Errorf("failed to return work ready chan")
}
}

func TestAddToLeaderInfoQueue(t *testing.T) {
q := newLeaderInfoQueue()
q.addLeaderInfo(raftio.LeaderInfo{})
q.addLeaderInfo(raftio.LeaderInfo{})
if len(q.workCh) != 1 {
t.Errorf("unexpected workCh len")
}
if len(q.notifications) != 2 {
t.Errorf("unexpected notifications len")
}
}

func TestGetFromLeaderInfoQueue(t *testing.T) {
q := newLeaderInfoQueue()
_, ok := q.getLeaderInfo()
if ok {
t.Errorf("unexpectedly returned leader info")
}
v1 := raftio.LeaderInfo{ClusterID: 101}
v2 := raftio.LeaderInfo{ClusterID: 2002}
q.addLeaderInfo(v1)
q.addLeaderInfo(v2)
rv1, ok1 := q.getLeaderInfo()
rv2, ok2 := q.getLeaderInfo()
_, ok3 := q.getLeaderInfo()
if !ok1 || rv1.ClusterID != v1.ClusterID {
t.Errorf("unexpected result")
}
if !ok2 || rv2.ClusterID != v2.ClusterID {
t.Errorf("unexpected result")
}
if ok3 {
t.Errorf("unexpectedly return third reader info rec")
}
}

0 comments on commit 4c64732

Please sign in to comment.