Skip to content

Commit

Permalink
Fix for consumer commitSync()
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
Krishna Kondaka committed Aug 19, 2023
1 parent 6322389 commit 6d6d8f8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;
import org.opensearch.dataprepper.plugins.kafka.util.KafkaTopicMetrics;
import com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
Expand All @@ -55,6 +56,7 @@
* * A utility class which will handle the core Kafka consumer operation.
*/
public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceListener {

private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceCustomConsumer.class);
private static final Long COMMIT_OFFSET_INTERVAL_MS = 300000L;
private static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 1;
Expand All @@ -79,6 +81,7 @@ public class KafkaSourceCustomConsumer implements Runnable, ConsumerRebalanceLis
private final Duration acknowledgementsTimeout;
private final KafkaTopicMetrics topicMetrics;
private long metricsUpdatedTime;
private final AtomicInteger numberOfAcksPending;

public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand Down Expand Up @@ -108,6 +111,7 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer,
Duration bufferTimeout = Duration.ofSeconds(1);
this.bufferAccumulator = BufferAccumulator.create(buffer, DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE, bufferTimeout);
this.lastCommitTime = System.currentTimeMillis();
this.numberOfAcksPending = new AtomicInteger(0);
}

public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range<Long> offsetRange) {
Expand All @@ -125,6 +129,7 @@ public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAn
private AcknowledgementSet createAcknowledgementSet(Map<TopicPartition, Range<Long>> offsets) {
AcknowledgementSet acknowledgementSet =
acknowledgementSetManager.create((result) -> {
numberOfAcksPending.decrementAndGet();
if (result == true) {
topicMetrics.getNumberOfPositiveAcknowledgements().increment();
synchronized(acknowledgedOffsets) {
Expand Down Expand Up @@ -155,6 +160,7 @@ public <T> void consumeRecords() throws Exception {
updateOffsetsToCommit(partition, new OffsetAndMetadata(offsetRange.getMaximum() + 1), offsetRange));
} else {
acknowledgementSet.complete();
numberOfAcksPending.incrementAndGet();
}
}
} catch (AuthenticationException e) {
Expand Down Expand Up @@ -229,7 +235,7 @@ private void commitOffsets() {
return;
}
try {
consumer.commitSync();
consumer.commitSync(offsetsToCommit);
offsetsToCommit.clear();
lastCommitTime = currentTimeMillis;
} catch (CommitFailedException e) {
Expand All @@ -244,7 +250,12 @@ Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit() {

@Override
public void run() {
consumer.subscribe(Arrays.asList(topicName));
consumer.subscribe(Arrays.asList(topicName), this);
Set<TopicPartition> partitions = consumer.assignment();
partitions.forEach((partition) -> {
final OffsetAndMetadata offsetAndMetadata = consumer.committed(partition);
LOG.info("Starting consumer with topic partition ({}) offset {}", partition, offsetAndMetadata);
});
boolean retryingAfterException = false;
while (!shutdownInProgress.get()) {
try {
Expand All @@ -261,6 +272,21 @@ public void run() {
retryingAfterException = true;
}
}
LOG.info("Number of acks pending = {}", numberOfAcksPending.get());
long startTime = Instant.now().getEpochSecond();
long curTime = startTime;
long ackTimeoutSeconds = acknowledgementsTimeout.toSeconds();
long waitTime = ackTimeoutSeconds;
while (curTime - startTime < ackTimeoutSeconds ) {
try {
Thread.sleep(waitTime * 1000);
curTime = Instant.now().getEpochSecond();
} catch (Exception e) {
curTime = Instant.now().getEpochSecond();
waitTime = ackTimeoutSeconds - (curTime - startTime);
}
}
commitOffsets();
}

private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, int partition) {
Expand Down Expand Up @@ -353,16 +379,18 @@ public void closeConsumer(){
public void shutdownConsumer(){
consumer.wakeup();
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
synchronized(partitionsToReset) {
partitionsToReset.add(topicPartition);
}
LOG.info("Assigned partition {}",topicPartition);
}
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition topicPartition : partitions) {
LOG.info("Revoked partition {}", topicPartition);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,15 @@ public void stop() {
LOG.info("Shutting down Consumers...");
shutdownInProgress.set(true);
executorService.shutdown();
long ackTimeoutSeconds = sourceConfig.getAcknowledgementsTimeout().toSeconds();
boolean acknowledgementsEnabled = sourceConfig.getAcknowledgementsEnabled() || ackTimeoutSeconds != KafkaSourceConfig.DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT.toSeconds();
long threadWaitingTime = calculateLongestThreadWaitingTime();
if (acknowledgementsEnabled && threadWaitingTime < 2 * ackTimeoutSeconds) {
threadWaitingTime = 2 * ackTimeoutSeconds;
}
try {
if (!executorService.awaitTermination(
calculateLongestThreadWaitingTime(), TimeUnit.SECONDS)) {
threadWaitingTime, TimeUnit.SECONDS)) {
LOG.info("Consumer threads are waiting for shutting down...");
executorService.shutdownNow();
}
Expand All @@ -178,6 +184,7 @@ public void stop() {
Thread.currentThread().interrupt();
}
}
consumer.closeConsumer();
LOG.info("Consumer shutdown successfully...");
}

Expand Down

0 comments on commit 6d6d8f8

Please sign in to comment.