-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathnode.go
164 lines (137 loc) · 4.07 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package dht
import (
"fmt"
"log"
"net"
"net/http"
"time"
"github.com/hashicorp/consul/api"
"github.com/stvp/rendezvous"
)
const (
checkInterval = 5 * time.Second
pollWait = time.Second
)
func newCheckListenerAndServer() (listener net.Listener, server *http.Server, err error) {
listener, err = net.Listen("tcp", "127.0.0.1:0")
if err != nil {
return listener, nil, err
}
server = &http.Server{
ReadTimeout: time.Second,
WriteTimeout: time.Second,
Handler: http.HandlerFunc(func(resp http.ResponseWriter, req *http.Request) {
fmt.Fprintf(resp, "OK")
}),
}
// When the listener is closed, this goroutine returns.
go server.Serve(listener)
return listener, server, err
}
// Node is a single node in a distributed hash table, coordinated using
// services registered in Consul. Key membership is determined using rendezvous
// hashing to ensure even distribution of keys and minimal key membership
// changes when a Node fails or otherwise leaves the hash table.
//
// Errors encountered when making blocking GET requests to the Consul agent API
// are logged using the log package.
type Node struct {
// Consul
serviceName string
serviceID string
consul *api.Client
// HTTP health check server
checkURL string
checkListener net.Listener
checkServer *http.Server
// Hash table
hashTable *rendezvous.Table
waitIndex uint64
// Graceful shutdown
stop chan bool
}
// Join creates a new Node and adds it to the distributed hash table specified
// by the given name. The given id should be unique among all Nodes in the hash
// table.
func Join(name, id string) (node *Node, err error) {
node = &Node{
serviceName: name,
serviceID: id,
stop: make(chan bool),
}
node.consul, err = api.NewClient(api.DefaultConfig())
if err != nil {
return nil, fmt.Errorf("dht: can't create Consul API client: %s", err)
}
node.checkListener, node.checkServer, err = newCheckListenerAndServer()
if err != nil {
return nil, fmt.Errorf("dht: can't start HTTP server: %s", err)
}
err = node.register()
if err != nil {
return nil, fmt.Errorf("dht: can't register %s service: %s", node.serviceName, err)
}
err = node.update()
if err != nil {
return nil, fmt.Errorf("dht: can't fetch %s services list: %s", node.serviceName, err)
}
go node.poll()
return node, nil
}
func (n *Node) register() (err error) {
err = n.consul.Agent().ServiceRegister(&api.AgentServiceRegistration{
Name: n.serviceName,
ID: n.serviceID,
Check: &api.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s", n.checkListener.Addr().String()),
Interval: checkInterval.String(),
},
})
return err
}
func (n *Node) poll() {
var err error
for {
select {
case <-n.stop:
return
case <-time.After(pollWait):
err = n.update()
if err != nil {
log.Printf("[dht %s %s] error: %s", n.serviceName, n.serviceID, err)
}
}
}
}
// update blocks until the service list changes or until the Consul agent's
// timeout is reached (10 minutes by default).
func (n *Node) update() (err error) {
opts := &api.QueryOptions{WaitIndex: n.waitIndex}
serviceEntries, meta, err := n.consul.Health().Service(n.serviceName, "", true, opts)
if err != nil {
return err
}
ids := make([]string, len(serviceEntries))
for i, entry := range serviceEntries {
ids[i] = entry.Service.ID
}
n.hashTable = rendezvous.New(ids)
n.waitIndex = meta.LastIndex
return nil
}
// Member returns true if the given key belongs to this Node in the distributed
// hash table.
func (n *Node) Member(key string) bool {
return n.hashTable.Get(key) == n.serviceID
}
// Leave removes the Node from the distributed hash table by de-registering it
// from Consul. Once Leave is called, the Node should be discarded. An error is
// returned if the Node is unable to successfully deregister itself from
// Consul. In that case, Consul's health check for the Node will fail and
// require manual cleanup.
func (n *Node) Leave() (err error) {
close(n.stop) // stop polling for state
err = n.consul.Agent().ServiceDeregister(n.serviceID)
n.checkListener.Close() // stop the health check http server
return err
}