From 52bce87237376583ca8f97082178efd0d683fdbf Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 28 Jan 2021 22:20:19 +0100 Subject: [PATCH 1/3] Don't commit when seeking if autoCommit is disabled Fixes #395 --- docs/Consuming.md | 11 +++++++ src/consumer/__tests__/seek.spec.js | 47 +++++++++++++++++++++++++++++ src/consumer/consumerGroup.js | 3 ++ src/consumer/index.js | 4 ++- src/consumer/offsetManager/index.js | 14 +++++++++ 5 files changed, 78 insertions(+), 1 deletion(-) diff --git a/docs/Consuming.md b/docs/Consuming.md index 3564ff597..a6fa47a28 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -329,6 +329,17 @@ consumer.seek({ topic: 'example', partition: 0, offset: 12384 }) Upon seeking to an offset, any messages in active batches are marked as stale and discarded, making sure the next message read for the partition is from the offset sought to. Make sure to check `isStale()` before processing a message using [the `eachBatch` interface](#each-batch) of `consumer.run`. +By default, the consumer will commit the offset seeked. To disable this, set the [`autoCommit`](#auto-commit) option to `false` on the consumer. + +```javascript +consumer.run({ + autoCommit: false, + eachMessage: async ({ topic, message }) => true +}) +// This will now only resolve the previous offset, not commit it +consumer.seek({ topic: 'example', partition: 0, offset: 12384 }) +``` + ## Custom partition assigner It's possible to configure the strategy the consumer will use to distribute partitions amongst the consumer group. KafkaJS has a round robin assigner configured by default. diff --git a/src/consumer/__tests__/seek.spec.js b/src/consumer/__tests__/seek.spec.js index 20101f504..fa5c8385a 100644 --- a/src/consumer/__tests__/seek.spec.js +++ b/src/consumer/__tests__/seek.spec.js @@ -1,3 +1,4 @@ +const createAdmin = require('../../admin') const createProducer = require('../../producer') const createConsumer = require('../index') const { KafkaJSNonRetriableError } = require('../../errors') @@ -163,5 +164,51 @@ describe('Consumer', () => { }, ]) }) + + describe('When "autoCommit" is false', () => { + let admin + + beforeEach(() => { + admin = createAdmin({ logger: newLogger(), cluster }) + }) + + afterEach(async () => { + admin && (await admin.disconnect()) + }) + + it('should not commit the offset', async () => { + await Promise.all([consumer, producer, admin].map(client => client.connect())) + + await producer.send({ + acks: 1, + topic: topicName, + messages: [1, 2, 3].map(n => ({ key: `key-${n}`, value: `value-${n}` })), + }) + await consumer.subscribe({ topic: topicName, fromBeginning: true }) + + const messagesConsumed = [] + consumer.run({ + autoCommit: false, + eachMessage: async event => messagesConsumed.push(event), + }) + consumer.seek({ topic: topicName, partition: 0, offset: 2 }) + + await waitForConsumerToJoinGroup(consumer) + await expect(waitForMessages(messagesConsumed, { number: 1 })).resolves.toEqual([ + { + topic: topicName, + partition: 0, + message: expect.objectContaining({ offset: '2' }), + }, + ]) + + await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([ + expect.objectContaining({ + partition: 0, + offset: '-1', + }), + ]) + }) + }) }) }) diff --git a/src/consumer/consumerGroup.js b/src/consumer/consumerGroup.js index 1279aef5b..bb7d4eb4d 100644 --- a/src/consumer/consumerGroup.js +++ b/src/consumer/consumerGroup.js @@ -55,6 +55,7 @@ module.exports = class ConsumerGroup { minBytes, maxBytes, maxWaitTimeInMs, + autoCommit, autoCommitInterval, autoCommitThreshold, isolationLevel, @@ -77,6 +78,7 @@ module.exports = class ConsumerGroup { this.minBytes = minBytes this.maxBytes = maxBytes this.maxWaitTime = maxWaitTimeInMs + this.autoCommit = autoCommit this.autoCommitInterval = autoCommitInterval this.autoCommitThreshold = autoCommitThreshold this.isolationLevel = isolationLevel @@ -274,6 +276,7 @@ module.exports = class ConsumerGroup { }), {} ), + autoCommit: this.autoCommit, autoCommitInterval: this.autoCommitInterval, autoCommitThreshold: this.autoCommitThreshold, coordinator, diff --git a/src/consumer/index.js b/src/consumer/index.js index 141ed4cdf..54be70e15 100644 --- a/src/consumer/index.js +++ b/src/consumer/index.js @@ -82,7 +82,7 @@ module.exports = ({ ) } - const createConsumerGroup = ({ autoCommitInterval, autoCommitThreshold }) => { + const createConsumerGroup = ({ autoCommit, autoCommitInterval, autoCommitThreshold }) => { return new ConsumerGroup({ logger: rootLogger, topics: keys(topics), @@ -98,6 +98,7 @@ module.exports = ({ maxBytes, maxWaitTimeInMs, instrumentationEmitter, + autoCommit, autoCommitInterval, autoCommitThreshold, isolationLevel, @@ -218,6 +219,7 @@ module.exports = ({ } consumerGroup = createConsumerGroup({ + autoCommit, autoCommitInterval, autoCommitThreshold, }) diff --git a/src/consumer/offsetManager/index.js b/src/consumer/offsetManager/index.js index 9c5326a2e..06c49c349 100644 --- a/src/consumer/offsetManager/index.js +++ b/src/consumer/offsetManager/index.js @@ -18,6 +18,7 @@ module.exports = class OffsetManager { * @param {import("../../../types").Cluster} options.cluster * @param {import("../../../types").Broker} options.coordinator * @param {import("../../../types").IMemberAssignment} options.memberAssignment + * @param {boolean} options.autoCommit * @param {number | null} options.autoCommitInterval * @param {number | null} options.autoCommitThreshold * @param {{[topic: string]: { fromBeginning: boolean }}} options.topicConfigurations @@ -30,6 +31,7 @@ module.exports = class OffsetManager { cluster, coordinator, memberAssignment, + autoCommit, autoCommitInterval, autoCommitThreshold, topicConfigurations, @@ -54,6 +56,7 @@ module.exports = class OffsetManager { this.generationId = generationId this.memberId = memberId + this.autoCommit = autoCommit this.autoCommitInterval = autoCommitInterval this.autoCommitThreshold = autoCommitThreshold this.lastCommit = Date.now() @@ -166,6 +169,17 @@ module.exports = class OffsetManager { return } + if (!this.autoCommit) { + this.resolveOffset({ + topic, + partition, + offset: Long.fromValue(offset) + .subtract(1) + .toString(), + }) + return + } + const { groupId, generationId, memberId } = this const coordinator = await this.getCoordinator() From 9514382fa4b2ffb3a7f6736f80d1c6e48de6c39d Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 28 Jan 2021 22:20:48 +0100 Subject: [PATCH 2/3] Don't commit offsets after fetch when autoCommit is false Related to #507 --- src/consumer/runner.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/consumer/runner.js b/src/consumer/runner.js index d0717a732..93b4e71ff 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -167,7 +167,7 @@ module.exports = class Runner extends EventEmitter { this.consumerGroup.resolveOffset({ topic, partition, offset: message.offset }) await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval }) - await this.consumerGroup.commitOffsetsIfNecessary() + await this.autoCommitOffsetsIfNecessary() } } From 18c78d1e98a39b84984bb5f19d0a90f0c2c6be86 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Fri, 29 Jan 2021 19:32:57 +0100 Subject: [PATCH 3/3] Test that consumer can seek back to an already resolved offset --- src/consumer/__tests__/seek.spec.js | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/consumer/__tests__/seek.spec.js b/src/consumer/__tests__/seek.spec.js index fa5c8385a..25c619f9d 100644 --- a/src/consumer/__tests__/seek.spec.js +++ b/src/consumer/__tests__/seek.spec.js @@ -186,7 +186,7 @@ describe('Consumer', () => { }) await consumer.subscribe({ topic: topicName, fromBeginning: true }) - const messagesConsumed = [] + let messagesConsumed = [] consumer.run({ autoCommit: false, eachMessage: async event => messagesConsumed.push(event), @@ -208,6 +208,22 @@ describe('Consumer', () => { offset: '-1', }), ]) + + messagesConsumed = [] + consumer.seek({ topic: topicName, partition: 0, offset: 1 }) + + await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([ + { + topic: topicName, + partition: 0, + message: expect.objectContaining({ offset: '1' }), + }, + { + topic: topicName, + partition: 0, + message: expect.objectContaining({ offset: '2' }), + }, + ]) }) }) })