diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index d705cb227bcc6..baf08a4214b22 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -58,7 +58,7 @@ public UpgradeDowngrade(
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}
- public boolean needsUpgradeOrDowngrade(HoodieTableVersion toWriteVersion) {
+ public static boolean needsUpgradeOrDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieTableVersion toWriteVersion) {
HoodieTableVersion fromTableVersion = metaClient.getTableConfig().getTableVersion();
// If table version is less than SIX, then we need to upgrade to SIX first before upgrading to any other version, irrespective of autoUpgrade flag
if (fromTableVersion.versionCode() < HoodieTableVersion.SIX.versionCode() && toWriteVersion.versionCode() >= HoodieTableVersion.EIGHT.versionCode()) {
@@ -76,6 +76,10 @@ public boolean needsUpgradeOrDowngrade(HoodieTableVersion toWriteVersion) {
return toWriteVersion.versionCode() != fromTableVersion.versionCode();
}
+ public boolean needsUpgradeOrDowngrade(HoodieTableVersion toWriteVersion) {
+ return needsUpgradeOrDowngrade(metaClient, config, toWriteVersion);
+ }
+
/**
* Perform Upgrade or Downgrade steps if required and updated table version if need be.
*
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
index f1693a4019698..67248ba4adcdb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/Checkpoint.java
@@ -36,6 +36,11 @@ public abstract class Checkpoint implements Serializable {
// These are extra props to be written to the commit metadata
protected Map extraProps = new HashMap<>();
+ public Checkpoint setCheckpointKey(String newKey) {
+ checkpointKey = newKey;
+ return this;
+ }
+
public String getCheckpointKey() {
return checkpointKey;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
index 2f49f3d2b1f94..15084a74ae46d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java
@@ -49,6 +49,16 @@ public class CheckpointUtils {
"org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource",
"org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource"
)));
+
+ public static final Set HOODIE_INCREMENTAL_SOURCES;
+
+ static {
+ HashSet hoodieIncSource = new HashSet<>(DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2);
+ hoodieIncSource.add("org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource");
+ hoodieIncSource.add("org.apache.hudi.utilities.sources.HoodieIncrSource");
+ HOODIE_INCREMENTAL_SOURCES = Collections.unmodifiableSet(hoodieIncSource);
+ }
+
public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
|| !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) {
@@ -61,6 +71,20 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata());
}
+ public static Checkpoint buildCheckpointFromGeneralSource(
+ String sourceClassName, int writeTableVersion, String checkpointToResume) {
+ return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, sourceClassName)
+ ? new StreamerCheckpointV2(checkpointToResume) : new StreamerCheckpointV1(checkpointToResume);
+ }
+
+ // Whenever we create checkpoint from streamer config checkpoint override, we should use this function
+ // to build checkpoints.
+ public static Checkpoint buildCheckpointFromConfigOverride(
+ String sourceClassName, int writeTableVersion, String checkpointToResume) {
+ return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, sourceClassName)
+ ? new UnresolvedStreamerCheckpointBasedOnCfg(checkpointToResume) : new StreamerCheckpointV1(checkpointToResume);
+ }
+
public static boolean shouldTargetCheckpointV2(int writeTableVersion, String sourceClassName) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
&& !DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(sourceClassName);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java
new file mode 100644
index 0000000000000..829c985aeb10c
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/HoodieIncrSourceCheckpointValUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+/**
+ * Utility class providing methods to check if a string starts with specific resume-related prefixes.
+ */
+public class HoodieIncrSourceCheckpointValUtils {
+ public static final String RESET_CHECKPOINT_V2_SEPARATOR = ":";
+ public static final String REQUEST_TIME_PREFIX = "resumeFromInstantRequestTime";
+ public static final String COMPLETION_TIME_PREFIX = "resumeFromInstantCompletionTime";
+
+ /**
+ * For hoodie incremental source ingestion, if the target table is version 8 or higher, the checkpoint
+ * key set by streamer config can be in either of the following format:
+ * - resumeFromInstantRequestTime:[checkpoint value based on request time]
+ * - resumeFromInstantCompletionTime:[checkpoint value based on completion time]
+ *
+ * StreamerCheckpointV2FromCfgCkp class itself captured the fact that this is version 8 and higher, plus
+ * the checkpoint source is from streamer config override.
+ *
+ * When the checkpoint is consumed by individual data sources, we need to convert them to either vanilla
+ * checkpoint v1 (request time based) or checkpoint v2 (completion time based).
+ */
+ public static Checkpoint resolveToActualCheckpointVersion(UnresolvedStreamerCheckpointBasedOnCfg checkpoint) {
+ String[] parts = extractKeyValues(checkpoint);
+ switch (parts[0]) {
+ case REQUEST_TIME_PREFIX: {
+ return new StreamerCheckpointV1(checkpoint).setCheckpointKey(parts[1]);
+ }
+ case COMPLETION_TIME_PREFIX: {
+ return new StreamerCheckpointV2(checkpoint).setCheckpointKey(parts[1]);
+ }
+ default:
+ throw new IllegalArgumentException("Unknown event ordering mode " + parts[0]);
+ }
+ }
+
+ private static String [] extractKeyValues(UnresolvedStreamerCheckpointBasedOnCfg checkpoint) {
+ String checkpointKey = checkpoint.getCheckpointKey();
+ String[] parts = checkpointKey.split(RESET_CHECKPOINT_V2_SEPARATOR);
+ if (parts.length != 2
+ || (
+ !parts[0].trim().equals(REQUEST_TIME_PREFIX)
+ && !parts[0].trim().equals(COMPLETION_TIME_PREFIX)
+ )) {
+ throw new IllegalArgumentException(
+ "Illegal checkpoint key override `" + checkpointKey + "`. Valid format is either `resumeFromInstantRequestTime:` or "
+ + "`resumeFromInstantCompletionTime:`.");
+ }
+ parts[0] = parts[0].trim();
+ parts[1] = parts[1].trim();
+ return parts;
+ }
+}
\ No newline at end of file
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/UnresolvedStreamerCheckpointBasedOnCfg.java b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/UnresolvedStreamerCheckpointBasedOnCfg.java
new file mode 100644
index 0000000000000..cd9485ae82b90
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/UnresolvedStreamerCheckpointBasedOnCfg.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+
+/**
+ * A special checkpoint v2 class that indicates its checkpoint key comes from streamer config checkpoint
+ * overrides.
+ *
+ * For hoodie incremental source, based on the content of the checkpoint override value, it can indicate
+ * either request time based checkpoint or completion time based. So the class serves as an indicator to
+ * data sources of interest that it needs to be further parsed and resolved to either checkpoint v1 or v2.
+ *
+ * For all the other data sources, it behaves exactly the same as checkpoint v2.
+ *
+ * To keep the checkpoint class design ignorant of which data source it serves, the class only indicates where
+ * the checkpoint key comes from.
+ * */
+public class UnresolvedStreamerCheckpointBasedOnCfg extends StreamerCheckpointV2 {
+ public UnresolvedStreamerCheckpointBasedOnCfg(String key) {
+ super(key);
+ }
+
+ public UnresolvedStreamerCheckpointBasedOnCfg(Checkpoint checkpoint) {
+ super(checkpoint);
+ }
+
+ public UnresolvedStreamerCheckpointBasedOnCfg(HoodieCommitMetadata commitMetadata) {
+ super(commitMetadata);
+ }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
index 450627e473b82..a4b1be058799a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestCheckpointUtils.java
@@ -21,6 +21,7 @@
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -39,6 +40,7 @@
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
@@ -53,6 +55,9 @@ public class TestCheckpointUtils {
private HoodieTableMetaClient metaClient;
private HoodieActiveTimeline activeTimeline;
+ private static final String CHECKPOINT_TO_RESUME = "20240101000000";
+ private static final String GENERAL_SOURCE = "org.apache.hudi.utilities.sources.GeneralSource";
+
@BeforeEach
public void setUp() {
metaClient = mock(HoodieTableMetaClient.class);
@@ -207,7 +212,49 @@ public void testConvertCheckpointWithUseTransitionTime() {
"8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource, false",
"8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource, false"
})
- public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) {
- assertEquals(expected, CheckpointUtils.shouldTargetCheckpointV2(version, sourceClassName));
+ public void testTargetCheckpointV2(int version, String sourceClassName, boolean isV2Checkpoint) {
+ assertEquals(isV2Checkpoint, CheckpointUtils.buildCheckpointFromGeneralSource(sourceClassName, version, "ignored") instanceof StreamerCheckpointV2);
+ }
+
+ @Test
+ public void testBuildCheckpointFromGeneralSource() {
+ // Test V2 checkpoint creation (newer table version + general source)
+ Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromGeneralSource(
+ GENERAL_SOURCE,
+ HoodieTableVersion.EIGHT.versionCode(),
+ CHECKPOINT_TO_RESUME
+ );
+ assertInstanceOf(StreamerCheckpointV2.class, checkpoint1);
+ assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());
+
+ // Test V1 checkpoint creation (older table version)
+ Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromGeneralSource(
+ GENERAL_SOURCE,
+ HoodieTableVersion.SEVEN.versionCode(),
+ CHECKPOINT_TO_RESUME
+ );
+ assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
+ assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
+ }
+
+ @Test
+ public void testBuildCheckpointFromConfigOverride() {
+ // Test checkpoint from config creation (newer table version + general source)
+ Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromConfigOverride(
+ GENERAL_SOURCE,
+ HoodieTableVersion.EIGHT.versionCode(),
+ CHECKPOINT_TO_RESUME
+ );
+ assertInstanceOf(UnresolvedStreamerCheckpointBasedOnCfg.class, checkpoint1);
+ assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());
+
+ // Test V1 checkpoint creation (older table version)
+ Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromConfigOverride(
+ GENERAL_SOURCE,
+ HoodieTableVersion.SEVEN.versionCode(),
+ CHECKPOINT_TO_RESUME
+ );
+ assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
+ assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
}
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestHoodieIncrSourceCheckpointValUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestHoodieIncrSourceCheckpointValUtils.java
new file mode 100644
index 0000000000000..5e9f220d1135e
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/checkpoint/TestHoodieIncrSourceCheckpointValUtils.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.checkpoint;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieIncrSourceCheckpointValUtils {
+
+ @Test
+ public void testResolveToV1V2CheckpointWithRequestTime() {
+ String checkpoint = "20240301";
+ UnresolvedStreamerCheckpointBasedOnCfg mockCheckpoint = mock(UnresolvedStreamerCheckpointBasedOnCfg.class);
+ when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantRequestTime:" + checkpoint);
+
+ Checkpoint result = HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion(mockCheckpoint);
+
+ assertInstanceOf(StreamerCheckpointV1.class, result);
+ assertEquals(checkpoint, result.getCheckpointKey());
+ }
+
+ @Test
+ public void testResolveToV1V2CheckpointWithCompletionTime() {
+ String checkpoint = "20240302";
+ UnresolvedStreamerCheckpointBasedOnCfg mockCheckpoint = mock(UnresolvedStreamerCheckpointBasedOnCfg.class);
+ when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantCompletionTime:" + checkpoint);
+
+ Checkpoint result = HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion(mockCheckpoint);
+
+ assertInstanceOf(StreamerCheckpointV2.class, result);
+ assertEquals(checkpoint, result.getCheckpointKey());
+ }
+
+ @Test
+ public void testResolveToV1V2CheckpointWithInvalidPrefix() {
+ UnresolvedStreamerCheckpointBasedOnCfg mockCheckpoint = mock(UnresolvedStreamerCheckpointBasedOnCfg.class);
+ when(mockCheckpoint.getCheckpointKey()).thenReturn("invalidPrefix:20240303");
+
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion(mockCheckpoint)
+ );
+ assertTrue(exception.getMessage().contains("Illegal checkpoint key override"));
+ }
+
+ @Test
+ public void testResolveToV1V2CheckpointWithMalformedInput() {
+ UnresolvedStreamerCheckpointBasedOnCfg mockCheckpoint = mock(UnresolvedStreamerCheckpointBasedOnCfg.class);
+ when(mockCheckpoint.getCheckpointKey()).thenReturn("malformedInput");
+
+ IllegalArgumentException exception = assertThrows(
+ IllegalArgumentException.class,
+ () -> HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion(mockCheckpoint)
+ );
+ assertTrue(exception.getMessage().contains("Illegal checkpoint key override"));
+ }
+}
\ No newline at end of file
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 10a02b6511a42..ce1a407eaa9d5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -25,6 +25,7 @@
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
+import org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.InstantRange;
@@ -69,6 +70,7 @@
import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
import static org.apache.hudi.DataSourceReadOptions.START_COMMIT;
+import static org.apache.hudi.common.table.checkpoint.HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
@@ -183,8 +185,14 @@ public HoodieIncrSource(
@Override
protected Option translateCheckpoint(Option lastCheckpoint) {
- // For Hudi incremental source, we'll let #fetchNextBatch to handle it since
- // it involves heavy lifting by loading the timeline for analysis
+ // User might override checkpoint based on
+ // - instant request time: Then we will treat it as a V1 checkpoint.
+ // - completion time: We will treat it as a normal V2 checkpoint.
+ if (!lastCheckpoint.isEmpty() && lastCheckpoint.get() instanceof UnresolvedStreamerCheckpointBasedOnCfg) {
+ lastCheckpoint = Option.of(resolveToActualCheckpointVersion((UnresolvedStreamerCheckpointBasedOnCfg) lastCheckpoint.get()));
+ }
+ // For Hudi incremental source, we'll let #fetchNextBatch to handle request time to completion time
+ // based conversion as it is heavy operation.
return lastCheckpoint;
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 779981fbbef35..a6c3c0e1411da 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -41,6 +41,7 @@
import java.io.Serializable;
+import static org.apache.hudi.common.table.checkpoint.CheckpointUtils.shouldTargetCheckpointV2;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
/**
@@ -99,6 +100,16 @@ protected InputBatch readFromCheckpoint(Option lastCheckpoint, lo
sourceLimit);
}
+ /**
+ * The second phase of checkpoint resolution - Checkpoint version translation.
+ * After the checkpoint value is decided based on the existing configurations at
+ * org.apache.hudi.utilities.streamer.StreamerCheckpointUtils#resolveWhatCheckpointToResume,
+ *
+ * For most of the data sources the there is no difference between checkpoint V1 and V2, it's
+ * merely changing the wrapper class.
+ *
+ * Check child class method overrides to see special case handling.
+ * */
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
protected Option translateCheckpoint(Option lastCheckpoint) {
if (lastCheckpoint.isEmpty()) {
@@ -128,6 +139,25 @@ protected Option translateCheckpoint(Option lastCheckpoi
throw new UnsupportedOperationException("Unsupported checkpoint type: " + lastCheckpoint.get());
}
+ public void assertCheckpointVersion(Option lastCheckpoint, Option lastCheckpointTranslated, Checkpoint checkpoint) {
+ if (checkpoint != null) {
+ boolean shouldBeV2Checkpoint = shouldTargetCheckpointV2(writeTableVersion, getClass().getName());
+ String errorMessage = String.format(
+ "Data source should return checkpoint version V%s. The checkpoint resumed in the iteration is %s, whose translated version is %s. "
+ + "The checkpoint returned after the iteration %s.",
+ shouldBeV2Checkpoint ? "2" : "1",
+ lastCheckpoint.isEmpty() ? "null" : lastCheckpointTranslated.get(),
+ lastCheckpointTranslated.isEmpty() ? "null" : lastCheckpointTranslated.get(),
+ checkpoint);
+ if (shouldBeV2Checkpoint && !(checkpoint instanceof StreamerCheckpointV2)) {
+ throw new IllegalStateException(errorMessage);
+ }
+ if (!shouldBeV2Checkpoint && !(checkpoint instanceof StreamerCheckpointV1)) {
+ throw new IllegalStateException(errorMessage);
+ }
+ }
+ }
+
/**
* Main API called by Hoodie Streamer to fetch records.
*
@@ -136,7 +166,9 @@ protected Option translateCheckpoint(Option lastCheckpoi
* @return
*/
public final InputBatch fetchNext(Option lastCheckpoint, long sourceLimit) {
- InputBatch batch = readFromCheckpoint(translateCheckpoint(lastCheckpoint), sourceLimit);
+ Option lastCheckpointTranslated = translateCheckpoint(lastCheckpoint);
+ InputBatch batch = readFromCheckpoint(lastCheckpointTranslated, sourceLimit);
+ assertCheckpointVersion(lastCheckpoint, lastCheckpointTranslated, batch.getCheckpointForNextBatch());
// If overriddenSchemaProvider is passed in CLI, use it
return overriddenSchemaProvider == null ? batch
: new InputBatch<>(batch.getBatch(), batch.getCheckpointForNextBatch(), overriddenSchemaProvider);
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 074ad8743b501..cfe01dfde68a0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -392,7 +392,12 @@ public static class Config implements Serializable {
/**
* Resume Hudi Streamer from this checkpoint.
*/
- @Parameter(names = {"--checkpoint"}, description = "Resume Hudi Streamer from this checkpoint.")
+ @Parameter(names = {"--checkpoint"}, description = "Resume Hudi Streamer from this checkpoint. \nIf --source-class specifies a class "
+ + "that is a child class of org.apache.hudi.utilities.sources.HoodieIncrSource and the hoodie.table.version is 8 or higher, the "
+ + "format is expected to be either `resumeFromInstantRequestTime: ` or `resumeFromInstantCompletionTime: `. "
+ + "In table version 8 and above internally we only support completion time based commit ordering. If we use resumeFromInstantCompletionTime "
+ + "mode, the checkpoint will be reset to the instant with the corresponding completion time and resume. If we use resumeFromInstantRequestTime "
+ + "hudi first finds the instant, fetch the completion time of it and resume from the completion time.")
public String checkpoint = null;
@Parameter(names = {"--initial-checkpoint-provider"}, description = "subclass of "
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index f13e4c00bba8b..d78f49d617d48 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -47,8 +47,6 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -140,7 +138,7 @@
import static org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
-import static org.apache.hudi.common.table.checkpoint.CheckpointUtils.shouldTargetCheckpointV2;
+import static org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromGeneralSource;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.config.HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE;
import static org.apache.hudi.config.HoodieClusteringConfig.INLINE_CLUSTERING;
@@ -545,7 +543,7 @@ private Option getLastPendingCompactionInstant(Option co
*/
public Pair readFromSource(String instantTime, HoodieTableMetaClient metaClient) throws IOException {
// Retrieve the previous round checkpoints, if any
- Option checkpointToResume = StreamerCheckpointUtils.getCheckpointToResumeFrom(commitsTimelineOpt, cfg, props);
+ Option checkpointToResume = StreamerCheckpointUtils.resolveCheckpointToResumeFrom(commitsTimelineOpt, cfg, props, metaClient);
LOG.info("Checkpoint to resume from : " + checkpointToResume);
int maxRetryCount = cfg.retryOnSourceFailures ? cfg.maxRetryCount : 1;
@@ -887,10 +885,8 @@ Map extractCheckpointMetadata(InputBatch inputBatch, TypedProper
}
// Otherwise create new checkpoint based on version
- Checkpoint checkpoint = shouldTargetCheckpointV2(versionCode, cfg.sourceClassName)
- ? new StreamerCheckpointV2((String) null)
- : new StreamerCheckpointV1((String) null);
-
+ Checkpoint checkpoint = buildCheckpointFromGeneralSource(cfg.sourceClassName, versionCode, null);
+
return checkpoint.getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
index c2e310ec81cfd..289c769eb4067 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamerCheckpointUtils.java
@@ -22,10 +22,10 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
-import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineLayout;
@@ -36,6 +36,7 @@
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.exception.HoodieStreamerException;
@@ -44,34 +45,89 @@
import java.io.IOException;
+import static org.apache.hudi.common.table.checkpoint.CheckpointUtils.HOODIE_INCREMENTAL_SOURCES;
+import static org.apache.hudi.common.table.checkpoint.CheckpointUtils.buildCheckpointFromConfigOverride;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2.STREAMER_CHECKPOINT_RESET_KEY_V2;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.removeConfigFromProps;
+import static org.apache.hudi.table.upgrade.UpgradeDowngrade.needsUpgradeOrDowngrade;
public class StreamerCheckpointUtils {
private static final Logger LOG = LoggerFactory.getLogger(StreamerCheckpointUtils.class);
- public static Option getCheckpointToResumeFrom(Option commitsTimelineOpt,
- HoodieStreamer.Config streamerConfig,
- TypedProperties props) throws IOException {
+ /**
+ * The first phase of checkpoint resolution - read the checkpoint configs from 2 sources and resolve
+ * conflicts:
+ *
+ *
commit metadata from the last completed instant, which can contain what is the last checkpoint
+ * from the previous streamer ingestion.
+ *
user checkpoint overrides specified in the writer config {@code streamerConfig}. Users might want to
+ * forcefully set the checkpoint to an arbitrary position or start from the very beginning.
+ *
+ * The 2 sources can have conflicts, and we need to decide which config should prevail.
+ *
+ * For the second phase of checkpoint resolution please refer
+ * {@link org.apache.hudi.utilities.sources.Source#translateCheckpoint} and child class overrides of this
+ * method.
+ */
+ public static Option resolveCheckpointToResumeFrom(Option commitsTimelineOpt,
+ HoodieStreamer.Config streamerConfig,
+ TypedProperties props,
+ HoodieTableMetaClient metaClient) throws IOException {
Option checkpoint = Option.empty();
+ assertNoCheckpointOverrideDuringUpgradeForHoodieIncSource(metaClient, streamerConfig, props);
+ // If we have both streamer config and commits specifying what checkpoint to use, go with the
+ // checkpoint resolution logic to resolve conflicting configurations.
if (commitsTimelineOpt.isPresent()) {
- checkpoint = getCheckpointToResumeString(commitsTimelineOpt.get(), streamerConfig, props);
+ checkpoint = resolveCheckpointBetweenConfigAndPrevCommit(commitsTimelineOpt.get(), streamerConfig, props);
+ }
+ // If there is only streamer config, extract the checkpoint directly.
+ checkpoint = useCkpFromOverrideConfigIfAny(streamerConfig, props, checkpoint);
+ return checkpoint;
+ }
+
+ /**
+ * Asserts that checkpoint override options are not used during table upgrade/downgrade operations.
+ * This validation is necessary because using checkpoint overrides during upgrade/downgrade operations
+ * is ambigious on if it should be interpreted as requested time or completion time.
+ *
+ * @param metaClient The metadata client for the Hudi table
+ * @param streamerConfig The configuration for the Hudi streamer
+ * @param props The typed properties containing configuration settings
+ * @throws HoodieUpgradeDowngradeException if checkpoint override options are used during upgrade/downgrade
+ */
+ @VisibleForTesting
+ static void assertNoCheckpointOverrideDuringUpgradeForHoodieIncSource(HoodieTableMetaClient metaClient, HoodieStreamer.Config streamerConfig, TypedProperties props) {
+ boolean hasCheckpointOverride = !StringUtils.isNullOrEmpty(streamerConfig.checkpoint)
+ || !StringUtils.isNullOrEmpty(streamerConfig.ignoreCheckpoint);
+ boolean isHoodieIncSource = HOODIE_INCREMENTAL_SOURCES.contains(streamerConfig.sourceClassName);
+ if (hasCheckpointOverride && isHoodieIncSource) {
+ HoodieTableVersion writeTableVersion = HoodieTableVersion.fromVersionCode(ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION));
+ HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(streamerConfig.targetBasePath).withProps(props).build();
+ if (config.autoUpgrade() && needsUpgradeOrDowngrade(metaClient, config, writeTableVersion)) {
+ throw new HoodieUpgradeDowngradeException(
+ String.format("When upgrade/downgrade is happening, please avoid setting --checkpoint option and --ignore-checkpoint for your delta streamers."
+ + " Detected invalid streamer configuration:\n%s", streamerConfig));
+ }
}
+ }
+ private static Option useCkpFromOverrideConfigIfAny(
+ HoodieStreamer.Config streamerConfig, TypedProperties props, Option checkpoint) {
LOG.debug("Checkpoint from config: " + streamerConfig.checkpoint);
if (!checkpoint.isPresent() && streamerConfig.checkpoint != null) {
int writeTableVersion = ConfigUtils.getIntWithAltKeys(props, HoodieWriteConfig.WRITE_TABLE_VERSION);
- checkpoint = Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
- ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
+ checkpoint = Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, writeTableVersion, streamerConfig.checkpoint));
}
return checkpoint;
}
/**
* Process previous commit metadata and checkpoint configs set by user to determine the checkpoint to resume from.
+ * The function consults various checkpoint related configurations and set the right
+ * `org.apache.hudi.common.table.checkpoint.Checkpoint#checkpointKey` value in the returned object.
*
* @param commitsTimeline commits timeline of interest, including .commit and .deltacommit.
*
@@ -79,9 +135,9 @@ public static Option getCheckpointToResumeFrom(Option getCheckpointToResumeString(HoodieTimeline commitsTimeline,
- HoodieStreamer.Config streamerConfig,
- TypedProperties props) throws IOException {
+ static Option resolveCheckpointBetweenConfigAndPrevCommit(HoodieTimeline commitsTimeline,
+ HoodieStreamer.Config streamerConfig,
+ TypedProperties props) throws IOException {
Option resumeCheckpoint = Option.empty();
// has deltacommit and this is a MOR table, then we should get checkpoint from .deltacommit
// if changing from mor to cow, before changing we must do a full compaction, so we can only consider .commit in such case
@@ -102,15 +158,13 @@ static Option getCheckpointToResumeString(HoodieTimeline commitsTime
HoodieCommitMetadata commitMetadata = commitMetadataOption.get();
Checkpoint checkpointFromCommit = CheckpointUtils.getCheckpoint(commitMetadata);
LOG.debug("Checkpoint reset from metadata: " + checkpointFromCommit.getCheckpointResetKey());
- if (streamerConfig.ignoreCheckpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointIgnoreKey())
- || !streamerConfig.ignoreCheckpoint.equals(checkpointFromCommit.getCheckpointIgnoreKey()))) {
+ if (ignoreCkpCfgPrevailsOverCkpFromPrevCommit(streamerConfig, checkpointFromCommit)) {
// we ignore any existing checkpoint and start ingesting afresh
resumeCheckpoint = Option.empty();
- } else if (streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey())
- || !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()))) {
- resumeCheckpoint = Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
- ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
- } else if (!StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey())) {
+ } else if (ckpOverrideCfgPrevailsOverCkpFromPrevCommit(streamerConfig, checkpointFromCommit)) {
+ resumeCheckpoint = Option.of(buildCheckpointFromConfigOverride(
+ streamerConfig.sourceClassName, writeTableVersion, streamerConfig.checkpoint));
+ } else if (shouldUseCkpFromPrevCommit(checkpointFromCommit)) {
//if previous checkpoint is an empty string, skip resume use Option.empty()
resumeCheckpoint = Option.of(checkpointFromCommit);
} else if (compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
@@ -126,13 +180,26 @@ static Option getCheckpointToResumeString(HoodieTimeline commitsTime
}
} else if (streamerConfig.checkpoint != null) {
// getLatestCommitMetadataWithValidCheckpointInfo(commitTimelineOpt.get()) will never return a commit metadata w/o any checkpoint key set.
- resumeCheckpoint = Option.of(CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, streamerConfig.sourceClassName)
- ? new StreamerCheckpointV2(streamerConfig.checkpoint) : new StreamerCheckpointV1(streamerConfig.checkpoint));
+ resumeCheckpoint = Option.of(buildCheckpointFromConfigOverride(streamerConfig.sourceClassName, writeTableVersion, streamerConfig.checkpoint));
}
}
return resumeCheckpoint;
}
+ private static boolean shouldUseCkpFromPrevCommit(Checkpoint checkpointFromCommit) {
+ return !StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointKey());
+ }
+
+ private static boolean ckpOverrideCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config streamerConfig, Checkpoint checkpointFromCommit) {
+ return streamerConfig.checkpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointResetKey())
+ || !streamerConfig.checkpoint.equals(checkpointFromCommit.getCheckpointResetKey()));
+ }
+
+ private static boolean ignoreCkpCfgPrevailsOverCkpFromPrevCommit(HoodieStreamer.Config streamerConfig, Checkpoint checkpointFromCommit) {
+ return streamerConfig.ignoreCheckpoint != null && (StringUtils.isNullOrEmpty(checkpointFromCommit.getCheckpointIgnoreKey())
+ || !streamerConfig.ignoreCheckpoint.equals(checkpointFromCommit.getCheckpointIgnoreKey()));
+ }
+
public static Option> getLatestInstantAndCommitMetadataWithValidCheckpointInfo(HoodieTimeline timeline)
throws IOException {
return (Option>) timeline.getReverseOrderedInstants().map(instant -> {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/CheckpointValidator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/CheckpointValidator.java
index 88a24cf408556..257616ca8b772 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/CheckpointValidator.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/CheckpointValidator.java
@@ -21,15 +21,18 @@
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
+import org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.util.Option;
import java.util.function.BiFunction;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-
/**
* Helper class to validate checkpoint options in test scenarios.
* Used by MockS3EventsHoodieIncrSource to validate checkpoint behavior.
@@ -40,6 +43,7 @@ public class CheckpointValidator {
/**
* Validation keys for checkpoint testing.
*/
+ public static final String VAL_CKP_INSTANCE_OF = "VAL_CKP_INSTANCE_OF_KEY";
public static final String VAL_CKP_KEY_EQ_VAL = "VAL_CKP_KEY_EQ_VAL_KEY";
public static final String VAL_CKP_RESET_KEY_EQUALS = "VAL_CKP_RESET_KEY";
public static final String VAL_CKP_RESET_KEY_IS_NULL = "VAL_CKP_RESET_KEY_IS_NULL";
@@ -47,6 +51,7 @@ public class CheckpointValidator {
public static final String VAL_CKP_IGNORE_KEY_IS_NULL = "VAL_CKP_IGNORE_KEY_IS_NULL";
public static final String VAL_NON_EMPTY_CKP_ALL_MEMBERS = "VAL_NON_EMPTY_CKP_ALL_MEMBERS";
public static final String VAL_EMPTY_CKP_KEY = "VAL_EMPTY_CKP_KEY";
+ public static final String VAL_NO_INGESTION_HAPPENS = "VAL_NO_INGESTION_HAPPENS_KEY";
public static final String VAL_NO_OP = "VAL_NO_OP";
/*
@@ -54,25 +59,49 @@ public class CheckpointValidator {
* */
private static final BiFunction