Skip to content

Commit

Permalink
Fix for kafka source not committing offsets issue #3231 (#3232)
Browse files Browse the repository at this point in the history
Signed-off-by: Hardeep Singh <[email protected]>
  • Loading branch information
hshardeesi authored Aug 24, 2023
1 parent a4df0bd commit 89e8f39
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.Range;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -82,6 +83,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private final KafkaTopicMetrics topicMetrics;
private long metricsUpdatedTime;
private final AtomicInteger numberOfAcksPending;
private long numRecordsCommitted = 0;

public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand All @@ -103,7 +105,6 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
this.metricsUpdatedTime = Instant.now().getEpochSecond();
this.acknowledgedOffsets = new ArrayList<>();
this.acknowledgementsTimeout = Duration.ofSeconds(Integer.MAX_VALUE);
// If the timeout value is different from default value, then enable acknowledgements automatically.
this.acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled();
this.acknowledgementSetManager = acknowledgementSetManager;
this.partitionCommitTrackerMap = new HashMap<>();
Expand All @@ -120,10 +121,7 @@ private long getCurrentTimeNanos() {
return now.getEpochSecond()*1000000000+now.getNano();
}

public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range<Long> offsetRange) {
long min = offsetRange.getMinimum();
long max = offsetRange.getMaximum();
topicMetrics.getNumberOfRecordsCommitted().increment(max - min + 1);
public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata) {
if (Objects.isNull(offsetAndMetadata)) {
return;
}
Expand Down Expand Up @@ -164,10 +162,10 @@ public <T> void consumeRecords() throws Exception {
}
iterateRecordPartitions(records, acknowledgementSet, offsets);
if (!acknowledgementsEnabled) {
offsets.forEach((partition, offsetRange) ->
updateOffsetsToCommit(partition,
new OffsetAndMetadata(offsetRange.getOffsets().getMaximum() + 1),
offsetRange.getOffsets()));
offsets.forEach((partition, offsetRange) -> {
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getOffsets().getMaximum() + 1));
numRecordsCommitted += offsetRange.getOffsets().getMaximum() - offsetRange.getOffsets().getMinimum() + 1;
});
} else {
acknowledgementSet.complete();
numberOfAcksPending.incrementAndGet();
Expand Down Expand Up @@ -197,7 +195,8 @@ private void resetOffsets() {
try {
final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
if (Objects.isNull(offsetAndMetadata)) {
consumer.seek(partition, 0L);
LOG.info("Seeking partition {} to the beginning", partition);
consumer.seekToBeginning(List.of(partition));
} else {
LOG.info("Seeking partition {} to {}", partition, offsetAndMetadata.offset());
consumer.seek(partition, offsetAndMetadata);
Expand All @@ -215,20 +214,16 @@ void processAcknowledgedOffsets() {

acknowledgedOffsets.forEach(offsets -> {
offsets.forEach((partition, offsetRange) -> {

if (getPartitionEpoch(partition) == offsetRange.getEpoch()) {
try {
int partitionId = partition.partition();
if (!partitionCommitTrackerMap.containsKey(partitionId)) {
OffsetAndMetadata committedOffsetAndMetadata = consumer.committed(partition);
Long committedOffset = Objects.nonNull(committedOffsetAndMetadata) ? committedOffsetAndMetadata.offset() : null;
LOG.info("Tracking offsets for partition{} starting with committedOffset {}", partition,
committedOffset);
partitionCommitTrackerMap.put(partitionId, new TopicPartitionCommitTracker(partition, committedOffset));
if (partitionCommitTrackerMap.containsKey(partitionId)) {
final OffsetAndMetadata offsetAndMetadata =
partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange.getOffsets());
updateOffsetsToCommit(partition, offsetAndMetadata);
} else {
LOG.error("Commit tracker not found for topic: {} partition: {}", partition.topic(), partitionId);
}
OffsetAndMetadata offsetAndMetadata =
partitionCommitTrackerMap.get(partitionId).addCompletedOffsets(offsetRange.getOffsets());
updateOffsetsToCommit(partition, offsetAndMetadata, offsetRange.getOffsets());
} catch (Exception e) {
LOG.error("Failed committed offsets upon positive acknowledgement {}", partition, e);
}
Expand All @@ -238,6 +233,21 @@ void processAcknowledgedOffsets() {
acknowledgedOffsets.clear();
}

private void updateCommitCountMetric(final TopicPartition topicPartition, final OffsetAndMetadata offsetAndMetadata) {
if (acknowledgementsEnabled) {
final TopicPartitionCommitTracker commitTracker = partitionCommitTrackerMap.get(topicPartition.partition());
if (Objects.isNull(commitTracker)) {
LOG.error("Commit tracker not found for topic: {} partition: {}",
topicPartition.topic(), topicPartition.partition());
return;
}
topicMetrics.getNumberOfRecordsCommitted().increment(commitTracker.getCommittedRecordCount());
} else {
topicMetrics.getNumberOfRecordsCommitted().increment(numRecordsCommitted);
numRecordsCommitted = 0;
}
}

private void commitOffsets(boolean forceCommit) {
if (topicConfig.getAutoCommit()) {
return;
Expand All @@ -251,6 +261,8 @@ private void commitOffsets(boolean forceCommit) {
if (offsetsToCommit.isEmpty()) {
return;
}

offsetsToCommit.forEach(((partition, offset) -> updateCommitCountMetric(partition, offset)));
try {
consumer.commitSync(offsetsToCommit);
} catch (Exception e) {
Expand All @@ -261,22 +273,26 @@ private void commitOffsets(boolean forceCommit) {
}
}

@VisibleForTesting
Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {
return offsetsToCommit;
}

@VisibleForTesting
Long getNumRecordsCommitted() {
return numRecordsCommitted;
}

@Override
public void run() {
consumer.subscribe(Arrays.asList(topicName), this);
Set<TopicPartition> partitions = consumer.assignment();
synchronized (ownedPartitionsEpoch) {
final long currentEpoch = getCurrentTimeNanos();
partitions.forEach((partition) -> {
final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
LOG.info("Starting consumer with topic partition ({}) offset {}", partition, offsetAndMetadata);
ownedPartitionsEpoch.put(partition, currentEpoch);
});
}
final long currentEpoch = getCurrentTimeNanos();
partitions.forEach((partition) -> {
final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
LOG.info("Starting consumer with topic partition ({}) offset {}", partition, offsetAndMetadata);
ownedPartitionsEpoch.put(partition, currentEpoch);
});

boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
Expand Down Expand Up @@ -357,8 +373,8 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
Map<TopicPartition, CommitOffsetRange> offsets) throws Exception {
for (TopicPartition topicPartition : records.partitions()) {
final long partitionEpoch = getPartitionEpoch(topicPartition);
if (partitionEpoch == 0) {
LOG.info("Skipping partition {}, lost ownership", topicPartition);
if (acknowledgementsEnabled && partitionEpoch == 0) {
//ToDo: Add metric
continue;
}

Expand Down Expand Up @@ -389,6 +405,11 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
long firstOffset = partitionRecords.get(0).offset();
Range<Long> offsetRange = Range.between(firstOffset, lastOffset);
offsets.put(topicPartition, new CommitOffsetRange(offsetRange, partitionEpoch));

if (acknowledgementsEnabled && !partitionCommitTrackerMap.containsKey(topicPartition.partition())) {
partitionCommitTrackerMap.put(topicPartition.partition(),
new TopicPartitionCommitTracker(topicPartition, firstOffset));
}
}
}

Expand All @@ -404,19 +425,18 @@ public void shutdownConsumer(){
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
synchronized(this) {
final long epoch = getCurrentTimeNanos();

for (TopicPartition topicPartition : partitions) {
if (ownedPartitionsEpoch.containsKey(topicPartition)) {
LOG.info("Partition {} already owned", topicPartition);
continue;
}
final OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
LOG.info("Assigned new partition {}, committed offset: {}",topicPartition,
Objects.isNull(offsetAndMetadata) ? 0 : offsetAndMetadata.offset());

LOG.info("Assigned partition {}", topicPartition);
partitionCommitTrackerMap.remove(topicPartition.partition());
ownedPartitionsEpoch.put(topicPartition, epoch);
}
}
dumpTopicPartitionOffsets(partitions);
}

@Override
Expand All @@ -425,7 +445,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffsets(true);
for (TopicPartition topicPartition : partitions) {
if (!ownedPartitionsEpoch.containsKey(topicPartition)) {
LOG.info("Partition {} not owned ", topicPartition);
LOG.info("Partition {} not owned", topicPartition);
continue;
}
LOG.info("Revoked partition {}", topicPartition);
Expand All @@ -438,4 +458,26 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
private long getPartitionEpoch(final TopicPartition topicPartition) {
return ownedPartitionsEpoch.getOrDefault(topicPartition, 0L);
}

final void dumpTopicPartitionOffsets(final Collection<TopicPartition> partitions) {
try {
final Map<TopicPartition, OffsetAndMetadata> committedOffsets = consumer.committed(new HashSet<>(partitions));
final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(partitions);
final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
for (TopicPartition topicPartition : partitions) {
final OffsetAndMetadata offsetAndMetadata = committedOffsets.get(topicPartition);
LOG.info("Partition {} offsets: beginningOffset: {}, endOffset: {}, committedOffset: {}",
topicPartition, getTopicPartitionOffset(beginningOffsets, topicPartition),
getTopicPartitionOffset(endOffsets, topicPartition),
Objects.isNull(offsetAndMetadata) ? "-" : offsetAndMetadata.offset());
}
} catch (Exception e) {
LOG.error("Failed to get offsets in onPartitionsAssigned callback", e);
}
}

final String getTopicPartitionOffset(final Map<TopicPartition, Long> offsetMap, final TopicPartition topicPartition) {
final Long offset = offsetMap.get(topicPartition);
return Objects.isNull(offset) ? "-" : offset.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,50 @@

package org.opensearch.dataprepper.plugins.kafka.consumer;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import org.apache.commons.lang3.Range;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

public class TopicPartitionCommitTracker {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class);
private long committedOffset;
private long committedRecordCount;
private long initialOffset;
private final TopicPartition topicPartition;
private final Map<Long, Range<Long>> offsetMaxMap;
private final Map<Long, Range<Long>> offsetMinMap;

public TopicPartitionCommitTracker(final TopicPartition topicPartition, Long committedOffset) {
public TopicPartitionCommitTracker(final TopicPartition topicPartition, final Long initialOffset) {
this.topicPartition = topicPartition;
this.committedOffset = Objects.nonNull(committedOffset) ? committedOffset-1 : -1L;
this.initialOffset = initialOffset;
LOG.info("Created commit tracker for partition: {}, initialOffset: {}", topicPartition, initialOffset);

this.committedOffset = initialOffset-1L;
this.committedRecordCount = 0;
this.offsetMaxMap = new HashMap<>();
this.offsetMinMap = new HashMap<>();
this.offsetMaxMap.put(this.committedOffset, Range.between(this.committedOffset, this.committedOffset));
}

public long getInitialOffset() {
return initialOffset;
}

public long getCommittedOffset() {
return committedOffset;
}

public long getCommittedRecordCount() {
long count = committedRecordCount;
committedRecordCount = 0;
return count;
}

public TopicPartitionCommitTracker(final String topic, final int partition, Long committedOffset) {
this(new TopicPartition(topic, partition), committedOffset);
}
Expand Down Expand Up @@ -68,6 +90,7 @@ public OffsetAndMetadata addCompletedOffsets(final Range<Long> offsetRange) {
Long maxValue = offsetMinMap.get(committedOffset).getMaximum();
if (maxValue != committedOffset) {
offsetMinMap.remove(committedOffset);
committedRecordCount += (maxValue - committedOffset);
committedOffset = maxValue;
offsetMaxMap.put(committedOffset, Range.between(committedOffset, committedOffset));
return new OffsetAndMetadata(committedOffset + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -98,8 +97,8 @@ public class KafkaSource implements Source<Record<Event>> {
private static CachedSchemaRegistryClient schemaRegistryClient;
private GlueSchemaRegistryKafkaDeserializer glueDeserializer;
private StringDeserializer stringDeserializer;
private final Map<TopicConfig, ExecutorService> topicExecutorService;
private final Map<TopicConfig, KafkaSourceCustomConsumer> topicConsumer;
private final List<ExecutorService> allTopicExecutorServices;
private final List<KafkaSourceCustomConsumer> allTopicConsumers;

@DataPrepperPluginConstructor
public KafkaSource(final KafkaSourceConfig sourceConfig,
Expand All @@ -111,9 +110,9 @@ public KafkaSource(final KafkaSourceConfig sourceConfig,
this.acknowledgementSetManager = acknowledgementSetManager;
this.pipelineName = pipelineDescription.getPipelineName();
this.stringDeserializer = new StringDeserializer();
shutdownInProgress = new AtomicBoolean(false);
topicExecutorService = new HashMap<>();
topicConsumer = new HashMap<>();
this.shutdownInProgress = new AtomicBoolean(false);
this.allTopicExecutorServices = new ArrayList<>();
this.allTopicConsumers = new ArrayList<>();
}

@Override
Expand All @@ -128,6 +127,8 @@ public void start(Buffer<Record<Event>> buffer) {
try {
int numWorkers = topic.getWorkers();
executorService = Executors.newFixedThreadPool(numWorkers);
allTopicExecutorServices.add(executorService);

IntStream.range(0, numWorkers).forEach(index -> {
switch (schema) {
case JSON:
Expand All @@ -147,10 +148,9 @@ public void start(Buffer<Record<Event>> buffer) {
break;
}
consumer = new KafkaSourceCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, acknowledgementSetManager, topicMetrics);
executorService.submit(consumer);
allTopicConsumers.add(consumer);

topicExecutorService.put(topic, executorService);
topicConsumer.put(topic, consumer);
executorService.submit(consumer);
});
} catch (Exception e) {
if (e instanceof BrokerNotAvailableException ||
Expand All @@ -168,18 +168,21 @@ public void start(Buffer<Record<Event>> buffer) {
@Override
public void stop() {
shutdownInProgress.set(true);
LOG.info("Shutting down Consumers...");
sourceConfig.getTopics().forEach(topic -> {
stopConsumer(topicExecutorService.get(topic), topicConsumer.get(topic));
});
LOG.info("Consumer shutdown successfully...");
final long shutdownWaitTime = calculateLongestThreadWaitingTime();

LOG.info("Shutting down {} Executor services", allTopicExecutorServices.size());
allTopicExecutorServices.forEach(executor -> stopExecutor(executor, shutdownWaitTime));

LOG.info("Closing {} consumers", allTopicConsumers.size());
allTopicConsumers.forEach(consumer -> consumer.closeConsumer());

LOG.info("Kafka source shutdown successfully...");
}

public void stopConsumer(final ExecutorService executorService, final KafkaSourceCustomConsumer consumer) {
public void stopExecutor(final ExecutorService executorService, final long shutdownWaitTime) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(
calculateLongestThreadWaitingTime(), TimeUnit.SECONDS)) {
if (!executorService.awaitTermination(shutdownWaitTime, TimeUnit.SECONDS)) {
LOG.info("Consumer threads are waiting for shutting down...");
executorService.shutdownNow();
}
Expand All @@ -190,7 +193,6 @@ public void stopConsumer(final ExecutorService executorService, final KafkaSourc
Thread.currentThread().interrupt();
}
}
consumer.closeConsumer();
}

private long calculateLongestThreadWaitingTime() {
Expand Down
Loading

0 comments on commit 89e8f39

Please sign in to comment.