Skip to content

Commit

Permalink
[HUDI-8917] Checkpoint reset handling (#12718)
Browse files Browse the repository at this point in the history
1. During upgrade/downgrade, no user checkpoint override via streamer
   config.
2. When checkpoint is set via streamer config for hoodie incremental
   source, it will specify request time based or completion time based.
Added relevant checkpoint handling logic accordingly.
  • Loading branch information
Davis-Zhang-Onehouse authored Jan 29, 2025
1 parent 5a13ce1 commit 0a0f3bb
Show file tree
Hide file tree
Showing 18 changed files with 1,102 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public UpgradeDowngrade(
this.upgradeDowngradeHelper = upgradeDowngradeHelper;
}

public boolean needsUpgradeOrDowngrade(HoodieTableVersion toWriteVersion) {
public static boolean needsUpgradeOrDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig config, HoodieTableVersion toWriteVersion) {
HoodieTableVersion fromTableVersion = metaClient.getTableConfig().getTableVersion();
// If table version is less than SIX, then we need to upgrade to SIX first before upgrading to any other version, irrespective of autoUpgrade flag
if (fromTableVersion.versionCode() < HoodieTableVersion.SIX.versionCode() && toWriteVersion.versionCode() >= HoodieTableVersion.EIGHT.versionCode()) {
Expand All @@ -76,6 +76,10 @@ public boolean needsUpgradeOrDowngrade(HoodieTableVersion toWriteVersion) {
return toWriteVersion.versionCode() != fromTableVersion.versionCode();
}

public boolean needsUpgradeOrDowngrade(HoodieTableVersion toWriteVersion) {
return needsUpgradeOrDowngrade(metaClient, config, toWriteVersion);
}

/**
* Perform Upgrade or Downgrade steps if required and updated table version if need be.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public abstract class Checkpoint implements Serializable {
// These are extra props to be written to the commit metadata
protected Map<String, String> extraProps = new HashMap<>();

public Checkpoint setCheckpointKey(String newKey) {
checkpointKey = newKey;
return this;
}

public String getCheckpointKey() {
return checkpointKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public class CheckpointUtils {
"org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource",
"org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource"
)));

public static final Set<String> HOODIE_INCREMENTAL_SOURCES;

static {
HashSet<String> hoodieIncSource = new HashSet<>(DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2);
hoodieIncSource.add("org.apache.hudi.utilities.sources.MockGeneralHoodieIncrSource");
hoodieIncSource.add("org.apache.hudi.utilities.sources.HoodieIncrSource");
HOODIE_INCREMENTAL_SOURCES = Collections.unmodifiableSet(hoodieIncSource);
}

public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_KEY_V2))
|| !StringUtils.isNullOrEmpty(commitMetadata.getMetadata(STREAMER_CHECKPOINT_RESET_KEY_V2))) {
Expand All @@ -61,6 +71,20 @@ public static Checkpoint getCheckpoint(HoodieCommitMetadata commitMetadata) {
throw new HoodieException("Checkpoint is not found in the commit metadata: " + commitMetadata.getExtraMetadata());
}

public static Checkpoint buildCheckpointFromGeneralSource(
String sourceClassName, int writeTableVersion, String checkpointToResume) {
return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, sourceClassName)
? new StreamerCheckpointV2(checkpointToResume) : new StreamerCheckpointV1(checkpointToResume);
}

// Whenever we create checkpoint from streamer config checkpoint override, we should use this function
// to build checkpoints.
public static Checkpoint buildCheckpointFromConfigOverride(
String sourceClassName, int writeTableVersion, String checkpointToResume) {
return CheckpointUtils.shouldTargetCheckpointV2(writeTableVersion, sourceClassName)
? new UnresolvedStreamerCheckpointBasedOnCfg(checkpointToResume) : new StreamerCheckpointV1(checkpointToResume);
}

public static boolean shouldTargetCheckpointV2(int writeTableVersion, String sourceClassName) {
return writeTableVersion >= HoodieTableVersion.EIGHT.versionCode()
&& !DATASOURCES_NOT_SUPPORTED_WITH_CKPT_V2.contains(sourceClassName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.table.checkpoint;

/**
* Utility class providing methods to check if a string starts with specific resume-related prefixes.
*/
public class HoodieIncrSourceCheckpointValUtils {
public static final String RESET_CHECKPOINT_V2_SEPARATOR = ":";
public static final String REQUEST_TIME_PREFIX = "resumeFromInstantRequestTime";
public static final String COMPLETION_TIME_PREFIX = "resumeFromInstantCompletionTime";

/**
* For hoodie incremental source ingestion, if the target table is version 8 or higher, the checkpoint
* key set by streamer config can be in either of the following format:
* - resumeFromInstantRequestTime:[checkpoint value based on request time]
* - resumeFromInstantCompletionTime:[checkpoint value based on completion time]
*
* StreamerCheckpointV2FromCfgCkp class itself captured the fact that this is version 8 and higher, plus
* the checkpoint source is from streamer config override.
*
* When the checkpoint is consumed by individual data sources, we need to convert them to either vanilla
* checkpoint v1 (request time based) or checkpoint v2 (completion time based).
*/
public static Checkpoint resolveToActualCheckpointVersion(UnresolvedStreamerCheckpointBasedOnCfg checkpoint) {
String[] parts = extractKeyValues(checkpoint);
switch (parts[0]) {
case REQUEST_TIME_PREFIX: {
return new StreamerCheckpointV1(checkpoint).setCheckpointKey(parts[1]);
}
case COMPLETION_TIME_PREFIX: {
return new StreamerCheckpointV2(checkpoint).setCheckpointKey(parts[1]);
}
default:
throw new IllegalArgumentException("Unknown event ordering mode " + parts[0]);
}
}

private static String [] extractKeyValues(UnresolvedStreamerCheckpointBasedOnCfg checkpoint) {
String checkpointKey = checkpoint.getCheckpointKey();
String[] parts = checkpointKey.split(RESET_CHECKPOINT_V2_SEPARATOR);
if (parts.length != 2
|| (
!parts[0].trim().equals(REQUEST_TIME_PREFIX)
&& !parts[0].trim().equals(COMPLETION_TIME_PREFIX)
)) {
throw new IllegalArgumentException(
"Illegal checkpoint key override `" + checkpointKey + "`. Valid format is either `resumeFromInstantRequestTime:<checkpoint value>` or "
+ "`resumeFromInstantCompletionTime:<checkpoint value>`.");
}
parts[0] = parts[0].trim();
parts[1] = parts[1].trim();
return parts;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.table.checkpoint;

import org.apache.hudi.common.model.HoodieCommitMetadata;

/**
* A special checkpoint v2 class that indicates its checkpoint key comes from streamer config checkpoint
* overrides.
*
* For hoodie incremental source, based on the content of the checkpoint override value, it can indicate
* either request time based checkpoint or completion time based. So the class serves as an indicator to
* data sources of interest that it needs to be further parsed and resolved to either checkpoint v1 or v2.
*
* For all the other data sources, it behaves exactly the same as checkpoint v2.
*
* To keep the checkpoint class design ignorant of which data source it serves, the class only indicates where
* the checkpoint key comes from.
* */
public class UnresolvedStreamerCheckpointBasedOnCfg extends StreamerCheckpointV2 {
public UnresolvedStreamerCheckpointBasedOnCfg(String key) {
super(key);
}

public UnresolvedStreamerCheckpointBasedOnCfg(Checkpoint checkpoint) {
super(checkpoint);
}

public UnresolvedStreamerCheckpointBasedOnCfg(HoodieCommitMetadata commitMetadata) {
super(commitMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand All @@ -39,6 +40,7 @@
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.FAIL;
import static org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling.USE_TRANSITION_TIME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -53,6 +55,9 @@ public class TestCheckpointUtils {
private HoodieTableMetaClient metaClient;
private HoodieActiveTimeline activeTimeline;

private static final String CHECKPOINT_TO_RESUME = "20240101000000";
private static final String GENERAL_SOURCE = "org.apache.hudi.utilities.sources.GeneralSource";

@BeforeEach
public void setUp() {
metaClient = mock(HoodieTableMetaClient.class);
Expand Down Expand Up @@ -207,7 +212,49 @@ public void testConvertCheckpointWithUseTransitionTime() {
"8, org.apache.hudi.utilities.sources.MockS3EventsHoodieIncrSource, false",
"8, org.apache.hudi.utilities.sources.MockGcsEventsHoodieIncrSource, false"
})
public void testTargetCheckpointV2(int version, String sourceClassName, boolean expected) {
assertEquals(expected, CheckpointUtils.shouldTargetCheckpointV2(version, sourceClassName));
public void testTargetCheckpointV2(int version, String sourceClassName, boolean isV2Checkpoint) {
assertEquals(isV2Checkpoint, CheckpointUtils.buildCheckpointFromGeneralSource(sourceClassName, version, "ignored") instanceof StreamerCheckpointV2);
}

@Test
public void testBuildCheckpointFromGeneralSource() {
// Test V2 checkpoint creation (newer table version + general source)
Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromGeneralSource(
GENERAL_SOURCE,
HoodieTableVersion.EIGHT.versionCode(),
CHECKPOINT_TO_RESUME
);
assertInstanceOf(StreamerCheckpointV2.class, checkpoint1);
assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());

// Test V1 checkpoint creation (older table version)
Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromGeneralSource(
GENERAL_SOURCE,
HoodieTableVersion.SEVEN.versionCode(),
CHECKPOINT_TO_RESUME
);
assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
}

@Test
public void testBuildCheckpointFromConfigOverride() {
// Test checkpoint from config creation (newer table version + general source)
Checkpoint checkpoint1 = CheckpointUtils.buildCheckpointFromConfigOverride(
GENERAL_SOURCE,
HoodieTableVersion.EIGHT.versionCode(),
CHECKPOINT_TO_RESUME
);
assertInstanceOf(UnresolvedStreamerCheckpointBasedOnCfg.class, checkpoint1);
assertEquals(CHECKPOINT_TO_RESUME, checkpoint1.getCheckpointKey());

// Test V1 checkpoint creation (older table version)
Checkpoint checkpoint2 = CheckpointUtils.buildCheckpointFromConfigOverride(
GENERAL_SOURCE,
HoodieTableVersion.SEVEN.versionCode(),
CHECKPOINT_TO_RESUME
);
assertInstanceOf(StreamerCheckpointV1.class, checkpoint2);
assertEquals(CHECKPOINT_TO_RESUME, checkpoint2.getCheckpointKey());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.table.checkpoint;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestHoodieIncrSourceCheckpointValUtils {

@Test
public void testResolveToV1V2CheckpointWithRequestTime() {
String checkpoint = "20240301";
UnresolvedStreamerCheckpointBasedOnCfg mockCheckpoint = mock(UnresolvedStreamerCheckpointBasedOnCfg.class);
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantRequestTime:" + checkpoint);

Checkpoint result = HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion(mockCheckpoint);

assertInstanceOf(StreamerCheckpointV1.class, result);
assertEquals(checkpoint, result.getCheckpointKey());
}

@Test
public void testResolveToV1V2CheckpointWithCompletionTime() {
String checkpoint = "20240302";
UnresolvedStreamerCheckpointBasedOnCfg mockCheckpoint = mock(UnresolvedStreamerCheckpointBasedOnCfg.class);
when(mockCheckpoint.getCheckpointKey()).thenReturn("resumeFromInstantCompletionTime:" + checkpoint);

Checkpoint result = HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion(mockCheckpoint);

assertInstanceOf(StreamerCheckpointV2.class, result);
assertEquals(checkpoint, result.getCheckpointKey());
}

@Test
public void testResolveToV1V2CheckpointWithInvalidPrefix() {
UnresolvedStreamerCheckpointBasedOnCfg mockCheckpoint = mock(UnresolvedStreamerCheckpointBasedOnCfg.class);
when(mockCheckpoint.getCheckpointKey()).thenReturn("invalidPrefix:20240303");

IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion(mockCheckpoint)
);
assertTrue(exception.getMessage().contains("Illegal checkpoint key override"));
}

@Test
public void testResolveToV1V2CheckpointWithMalformedInput() {
UnresolvedStreamerCheckpointBasedOnCfg mockCheckpoint = mock(UnresolvedStreamerCheckpointBasedOnCfg.class);
when(mockCheckpoint.getCheckpointKey()).thenReturn("malformedInput");

IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion(mockCheckpoint)
);
assertTrue(exception.getMessage().contains("Illegal checkpoint key override"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.table.checkpoint.CheckpointUtils;
import org.apache.hudi.common.table.checkpoint.UnresolvedStreamerCheckpointBasedOnCfg;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
import org.apache.hudi.common.table.log.InstantRange;
Expand Down Expand Up @@ -69,6 +70,7 @@
import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE;
import static org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
import static org.apache.hudi.DataSourceReadOptions.START_COMMIT;
import static org.apache.hudi.common.table.checkpoint.HoodieIncrSourceCheckpointValUtils.resolveToActualCheckpointVersion;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
Expand Down Expand Up @@ -183,8 +185,14 @@ public HoodieIncrSource(

@Override
protected Option<Checkpoint> translateCheckpoint(Option<Checkpoint> lastCheckpoint) {
// For Hudi incremental source, we'll let #fetchNextBatch to handle it since
// it involves heavy lifting by loading the timeline for analysis
// User might override checkpoint based on
// - instant request time: Then we will treat it as a V1 checkpoint.
// - completion time: We will treat it as a normal V2 checkpoint.
if (!lastCheckpoint.isEmpty() && lastCheckpoint.get() instanceof UnresolvedStreamerCheckpointBasedOnCfg) {
lastCheckpoint = Option.of(resolveToActualCheckpointVersion((UnresolvedStreamerCheckpointBasedOnCfg) lastCheckpoint.get()));
}
// For Hudi incremental source, we'll let #fetchNextBatch to handle request time to completion time
// based conversion as it is heavy operation.
return lastCheckpoint;
}

Expand Down
Loading

0 comments on commit 0a0f3bb

Please sign in to comment.