Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE 615: Changes to support "seekEpoch", "seekTimestamp" in Kafka Consumer step, removing "seek" from ConsumerCommonConfigs #619

Merged
merged 10 commits into from
Feb 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class ConsumerLocalConfigs {
private final String protoClassType;
private final Boolean cacheByTopic;
private final String filterByJsonPath;
private final Long seekToTimestamp;

@JsonCreator
public ConsumerLocalConfigs(
Expand All @@ -36,7 +37,8 @@ public ConsumerLocalConfigs(
@JsonProperty("pollingTime") Long pollingTime,
@JsonProperty("cacheByTopic") Boolean cacheByTopic,
@JsonProperty("filterByJsonPath") String filterByJsonPath,
@JsonProperty("seek") String seek) {
@JsonProperty("seek") String seek,
@JsonProperty("seekToTimestamp") Long seekToTimestamp) {
this.recordType = recordType;
this.protoClassType= protobufMessageClassType;
this.fileDumpTo = fileDumpTo;
Expand All @@ -48,30 +50,32 @@ public ConsumerLocalConfigs(
this.cacheByTopic = cacheByTopic;
this.filterByJsonPath = filterByJsonPath;
this.seek = seek;
this.seekToTimestamp = seekToTimestamp;
}


public ConsumerLocalConfigs(
String recordType,
String fileDumpTo,
Boolean commitAsync,
String recordType,
String fileDumpTo,
Boolean commitAsync,
Boolean commitSync,
Boolean showRecordsConsumed,
Integer maxNoOfRetryPollsOrTimeouts,
Long pollingTime,
Boolean cacheByTopic,
String filterByJsonPath,
String seek) {
this(recordType, null,
this(recordType, null,
fileDumpTo,
commitAsync,
commitSync,
showRecordsConsumed,
maxNoOfRetryPollsOrTimeouts,
pollingTime,
pollingTime,
cacheByTopic,
filterByJsonPath,
seek);
seek,
null);
}

public String getRecordType() {
Expand Down Expand Up @@ -119,6 +123,10 @@ public String getSeek() {
return seek;
}

public Long getSeekToTimestamp() {
return seekToTimestamp;
}

@JsonIgnore
public String[] getSeekTopicPartitionOffset() {
return seek.split(",");
Expand All @@ -139,7 +147,8 @@ public boolean equals(Object o) {
Objects.equals(pollingTime, that.pollingTime) &&
Objects.equals(filterByJsonPath, that.filterByJsonPath) &&
Objects.equals(cacheByTopic, that.cacheByTopic) &&
Objects.equals(seek, that.seek);
Objects.equals(seek, that.seek) &&
Objects.equals(seekToTimestamp, that.seekToTimestamp);
}

@Override
Expand All @@ -162,6 +171,7 @@ public String toString() {
", cacheByTopic=" + cacheByTopic +
", filterByJsonPath=" + filterByJsonPath +
", seek=" + seek +
", seekToTimestamp=" + seekToTimestamp +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,13 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

import com.jayway.jsonpath.JsonPath;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
Expand Down Expand Up @@ -135,6 +128,14 @@ public static void validateLocalConfigs(ConsumerLocalConfigs localConfigs) {
validateCommitFlags(localCommitSync, localCommitAsync);

validateSeekConfig(localConfigs);
validateSeekToTimestamp(localConfigs);
}
}

private static void validateSeekToTimestamp(ConsumerLocalConfigs localConfigs) {
Long seekToTimestamp = localConfigs.getSeekToTimestamp();
if (Objects.nonNull(seekToTimestamp) && (seekToTimestamp > System.currentTimeMillis() || seekToTimestamp < 0L)) {
throw new RuntimeException("\n------> 'seekToTimestamp' is not a valid epoch/Unix timestamp");
}
}

Expand Down Expand Up @@ -163,7 +164,8 @@ public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consume
consumerCommon.getPollingTime(),
consumerCommon.getCacheByTopic(),
consumerCommon.getFilterByJsonPath(),
consumerCommon.getSeek());
consumerCommon.getSeek(),
null);
}

// Handle recordType
Expand Down Expand Up @@ -210,6 +212,8 @@ public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consume
effectiveCommitSync = localCommitSync;
effectiveCommitAsync = localCommitAsync;
}
// this property doesn't make sense in a common context. should always be picked from local config
Long effectiveSeekToTimestamp = consumerLocal.getSeekToTimestamp();

return new ConsumerLocalConfigs(
effectiveRecordType,
Expand All @@ -222,7 +226,8 @@ public static ConsumerLocalConfigs createEffective(ConsumerCommonConfigs consume
effectivePollingTime,
effectiveConsumerCacheByTopic,
filterByJsonPath,
effectiveSeek);
effectiveSeek,
effectiveSeekToTimestamp);
}

public static ConsumerLocalConfigs readConsumerLocalTestProperties(String requestJsonWithConfigWrapped) {
Expand Down Expand Up @@ -378,30 +383,61 @@ public static void handleCommitSyncAsync(Consumer<Long, String> consumer,
// --------------------------------------------------------
}

public static void handleSeekOffset(ConsumerLocalConfigs effectiveLocal, Consumer consumer) {
public static void handleSeek(ConsumerLocalConfigs effectiveLocal, Consumer consumer, String topicName) {
String seek = effectiveLocal.getSeek();
if (!isEmpty(seek)) {
String[] seekParts = effectiveLocal.getSeekTopicPartitionOffset();
String topic = seekParts[0];
int partition = parseInt(seekParts[1]);
long offset = parseLong(seekParts[2]);
handleSeekByOffset(effectiveLocal, consumer);
} else if (Objects.nonNull(effectiveLocal.getSeekToTimestamp())) {
handleSeekByTimestamp(effectiveLocal, consumer, topicName);
}
}

private static void handleSeekByTimestamp(ConsumerLocalConfigs effectiveLocal, Consumer consumer, String topicName) {
if (Objects.nonNull(effectiveLocal.getSeekToTimestamp())) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topicName);

//fetch partitions on topic
List<TopicPartition> topicPartitions = partitionInfos.stream()
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());

TopicPartition topicPartition = new TopicPartition(topic, partition);
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(topicPartition);
//fetch offsets for each partition-timestamp pair
Map<TopicPartition, Long> topicPartitionTimestampMap = topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), ignore -> effectiveLocal.getSeekToTimestamp()));
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(topicPartitionTimestampMap);

//assign to fetched partitions
consumer.unsubscribe();
consumer.assign(topicPartitions);
consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());

if (offset <= -1) {
consumer.seekToEnd(topicPartitions);
consumer.seek(topicPartition, consumer.position(topicPartition) + offset);
} else {
consumer.seek(topicPartition, offset);
//seek to fetched offsets for partitions
for (Map.Entry<TopicPartition, OffsetAndTimestamp> topicOffsetEntry : topicPartitionOffsetAndTimestampMap.entrySet()) {
consumer.seek(topicOffsetEntry.getKey(), topicOffsetEntry.getValue().offset());
}
}
}

private static void handleSeekByOffset(ConsumerLocalConfigs effectiveLocal, Consumer consumer) {
String[] seekParts = effectiveLocal.getSeekTopicPartitionOffset();
String topic = seekParts[0];
int partition = parseInt(seekParts[1]);
long offset = parseLong(seekParts[2]);

TopicPartition topicPartition = new TopicPartition(topic, partition);
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(topicPartition);

consumer.unsubscribe();
consumer.assign(topicPartitions);

if (offset <= -1) {
consumer.seekToEnd(topicPartitions);
consumer.seek(topicPartition, consumer.position(topicPartition) + offset);
} else {
consumer.seek(topicPartition, offset);
}
}

private static void validateCommitFlags(Boolean commitSync, Boolean commitAsync) {
if ((commitSync != null && commitAsync != null) && commitSync == true && commitAsync == true) {
throw new RuntimeException("\n********* Both commitSync and commitAsync can not be true *********\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import static org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper.getMaxTimeOuts;
import static org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper.getPollTime;
import static org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper.handleCommitSyncAsync;
import static org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper.handleSeekOffset;
import static org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper.handleSeek;
import static org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper.initialPollWaitingForConsumerGroupJoin;
import static org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper.prepareResult;
import static org.jsmart.zerocode.core.kafka.helper.KafkaConsumerHelper.readConsumerLocalTestProperties;
Expand Down Expand Up @@ -62,7 +62,7 @@ public String receive(String kafkaServers, String topicName, String requestJsonW

int noOfTimeOuts = 0;

handleSeekOffset(effectiveLocal, consumer);
handleSeek(effectiveLocal, consumer, topicName);

LOGGER.debug("initial polling to trigger ConsumerGroupJoin");

Expand Down
Loading