-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8917] Checkpoint reset handling #12718
[HUDI-8917] Checkpoint reset handling #12718
Conversation
d8ddad2
to
c248ae8
Compare
3d4b06c
to
f6bd3d7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hey @Davis-Zhang-Onehouse :
I am thinking of simplifying this. lets connect f2f and see if we can work out something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 high level feedback
// - instant request time: Then we will treat it as a V1 checkpoint. | ||
// - completion time: We will treat it as a normal V2 checkpoint. | ||
if (lastCheckpoint.get() instanceof StreamerCheckpointFromCfgCkp) { | ||
lastCheckpoint = Option.of(resolveToV1V2Checkpoint((StreamerCheckpointFromCfgCkp) lastCheckpoint.get())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets move this resolution (V1 or V2 checkpoint) to translatecheckpoint() override method in HoodieIncrSource.
So, by the time we reach fetchNextBatch, its going to be either of StreamerCheckpointV2 or StreamerCheckpointV1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sending over the reviews so far
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
Show resolved
Hide resolved
...rc/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java
Outdated
Show resolved
Hide resolved
...mmon/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointFromCfgCkp.java
Outdated
Show resolved
Hide resolved
|
||
@VisibleForTesting | ||
static void assertNoCheckpointOverrideDuringUpgrade(HoodieTableMetaClient metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
java docs please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, github integrated with copilot, I will cut feature request to GH :)
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
Show resolved
Hide resolved
if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config, writeTableVersion)) { | ||
throw new HoodieUpgradeDowngradeException( | ||
String.format("When upgrade/downgrade is happening, please avoid setting --checkpoint option and --ignore-checkpoint for your delta streamers." | ||
+ " Detected invalid streamer configuration:\n%s", streamerConfig)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you know if we are logging each individual entries here. or it just the hash of the object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's full content, check the toString method of the class
} | ||
} | ||
|
||
private static Option<Checkpoint> useCkpFromOverrideConfigIfAny( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to abbreviate "checkpoint", we can use "ckpt" instead of "ckp"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave it to future follow up
return !StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey()); | ||
} | ||
|
||
private static boolean ckpOverrideCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config streamerConfig, Checkpoint checkpointFromCommit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above. "ckpt" instead of "ckp"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave it to future follow up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we enhance the documentation for StreamerConfig.checkpoint to call out the expected format.
// Assert the table is upgraded. | ||
metaClient = getHoodieMetaClientWithTableVersion(storageConf(), basePath(), "8"); | ||
assertEquals(HoodieTableVersion.EIGHT, metaClient.getTableConfig().getTableVersion()); | ||
assertEquals(TimelineLayoutVersion.LAYOUT_VERSION_2, metaClient.getTableConfig().getTimelineLayoutVersion().get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this validation code blocks (L 288 to 294) to common method and reuse across
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will have a follow up PR for code maintainability items. Will create jira.
fcb8a13
to
61d176b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sending over feedback
No functional change, just move code to smaller functions, add docs.
1. During upgrade/downgrade, no user checkpoint override via streamer config. 2. When checkpoint is set via streamer config for hoodie incremental source, it will specify request time based or completion time based. Added relevant checkpoint handling logic accordingly.
All places that we build checkpoint from streamer config instead of using checkpoint from the previous commit metadata, we should call buildCheckpointFromConfigOverride to preserve this information.
unit test for HoodieIncrSourceCheckpointValUtils and buildCheckpointFromGeneralSource.
Unit test for assertNoCheckpointOverrideDuringUpgrade
The commit covers the main code path for reset checkpoint handling for upgrade e2e.
If a data source in returns checkpoint version that mismatch the target version (version 8 target ckp v2, 6 targets ckp v1), it should error out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few more minor comments
public static final Set<String> HOODIE_INCREMENTAL_SOURCES; | ||
|
||
static { | ||
HashSet<String> tmp = new HashSet<>(DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets not name it as "tmp"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also lets do
public static final Set<String> DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2 = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(
"org.apache.hudi.utilities.sources.S3EventsHoodieIncrSource",
"org.apache.hudi.utilities.sources.GcsEventsHoodieIncrSource",
"org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource",
"org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource"
)));
and
public static Set<String> HOODIE_INCR_SOURCES = new HashSet<>(DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2);
HOODIE_INCR_SOURCES.add("org.apache.hudi.utilities.sources.HoodieIncrSource");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applied with slight change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
61d176b
to
4b281fe
Compare
Change Logs
Please start reviewing from commit with title "basic code refactor". All previous commits comes from #12688. Or alternatively, you can review this.
The change is about how we handle checkpoint override configurations for delta streamer with hoodie incremental source during auto upgrade:
for request time based checkpoint reset. Internally delta streamer for hoodie incremental source would convert this to the completion time of the same instant and start ingestion following completion time based ordering. Alternatively, user can reset checkpoint based on completion time using:
Please note, the config only controls the way of how checkpoint is overridden, delta streamer always follow the completion time based handing for version 8 target hoodie tables.
Impact
Avoid corner case that user/hudi confuse checkpoint semantics during auto upgrade.
Risk level (write none, low medium or high below)
none.
Documentation Update
After hoodie incremental source can support completion time based checkpoint, we should update the doc about these rules. As of now, the hoodie incremental sources are still using request time based handling in 1.x.
Contributor's checklist