diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index e093139b71..2604f4404d 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -121,6 +121,10 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.errLogRateLimiter = new LogRateLimiter(2, System.currentTimeMillis()); } + KafkaTopicMetrics getTopicMetrics() { + return topicMetrics; + } + private long getCurrentTimeNanos() { Instant now = Instant.now(); return now.getEpochSecond()*1000000000+now.getNano(); @@ -501,4 +505,4 @@ final String getTopicPartitionOffset(final Map offsetMap, final Long offset = offsetMap.get(topicPartition); return Objects.isNull(offset) ? "-" : offset.toString(); } -} \ No newline at end of file +} diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index 43312becfa..1e61146e55 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -49,6 +49,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import static org.awaitility.Awaitility.await; + import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasEntry; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -97,16 +99,25 @@ public class KafkaSourceCustomConsumerTest { private final int testPartition = 0; private final int testJsonPartition = 1; private Counter counter; + @Mock + private Counter posCounter; + @Mock + private Counter negCounter; + private Duration delayTime; + private double posCount; + private double negCount; @BeforeEach public void setUp() { + delayTime = Duration.ofMillis(10); kafkaConsumer = mock(KafkaConsumer.class); topicMetrics = mock(KafkaTopicMetrics.class); counter = mock(Counter.class); + posCounter = mock(Counter.class); + negCounter = mock(Counter.class); topicConfig = mock(TopicConfig.class); - when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(counter); - when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter); - when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter); + when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(posCounter); + when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(negCounter); when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter); when(topicMetrics.getNumberOfDeserializationErrors()).thenReturn(counter); when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); @@ -114,7 +125,16 @@ public void setUp() { when(topicConfig.getAutoCommit()).thenReturn(false); when(kafkaConsumer.committed(any(TopicPartition.class))).thenReturn(null); - doAnswer((i)-> {return null;}).when(counter).increment(); + doAnswer((i)-> { + posCount += 1.0; + return null; + }).when(posCounter).increment(); + doAnswer((i)-> { + negCount += 1.0; + return null; + }).when(negCounter).increment(); + doAnswer((i)-> {return posCount;}).when(posCounter).count(); + doAnswer((i)-> {return negCount;}).when(negCounter).count(); callbackExecutor = Executors.newFixedThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000)); @@ -207,9 +227,10 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted event.getEventHandle().release(true); } // Wait for acknowledgement callback function to run - try { - Thread.sleep(100); - } catch (Exception e){} + await().atMost(delayTime.plusMillis(5000)) + .until(() -> { + return consumer.getTopicMetrics().getNumberOfPositiveAcknowledgements().count() == 1.0; + }); consumer.processAcknowledgedOffsets(); offsetsToCommit = consumer.getOffsetsToCommit(); @@ -254,9 +275,10 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int event.getEventHandle().release(false); } // Wait for acknowledgement callback function to run - try { - Thread.sleep(100); - } catch (Exception e){} + await().atMost(delayTime.plusMillis(5000)) + .until(() -> { + return consumer.getTopicMetrics().getNumberOfNegativeAcknowledgements().count() == 1.0; + }); consumer.processAcknowledgedOffsets(); offsetsToCommit = consumer.getOffsetsToCommit();