From 35a168a8c349546aa4c6fbc7a98e4abc7393c11b Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Wed, 30 Oct 2024 03:57:37 +0000 Subject: [PATCH] Addressed review comments Signed-off-by: Krishna Kondaka --- .../kafka/consumer/KafkaCustomConsumer.java | 29 ++++++++++++++++--- .../kafka/util/KafkaTopicConsumerMetrics.java | 7 +++++ .../consumer/KafkaCustomConsumerTest.java | 24 +++++++++++++++ 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 8a64c42e3b..a84f800d8d 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -99,6 +99,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener private final LogRateLimiter errLogRateLimiter; private final ByteDecoder byteDecoder; private final long maxRetriesOnException; + private final Map partitionToLastReceivedTimestampMillis; public KafkaCustomConsumer(final KafkaConsumer consumer, final AtomicBoolean shutdownInProgress, @@ -122,6 +123,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer, this.pauseConsumePredicate = pauseConsumePredicate; this.topicMetrics.register(consumer); this.offsetsToCommit = new HashMap<>(); + this.partitionToLastReceivedTimestampMillis = new HashMap<>(); this.ownedPartitionsEpoch = new HashMap<>(); this.metricsUpdatedTime = Instant.now().getEpochSecond(); this.acknowledgedOffsets = new ArrayList<>(); @@ -142,6 +144,22 @@ KafkaTopicConsumerMetrics getTopicMetrics() { return topicMetrics; } + long getRecordTimeStamp(final ConsumerRecord consumerRecord, final long nowMs) { + final long timestamp = consumerRecord.timestamp(); + int partition = consumerRecord.partition(); + if (timestamp > nowMs) { + topicMetrics.getNumberOfInvalidTimeStamps().increment(); + if (partitionToLastReceivedTimestampMillis.containsKey(partition)) { + return partitionToLastReceivedTimestampMillis.get(partition); + } else { + return nowMs; + } + } else { + partitionToLastReceivedTimestampMillis.put(partition, timestamp); + return timestamp; + } + } + private long getCurrentTimeNanos() { Instant now = Instant.now(); return now.getEpochSecond()*1000000000+now.getNano(); @@ -436,12 +454,13 @@ private Record getRecord(ConsumerRecord consumerRecord, in } eventMetadata.setAttribute("kafka_headers", headerData); } - eventMetadata.setAttribute("kafka_timestamp", consumerRecord.timestamp()); + final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, now.toEpochMilli()); + eventMetadata.setAttribute("kafka_timestamp", receivedTimeStamp); eventMetadata.setAttribute("kafka_timestamp_type", consumerRecord.timestampType().toString()); eventMetadata.setAttribute("kafka_topic", topicName); eventMetadata.setAttribute("kafka_partition", String.valueOf(partition)); - eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp())); - event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp())); + eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp)); + event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp)); return new Record(event); } @@ -511,7 +530,9 @@ private void iterateRecordPartitions(ConsumerRecords records, fin if (schema == MessageFormat.BYTES) { InputStream inputStream = new ByteArrayInputStream((byte[])consumerRecord.value()); if(byteDecoder != null) { - byteDecoder.parse(inputStream, Instant.ofEpochMilli(consumerRecord.timestamp()), (record) -> { + final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, Instant.now().toEpochMilli()); + + byteDecoder.parse(inputStream, Instant.ofEpochMilli(receivedTimeStamp), (record) -> { processRecord(acknowledgementSet, record); }); } else { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java index aaa81b39b5..1fd03f8aff 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/util/KafkaTopicConsumerMetrics.java @@ -22,6 +22,7 @@ public class KafkaTopicConsumerMetrics { static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse"; static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors"; static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows"; + static final String NUMBER_OF_INVALID_TIMESTAMPS = "numberOfInvalidTimeStamps"; static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors"; static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted"; static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed"; @@ -38,6 +39,7 @@ public class KafkaTopicConsumerMetrics { private final Counter numberOfDeserializationErrors; private final Counter numberOfBufferSizeOverflows; private final Counter numberOfPollAuthErrors; + private final Counter numberOfInvalidTimeStamps; private final Counter numberOfRecordsCommitted; private final Counter numberOfRecordsConsumed; private final Counter numberOfBytesConsumed; @@ -53,6 +55,7 @@ public KafkaTopicConsumerMetrics(final String topicName, final PluginMetrics plu this.numberOfBytesConsumed = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BYTES_CONSUMED, topicNameInMetrics)); this.numberOfRecordsCommitted = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_COMMITTED, topicNameInMetrics)); this.numberOfRecordsFailedToParse = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_FAILED_TO_PARSE, topicNameInMetrics)); + this.numberOfInvalidTimeStamps = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_INVALID_TIMESTAMPS, topicNameInMetrics)); this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS, topicNameInMetrics)); this.numberOfBufferSizeOverflows = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BUFFER_SIZE_OVERFLOWS, topicNameInMetrics)); this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS, topicNameInMetrics)); @@ -151,6 +154,10 @@ public Counter getNumberOfNegativeAcknowledgements() { return numberOfNegativeAcknowledgements; } + public Counter getNumberOfInvalidTimeStamps() { + return numberOfInvalidTimeStamps; + } + public Counter getNumberOfPositiveAcknowledgements() { return numberOfPositiveAcknowledgements; } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java index 8f0052afb5..208682e2c0 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumerTest.java @@ -40,6 +40,7 @@ import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -51,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasEntry; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -138,6 +140,7 @@ public void setUp() { when(topicMetrics.getNumberOfBufferSizeOverflows()).thenReturn(overflowCounter); when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter); when(topicMetrics.getNumberOfDeserializationErrors()).thenReturn(counter); + when(topicMetrics.getNumberOfInvalidTimeStamps()).thenReturn(counter); when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1)); when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT); when(topicConfig.getAutoCommit()).thenReturn(false); @@ -196,6 +199,27 @@ private BlockingBuffer> getBuffer() { return new BlockingBuffer<>(pluginSetting); } + @Test + public void testGetRecordTimeStamp() { + ConsumerRecord consumerRecord1 = mock(ConsumerRecord.class); + ConsumerRecord consumerRecord2 = mock(ConsumerRecord.class); + ConsumerRecord consumerRecord3 = mock(ConsumerRecord.class); + consumer = createObjectUnderTestWithMockBuffer("plaintext"); + long nowMs = Instant.now().toEpochMilli(); + long timestamp1 = nowMs - 5; + when(consumerRecord1.timestamp()).thenReturn(timestamp1); + when(consumerRecord1.partition()).thenReturn(1); + assertThat(consumer.getRecordTimeStamp(consumerRecord1, nowMs), equalTo(timestamp1)); + long timestamp2 = nowMs + 5; + when(consumerRecord2.timestamp()).thenReturn(timestamp2); + when(consumerRecord2.partition()).thenReturn(1); + assertThat(consumer.getRecordTimeStamp(consumerRecord2, nowMs), equalTo(timestamp1)); + long timestamp3 = nowMs + 10; + when(consumerRecord3.timestamp()).thenReturn(timestamp3); + when(consumerRecord3.partition()).thenReturn(2); + assertThat(consumer.getRecordTimeStamp(consumerRecord3, nowMs), equalTo(nowMs)); + } + @Test public void testBufferOverflowPauseResume() throws InterruptedException, Exception { when(topicConfig.getMaxPollInterval()).thenReturn(Duration.ofMillis(4000));