-
Notifications
You must be signed in to change notification settings - Fork 2
/
container_consumer.go
80 lines (75 loc) · 2.52 KB
/
container_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package kafkaclient
import "github.com/confluentinc/confluent-kafka-go/kafka"
// Consume create consumers based on per thread and directly consume messages from the Kafka broker.
func (c *Container) Consume(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error) {
newConfig := c.cloneConfig(config)
newConfig[GoApplicationRebalanceEnable] = true
for numWorker := uint64(1); numWorker <= args.Workers; numWorker++ {
var cons *Consumer
cons, err = c.NewConsumer(newConfig)
if err != nil {
return
}
err = cons.consume(args)
if err != nil {
return
}
consList = append(consList, cons)
}
return
}
// ConsumeEvent create consumers based on per thread and directly consume events from the Kafka broker.
func (c *Container) ConsumeEvent(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error) {
for numWorker := uint64(1); numWorker <= args.Workers; numWorker++ {
var cons *Consumer
cons, err = c.NewConsumer(config)
if err != nil {
return
}
err = cons.consumeEvent(args)
if err != nil {
return
}
consList = append(consList, cons)
}
return
}
// ConsumeBatch create consumers based on per thread and directly consume messages from the Kafka broker.
// ConsumeBatch is an improved version of Consume but polling in a batch manner.
func (c *Container) ConsumeBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error) {
newConfig := c.cloneConfig(config)
newConfig[GoEventsChannelEnable] = true
newConfig[GoApplicationRebalanceEnable] = true
for numWorker := uint64(1); numWorker <= args.Workers; numWorker++ {
var cons *Consumer
cons, err = c.NewConsumer(newConfig)
if err != nil {
return
}
err = cons.consumeBatch(args)
if err != nil {
return
}
consList = append(consList, cons)
}
return
}
// ConsumeEventBatch create consumers based on per thread and directly consume events from the Kafka broker.
// ConsumeEventBatch is an upgraded version of ConsumeEvent with the batch processing.
func (c *Container) ConsumeEventBatch(config kafka.ConfigMap, args ConsumeArgs) (consList []*Consumer, err error) {
newConfig := c.cloneConfig(config)
newConfig[GoEventsChannelEnable] = true
for numWorker := uint64(1); numWorker <= args.Workers; numWorker++ {
var cons *Consumer
cons, err = c.NewConsumer(newConfig)
if err != nil {
return
}
err = cons.consumeEventBatch(args)
if err != nil {
return
}
consList = append(consList, cons)
}
return
}