-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgraft_cluster.go
146 lines (116 loc) · 4.07 KB
/
graft_cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package graft
import (
"context"
"sync"
"sync/atomic"
"time"
"graft/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// models an actual cluster of graft instances and provides abstractions for interfacing with all of them
const unknownLeader = 0
type (
machineID = int64
lazyClient func() pb.GraftClient
cluster struct {
// machines are the gRPC clients for all the machines in the cluster
// while machineContext maps machines to any active cancelable contexts
machines map[machineID]lazyClient
machineCancellationFuncs sync.Map // equiv to: map[machineID]context.CancelFunc
currentLeader machineID
}
)
// newCluster creates a cluster struct and connects to all other machines in the cluster
func connectToCluster(config graftConfig, thisMachinesID machineID) *cluster {
newCluster := cluster{
machines: make(map[machineID]lazyClient),
machineCancellationFuncs: sync.Map{},
currentLeader: unknownLeader,
}
for machineID, machineAddr := range config.clusterConfig {
if machineID == thisMachinesID {
continue
}
newCluster.machines[machineID] = connectToMachine(machineAddr)
newCluster.machineCancellationFuncs.Store(machineID, context.CancelFunc(func() {}))
}
return &newCluster
}
// pushEntries pushes entries to all entities within the cluster, primarily abstracts away any error handling involved with the
// invocation of this append entries RPC, ie cancelling existing requests and timing out old ones
func (c *cluster) appendEntryForMember(machineID machineID, entry *pb.AppendEntriesArgs, currentTerm int64) *pb.AppendEntriesResponse {
// cancel any outbound request and create a new one
cancelExistingReqForMachine, _ := c.machineCancellationFuncs.Load(machineID)
(cancelExistingReqForMachine.(context.CancelFunc))()
machineContext, cancelFunc := context.WithCancel(context.Background())
c.machineCancellationFuncs.Store(machineID, cancelFunc)
clusterMachine := c.machines[machineID]
ctx, cancel := context.WithTimeout(machineContext, 100*time.Millisecond)
defer cancel()
result, err := clusterMachine().AppendEntries(ctx, entry)
if err != nil {
return nil
}
return result
}
// requestVote requests a vote from each member of the cluster and accumulates the total
// returns the total vote and the current term
func (c *cluster) requestVote(voteRequest *pb.RequestVoteArgs) (int, int) {
termLock := sync.Mutex{}
newTerm := -1
totalVotes := int32(0)
// voting wait group
wg := sync.WaitGroup{}
wg.Add(len(c.machines))
for _, clusterMachine := range c.machines {
// poll each machine in the cluster for a vote
go func(clusterMachine pb.GraftClient) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
defer wg.Done()
voteResult, err := clusterMachine.RequestVote(ctx, voteRequest)
if err != nil {
return
}
if voteResult.VoteGranted {
atomic.AddInt32(&totalVotes, 1)
}
termLock.Lock()
if voteResult.CurrentTerm > int64(newTerm) {
newTerm = int(voteResult.CurrentTerm)
}
termLock.Unlock()
}(clusterMachine())
}
wg.Wait()
return int(totalVotes), newTerm
}
// pushOperationToLeader propagates an operation to the global cluster leader
func (c *cluster) pushOperationToLeader(serializedOperation string) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
go func() {
c.machines[c.currentLeader]().AddLogEntry(ctx, &pb.AddLogEntryArgs{
Operation: serializedOperation,
})
cancel()
}()
}
func (c *cluster) clusterSize() int { return len(c.machines) }
// connects to a machine (lazily), ie the connection is only established when it is first used
func connectToMachine(addr string) lazyClient {
var v pb.GraftClient
var once sync.Once
return func() pb.GraftClient {
once.Do(func() {
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
conn, err := grpc.Dial(addr, opts...)
if err != nil {
panic(err)
}
// TODO: close the conn eventually
v = pb.NewGraftClient(conn)
})
return v
}
}