-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.js
36 lines (29 loc) · 870 Bytes
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
const kafka = require('./client')
const group = process.argv[2]
async function init() {
/**
* Consumer will subscribe to a particular kafka topic and then get those messages whenever a producer sends messages
* to that topic
*/
// Define a consumer
console.log('Creating consumer...')
const consumer = kafka.consumer({
groupId: group
})
console.log('Consumer created.')
// Connecting the consumer
console.log('Connecting consumer...')
await consumer.connect()
console.log('Consumer connected.')
// Subscribe to the topic
await consumer.subscribe({
topics: ['rider-updates']
})
// Consume message
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log(`Group: ${group}: TOPIC:${topic} ; PARTITION:${partition} ; MESSAGE: ${message.value.toString()}`)
}
})
}
init()