Skip to content

Commit

Permalink
Add check of older_than when RemoveOrphanFiles
Browse files Browse the repository at this point in the history
  • Loading branch information
hongli.wwj authored and wwj6591812 committed Dec 26, 2024
1 parent 49a5bd2 commit fd12a7e
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
Expand All @@ -33,6 +34,7 @@
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableConsumer;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -377,9 +379,18 @@ public static SerializableConsumer<Path> createFileCleaner(
}

public static long olderThanMillis(@Nullable String olderThan) {
return isNullOrWhitespaceOnly(olderThan)
? System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1)
: DateTimeUtils.parseTimestampData(olderThan, 3, TimeZone.getDefault())
.getMillisecond();
if (isNullOrWhitespaceOnly(olderThan)) {
return System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
} else {
Timestamp parsedTimestampData =
DateTimeUtils.parseTimestampData(olderThan, 3, TimeZone.getDefault());
Preconditions.checkArgument(
parsedTimestampData.compareTo(
Timestamp.fromEpochMillis(System.currentTimeMillis()))
< 0,
"The arg olderThan must be less than now, because dataFiles that are currently being written and not referenced by snapshots will be mistakenly cleaned up.");

return parsedTimestampData.getMillisecond();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.operation;

import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Utils for {@link OrphanFilesClean}. */
public class OrphanFilesCleanTest {

@Test
public void testOlderThanMillis() {
// normal olderThan
OrphanFilesClean.olderThanMillis(null);
OrphanFilesClean.olderThanMillis("2024-12-21 23:00:00");

// non normal olderThan
assertThatThrownBy(() -> OrphanFilesClean.olderThanMillis("3024-12-21 23:00:00"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"The arg olderThan must be less than now, because dataFiles that are currently being written and not referenced by snapshots will be mistakenly cleaned up.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -124,17 +125,20 @@ public void testRunWithoutException() throws Exception {
CloseableIterator<Row> withoutOlderThanCollect = executeSQL(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));

String olderThan =
DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
String withDryRun =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)",
database, tableName);
"CALL sys.remove_orphan_files('%s.%s', '%s', true)",
database, tableName, olderThan);
ImmutableList<Row> actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun));
assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2"));

String withOlderThan =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')",
database, tableName);
"CALL sys.remove_orphan_files('%s.%s', '%s')",
database, tableName, olderThan);
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan));

assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2"));
Expand Down Expand Up @@ -178,17 +182,19 @@ public void testRemoveDatabaseOrphanFilesITCase() throws Exception {
CloseableIterator<Row> withParallelismCollect = executeSQL(withParallelism);
assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0"));

String olderThan =
DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
String withDryRun =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)",
database, "*");
"CALL sys.remove_orphan_files('%s.%s', '%s', true)",
database, "*", olderThan);
ImmutableList<Row> actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun));
assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4"));

String withOlderThan =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')",
database, "*");
"CALL sys.remove_orphan_files('%s.%s', '%s')", database, "*", olderThan);
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan));

assertThat(actualDeleteFile).containsOnly(Row.of("4"));
Expand Down Expand Up @@ -237,10 +243,13 @@ public void testCleanWithBranch() throws Exception {
false,
true);
}

String olderThan =
DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
String procedure =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')",
database, "*");
"CALL sys.remove_orphan_files('%s.%s', '%s')", database, "*", olderThan);
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(executeSQL(procedure));
assertThat(actualDeleteFile).containsOnly(Row.of("4"));
}
Expand Down Expand Up @@ -272,26 +281,29 @@ public void testRunWithMode() throws Exception {
CloseableIterator<Row> withoutOlderThanCollect = executeSQL(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));

String olderThan =
DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
String withLocalMode =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'local')",
database, tableName);
"CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'local')",
database, tableName, olderThan);
ImmutableList<Row> actualLocalRunDeleteFile =
ImmutableList.copyOf(executeSQL(withLocalMode));
assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));

String withDistributedMode =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'distributed')",
database, tableName);
"CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'distributed')",
database, tableName, olderThan);
ImmutableList<Row> actualDistributedRunDeleteFile =
ImmutableList.copyOf(executeSQL(withDistributedMode));
assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));

String withInvalidMode =
String.format(
"CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'unknown')",
database, tableName);
"CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'unknown')",
database, tableName, olderThan);
assertThatCode(() -> executeSQL(withInvalidMode))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Unknown mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DateTimeUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -129,23 +130,28 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception {
CloseableIterator<Row> withoutOlderThanCollect = executeSQL(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));

String olderThan =
DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
String withDryRun =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)",
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true)"
: "CALL sys.remove_orphan_files('%s.%s', '%s', true)",
database,
tableName);
tableName,
olderThan);
ImmutableList<Row> actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun));
assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2"));

String withOlderThan =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')",
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')"
: "CALL sys.remove_orphan_files('%s.%s', '%s')",
database,
tableName);
tableName,
olderThan);
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan));

assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2"));
Expand Down Expand Up @@ -195,23 +201,28 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws
CloseableIterator<Row> withParallelismCollect = executeSQL(withParallelism);
assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0"));

String olderThan =
DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
String withDryRun =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true)",
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true)"
: "CALL sys.remove_orphan_files('%s.%s', '%s', true)",
database,
"*");
"*",
olderThan);
ImmutableList<Row> actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun));
assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4"));

String withOlderThan =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')",
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')"
: "CALL sys.remove_orphan_files('%s.%s', '%s')",
database,
"*");
"*",
olderThan);
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan));

assertThat(actualDeleteFile).containsOnly(Row.of("4"));
Expand Down Expand Up @@ -261,13 +272,18 @@ public void testCleanWithBranch(boolean isNamedArgument) throws Exception {
false,
true);
}

String olderThan =
DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
String procedure =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59')",
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s')"
: "CALL sys.remove_orphan_files('%s.%s', '%s')",
database,
"*");
"*",
olderThan);
ImmutableList<Row> actualDeleteFile = ImmutableList.copyOf(executeSQL(procedure));
assertThat(actualDeleteFile).containsOnly(Row.of("4"));
}
Expand Down Expand Up @@ -305,35 +321,41 @@ public void testRunWithMode(boolean isNamedArgument) throws Exception {
CloseableIterator<Row> withoutOlderThanCollect = executeSQL(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));

String olderThan =
DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3);
String withLocalMode =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'local')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'local')",
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'local')"
: "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'local')",
database,
tableName);
tableName,
olderThan);
ImmutableList<Row> actualLocalRunDeleteFile =
ImmutableList.copyOf(executeSQL(withLocalMode));
assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));

String withDistributedMode =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'distributed')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'distributed')",
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'distributed')"
: "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'distributed')",
database,
tableName);
tableName,
olderThan);
ImmutableList<Row> actualDistributedRunDeleteFile =
ImmutableList.copyOf(executeSQL(withDistributedMode));
assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));

String withInvalidMode =
String.format(
isNamedArgument
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism => 5, mode => 'unknown')"
: "CALL sys.remove_orphan_files('%s.%s', '2999-12-31 23:59:59', true, 5, 'unknown')",
? "CALL sys.remove_orphan_files(`table` => '%s.%s', older_than => '%s', dry_run => true, parallelism => 5, mode => 'unknown')"
: "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'unknown')",
database,
tableName);
tableName,
olderThan);
assertThatCode(() -> executeSQL(withInvalidMode))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Unknown mode");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark.procedure

import org.apache.paimon.data.Timestamp
import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.utils.DateTimeUtils
Expand Down Expand Up @@ -61,6 +62,10 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
TimeUnit.SECONDS.toMillis(1)),
3)

System.out.println("orphanFile2ModTime is : " + orphanFile2ModTime);
System.out.println("older_than1 is : " + older_than1)
System.out.println("in ut Timestamp.now() is : " + Timestamp.now)

checkAnswer(
spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1')"),
Row(1, 1) :: Nil)
Expand Down

0 comments on commit fd12a7e

Please sign in to comment.