Skip to content

Commit

Permalink
Merge branch 'master' into upgrade_wiremock_and_json-path
Browse files Browse the repository at this point in the history
  • Loading branch information
a1shadows authored Mar 18, 2024
2 parents cc4ac93 + 5e440fa commit 422a231
Show file tree
Hide file tree
Showing 9 changed files with 462 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@
import com.google.protobuf.MessageOrBuilder;
import com.google.protobuf.util.JsonFormat;
import com.jayway.jsonpath.JsonPath;
import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNumeric;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -27,21 +23,13 @@
import org.jsmart.zerocode.core.di.provider.GsonSerDeProvider;
import org.jsmart.zerocode.core.di.provider.ObjectMapperProvider;
import org.jsmart.zerocode.core.kafka.KafkaConstants;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.AVRO;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.DEFAULT_POLLING_TIME_MILLI_SEC;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.JSON;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.MAX_NO_OF_RETRY_POLLS_OR_TIME_OUTS;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.PROTO;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.RAW;
import static org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils.resolveValuePlaceHolders;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigs;
import org.jsmart.zerocode.core.kafka.consume.ConsumerLocalConfigsWrap;
import org.jsmart.zerocode.core.kafka.consume.SeekTimestamp;
import org.jsmart.zerocode.core.kafka.receive.ConsumerCommonConfigs;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecord;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerJsonRecords;
import org.jsmart.zerocode.core.kafka.receive.message.ConsumerRawRecords;
import static org.jsmart.zerocode.core.utils.SmartUtils.prettyPrintJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -54,7 +42,6 @@
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -67,6 +54,20 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNumeric;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.AVRO;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.DEFAULT_POLLING_TIME_MILLI_SEC;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.JSON;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.MAX_NO_OF_RETRY_POLLS_OR_TIME_OUTS;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.PROTO;
import static org.jsmart.zerocode.core.kafka.KafkaConstants.RAW;
import static org.jsmart.zerocode.core.kafka.common.KafkaCommonUtils.resolveValuePlaceHolders;
import static org.jsmart.zerocode.core.utils.SmartUtils.prettyPrintJson;

public class KafkaConsumerHelper {
public static final String CONSUMER = "CONSUMER";
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerHelper.class);
Expand All @@ -88,7 +89,6 @@ public static Consumer createConsumer(String bootStrapServers, String consumerPr
resolveValuePlaceHolders(properties);

final Consumer consumer = new KafkaConsumer(properties);
consumer.subscribe(Collections.singletonList(topic));

if (consumerToBeCached) {
consumerCacheByTopicMap.forEach((xTopic, xConsumer) -> {
Expand Down Expand Up @@ -414,8 +414,7 @@ public static void handleCommitSyncAsync(Consumer<Long, String> consumer,
}

public static void handleSeek(ConsumerLocalConfigs effectiveLocal, Consumer consumer, String topicName) {
String seek = effectiveLocal.getSeek();
if (!isEmpty(seek)) {
if (!isEmpty(effectiveLocal.getSeek())) {
handleSeekByOffset(effectiveLocal, consumer);
} else if (!isEmpty(effectiveLocal.getSeekEpoch())) {
handleSeekByEpoch(Long.parseLong(effectiveLocal.getSeekEpoch()), consumer, topicName);
Expand Down Expand Up @@ -451,20 +450,35 @@ private static void handleSeekByEpoch(Long epoch, Consumer consumer, String topi
.collect(Collectors.toMap(Function.identity(), ignore -> epoch));
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = consumer.offsetsForTimes(topicPartitionTimestampMap);

//removing partitions that are null, since we will only subscribe to partitions that have messages after the given timestamp
//by default partitions with no valid offset mapped to timestamp will also be returned by the method with value null. We will skip these
topicPartitionOffsetAndTimestampMap = topicPartitionOffsetAndTimestampMap.entrySet()
.stream()
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

//assign to fetched partitions
consumer.unsubscribe();
consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());
if (consumer.assignment().isEmpty()) {
consumer.assign(topicPartitionOffsetAndTimestampMap.keySet());
}

//seek to end for partitions that have no offset/timestamp >= seekEpoch
List<TopicPartition> noSeekPartitions = topicPartitionOffsetAndTimestampMap.entrySet().stream()
.filter(tp -> tp.getValue() == null)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
if (!noSeekPartitions.isEmpty()) {
consumer.seekToEnd(noSeekPartitions);
//commit the latest offsets so that they are skipped and only new messages consumed
Map<TopicPartition, OffsetAndMetadata> partitionLatestOffsetsToCommit =
noSeekPartitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(consumer.position(tp) + 1)));
LOGGER.debug("==> Committing the following : " + partitionLatestOffsetsToCommit);
consumer.commitSync(partitionLatestOffsetsToCommit);
}


//seek to fetched offsets for partitions
for (Map.Entry<TopicPartition, OffsetAndTimestamp> topicOffsetEntry : topicPartitionOffsetAndTimestampMap.entrySet()) {
consumer.seek(topicOffsetEntry.getKey(), topicOffsetEntry.getValue().offset());
if (Objects.nonNull(topicOffsetEntry.getValue())) {
//seek to offset only if it is more than current offset position(for retry poll scenarios)
if (consumer.position(topicOffsetEntry.getKey()) < topicOffsetEntry.getValue().offset())
consumer.seek(topicOffsetEntry.getKey(), topicOffsetEntry.getValue().offset());
LOGGER.debug("==> Seeking to " + topicOffsetEntry);
}
}
}
}
Expand All @@ -479,7 +493,6 @@ private static void handleSeekByOffset(ConsumerLocalConfigs effectiveLocal, Cons
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(topicPartition);

consumer.unsubscribe();
consumer.assign(topicPartitions);

if (offset <= -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.inject.name.Named;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
Expand Down Expand Up @@ -63,6 +64,10 @@ public String receive(String kafkaServers, String topicName, String requestJsonW
int noOfTimeOuts = 0;

handleSeek(effectiveLocal, consumer, topicName);
//subscribe to topic if seek not used
if (consumer.assignment().isEmpty()) {
consumer.subscribe(Collections.singletonList(topicName));
}

LOGGER.debug("initial polling to trigger ConsumerGroupJoin");

Expand Down
6 changes: 4 additions & 2 deletions docker/compose/kafka-schema-registry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ services:
"
# rather than giving sleep 15 use this
# to block init container to wait for Kafka broker to be ready
kafka-topics --bootstrap-server kafka:9092 --list
kafka-topics --bootstrap-server kafka:29092 --list
# create init topics
kafka-topics --create --partitions 3 --bootstrap-server kafka:9092 --topic demo-seekTime-multi-partition
kafka-topics --create --partitions 3 --bootstrap-server kafka:29092 --topic demo-seekTime-multi-partition-1
kafka-topics --create --partitions 3 --bootstrap-server kafka:29092 --topic demo-seekTime-multi-partition-2
kafka-topics --bootstrap-server kafka:29092 --list
"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,34 @@ public void testKafkaConsume_seekOffsetLatest() throws Exception {
}

@Test
@Scenario("kafka/consume/test_kafka_consume_seek_epoch_and_timestamp.json")
public void testKafkaConsume_seekEpochAndTimestamp() {
@Scenario("kafka/consume/test_kafka_consume_seek_epoch_multi_partition.json")
public void testKafkaConsume_seekEpoch_multi_partition() {

}


@Test
@Scenario("kafka/consume/test_kafka_consume_seek_epoch_retry.json")
public void testKafkaConsume_seekEpoch_retry() {

}

@Test
@Scenario("kafka/consume/test_kafka_consume_seek_epoch_continue_from_last_offset.json")
public void testKafkaConsume_seekEpoch_continue_consumption() {

}

@Test
@Scenario("kafka/consume/test_kafka_consume_seek_timestamp_multi_partition.json")
public void testKafkaConsume_seekTimestamp_multi_partition() {

}
@Test
@Scenario("kafka/consume/test_kafka_consume_seek_timestamp_continue_from_last_offset.json")
public void testKafkaConsume_seekTimestamp_continue_consumption() {

}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
{
"scenarioName": "Consume message after epoch and continue consumption on second consume",
"steps": [
{
"name": "load_kafka_before_timestamp",
"url": "kafka-topic:demo-seekEpoch-1",
"operation": "PRODUCE",
"request": {
"records": [
{
"key": "${RANDOM.NUMBER}",
"value": "Before Timestamp 1"
},
{
"key": "${RANDOM.NUMBER}",
"value": "Before Timestamp 2"
}
]
},
"assertions": {
"status": "Ok"
}
},
{
"name": "load_timestamp_and_epoch",
"url": "org.jsmart.zerocode.zerocodejavaexec.utils.ExampleUtils",
"operation": "seekTimestampToEpoch",
"request": {
"timestamp": "${LOCAL.DATETIME.NOW:yyyy-MM-dd'T'HH:mm:ss.SSS}",
"format": "yyyy-MM-dd'T'HH:mm:ss.SSS"
},
"assertions": {}
},
{
"name": "load_kafka_after_timestamp",
"url": "kafka-topic:demo-seekEpoch-1",
"operation": "PRODUCE",
"request": {
"records": [
{
"key": "${RANDOM.NUMBER}",
"value": "After Timestamp 1"
},
{
"key": "${RANDOM.NUMBER}",
"value": "After Timestamp 2"
},
{
"key": "${RANDOM.NUMBER}",
"value": "After Timestamp 3"
}
]
},
"assertions": {
"status": "Ok"
}
},
{
"name": "consume_seekEpoch",
"url": "kafka-topic:demo-seekEpoch-1",
"operation": "CONSUME",
"request": {
"consumerLocalConfigs": {
"seekEpoch": "${$.load_timestamp_and_epoch.response}",
"commitSync": true,
"recordType": "RAW",
"showRecordsConsumed": true,
"maxNoOfRetryPollsOrTimeouts": 1
}
},
"sort": {
"key": "value",
"order": "natural",
"path": "$.records"
},
"verify": {
"records": [
{
"value": "After Timestamp 1"
},
{
"value": "After Timestamp 2"
}
]
}
},
{
"name": "consume_seekEpoch_continue_from_last_offset",
"url": "kafka-topic:demo-seekEpoch-1",
"operation": "CONSUME",
"request": {
"consumerLocalConfigs": {
"seekEpoch": "${$.load_timestamp_and_epoch.response}",
"commitSync": true,
"recordType": "RAW",
"showRecordsConsumed": true,
"maxNoOfRetryPollsOrTimeouts": 1
}
},
"verify": {
"records": [
{
"value": "After Timestamp 3"
}
]
}
}
]
}
Loading

0 comments on commit 422a231

Please sign in to comment.