diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 4a58464f7308..c776e3f2f8b2 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -181,6 +181,10 @@ private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue setKafkaProducer() } countDownSentEvents.await(); + kafkaEmitter.close(); + Assert.assertEquals(0, kafkaEmitter.getMetricLostCount()); Assert.assertEquals(0, kafkaEmitter.getAlertLostCount()); Assert.assertEquals(0, kafkaEmitter.getSegmentMetadataLostCount());