From 6d6d8f800e88a2c511c9470e37a2673ddc803f3f Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Sat, 19 Aug 2023 16:23:29 +0000 Subject: [PATCH] Fix for consumer commitSync() Signed-off-by: Krishna Kondaka --- .../consumer/KafkaSourceCustomConsumer.java | 38 ++++++++++++++++--- .../plugins/kafka/source/KafkaSource.java | 9 ++++- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 96767016c5..93daa22ec9 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -45,6 +45,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics; import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema; @@ -55,6 +56,7 @@ * * A utility class which will handle the core Kafka consumer operation. */ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceListener { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class); private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L; private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1; @@ -79,6 +81,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis private final Duration acknowledgementsTimeout; private final KafkaTopicMetrics topicMetrics; private long metricsUpdatedTime; + private final AtomicInteger numberOfAcksPending; public KafkaSourceCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -108,6 +111,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, Duration bufferTimeout = Duration.ofSeconds(1); this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout); this.lastCommitTime = System.currentTimeMillis(); + this.numberOfAcksPending = new AtomicInteger(0); } public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range offsetRange) { @@ -125,6 +129,7 @@ public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAn private AcknowledgementSet createAcknowledgementSet(Map> offsets) { AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create((result) -> { + numberOfAcksPending.decrementAndGet(); if (result == true) { topicMetrics.getNumberOfPositiveAcknowledgements().increment(); synchronized(acknowledgedOffsets) { @@ -155,6 +160,7 @@ public void consumeRecords() throws Exception { updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1), offsetRange)); } else { acknowledgementSet.complete(); + numberOfAcksPending.incrementAndGet(); } } } catch (AuthenticationException e) { @@ -229,7 +235,7 @@ private void commitOffsets() { return; } try { - consumer.commitSync(); + consumer.commitSync(offsetsToCommit); offsetsToCommit.clear(); lastCommitTime = currentTimeMillis; } catch (CommitFailedException e) { @@ -244,7 +250,12 @@ Map getOffsetsToCommit() { @Override public void run() { - consumer.subscribe(Arrays.asList(topicName)); + consumer.subscribe(Arrays.asList(topicName), this); + Set partitions = consumer.assignment(); + partitions.forEach((partition) -> { + final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition); + LOG.info("Starting consumer with topic partition ({}) offset {}", partition, offsetAndMetadata); + }); boolean retryingAfterException = false; while (!shutdownInProgress.get()) { try { @@ -261,6 +272,21 @@ public void run() { retryingAfterException = true; } } + LOG.info("Number of acks pending = {}", numberOfAcksPending.get()); + long startTime = Instant.now().getEpochSecond(); + long curTime = startTime; + long ackTimeoutSeconds = acknowledgementsTimeout.toSeconds(); + long waitTime = ackTimeoutSeconds; + while (curTime - startTime < ackTimeoutSeconds ) { + try { + Thread.sleep(waitTime * 1000); + curTime = Instant.now().getEpochSecond(); + } catch (Exception e) { + curTime = Instant.now().getEpochSecond(); + waitTime = ackTimeoutSeconds - (curTime - startTime); + } + } + commitOffsets(); } private Record getRecord(ConsumerRecord consumerRecord, int partition) { @@ -353,16 +379,18 @@ public void closeConsumer(){ public void shutdownConsumer(){ consumer.wakeup(); } + @Override public void onPartitionsAssigned(Collection partitions) { for (TopicPartition topicPartition : partitions) { - synchronized(partitionsToReset) { - partitionsToReset.add(topicPartition); - } + LOG.info("Assigned partition {}",topicPartition); } } @Override public void onPartitionsRevoked(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + LOG.info("Revoked partition {}", topicPartition); + } } } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index ea021f8474..f8c2340241 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -165,9 +165,15 @@ public void stop() { LOG.info("Shutting down Consumers..."); shutdownInProgress.set(true); executorService.shutdown(); + long ackTimeoutSeconds = sourceConfig.getAcknowledgementsTimeout().toSeconds(); + boolean acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || ackTimeoutSeconds != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT.toSeconds(); + long threadWaitingTime = calculateLongestThreadWaitingTime(); + if (acknowledgementsEnabled && threadWaitingTime < 2 * ackTimeoutSeconds) { + threadWaitingTime = 2 * ackTimeoutSeconds; + } try { if (!executorService.awaitTermination( - calculateLongestThreadWaitingTime(), TimeUnit.SECONDS)) { + threadWaitingTime, TimeUnit.SECONDS)) { LOG.info("Consumer threads are waiting for shutting down..."); executorService.shutdownNow(); } @@ -178,6 +184,7 @@ public void stop() { Thread.currentThread().interrupt(); } } + consumer.closeConsumer(); LOG.info("Consumer shutdown successfully..."); }