Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka committed Oct 30, 2024
1 parent e59917d commit 35a168a
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class KafkaCustomConsumer implements Runnable, ConsumerRebalanceListener
private final LogRateLimiter errLogRateLimiter;
private final ByteDecoder byteDecoder;
private final long maxRetriesOnException;
private final Map<Integer, Long> partitionToLastReceivedTimestampMillis;

public KafkaCustomConsumer(final KafkaConsumer consumer,
final AtomicBoolean shutdownInProgress,
Expand All @@ -122,6 +123,7 @@ public KafkaCustomConsumer(final KafkaConsumer consumer,
this.pauseConsumePredicate = pauseConsumePredicate;
this.topicMetrics.register(consumer);
this.offsetsToCommit = new HashMap<>();
this.partitionToLastReceivedTimestampMillis = new HashMap<>();
this.ownedPartitionsEpoch = new HashMap<>();
this.metricsUpdatedTime = Instant.now().getEpochSecond();
this.acknowledgedOffsets = new ArrayList<>();
Expand All @@ -142,6 +144,22 @@ KafkaTopicConsumerMetrics getTopicMetrics() {
return topicMetrics;
}

<T> long getRecordTimeStamp(final ConsumerRecord<String, T> consumerRecord, final long nowMs) {
final long timestamp = consumerRecord.timestamp();
int partition = consumerRecord.partition();
if (timestamp > nowMs) {
topicMetrics.getNumberOfInvalidTimeStamps().increment();
if (partitionToLastReceivedTimestampMillis.containsKey(partition)) {
return partitionToLastReceivedTimestampMillis.get(partition);
} else {
return nowMs;
}
} else {
partitionToLastReceivedTimestampMillis.put(partition, timestamp);
return timestamp;
}
}

private long getCurrentTimeNanos() {
Instant now = Instant.now();
return now.getEpochSecond()*1000000000+now.getNano();
Expand Down Expand Up @@ -436,12 +454,13 @@ private <T> Record<Event> getRecord(ConsumerRecord<String, T> consumerRecord, in
}
eventMetadata.setAttribute("kafka_headers", headerData);
}
eventMetadata.setAttribute("kafka_timestamp", consumerRecord.timestamp());
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, now.toEpochMilli());
eventMetadata.setAttribute("kafka_timestamp", receivedTimeStamp);
eventMetadata.setAttribute("kafka_timestamp_type", consumerRecord.timestampType().toString());
eventMetadata.setAttribute("kafka_topic", topicName);
eventMetadata.setAttribute("kafka_partition", String.valueOf(partition));
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp()));
event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(consumerRecord.timestamp()));
eventMetadata.setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp));
event.getEventHandle().setExternalOriginationTime(Instant.ofEpochMilli(receivedTimeStamp));

return new Record<Event>(event);
}
Expand Down Expand Up @@ -511,7 +530,9 @@ private <T> void iterateRecordPartitions(ConsumerRecords<String, T> records, fin
if (schema == MessageFormat.BYTES) {
InputStream inputStream = new ByteArrayInputStream((byte[])consumerRecord.value());
if(byteDecoder != null) {
byteDecoder.parse(inputStream, Instant.ofEpochMilli(consumerRecord.timestamp()), (record) -> {
final long receivedTimeStamp = getRecordTimeStamp(consumerRecord, Instant.now().toEpochMilli());

byteDecoder.parse(inputStream, Instant.ofEpochMilli(receivedTimeStamp), (record) -> {
processRecord(acknowledgementSet, record);
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class KafkaTopicConsumerMetrics {
static final String NUMBER_OF_RECORDS_FAILED_TO_PARSE = "numberOfRecordsFailedToParse";
static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors";
static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows";
static final String NUMBER_OF_INVALID_TIMESTAMPS = "numberOfInvalidTimeStamps";
static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors";
static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted";
static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed";
Expand All @@ -38,6 +39,7 @@ public class KafkaTopicConsumerMetrics {
private final Counter numberOfDeserializationErrors;
private final Counter numberOfBufferSizeOverflows;
private final Counter numberOfPollAuthErrors;
private final Counter numberOfInvalidTimeStamps;
private final Counter numberOfRecordsCommitted;
private final Counter numberOfRecordsConsumed;
private final Counter numberOfBytesConsumed;
Expand All @@ -53,6 +55,7 @@ public KafkaTopicConsumerMetrics(final String topicName, final PluginMetrics plu
this.numberOfBytesConsumed = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BYTES_CONSUMED, topicNameInMetrics));
this.numberOfRecordsCommitted = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_COMMITTED, topicNameInMetrics));
this.numberOfRecordsFailedToParse = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_RECORDS_FAILED_TO_PARSE, topicNameInMetrics));
this.numberOfInvalidTimeStamps = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_INVALID_TIMESTAMPS, topicNameInMetrics));
this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS, topicNameInMetrics));
this.numberOfBufferSizeOverflows = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BUFFER_SIZE_OVERFLOWS, topicNameInMetrics));
this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS, topicNameInMetrics));
Expand Down Expand Up @@ -151,6 +154,10 @@ public Counter getNumberOfNegativeAcknowledgements() {
return numberOfNegativeAcknowledgements;
}

public Counter getNumberOfInvalidTimeStamps() {
return numberOfInvalidTimeStamps;
}

public Counter getNumberOfPositiveAcknowledgements() {
return numberOfPositiveAcknowledgements;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -51,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -138,6 +140,7 @@ public void setUp() {
when(topicMetrics.getNumberOfBufferSizeOverflows()).thenReturn(overflowCounter);
when(topicMetrics.getNumberOfRecordsCommitted()).thenReturn(counter);
when(topicMetrics.getNumberOfDeserializationErrors()).thenReturn(counter);
when(topicMetrics.getNumberOfInvalidTimeStamps()).thenReturn(counter);
when(topicConfig.getThreadWaitingTime()).thenReturn(Duration.ofSeconds(1));
when(topicConfig.getSerdeFormat()).thenReturn(MessageFormat.PLAINTEXT);
when(topicConfig.getAutoCommit()).thenReturn(false);
Expand Down Expand Up @@ -196,6 +199,27 @@ private BlockingBuffer<Record<Event>> getBuffer() {
return new BlockingBuffer<>(pluginSetting);
}

@Test
public void testGetRecordTimeStamp() {
ConsumerRecord<String, Object> consumerRecord1 = mock(ConsumerRecord.class);
ConsumerRecord<String, Object> consumerRecord2 = mock(ConsumerRecord.class);
ConsumerRecord<String, Object> consumerRecord3 = mock(ConsumerRecord.class);
consumer = createObjectUnderTestWithMockBuffer("plaintext");
long nowMs = Instant.now().toEpochMilli();
long timestamp1 = nowMs - 5;
when(consumerRecord1.timestamp()).thenReturn(timestamp1);
when(consumerRecord1.partition()).thenReturn(1);
assertThat(consumer.getRecordTimeStamp(consumerRecord1, nowMs), equalTo(timestamp1));
long timestamp2 = nowMs + 5;
when(consumerRecord2.timestamp()).thenReturn(timestamp2);
when(consumerRecord2.partition()).thenReturn(1);
assertThat(consumer.getRecordTimeStamp(consumerRecord2, nowMs), equalTo(timestamp1));
long timestamp3 = nowMs + 10;
when(consumerRecord3.timestamp()).thenReturn(timestamp3);
when(consumerRecord3.partition()).thenReturn(2);
assertThat(consumer.getRecordTimeStamp(consumerRecord3, nowMs), equalTo(nowMs));
}

@Test
public void testBufferOverflowPauseResume() throws InterruptedException, Exception {
when(topicConfig.getMaxPollInterval()).thenReturn(Duration.ofMillis(4000));
Expand Down

0 comments on commit 35a168a

Please sign in to comment.