diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java index 96767016c5..7f5226d90e 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumer.java @@ -110,6 +110,10 @@ public KafkaSourceCustomConsumer(final KafkaConsumer consumer, this.lastCommitTime = System.currentTimeMillis(); } + KafkaTopicMetrics getTopicMetrics() { + return topicMetrics; + } + public void updateOffsetsToCommit(final TopicPartition partition, final OffsetAndMetadata offsetAndMetadata, Range offsetRange) { long min = offsetRange.getMinimum(); long max = offsetRange.getMaximum(); diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java index 47080515d3..8f313eae54 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaSourceCustomConsumerTest.java @@ -51,10 +51,13 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.time.Duration; + +import static org.awaitility.Awaitility.await; + @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class KafkaSourceCustomConsumerTest { @@ -95,23 +98,35 @@ public class KafkaSourceCustomConsumerTest { private final int testPartition = 0; private final int testJsonPartition = 1; private Counter counter; + @Mock + private Counter posCounter; + @Mock + private Counter negCounter; + private Duration delayTime; + private double posCount; + private double negCount; @BeforeEach public void setUp() { + delayTime = Duration.ofMillis(10); kafkaConsumer = mock(KafkaConsumer.class); topicMetrics = mock(KafkaTopicMetrics.class); counter = mock(Counter.class); + posCounter = mock(Counter.class); + negCounter = mock(Counter.class); topicConfig = mock(TopicConfig.class); - when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(counter); - when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter); - when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(counter); + when(topicMetrics.getNumberOfPositiveAcknowledgements()).thenReturn(posCounter); + when(topicMetrics.getNumberOfNegativeAcknowledgements()).thenReturn(negCounter); when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter); when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); when(topicConfig.getAutoCommit()).thenReturn(false); when(kafkaConsumer.committed(any(TopicPartition.class))).thenReturn(null); - doAnswer((i)-> {return null;}).when(counter).increment(); + doAnswer((i)-> {posCount += 1.0; return null;}).when(posCounter).increment(); + doAnswer((i)-> {negCount += 1.0; return null;}).when(negCounter).increment(); + doAnswer((i)-> {return posCount;}).when(posCounter).count(); + doAnswer((i)-> {return negCount;}).when(negCounter).count(); callbackExecutor = Executors.newFixedThreadPool(2); acknowledgementSetManager = new DefaultAcknowledgementSetManager(callbackExecutor, Duration.ofMillis(2000)); @@ -201,9 +216,10 @@ public void testPlainTextConsumeRecordsWithAcknowledgements() throws Interrupted event.getEventHandle().release(true); } // Wait for acknowledgement callback function to run - try { - Thread.sleep(10000); - } catch (Exception e){} + await().atMost(delayTime.plusMillis(5000)) + .until(() -> { + return consumer.getTopicMetrics().getNumberOfPositiveAcknowledgements().count() == 1.0; + }); consumer.processAcknowledgedOffsets(); offsetsToCommit = consumer.getOffsetsToCommit(); @@ -245,9 +261,10 @@ public void testPlainTextConsumeRecordsWithNegativeAcknowledgements() throws Int event.getEventHandle().release(false); } // Wait for acknowledgement callback function to run - try { - Thread.sleep(10000); - } catch (Exception e){} + await().atMost(delayTime.plusMillis(5000)) + .until(() -> { + return consumer.getTopicMetrics().getNumberOfNegativeAcknowledgements().count() == 1.0; + }); consumer.processAcknowledgedOffsets(); offsetsToCommit = consumer.getOffsetsToCommit();