diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 274cdd52fe14..fc2e1200f08d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.data.Timestamp; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -33,6 +34,7 @@ import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SerializableConsumer; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -377,9 +379,18 @@ public static SerializableConsumer createFileCleaner( } public static long olderThanMillis(@Nullable String olderThan) { - return isNullOrWhitespaceOnly(olderThan) - ? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1) - : DateTimeUtils.parseTimestampData(olderThan, 3, TimeZone.getDefault()) - .getMillisecond(); + if (isNullOrWhitespaceOnly(olderThan)) { + return System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1); + } else { + Timestamp parsedTimestampData = + DateTimeUtils.parseTimestampData(olderThan, 3, TimeZone.getDefault()); + Preconditions.checkArgument( + parsedTimestampData.compareTo( + Timestamp.fromEpochMillis(System.currentTimeMillis())) + < 0, + "The arg olderThan must be less than now, because dataFiles that are currently being written and not referenced by snapshots will be mistakenly cleaned up."); + + return parsedTimestampData.getMillisecond(); + } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java new file mode 100644 index 000000000000..97ba35b2c07b --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Utils for {@link OrphanFilesClean}. */ +public class OrphanFilesCleanTest { + + @Test + public void testOlderThanMillis() { + // normal olderThan + OrphanFilesClean.olderThanMillis(null); + OrphanFilesClean.olderThanMillis("2024-12-21 23:00:00"); + + // non normal olderThan + assertThatThrownBy(() -> OrphanFilesClean.olderThanMillis("3024-12-21 23:00:00")) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage( + "The arg olderThan must be less than now, because dataFiles that are currently being written and not referenced by snapshots will be mistakenly cleaned up."); + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java index a168c3785c7c..5a9cd0fe2d9a 100644 --- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java @@ -37,6 +37,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -124,17 +125,20 @@ public void testRunWithoutException() throws Exception { CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); + String olderThan = + DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); String withDryRun = String.format( - "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)", - database, tableName); + "CALL sys.remove_orphan_files('%s.%s', '%s', true)", + database, tableName, olderThan); ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun)); assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2")); String withOlderThan = String.format( - "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", - database, tableName); + "CALL sys.remove_orphan_files('%s.%s', '%s')", + database, tableName, olderThan); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2")); @@ -178,17 +182,19 @@ public void testRemoveDatabaseOrphanFilesITCase() throws Exception { CloseableIterator withParallelismCollect = executeSQL(withParallelism); assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0")); + String olderThan = + DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); String withDryRun = String.format( - "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)", - database, "*"); + "CALL sys.remove_orphan_files('%s.%s', '%s', true)", + database, "*", olderThan); ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun)); assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4")); String withOlderThan = String.format( - "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", - database, "*"); + "CALL sys.remove_orphan_files('%s.%s', '%s')", database, "*", olderThan); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); assertThat(actualDeleteFile).containsOnly(Row.of("4")); @@ -237,10 +243,13 @@ public void testCleanWithBranch() throws Exception { false, true); } + + String olderThan = + DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); String procedure = String.format( - "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", - database, "*"); + "CALL sys.remove_orphan_files('%s.%s', '%s')", database, "*", olderThan); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(procedure)); assertThat(actualDeleteFile).containsOnly(Row.of("4")); } @@ -272,26 +281,29 @@ public void testRunWithMode() throws Exception { CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); + String olderThan = + DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); String withLocalMode = String.format( - "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'local')", - database, tableName); + "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'local')", + database, tableName, olderThan); ImmutableList actualLocalRunDeleteFile = ImmutableList.copyOf(executeSQL(withLocalMode)); assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2")); String withDistributedMode = String.format( - "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'distributed')", - database, tableName); + "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'distributed')", + database, tableName, olderThan); ImmutableList actualDistributedRunDeleteFile = ImmutableList.copyOf(executeSQL(withDistributedMode)); assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2")); String withInvalidMode = String.format( - "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'unknown')", - database, tableName); + "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'unknown')", + database, tableName, olderThan); assertThatCode(() -> executeSQL(withInvalidMode)) .isInstanceOf(RuntimeException.class) .hasMessageContaining("Unknown mode"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java index 77f3be2f0c76..2828101114a1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java @@ -35,6 +35,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -129,23 +130,28 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); + String olderThan = + DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); String withDryRun = String.format( isNamedArgument - ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)" - : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)", + ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true)" + : "CALL sys.remove_orphan_files('%s.%s', '%s', true)", database, - tableName); + tableName, + olderThan); ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun)); assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2")); String withOlderThan = String.format( isNamedArgument - ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59')" - : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", + ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')" + : "CALL sys.remove_orphan_files('%s.%s', '%s')", database, - tableName); + tableName, + olderThan); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2")); @@ -195,23 +201,28 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws CloseableIterator withParallelismCollect = executeSQL(withParallelism); assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0")); + String olderThan = + DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); String withDryRun = String.format( isNamedArgument - ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)" - : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)", + ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true)" + : "CALL sys.remove_orphan_files('%s.%s', '%s', true)", database, - "*"); + "*", + olderThan); ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun)); assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4")); String withOlderThan = String.format( isNamedArgument - ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59')" - : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", + ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')" + : "CALL sys.remove_orphan_files('%s.%s', '%s')", database, - "*"); + "*", + olderThan); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); assertThat(actualDeleteFile).containsOnly(Row.of("4")); @@ -261,13 +272,18 @@ public void testCleanWithBranch(boolean isNamedArgument) throws Exception { false, true); } + + String olderThan = + DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); String procedure = String.format( isNamedArgument - ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59')" - : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')", + ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')" + : "CALL sys.remove_orphan_files('%s.%s', '%s')", database, - "*"); + "*", + olderThan); ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(procedure)); assertThat(actualDeleteFile).containsOnly(Row.of("4")); } @@ -305,13 +321,17 @@ public void testRunWithMode(boolean isNamedArgument) throws Exception { CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); + String olderThan = + DateTimeUtils.formatLocalDateTime( + DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); String withLocalMode = String.format( isNamedArgument - ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'local')" - : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'local')", + ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'local')" + : "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'local')", database, - tableName); + tableName, + olderThan); ImmutableList actualLocalRunDeleteFile = ImmutableList.copyOf(executeSQL(withLocalMode)); assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2")); @@ -319,10 +339,11 @@ public void testRunWithMode(boolean isNamedArgument) throws Exception { String withDistributedMode = String.format( isNamedArgument - ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'distributed')" - : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'distributed')", + ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'distributed')" + : "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'distributed')", database, - tableName); + tableName, + olderThan); ImmutableList actualDistributedRunDeleteFile = ImmutableList.copyOf(executeSQL(withDistributedMode)); assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2")); @@ -330,10 +351,11 @@ public void testRunWithMode(boolean isNamedArgument) throws Exception { String withInvalidMode = String.format( isNamedArgument - ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'unknown')" - : "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'unknown')", + ? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'unknown')" + : "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'unknown')", database, - tableName); + tableName, + olderThan); assertThatCode(() -> executeSQL(withInvalidMode)) .isInstanceOf(RuntimeException.class) .hasMessageContaining("Unknown mode"); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index 3ffe7fba264f..b1bb3124e336 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -18,6 +18,7 @@ package org.apache.paimon.spark.procedure +import org.apache.paimon.data.Timestamp import org.apache.paimon.fs.Path import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.utils.DateTimeUtils @@ -61,6 +62,10 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { TimeUnit.SECONDS.toMillis(1)), 3) + System.out.println("orphanFile2ModTime is : " + orphanFile2ModTime); + System.out.println("older_than1 is : " + older_than1) + System.out.println("in ut Timestamp.now() is : " + Timestamp.now) + checkAnswer( spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1')"), Row(1, 1) :: Nil)