Skip to content

Commit

Permalink
Better automatic offset reset for Kinesis ingestion (#15338)
Browse files Browse the repository at this point in the history
Better automatic offset reset for Kinesis ingestion
  • Loading branch information
AmatyaAvadhanula authored Dec 13, 2023
1 parent 4ec9a0a commit 48a96f5
Showing 1 changed file with 31 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, ByteEntity>
{
Expand Down Expand Up @@ -125,28 +125,42 @@ protected void possiblyResetDataSourceMetadata(
{
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) {
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets();
final Map<StreamPartition<String>, String> partitionToSequenceResetMap = new HashMap<>();
for (final StreamPartition<String> streamPartition : assignment) {
String sequence = currOffsets.get(streamPartition.getPartitionId());
if (!recordSupplier.isOffsetAvailable(streamPartition, KinesisSequenceNumber.of(sequence))) {
if (task.getTuningConfig().isResetOffsetAutomatically()) {
log.info("Attempting to reset sequences automatically for all partitions");
try {
sendResetRequestAndWait(
assignment.stream()
.collect(Collectors.toMap(x -> x, x -> currOffsets.get(x.getPartitionId()))),
toolbox
);
}
catch (IOException e) {
throw new ISE(e, "Exception while attempting to automatically reset sequences");
}
} else {
partitionToSequenceResetMap.put(streamPartition, sequence);
}
}

if (!partitionToSequenceResetMap.isEmpty()) {
for (Map.Entry<StreamPartition<String>, String> partitionToSequence : partitionToSequenceResetMap.entrySet()) {
log.warn("Starting sequenceNumber[%s] is no longer available for partition[%s].",
partitionToSequence.getValue(),
partitionToSequence.getKey()
);
}
if (task.getTuningConfig().isResetOffsetAutomatically()) {
log.info(
"Attempting to reset offsets for [%d] partitions with ids[%s].",
partitionToSequenceResetMap.size(),
partitionToSequenceResetMap.keySet()
);
try {
sendResetRequestAndWait(partitionToSequenceResetMap, toolbox);
}
catch (IOException e) {
throw new ISE(
"Starting sequenceNumber [%s] is no longer available for partition [%s] and resetOffsetAutomatically is not enabled",
sequence,
streamPartition.getPartitionId()
e,
"Exception while attempting to automatically reset sequences for partitions[%s]",
partitionToSequenceResetMap.keySet()
);
}
} else {
throw new ISE(
"Automatic offset reset is disabled, but there are partitions with unavailable sequence numbers [%s].",
partitionToSequenceResetMap
);
}
}
}
Expand Down Expand Up @@ -191,5 +205,4 @@ protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(
return null;
}
}

}

0 comments on commit 48a96f5

Please sign in to comment.