-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Better automatic offset reset for Kinesis ingestion #15338
Better automatic offset reset for Kinesis ingestion #15338
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @AmatyaAvadhanula! Looks good overall. Left some comments on the logging and exception messages. Also, CI is failing for code coverage reasons - it seems like we don't have existing unit test coverage for this functionality in general?
sendResetRequestAndWait(shardResetMap, toolbox); | ||
} | ||
catch (IOException e) { | ||
throw new ISE(e, "Exception while attempting to automatically reset sequences"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, perhaps include shardResetMap
keys in the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
if (!shardResetMap.isEmpty()) { | ||
for (Map.Entry<StreamPartition<String>, String> partitionToReset : shardResetMap.entrySet()) { | ||
log.warn("Starting sequence number [%s] is no longer available for partition [%s]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.warn("Starting sequence number [%s] is no longer available for partition [%s]", | |
log.warn("Starting sequenceNumber[%s] is no longer available for partition[%s]", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
); | ||
} | ||
if (task.getTuningConfig().isResetOffsetAutomatically()) { | ||
log.info("Attempting to reset offsets for [%d] partitions.", shardResetMap.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partitions that repeatedly fall off the stream and get reset automatically would indicate an underlying issue, so I think logging the partition keys shardResetMap.keys()
would be useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
} else { | ||
throw new ISE("Sequence numbers are unavailable but automatic offset reset is disabled."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collect the unavailable partitions above and include them in the exception (instead of an end user having to dig up logs)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@abhishekrb19, thanks for the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Left a couple of suggestions on variable naming. Thanks!
@@ -125,28 +125,42 @@ protected void possiblyResetDataSourceMetadata( | |||
{ | |||
if (!task.getTuningConfig().isSkipSequenceNumberAvailabilityCheck()) { | |||
final ConcurrentMap<String, String> currOffsets = getCurrentOffsets(); | |||
final Map<StreamPartition<String>, String> shardResetMap = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: for clarity, maybe call this map partitionToSequenceResetMap
since we refer to the keys and values of the map a few times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
if (!shardResetMap.isEmpty()) { | ||
for (Map.Entry<StreamPartition<String>, String> partitionToReset : shardResetMap.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: partitionToReset
-> partitionToSequence
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Kinesis automatic reset attempts to reset the offsets in the metadata store for all the shards in a task even if one of them has fallen behind.
This PR aims to improve this behaviour by resetting the checkpoints for only those partitions for which the sequence numbers are unavailable.