-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathbindings.go
281 lines (236 loc) · 9.22 KB
/
bindings.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package amqprpc
import (
amqp "github.com/rabbitmq/amqp091-go"
)
// The default exchange types that are available in RabbitMQ.
const (
ExchangeTypeDirect = "direct"
ExchangeTypeTopic = "topic"
ExchangeTypeHeaders = "headers"
)
// The default exchanges that are available in RabbitMQ.
const (
DefaultExchangeNameDirect = "amq.direct"
DefaultExchangeNameTopic = "amq.topic"
DefaultExchangeNameHeaders = "amq.match"
)
// The different queue types that are available in RabbitMQ.
const (
QueueTypeClassic = "classic"
QueueTypeQuorum = "quorum"
)
// HandlerBinding holds information about how an exchange and a queue should be
// declared and bound. If the ExchangeName is not defined (an empty string), the
// queue will not be bound to the exchange but assumed to use the default match.
type HandlerBinding struct {
// QueueName is the name of the queue that the handler should be bound to.
QueueName string
// ExchangeName is the exchange that the queue should be bound to.
ExchangeName string
// RoutingKey is the routing key that the queue should be bound to.
RoutingKey string
// BindHeaders is the headers that the queue should be bound to.
BindHeaders amqp.Table
// Handler is the function that should be called when a message is
// received.
Handler HandlerFunc
// QueueDurable sets the durable flag. Should not be used together with
// QueueExclusive since that will fail in a future RabbitMQ version.
// Setting setting it to false not work with quorum queues.
QueueDurable bool
// Set the queue to be automatically deleted when the last consumer stops.
QueueAutoDelete bool
// SkipQueueDeclare will skip the queue declaration. This can be useful when
// you are migrating queue types or arguments and want to avoid the
// PRECONDITION_FAILED error.
SkipQueueDeclare bool
// QueueExclusive sets the exclusive flag when declaring the queue.
// Exclusive queues can only be used by the connection that created them.
// If exclusive is true, the queue will be deleted during a network failure
// making incoming requests fail before the reconnect and redeclare
// happens.
QueueExclusive bool
// QueueDeclareArgs sets any extra queue arguments.
QueueDeclareArgs amqp.Table
// AutoAck sets the auto-ack flag on the consumer.
AutoAck bool
// PrefetchCount sets the prefetch count for the consumer. This is only
// really usable when AutoAck is also set to false.
PrefetchCount int
// ExclusiveConsumer sets the exclusive flag when starting the consumer.
// This ensures that the bound handler is the only consumer of its queue.
// This works only with classic queues and setting this to true while using
// a quorum queue will silently allow multiple consumers.
ExclusiveConsumer bool
// ConsumerArgs sets any extra consumer arguments.
ConsumerArgs amqp.Table
}
// WithQueueName sets the name of the queue that the handler should be bound to.
func (b HandlerBinding) WithQueueName(name string) HandlerBinding {
b.QueueName = name
return b
}
// WithExchangeName sets the exchange that the queue should be bound to.
func (b HandlerBinding) WithExchangeName(name string) HandlerBinding {
b.ExchangeName = name
return b
}
// WithRoutingKey sets the routing key that the queue should be bound to.
func (b HandlerBinding) WithRoutingKey(key string) HandlerBinding {
b.RoutingKey = key
return b
}
// WithBindHeaders sets the headers that the queue should be bound to.
func (b HandlerBinding) WithBindHeaders(headers amqp.Table) HandlerBinding {
b.BindHeaders = headers
return b
}
// WithHandler sets the function that should be called when a message is received.
func (b HandlerBinding) WithHandler(handler HandlerFunc) HandlerBinding {
b.Handler = handler
return b
}
// WithQueueDurable sets the durable flag for the queue.
func (b HandlerBinding) WithQueueDurable(durable bool) HandlerBinding {
b.QueueDurable = durable
return b
}
// WithQueueAutoDelete sets the queue to be automatically deleted when the last consumer stops.
func (b HandlerBinding) WithQueueAutoDelete(autoDelete bool) HandlerBinding {
b.QueueAutoDelete = autoDelete
return b
}
// WithSkipQueueDeclare sets the flag to skip the queue declaration.
func (b HandlerBinding) WithSkipQueueDeclare(skip bool) HandlerBinding {
b.SkipQueueDeclare = skip
return b
}
// WithQueueExclusive sets the exclusive flag when declaring the queue.
// Exclusive queues can only be used by the connection that created them. If
// exclusive is true, the queue will be deleted during a network failure making
// incoming requests fail before the reconnect and redeclare happens.
func (b HandlerBinding) WithQueueExclusive(exclusive bool) HandlerBinding {
b.QueueExclusive = exclusive
return b
}
// WithQueueDeclareArgs sets extra queue arguments.
func (b HandlerBinding) WithQueueDeclareArgs(args amqp.Table) HandlerBinding {
b.QueueDeclareArgs = args
return b
}
// WithQueueDeclareArg sets one queue argument, this ensures that the queue
// default arguments are not overwritten.
func (b HandlerBinding) WithQueueDeclareArg(key string, val any) HandlerBinding {
b.QueueDeclareArgs[key] = val
return b
}
// WithAutoAck sets the auto-ack flag on the consumer.
func (b HandlerBinding) WithAutoAck(autoAck bool) HandlerBinding {
b.AutoAck = autoAck
return b
}
// WithPrefetchCount sets the prefetch count for the consumer.
func (b HandlerBinding) WithPrefetchCount(count int) HandlerBinding {
b.PrefetchCount = count
return b
}
// WithExclusiveConsumer sets the exclusive flag when starting the consumer.
// This ensures that the bound handler is the only consumer of its queue. This
// works only with classic queues and setting this to true while using a quorum
// queue will silently allow multiple consumers.
func (b HandlerBinding) WithExclusiveConsumer(exclusive bool) HandlerBinding {
b.ExclusiveConsumer = exclusive
return b
}
// WithConsumerArgs sets any extra consumer arguments.
func (b HandlerBinding) WithConsumerArgs(args amqp.Table) HandlerBinding {
b.ConsumerArgs = args
return b
}
// WithConsumerArg sets one extra consumer argument, this ensures that the
// consumer default arguments are not overwritten.
func (b HandlerBinding) WithConsumerArg(key string, val any) HandlerBinding {
b.ConsumerArgs[key] = val
return b
}
// CreateBinding returns a HandlerBinding with default values set.
func CreateBinding(queueName, exchangeName string, handler HandlerFunc) HandlerBinding {
return HandlerBinding{
QueueName: queueName,
ExchangeName: exchangeName,
BindHeaders: amqp.Table{},
Handler: handler,
// Must be true when using a quorum queue. And is a good default when
// using a cluster.
QueueDurable: true,
QueueDeclareArgs: amqp.Table{
// This is a good default queue for modern rabbitmq
// installations.
"x-queue-type": QueueTypeQuorum,
// Remove queues not used for 30 minutes. This is a good
// default instead of using the auto-delete flag since it gives
// the handler time to reconnect in case of a failure.
"x-expires": 30 * 60 * 1000,
},
// Default to auto ack.
AutoAck: true,
// Use a reasonable default value.
// https://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
// https://godoc.org/github.com/rabbitmq/amqp091-go#Channel.Qos
// In reality, this makes no difference unless AutoAck is set to false.
PrefetchCount: 10,
ConsumerArgs: amqp.Table{},
}
}
// DirectBinding returns a HandlerBinding to use for direct exchanges where each
// routing key will be mapped to one handler.
func DirectBinding(routingKey string, handler HandlerFunc) HandlerBinding {
// The queue name is the same as the routing key in a direct binding.
return CreateBinding(routingKey, DefaultExchangeNameDirect, handler).
WithRoutingKey(routingKey)
}
// TopicBinding returns a HandlerBinding to use for topic exchanges. The default
// exchange (amq.topic) will be used. The topic is matched on the routing key.
func TopicBinding(queueName, routingKey string, handler HandlerFunc) HandlerBinding {
return CreateBinding(queueName, DefaultExchangeNameTopic, handler).
WithRoutingKey(routingKey)
}
// HeadersBinding returns a HandlerBinding to use for header exchanges that
// will match on specific headers. The headers are specified as an amqp.Table.
// The default exchange amq.match will be used.
func HeadersBinding(queueName string, headers amqp.Table, handler HandlerFunc) HandlerBinding {
return CreateBinding(queueName, DefaultExchangeNameHeaders, handler).
WithBindHeaders(headers)
}
// ExchangeDeclareSettings is the settings that will be used when a handler
// is mapped to a fanout exchange and an exchange is declared.
type ExchangeDeclareSettings struct {
// Name is the name of the exchange.
Name string
// Type is the exchange type.
Type string
// Durable sets the durable flag. Durable exchanges survives server restart.
Durable bool
// AutoDelete sets the auto-delete flag, this ensures the exchange is
// deleted when it isn't bound to any more.
AutoDelete bool
// Args sets the arguments table used.
Args amqp.Table
}
func createExchanges(ch *amqp.Channel, exchanges []ExchangeDeclareSettings) error {
for _, e := range exchanges {
err := ch.ExchangeDeclare(
e.Name,
e.Type,
e.Durable,
e.AutoDelete,
false, // internal.
false, // no-wait.
e.Args,
)
if err != nil {
return err
}
}
return nil
}