Skip to content

Commit

Permalink
Use SupportsPrefixOperations for Remove OrphanFile Procedure on Spark…
Browse files Browse the repository at this point in the history
… 3.5
  • Loading branch information
ismailsimsek committed Jan 8, 2025
1 parent a191684 commit 74664ad
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -307,14 +308,20 @@ private Dataset<FileURI> actualFileIdentDS() {
@VisibleForTesting
Dataset<String> listWithPrefix() {
List<String> matchingFiles = Lists.newArrayList();
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
// listPrefix only returns files. so we additionally need to check parent folders for each file
// in following example file itself is not filtered out,
// but it should be excluded due to its parent folder: `_c2_trunc`
// "/data/_c2_trunc/file.txt"
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), true);

Iterator<org.apache.iceberg.io.FileInfo> iterator =
((SupportsPrefixOperations) table.io()).listPrefix(location).iterator();
while (iterator.hasNext()) {
org.apache.iceberg.io.FileInfo fileInfo = iterator.next();
if (fileInfo.createdAtMillis() < olderThanTimestamp
&& pathFilter.accept(new Path(fileInfo.location()))) {
// NOTE: check the path relative to table location. To avoid checking un necessary root
// folders
Path relativeFilePath = new Path(fileInfo.location().replace(location, ""));
if (fileInfo.createdAtMillis() < olderThanTimestamp && pathFilter.accept(relativeFilePath)) {
matchingFiles.add(fileInfo.location());
}
}
Expand All @@ -328,7 +335,8 @@ Dataset<String> listWithoutPrefix() {
List<String> matchingFiles = Lists.newArrayList();

Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp;
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs());
// don't check parent folders because it's already checked by recursive call
PathFilter pathFilter = PartitionAwareHiddenPathFilter.forSpecs(table.specs(), false);

// list at most MAX_DRIVER_LISTING_DEPTH levels and only dirs that have
// less than MAX_DRIVER_LISTING_DIRECT_SUB_DIRS direct sub dirs on the driver
Expand Down Expand Up @@ -612,21 +620,46 @@ private FileURI toFileURI(I input) {
static class PartitionAwareHiddenPathFilter implements PathFilter, Serializable {

private final Set<String> hiddenPathPartitionNames;
private final boolean checkParents;

PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames) {
PartitionAwareHiddenPathFilter(Set<String> hiddenPathPartitionNames, boolean checkParents) {
this.hiddenPathPartitionNames = hiddenPathPartitionNames;
this.checkParents = checkParents;
}

@Override
public boolean accept(Path path) {
if (!checkParents) {
return doAccept(path);
}

// if any of the parent folders is not accepted then return false
if (hasHiddenPttParentFolder(path)) {
return false;
}

return doAccept(path);
}

private boolean doAccept(Path path) {
return isHiddenPartitionPath(path) || HiddenPathFilter.get().accept(path);
}

/**
* Iterates through the parent folders if any of the parent folders of the given path is a
* hidden partition folder.
*/
public boolean hasHiddenPttParentFolder(Path path) {
return Stream.iterate(path, Path::getParent)
.takeWhile(Objects::nonNull)
.anyMatch(parentPath -> !doAccept(parentPath));
}

private boolean isHiddenPartitionPath(Path path) {
return hiddenPathPartitionNames.stream().anyMatch(path.getName()::startsWith);
}

static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
static PathFilter forSpecs(Map<Integer, PartitionSpec> specs, boolean checkParents) {
if (specs == null) {
return HiddenPathFilter.get();
}
Expand All @@ -642,7 +675,7 @@ static PathFilter forSpecs(Map<Integer, PartitionSpec> specs) {
if (partitionNames.isEmpty()) {
return HiddenPathFilter.get();
} else {
return new PartitionAwareHiddenPathFilter(partitionNames);
return new PartitionAwareHiddenPathFilter(partitionNames, checkParents);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,14 +867,16 @@ public void testCompareToFileList() throws IOException {
.as("Invalid file should be present")
.isTrue();

DeleteOrphanFilesSparkAction action3 =
DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis());
assertThatDatasetsAreEqualIgnoringOrder(action3.listWithPrefix(), action3.listWithoutPrefix());
assertThatDatasetsAreEqualIgnoringOrder(
deleteOrphanFilesSparkAction.listWithPrefix(),
deleteOrphanFilesSparkAction.listWithoutPrefix());

DeleteOrphanFiles.Result result3 = action3.execute();
DeleteOrphanFiles.Result result3 = deleteOrphanFilesSparkAction.execute();
assertThat(result3.orphanFileLocations())
.as("Action should delete 1 file")
.isEqualTo(invalidFilePaths);
Expand All @@ -900,14 +902,16 @@ public void testCompareToFileList() throws IOException {
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFilesSparkAction action4 =
deleteOrphanFilesSparkAction =
actions
.deleteOrphanFiles(table)
.compareToFileList(compareToFileListWithOutsideLocation)
.deleteWith(s -> {});
assertThatDatasetsAreEqualIgnoringOrder(action4.listWithPrefix(), action4.listWithoutPrefix());
assertThatDatasetsAreEqualIgnoringOrder(
deleteOrphanFilesSparkAction.listWithPrefix(),
deleteOrphanFilesSparkAction.listWithoutPrefix());

DeleteOrphanFiles.Result result4 = action4.execute();
DeleteOrphanFiles.Result result4 = deleteOrphanFilesSparkAction.execute();
assertThat(result4.orphanFileLocations()).as("Action should find nothing").isEmpty();
}

Expand Down

0 comments on commit 74664ad

Please sign in to comment.