From ae3c5dec99b6f20d860c945101d792c436301dc7 Mon Sep 17 00:00:00 2001 From: Manikumar Reddy Date: Fri, 29 Nov 2024 22:38:57 +0530 Subject: [PATCH] KAFKA-18013: Add support for duration based offset reset strategy to Kafka Consumer (#17972) Update AutoOffsetResetStrategy.java to support duration based reset strategy Update OffsetFetcher related classes and unit tests Reviewers: Andrew Schofield , Lianet Magrans --- .../clients/consumer/ConsumerConfig.java | 2 + .../kafka/clients/consumer/MockConsumer.java | 10 ++ .../internals/AutoOffsetResetStrategy.java | 106 ++++++++++++++---- .../consumer/internals/OffsetFetcher.java | 13 ++- .../internals/OffsetFetcherUtils.java | 39 +++---- .../internals/OffsetsRequestManager.java | 18 +-- .../consumer/internals/SubscriptionState.java | 2 +- .../clients/consumer/KafkaConsumerTest.java | 17 ++- .../clients/consumer/MockConsumerTest.java | 22 ++++ .../AutoOffsetResetStrategyTest.java | 83 ++++++++++---- .../consumer/internals/OffsetFetcherTest.java | 23 ++++ 11 files changed, 252 insertions(+), 83 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 779a9933fd745..fb4c05062bb9f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -173,6 +173,8 @@ public class ConsumerConfig extends AbstractConfig { "(e.g. because that data has been deleted): " + "
  • earliest: automatically reset the offset to the earliest offset" + "
  • latest: automatically reset the offset to the latest offset
  • " + + "
  • by_duration:: automatically reset the offset to a configured from the current timestamp. must be specified in ISO8601 format (PnDTnHnMn.nS). " + + "Negative duration is not allowed.
  • " + "
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • " + "
  • anything else: throw exception to the consumer.
" + "

Note that altering partition numbers while setting this config to latest may cause message delivery loss since " + diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index a900fac95aa81..817e8a63b2018 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -62,6 +62,7 @@ public class MockConsumer implements Consumer { private final SubscriptionState subscriptions; private final Map beginningOffsets; private final Map endOffsets; + private final Map durationResetOffsets; private final Map committed; private final Queue pollTasks; private final Set paused; @@ -104,6 +105,7 @@ private MockConsumer(AutoOffsetResetStrategy offsetResetStrategy) { this.closed = false; this.beginningOffsets = new HashMap<>(); this.endOffsets = new HashMap<>(); + this.durationResetOffsets = new HashMap<>(); this.pollTasks = new LinkedList<>(); this.pollException = null; this.wakeup = new AtomicBoolean(false); @@ -433,6 +435,10 @@ public synchronized void updateEndOffsets(final Map newOff endOffsets.putAll(newOffsets); } + public synchronized void updateDurationOffsets(final Map newOffsets) { + durationResetOffsets.putAll(newOffsets); + } + public void disableTelemetry() { telemetryDisabled = true; } @@ -610,6 +616,10 @@ private void resetOffsetPosition(TopicPartition tp) { offset = endOffsets.get(tp); if (offset == null) throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end"); + } else if (strategy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) { + offset = durationResetOffsets.get(tp); + if (offset == null) + throw new IllegalStateException("MockConsumer didn't have duration offset specified, but tried to seek to timestamp"); } else { throw new NoOffsetForPartitionException(tp); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java index a94eb4585a320..6eecc4e09d8b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java @@ -18,15 +18,19 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.utils.Utils; +import java.time.Duration; +import java.time.Instant; import java.util.Arrays; import java.util.Locale; import java.util.Objects; +import java.util.Optional; public class AutoOffsetResetStrategy { - private enum StrategyType { - LATEST, EARLIEST, NONE; + public enum StrategyType { + LATEST, EARLIEST, NONE, BY_DURATION; @Override public String toString() { @@ -39,30 +43,65 @@ public String toString() { public static final AutoOffsetResetStrategy NONE = new AutoOffsetResetStrategy(StrategyType.NONE); private final StrategyType type; + private final Optional duration; private AutoOffsetResetStrategy(StrategyType type) { this.type = type; + this.duration = Optional.empty(); } - public static boolean isValid(String offsetStrategy) { - return Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy); + private AutoOffsetResetStrategy(Duration duration) { + this.type = StrategyType.BY_DURATION; + this.duration = Optional.of(duration); } + /** + * Returns the AutoOffsetResetStrategy from the given string. + */ public static AutoOffsetResetStrategy fromString(String offsetStrategy) { - if (offsetStrategy == null || !isValid(offsetStrategy)) { - throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy); + if (offsetStrategy == null) { + throw new IllegalArgumentException("Auto offset reset strategy is null"); } - StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT)); - switch (type) { - case EARLIEST: - return EARLIEST; - case LATEST: - return LATEST; - case NONE: - return NONE; - default: - throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy); + + if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) { + throw new IllegalArgumentException("<:duration> part is missing in by_duration auto offset reset strategy."); + } + + if (Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy)) { + StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT)); + switch (type) { + case EARLIEST: + return EARLIEST; + case LATEST: + return LATEST; + case NONE: + return NONE; + default: + throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy); + } + } + + if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) { + String isoDuration = offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1); + try { + Duration duration = Duration.parse(isoDuration); + if (duration.isNegative()) { + throw new IllegalArgumentException("Negative duration is not supported in by_duration offset reset strategy."); + } + return new AutoOffsetResetStrategy(duration); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to parse duration string in by_duration offset reset strategy.", e); + } } + + throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy); + } + + /** + * Returns the offset reset strategy type. + */ + public StrategyType type() { + return type; } /** @@ -72,33 +111,54 @@ public String name() { return type.toString(); } + /** + * Return the timestamp to be used for the ListOffsetsRequest. + * @return the timestamp for the OffsetResetStrategy, + * if the strategy is EARLIEST or LATEST or duration is provided + * else return Optional.empty() + */ + public Optional timestamp() { + if (type == StrategyType.EARLIEST) + return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP); + else if (type == StrategyType.LATEST) + return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP); + else if (type == StrategyType.BY_DURATION && duration.isPresent()) { + Instant now = Instant.now(); + return Optional.of(now.minus(duration.get()).toEpochMilli()); + } else + return Optional.empty(); + } + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o; - return Objects.equals(type, that.type); + return type == that.type && Objects.equals(duration, that.duration); } @Override public int hashCode() { - return Objects.hashCode(type); + return Objects.hash(type, duration); } @Override public String toString() { return "AutoOffsetResetStrategy{" + - "type='" + type + '\'' + + "type=" + type + + (duration.map(value -> ", duration=" + value).orElse("")) + '}'; } public static class Validator implements ConfigDef.Validator { @Override public void ensureValid(String name, Object value) { - String strategy = (String) value; - if (!AutoOffsetResetStrategy.isValid(strategy)) { - throw new ConfigException(name, value, "Invalid value " + strategy + " for configuration " + - name + ": the value must be either 'earliest', 'latest', or 'none'."); + String offsetStrategy = (String) value; + try { + fromString(offsetStrategy); + } catch (Exception e) { + throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " + + name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:'."); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java index f7646bff9ede5..bb01510e906be 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java @@ -101,12 +101,13 @@ public OffsetFetcher(LogContext logContext, * and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd(). */ public void resetPositionsIfNeeded() { - Map offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); + Map partitionAutoOffsetResetStrategyMap = + offsetFetcherUtils.getOffsetResetStrategyForPartitions(); - if (offsetResetTimestamps.isEmpty()) + if (partitionAutoOffsetResetStrategyMap.isEmpty()) return; - resetPositionsAsync(offsetResetTimestamps); + resetPositionsAsync(partitionAutoOffsetResetStrategyMap); } /** @@ -209,7 +210,9 @@ private Map beginningOrEndOffset(Collection partitionResetTimestamps) { + private void resetPositionsAsync(Map partitionAutoOffsetResetStrategyMap) { + Map partitionResetTimestamps = partitionAutoOffsetResetStrategyMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get())); Map> timestampsToSearchByNode = groupListOffsetRequests(partitionResetTimestamps, new HashSet<>()); for (Map.Entry> entry : timestampsToSearchByNode.entrySet()) { @@ -221,7 +224,7 @@ private void resetPositionsAsync(Map partitionResetTimesta future.addListener(new RequestFutureListener<>() { @Override public void onSuccess(ListOffsetResult result) { - offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, result); + offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result, partitionAutoOffsetResetStrategyMap); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java index 2e9d48ebda5d1..98afb02d6b9d0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.message.ListOffsetsResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.ListOffsetsRequest; import org.apache.kafka.common.requests.ListOffsetsResponse; import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; import org.apache.kafka.common.utils.LogContext; @@ -224,19 +223,22 @@ void validatePositionsOnMetadataChange() { } } - Map getOffsetResetTimestamp() { + /** + * get OffsetResetStrategy for all assigned partitions + */ + Map getOffsetResetStrategyForPartitions() { // Raise exception from previous offset fetch if there is one RuntimeException exception = cachedResetPositionsException.getAndSet(null); if (exception != null) throw exception; Set partitions = subscriptionState.partitionsNeedingReset(time.milliseconds()); - final Map offsetResetTimestamps = new HashMap<>(); + final Map partitionAutoOffsetResetStrategyMap = new HashMap<>(); for (final TopicPartition partition : partitions) { - offsetResetTimestamps.put(partition, offsetResetStrategyTimestamp(partition)); + partitionAutoOffsetResetStrategyMap.put(partition, offsetResetStrategyWithValidTimestamp(partition)); } - return offsetResetTimestamps; + return partitionAutoOffsetResetStrategyMap; } static Map buildListOffsetsResult( @@ -283,14 +285,13 @@ static Map buildOffsetsForTimeIntern return offsetsResults; } - private long offsetResetStrategyTimestamp(final TopicPartition partition) { + private AutoOffsetResetStrategy offsetResetStrategyWithValidTimestamp(final TopicPartition partition) { AutoOffsetResetStrategy strategy = subscriptionState.resetStrategy(partition); - if (strategy == AutoOffsetResetStrategy.EARLIEST) - return ListOffsetsRequest.EARLIEST_TIMESTAMP; - else if (strategy == AutoOffsetResetStrategy.LATEST) - return ListOffsetsRequest.LATEST_TIMESTAMP; - else + if (strategy.timestamp().isPresent()) { + return strategy; + } else { throw new NoOffsetForPartitionException(partition); + } } static Set topicsForPartitions(Collection partitions) { @@ -319,18 +320,9 @@ void updateSubscriptionState(Map resetTimestamps, - final ListOffsetResult result) { + final ListOffsetResult result, + final Map partitionAutoOffsetResetStrategyMap) { if (!result.partitionsToRetry.isEmpty()) { subscriptionState.requestFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs); metadata.requestUpdate(false); @@ -339,10 +331,9 @@ void onSuccessfulResponseForResettingPositions( for (Map.Entry fetchedOffset : result.fetchedOffsets.entrySet()) { TopicPartition partition = fetchedOffset.getKey(); ListOffsetData offsetData = fetchedOffset.getValue(); - ListOffsetsRequestData.ListOffsetsPartition requestedReset = resetTimestamps.get(partition); resetPositionIfNeeded( partition, - timestampToOffsetResetStrategy(requestedReset.timestamp()), + partitionAutoOffsetResetStrategyMap.get(partition), offsetData); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java index db847eae831a7..7870caec1ba07 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java @@ -472,20 +472,20 @@ private boolean canReusePendingOffsetFetchEvent(Set partitions) * this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException}) */ CompletableFuture resetPositionsIfNeeded() { - Map offsetResetTimestamps; + Map partitionAutoOffsetResetStrategyMap; try { - offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp(); + partitionAutoOffsetResetStrategyMap = offsetFetcherUtils.getOffsetResetStrategyForPartitions(); } catch (Exception e) { CompletableFuture result = new CompletableFuture<>(); result.completeExceptionally(e); return result; } - if (offsetResetTimestamps.isEmpty()) + if (partitionAutoOffsetResetStrategyMap.isEmpty()) return CompletableFuture.completedFuture(null); - return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps); + return sendListOffsetsRequestsAndResetPositions(partitionAutoOffsetResetStrategyMap); } /** @@ -652,12 +652,14 @@ private CompletableFuture buildListOffsetRequestToNode( * partitions. Use the retrieved offsets to reset positions in the subscription state. * This also adds the request to the list of unsentRequests. * - * @param timestampsToSearch the mapping between partitions and target time + * @param partitionAutoOffsetResetStrategyMap the mapping between partitions and AutoOffsetResetStrategy * @return A {@link CompletableFuture} which completes when the requests are * complete. */ private CompletableFuture sendListOffsetsRequestsAndResetPositions( - final Map timestampsToSearch) { + final Map partitionAutoOffsetResetStrategyMap) { + Map timestampsToSearch = partitionAutoOffsetResetStrategyMap.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get())); Map> timestampsToSearchByNode = groupListOffsetRequests(timestampsToSearch, Optional.empty()); @@ -677,8 +679,8 @@ private CompletableFuture sendListOffsetsRequestsAndResetPositions( partialResult.whenComplete((result, error) -> { if (error == null) { - offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, - result); + offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result, + partitionAutoOffsetResetStrategyMap); } else { RuntimeException e; if (error instanceof RuntimeException) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 0df99301e9e97..f700b8706ca60 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -441,7 +441,7 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position log.debug("Skipping reset of partition {} since it is no longer assigned", tp); } else if (!state.awaitingReset()) { log.debug("Skipping reset of partition {} since reset is no longer needed", tp); - } else if (requestedResetStrategy != state.resetStrategy) { + } else if (requestedResetStrategy != null && !requestedResetStrategy.equals(state.resetStrategy)) { log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp); } else { log.info("Resetting offset for partition {} to position {}.", tp, position); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index e8a084ebd5417..36d15c0fe94ac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1082,7 +1082,20 @@ public void testResetToCommittedOffset(GroupProtocol groupProtocol) { @ParameterizedTest @EnumSource(GroupProtocol.class) public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) { - SubscriptionState subscription = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.LATEST); + setUpConsumerWithAutoResetPolicy(groupProtocol, AutoOffsetResetStrategy.LATEST); + assertEquals(50L, consumer.position(tp0)); + } + + @ParameterizedTest + @EnumSource(GroupProtocol.class) + public void testResetUsingDurationBasedAutoResetPolicy(GroupProtocol groupProtocol) { + AutoOffsetResetStrategy durationStrategy = AutoOffsetResetStrategy.fromString("by_duration:PT1H"); + setUpConsumerWithAutoResetPolicy(groupProtocol, durationStrategy); + assertEquals(50L, consumer.position(tp0)); + } + + private void setUpConsumerWithAutoResetPolicy(GroupProtocol groupProtocol, AutoOffsetResetStrategy strategy) { + SubscriptionState subscription = new SubscriptionState(new LogContext(), strategy); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1100,8 +1113,6 @@ public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) { client.prepareResponse(listOffsetsResponse(Collections.singletonMap(tp0, 50L))); consumer.poll(Duration.ZERO); - - assertEquals(50L, consumer.position(tp0)); } @ParameterizedTest diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index 41c9f199d15e2..21cee3183bc69 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -117,6 +117,28 @@ public void endOffsetsShouldBeIdempotent() { assertEquals(11L, (long) consumer.endOffsets(Collections.singleton(partition)).get(partition)); } + @Test + public void testDurationBasedOffsetReset() { + MockConsumer consumer = new MockConsumer<>("by_duration:PT1H"); + consumer.subscribe(Collections.singleton("test")); + consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1))); + HashMap durationBasedOffsets = new HashMap<>(); + durationBasedOffsets.put(new TopicPartition("test", 0), 10L); + durationBasedOffsets.put(new TopicPartition("test", 1), 11L); + consumer.updateDurationOffsets(durationBasedOffsets); + ConsumerRecord rec1 = new ConsumerRecord<>("test", 0, 10L, 0L, TimestampType.CREATE_TIME, + 0, 0, "key1", "value1", new RecordHeaders(), Optional.empty()); + ConsumerRecord rec2 = new ConsumerRecord<>("test", 0, 11L, 0L, TimestampType.CREATE_TIME, + 0, 0, "key2", "value2", new RecordHeaders(), Optional.empty()); + consumer.addRecord(rec1); + consumer.addRecord(rec2); + ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); + Iterator> iter = records.iterator(); + assertEquals(rec1, iter.next()); + assertEquals(rec2, iter.next()); + assertFalse(iter.hasNext()); + } + @Test public void testRebalanceListener() { final List revoked = new ArrayList<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java index 780319d610f5d..25ff9073747e8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategyTest.java @@ -17,9 +17,14 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.requests.ListOffsetsRequest; import org.junit.jupiter.api.Test; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -29,26 +34,24 @@ public class AutoOffsetResetStrategyTest { - @Test - public void testIsValid() { - assertTrue(AutoOffsetResetStrategy.isValid("earliest")); - assertTrue(AutoOffsetResetStrategy.isValid("latest")); - assertTrue(AutoOffsetResetStrategy.isValid("none")); - assertFalse(AutoOffsetResetStrategy.isValid("invalid")); - assertFalse(AutoOffsetResetStrategy.isValid("LATEST")); - assertFalse(AutoOffsetResetStrategy.isValid("")); - assertFalse(AutoOffsetResetStrategy.isValid(null)); - } - @Test public void testFromString() { assertEquals(AutoOffsetResetStrategy.EARLIEST, AutoOffsetResetStrategy.fromString("earliest")); assertEquals(AutoOffsetResetStrategy.LATEST, AutoOffsetResetStrategy.fromString("latest")); assertEquals(AutoOffsetResetStrategy.NONE, AutoOffsetResetStrategy.fromString("none")); assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("invalid")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("by_duration:invalid")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("by_duration:-PT1H")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("by_duration:")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("by_duration")); assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("LATEST")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("EARLIEST")); + assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("NONE")); assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString("")); assertThrows(IllegalArgumentException.class, () -> AutoOffsetResetStrategy.fromString(null)); + + AutoOffsetResetStrategy strategy = AutoOffsetResetStrategy.fromString("by_duration:PT1H"); + assertEquals("by_duration", strategy.name()); } @Test @@ -57,21 +60,63 @@ public void testValidator() { assertDoesNotThrow(() -> validator.ensureValid("test", "earliest")); assertDoesNotThrow(() -> validator.ensureValid("test", "latest")); assertDoesNotThrow(() -> validator.ensureValid("test", "none")); + assertDoesNotThrow(() -> validator.ensureValid("test", "by_duration:PT1H")); assertThrows(ConfigException.class, () -> validator.ensureValid("test", "invalid")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:invalid")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:-PT1H")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration:")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "by_duration")); assertThrows(ConfigException.class, () -> validator.ensureValid("test", "LATEST")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "EARLIEST")); + assertThrows(ConfigException.class, () -> validator.ensureValid("test", "NONE")); assertThrows(ConfigException.class, () -> validator.ensureValid("test", "")); assertThrows(ConfigException.class, () -> validator.ensureValid("test", null)); } @Test public void testEqualsAndHashCode() { - AutoOffsetResetStrategy strategy1 = AutoOffsetResetStrategy.fromString("earliest"); - AutoOffsetResetStrategy strategy2 = AutoOffsetResetStrategy.fromString("earliest"); - AutoOffsetResetStrategy strategy3 = AutoOffsetResetStrategy.fromString("latest"); - - assertEquals(strategy1, strategy2); - assertNotEquals(strategy1, strategy3); - assertEquals(strategy1.hashCode(), strategy2.hashCode()); - assertNotEquals(strategy1.hashCode(), strategy3.hashCode()); + AutoOffsetResetStrategy earliest1 = AutoOffsetResetStrategy.fromString("earliest"); + AutoOffsetResetStrategy earliest2 = AutoOffsetResetStrategy.fromString("earliest"); + AutoOffsetResetStrategy latest1 = AutoOffsetResetStrategy.fromString("latest"); + + AutoOffsetResetStrategy duration1 = AutoOffsetResetStrategy.fromString("by_duration:P2D"); + AutoOffsetResetStrategy duration2 = AutoOffsetResetStrategy.fromString("by_duration:P2D"); + + assertEquals(earliest1, earliest2); + assertNotEquals(earliest1, latest1); + assertEquals(earliest1.hashCode(), earliest2.hashCode()); + assertNotEquals(earliest1.hashCode(), latest1.hashCode()); + + assertNotEquals(latest1, duration2); + assertEquals(duration1, duration2); + } + + @Test + public void testTimestamp() { + AutoOffsetResetStrategy earliest1 = AutoOffsetResetStrategy.fromString("earliest"); + AutoOffsetResetStrategy earliest2 = AutoOffsetResetStrategy.fromString("earliest"); + assertEquals(Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP), earliest1.timestamp()); + assertEquals(earliest1, earliest2); + + AutoOffsetResetStrategy latest1 = AutoOffsetResetStrategy.fromString("latest"); + AutoOffsetResetStrategy latest2 = AutoOffsetResetStrategy.fromString("latest"); + assertEquals(Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP), latest1.timestamp()); + assertEquals(latest1, latest2); + + AutoOffsetResetStrategy none1 = AutoOffsetResetStrategy.fromString("none"); + AutoOffsetResetStrategy none2 = AutoOffsetResetStrategy.fromString("none"); + assertFalse(none1.timestamp().isPresent()); + assertEquals(none1, none2); + + AutoOffsetResetStrategy byDuration1 = AutoOffsetResetStrategy.fromString("by_duration:PT1H"); + Optional timestamp = byDuration1.timestamp(); + assertTrue(timestamp.isPresent()); + assertTrue(timestamp.get() <= Instant.now().toEpochMilli() - Duration.ofHours(1).toMillis()); + + AutoOffsetResetStrategy byDuration2 = AutoOffsetResetStrategy.fromString("by_duration:PT1H"); + AutoOffsetResetStrategy byDuration3 = AutoOffsetResetStrategy.fromString("by_duration:PT2H"); + + assertEquals(byDuration1, byDuration2); + assertNotEquals(byDuration1, byDuration3); } } \ No newline at end of file diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java index d5ff67359c465..9a6c4d081bfae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java @@ -65,6 +65,7 @@ import org.junit.jupiter.api.Test; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -93,6 +94,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class OffsetFetcherTest { @@ -186,6 +189,26 @@ public void testUpdateFetchPositionResetToLatestOffset() { assertEquals(5, subscriptions.position(tp0).offset); } + @Test + public void testUpdateFetchPositionResetToDurationOffset() { + long timestamp = Instant.now().toEpochMilli(); + AutoOffsetResetStrategy durationStrategy = mock(AutoOffsetResetStrategy.class); + when(durationStrategy.timestamp()).thenReturn(Optional.of(timestamp)); + buildFetcher(durationStrategy); + assignFromUser(singleton(tp0)); + subscriptions.requestOffsetReset(tp0, durationStrategy); + + client.updateMetadata(initialUpdateResponse); + + client.prepareResponse(listOffsetRequestMatcher(timestamp), + listOffsetResponse(Errors.NONE, 1L, 5L)); + offsetFetcher.resetPositionsIfNeeded(); + consumerClient.pollNoWakeup(); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertTrue(subscriptions.isFetchable(tp0)); + assertEquals(5, subscriptions.position(tp0).offset); + } + /** * Make sure the client behaves appropriately when receiving an exception for unavailable offsets */