-
Notifications
You must be signed in to change notification settings - Fork 3
/
discovery.go
233 lines (204 loc) · 6.04 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package main
import (
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/hashicorp/consul/api"
)
// Spawns watches for services, adding more when new services are discovered
func discoverServices(nodeName string, config *Config, shutdownCh chan struct{}, client *api.Client) {
if config.ServiceWatch == GlobalMode {
log.Info("Discovering services from catalog")
} else {
log.Infof("Discovering services on local node (%s)", nodeName)
}
queryOpts := &api.QueryOptions{
AllowStale: true,
WaitTime: watchWaitTime,
}
// Used to store services we've already started watches for
services := make(map[string]bool)
// Share a stop channel among watches for faster shutdown
stopCh := make(map[string]chan struct{})
// Loop indefinitely to run the watch, doing repeated blocking queries to Consul
for {
// Check for shutdown event
select {
case <-shutdownCh:
log.Infof("Shutting down service watches (count: %d)...", len(services))
// Use a wait group to shut down all the watches at the same time
var wg sync.WaitGroup
for service, _ := range services {
wg.Add(1)
ch := stopCh[service]
go func() {
defer wg.Done()
ch <- struct{}{}
ch <- struct{}{}
}()
}
wg.Wait()
log.Info("Finished shutting down service watches")
<-shutdownCh
return
default:
}
var queryMeta *api.QueryMeta
currentServices := make(map[string][]string)
var err error
// Watch either all services or just the local node's, depending on whether GlobalMode is set
if config.ServiceWatch == GlobalMode {
currentServices, queryMeta, err = client.Catalog().Services(queryOpts)
} else {
var node *api.CatalogNode
node, queryMeta, err = client.Catalog().Node(nodeName, queryOpts)
if err == nil {
// Build the map of service:[tags]
for _, config := range node.Services {
if _, ok := currentServices[config.Service]; ok {
currentServices[config.Service] = config.Tags
} else {
currentServices[config.Service] = append(currentServices[config.Service], config.Tags...)
}
}
}
}
if err != nil {
log.Errorf("Error trying to watch services: %s, retrying in 10s...", err)
time.Sleep(errorWaitTime)
continue
}
// Update our WaitIndex for the next query
queryOpts.WaitIndex = queryMeta.LastIndex
// Reset the map so we can detect removed services
for service, _ := range services {
services[service] = false
}
// Compare the new list of services with our stored one to see if we need to
// spawn any new watches
for service, tags := range currentServices {
serviceConfig := config.serviceConfig(service)
// If DistinctTags is specified, spawn a separate watch for each tag on the service
if serviceConfig != nil && serviceConfig.DistinctTags {
for _, tag := range tags {
if _, ok := services[service+":"+tag]; !ok && !contains(serviceConfig.IgnoredTags, tag) {
watchOpts := &WatchOptions{
service: service,
tag: tag,
config: config,
client: client,
stopCh: make(chan struct{}, 0),
}
stopCh[service+":"+tag] = watchOpts.stopCh
log.Infof("Discovered new service: %s (tag: %s)", service, tag)
go watch(watchOpts)
}
services[service+":"+tag] = true
}
} else {
if _, ok := services[service]; !ok {
watchOpts := &WatchOptions{
service: service,
config: config,
client: client,
stopCh: make(chan struct{}, 0),
}
stopCh[service] = watchOpts.stopCh
log.Infof("Discovered new service: %s", service)
go watch(watchOpts)
}
services[service] = true
}
}
// Shut down watched for removed services
for service, alive := range services {
if !alive {
log.Infof("Service %s left, removing", service)
ch := stopCh[service]
delete(services, service)
delete(stopCh, service)
go func() {
ch <- struct{}{}
ch <- struct{}{}
}()
}
}
}
}
// Queries the catalog for nodes and starts watches for them
func discoverNodes(config *Config, shutdownCh chan struct{}, client *api.Client) {
queryOpts := &api.QueryOptions{
AllowStale: true,
WaitTime: watchWaitTime,
}
// Used to store nodes we've already started watches for
nodes := make(map[string]bool, 0)
// Share a stop channel among watches for faster shutdown
stopCh := make(map[string]chan struct{})
// Loop indefinitely to run the watch, doing repeated blocking queries to Consul
for {
// Check for shutdown event
select {
case <-shutdownCh:
log.Infof("Shutting down node watches (count: %d)...", len(nodes))
// Use a wait group to shut down all the watches at the same time
var wg sync.WaitGroup
for node, _ := range nodes {
wg.Add(1)
ch := stopCh[node]
go func() {
defer wg.Done()
ch <- struct{}{}
ch <- struct{}{}
}()
}
wg.Wait()
log.Info("Finished shutting down node watches")
<-shutdownCh
return
default:
}
currentNodes, queryMeta, err := client.Catalog().Nodes(queryOpts)
if err != nil {
log.Errorf("Error trying to watch node list: %s, retrying in 10s...", err)
time.Sleep(errorWaitTime)
continue
}
// Update our WaitIndex for the next query
queryOpts.WaitIndex = queryMeta.LastIndex
// Reset the map so we can detect removed nodes
for node, _ := range nodes {
nodes[node] = false
}
// Compare the new list of nodes with our stored one to see if we need to
// spawn any new watches
for _, node := range currentNodes {
nodeName := node.Node
if _, ok := nodes[nodeName]; !ok {
log.Infof("Discovered new node: %s", nodeName)
opts := &WatchOptions{
node: nodeName,
config: config,
client: client,
stopCh: make(chan struct{}, 0),
}
stopCh[nodeName] = opts.stopCh
go watch(opts)
}
nodes[nodeName] = true
}
// Shut down watches for removed nodes
for node, alive := range nodes {
if !alive {
log.Infof("Node %s left, removing", node)
ch := stopCh[node]
delete(nodes, node)
delete(stopCh, node)
go func() {
ch <- struct{}{}
ch <- struct{}{}
}()
}
}
}
}