Skip to content

Commit

Permalink
KAFKA-18013: Add support for duration based offset reset strategy to …
Browse files Browse the repository at this point in the history
…Kafka Consumer (apache#17972)

Update AutoOffsetResetStrategy.java to support duration based reset strategy
Update OffsetFetcher related classes and unit tests

Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans <[email protected]>
  • Loading branch information
omkreddy authored Nov 29, 2024
1 parent 6237325 commit ae3c5de
Show file tree
Hide file tree
Showing 11 changed files with 252 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ public class ConsumerConfig extends AbstractConfig {
"(e.g. because that data has been deleted): " +
"<ul><li>earliest: automatically reset the offset to the earliest offset" +
"<li>latest: automatically reset the offset to the latest offset</li>" +
"<li>by_duration:<duration>: automatically reset the offset to a configured <duration> from the current timestamp. <duration> must be specified in ISO8601 format (PnDTnHnMn.nS). " +
"Negative duration is not allowed.</li>" +
"<li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li>" +
"<li>anything else: throw exception to the consumer.</li></ul>" +
"<p>Note that altering partition numbers while setting this config to latest may cause message delivery loss since " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private final SubscriptionState subscriptions;
private final Map<TopicPartition, Long> beginningOffsets;
private final Map<TopicPartition, Long> endOffsets;
private final Map<TopicPartition, Long> durationResetOffsets;
private final Map<TopicPartition, OffsetAndMetadata> committed;
private final Queue<Runnable> pollTasks;
private final Set<TopicPartition> paused;
Expand Down Expand Up @@ -104,6 +105,7 @@ private MockConsumer(AutoOffsetResetStrategy offsetResetStrategy) {
this.closed = false;
this.beginningOffsets = new HashMap<>();
this.endOffsets = new HashMap<>();
this.durationResetOffsets = new HashMap<>();
this.pollTasks = new LinkedList<>();
this.pollException = null;
this.wakeup = new AtomicBoolean(false);
Expand Down Expand Up @@ -433,6 +435,10 @@ public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOff
endOffsets.putAll(newOffsets);
}

public synchronized void updateDurationOffsets(final Map<TopicPartition, Long> newOffsets) {
durationResetOffsets.putAll(newOffsets);
}

public void disableTelemetry() {
telemetryDisabled = true;
}
Expand Down Expand Up @@ -610,6 +616,10 @@ private void resetOffsetPosition(TopicPartition tp) {
offset = endOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
} else if (strategy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
offset = durationResetOffsets.get(tp);
if (offset == null)
throw new IllegalStateException("MockConsumer didn't have duration offset specified, but tried to seek to timestamp");
} else {
throw new NoOffsetForPartitionException(tp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.utils.Utils;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

public class AutoOffsetResetStrategy {
private enum StrategyType {
LATEST, EARLIEST, NONE;
public enum StrategyType {
LATEST, EARLIEST, NONE, BY_DURATION;

@Override
public String toString() {
Expand All @@ -39,30 +43,65 @@ public String toString() {
public static final AutoOffsetResetStrategy NONE = new AutoOffsetResetStrategy(StrategyType.NONE);

private final StrategyType type;
private final Optional<Duration> duration;

private AutoOffsetResetStrategy(StrategyType type) {
this.type = type;
this.duration = Optional.empty();
}

public static boolean isValid(String offsetStrategy) {
return Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
private AutoOffsetResetStrategy(Duration duration) {
this.type = StrategyType.BY_DURATION;
this.duration = Optional.of(duration);
}

/**
* Returns the AutoOffsetResetStrategy from the given string.
*/
public static AutoOffsetResetStrategy fromString(String offsetStrategy) {
if (offsetStrategy == null || !isValid(offsetStrategy)) {
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
if (offsetStrategy == null) {
throw new IllegalArgumentException("Auto offset reset strategy is null");
}
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
switch (type) {
case EARLIEST:
return EARLIEST;
case LATEST:
return LATEST;
case NONE:
return NONE;
default:
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);

if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) {
throw new IllegalArgumentException("<:duration> part is missing in by_duration auto offset reset strategy.");
}

if (Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy)) {
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
switch (type) {
case EARLIEST:
return EARLIEST;
case LATEST:
return LATEST;
case NONE:
return NONE;
default:
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
}
}

if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
String isoDuration = offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
try {
Duration duration = Duration.parse(isoDuration);
if (duration.isNegative()) {
throw new IllegalArgumentException("Negative duration is not supported in by_duration offset reset strategy.");
}
return new AutoOffsetResetStrategy(duration);
} catch (Exception e) {
throw new IllegalArgumentException("Unable to parse duration string in by_duration offset reset strategy.", e);
}
}

throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
}

/**
* Returns the offset reset strategy type.
*/
public StrategyType type() {
return type;
}

/**
Expand All @@ -72,33 +111,54 @@ public String name() {
return type.toString();
}

/**
* Return the timestamp to be used for the ListOffsetsRequest.
* @return the timestamp for the OffsetResetStrategy,
* if the strategy is EARLIEST or LATEST or duration is provided
* else return Optional.empty()
*/
public Optional<Long> timestamp() {
if (type == StrategyType.EARLIEST)
return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
else if (type == StrategyType.LATEST)
return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
else if (type == StrategyType.BY_DURATION && duration.isPresent()) {
Instant now = Instant.now();
return Optional.of(now.minus(duration.get()).toEpochMilli());
} else
return Optional.empty();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o;
return Objects.equals(type, that.type);
return type == that.type && Objects.equals(duration, that.duration);
}

@Override
public int hashCode() {
return Objects.hashCode(type);
return Objects.hash(type, duration);
}

@Override
public String toString() {
return "AutoOffsetResetStrategy{" +
"type='" + type + '\'' +
"type=" + type +
(duration.map(value -> ", duration=" + value).orElse("")) +
'}';
}

public static class Validator implements ConfigDef.Validator {
@Override
public void ensureValid(String name, Object value) {
String strategy = (String) value;
if (!AutoOffsetResetStrategy.isValid(strategy)) {
throw new ConfigException(name, value, "Invalid value " + strategy + " for configuration " +
name + ": the value must be either 'earliest', 'latest', or 'none'.");
String offsetStrategy = (String) value;
try {
fromString(offsetStrategy);
} catch (Exception e) {
throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " +
name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,13 @@ public OffsetFetcher(LogContext logContext,
* and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
*/
public void resetPositionsIfNeeded() {
Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap =
offsetFetcherUtils.getOffsetResetStrategyForPartitions();

if (offsetResetTimestamps.isEmpty())
if (partitionAutoOffsetResetStrategyMap.isEmpty())
return;

resetPositionsAsync(offsetResetTimestamps);
resetPositionsAsync(partitionAutoOffsetResetStrategyMap);
}

/**
Expand Down Expand Up @@ -209,7 +210,9 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
}
}

private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
private void resetPositionsAsync(Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
Map<TopicPartition, Long> partitionResetTimestamps = partitionAutoOffsetResetStrategyMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
Expand All @@ -221,7 +224,7 @@ private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimesta
future.addListener(new RequestFutureListener<>() {
@Override
public void onSuccess(ListOffsetResult result) {
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, result);
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result, partitionAutoOffsetResetStrategyMap);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -224,19 +223,22 @@ void validatePositionsOnMetadataChange() {
}
}

Map<TopicPartition, Long> getOffsetResetTimestamp() {
/**
* get OffsetResetStrategy for all assigned partitions
*/
Map<TopicPartition, AutoOffsetResetStrategy> getOffsetResetStrategyForPartitions() {
// Raise exception from previous offset fetch if there is one
RuntimeException exception = cachedResetPositionsException.getAndSet(null);
if (exception != null)
throw exception;

Set<TopicPartition> partitions = subscriptionState.partitionsNeedingReset(time.milliseconds());
final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap = new HashMap<>();
for (final TopicPartition partition : partitions) {
offsetResetTimestamps.put(partition, offsetResetStrategyTimestamp(partition));
partitionAutoOffsetResetStrategyMap.put(partition, offsetResetStrategyWithValidTimestamp(partition));
}

return offsetResetTimestamps;
return partitionAutoOffsetResetStrategyMap;
}

static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
Expand Down Expand Up @@ -283,14 +285,13 @@ static Map<TopicPartition, OffsetAndTimestampInternal> buildOffsetsForTimeIntern
return offsetsResults;
}

private long offsetResetStrategyTimestamp(final TopicPartition partition) {
private AutoOffsetResetStrategy offsetResetStrategyWithValidTimestamp(final TopicPartition partition) {
AutoOffsetResetStrategy strategy = subscriptionState.resetStrategy(partition);
if (strategy == AutoOffsetResetStrategy.EARLIEST)
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
else if (strategy == AutoOffsetResetStrategy.LATEST)
return ListOffsetsRequest.LATEST_TIMESTAMP;
else
if (strategy.timestamp().isPresent()) {
return strategy;
} else {
throw new NoOffsetForPartitionException(partition);
}
}

static Set<String> topicsForPartitions(Collection<TopicPartition> partitions) {
Expand Down Expand Up @@ -319,18 +320,9 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
}
}

static AutoOffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
return AutoOffsetResetStrategy.EARLIEST;
else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
return AutoOffsetResetStrategy.LATEST;
else
return null;
}

void onSuccessfulResponseForResettingPositions(
final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
final ListOffsetResult result) {
final ListOffsetResult result,
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
if (!result.partitionsToRetry.isEmpty()) {
subscriptionState.requestFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
Expand All @@ -339,10 +331,9 @@ void onSuccessfulResponseForResettingPositions(
for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
TopicPartition partition = fetchedOffset.getKey();
ListOffsetData offsetData = fetchedOffset.getValue();
ListOffsetsRequestData.ListOffsetsPartition requestedReset = resetTimestamps.get(partition);
resetPositionIfNeeded(
partition,
timestampToOffsetResetStrategy(requestedReset.timestamp()),
partitionAutoOffsetResetStrategyMap.get(partition),
offsetData);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,20 +472,20 @@ private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions)
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException})
*/
CompletableFuture<Void> resetPositionsIfNeeded() {
Map<TopicPartition, Long> offsetResetTimestamps;
Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap;

try {
offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
partitionAutoOffsetResetStrategyMap = offsetFetcherUtils.getOffsetResetStrategyForPartitions();
} catch (Exception e) {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(e);
return result;
}

if (offsetResetTimestamps.isEmpty())
if (partitionAutoOffsetResetStrategyMap.isEmpty())
return CompletableFuture.completedFuture(null);

return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
return sendListOffsetsRequestsAndResetPositions(partitionAutoOffsetResetStrategyMap);
}

/**
Expand Down Expand Up @@ -652,12 +652,14 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode(
* partitions. Use the retrieved offsets to reset positions in the subscription state.
* This also adds the request to the list of unsentRequests.
*
* @param timestampsToSearch the mapping between partitions and target time
* @param partitionAutoOffsetResetStrategyMap the mapping between partitions and AutoOffsetResetStrategy
* @return A {@link CompletableFuture} which completes when the requests are
* complete.
*/
private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(
final Map<TopicPartition, Long> timestampsToSearch) {
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
Map<TopicPartition, Long> timestampsToSearch = partitionAutoOffsetResetStrategyMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(timestampsToSearch, Optional.empty());

Expand All @@ -677,8 +679,8 @@ private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(

partialResult.whenComplete((result, error) -> {
if (error == null) {
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
result);
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result,
partitionAutoOffsetResetStrategyMap);
} else {
RuntimeException e;
if (error instanceof RuntimeException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position
log.debug("Skipping reset of partition {} since it is no longer assigned", tp);
} else if (!state.awaitingReset()) {
log.debug("Skipping reset of partition {} since reset is no longer needed", tp);
} else if (requestedResetStrategy != state.resetStrategy) {
} else if (requestedResetStrategy != null && !requestedResetStrategy.equals(state.resetStrategy)) {
log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp);
} else {
log.info("Resetting offset for partition {} to position {}.", tp, position);
Expand Down
Loading

0 comments on commit ae3c5de

Please sign in to comment.