Skip to content

Commit

Permalink
Spark: Ensure partition stats files are considered for GC procedures (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat authored Jan 18, 2024
1 parent b6cefe5 commit 02836ea
Show file tree
Hide file tree
Showing 13 changed files with 586 additions and 92 deletions.
42 changes: 40 additions & 2 deletions core/src/main/java/org/apache/iceberg/ReachableFileUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,13 @@ public static List<String> manifestListLocations(Table table, Set<Long> snapshot
}

/**
* Returns locations of statistics files in a table.
* Returns locations of all statistics files in a table.
*
* @param table table for which statistics files needs to be listed
* @return the location of statistics files
*/
public static List<String> statisticsFilesLocations(Table table) {
return statisticsFilesLocations(table, statisticsFile -> true);
return statisticsFilesLocationsForSnapshots(table, null);
}

/**
Expand All @@ -148,12 +148,50 @@ public static List<String> statisticsFilesLocations(Table table) {
* @param table table for which statistics files needs to be listed
* @param predicate predicate for filtering the statistics files
* @return the location of statistics files
* @deprecated since 1.5.0, will be removed in 1.6.0; use the {@code
* statisticsFilesLocationsForSnapshots(table, snapshotIds)} instead.
*/
@Deprecated
public static List<String> statisticsFilesLocations(
Table table, Predicate<StatisticsFile> predicate) {
return table.statisticsFiles().stream()
.filter(predicate)
.map(StatisticsFile::path)
.collect(Collectors.toList());
}

/**
* Returns locations of all statistics files for a table matching the given snapshot IDs.
*
* @param table table for which statistics files needs to be listed
* @param snapshotIds ids of snapshots for which statistics files will be returned. If null,
* statistics files for all the snapshots will be returned.
* @return the location of statistics files
*/
public static List<String> statisticsFilesLocationsForSnapshots(
Table table, Set<Long> snapshotIds) {
List<String> statsFileLocations = Lists.newArrayList();

Predicate<StatisticsFile> statsFilePredicate;
Predicate<PartitionStatisticsFile> partitionStatsFilePredicate;
if (snapshotIds == null) {
statsFilePredicate = file -> true;
partitionStatsFilePredicate = file -> true;
} else {
statsFilePredicate = file -> snapshotIds.contains(file.snapshotId());
partitionStatsFilePredicate = file -> snapshotIds.contains(file.snapshotId());
}

table.statisticsFiles().stream()
.filter(statsFilePredicate)
.map(StatisticsFile::path)
.forEach(statsFileLocations::add);

table.partitionStatisticsFiles().stream()
.filter(partitionStatsFilePredicate)
.map(PartitionStatisticsFile::path)
.forEach(statsFileLocations::add);

return statsFileLocations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.UUID;
import org.apache.iceberg.ImmutableGenericPartitionStatisticsFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.PositionOutputStream;

public class ProcedureUtil {

private ProcedureUtil() {}

static PartitionStatisticsFile writePartitionStatsFile(
long snapshotId, String statsLocation, FileIO fileIO) {
PositionOutputStream positionOutputStream;
try {
positionOutputStream = fileIO.newOutputFile(statsLocation).create();
positionOutputStream.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}

return ImmutableGenericPartitionStatisticsFile.builder()
.snapshotId(snapshotId)
.fileSizeInBytes(42L)
.path(statsLocation)
.build();
}

static String statsFileLocation(String tableLocation) {
String statsFileName = "stats-file-" + UUID.randomUUID();
return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.GenericBlobMetadata;
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -458,7 +457,7 @@ public void testExpireSnapshotsWithStatisticFiles() throws Exception {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
String statsFileLocation1 = statsFileLocation(table.location());
String statsFileLocation1 = ProcedureUtil.statsFileLocation(table.location());
StatisticsFile statisticsFile1 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
Expand All @@ -469,7 +468,7 @@ public void testExpireSnapshotsWithStatisticFiles() throws Exception {

sql("INSERT INTO %s SELECT 20, 'def'", tableName);
table.refresh();
String statsFileLocation2 = statsFileLocation(table.location());
String statsFileLocation2 = ProcedureUtil.statsFileLocation(table.location());
StatisticsFile statisticsFile2 =
writeStatsFile(
table.currentSnapshot().snapshotId(),
Expand All @@ -488,18 +487,9 @@ public void testExpireSnapshotsWithStatisticFiles() throws Exception {
Assertions.assertThat(output.get(0)[5]).as("should be 1 deleted statistics file").isEqualTo(1L);

table.refresh();
List<StatisticsFile> statsWithSnapshotId1 =
table.statisticsFiles().stream()
.filter(statisticsFile -> statisticsFile.snapshotId() == statisticsFile1.snapshotId())
.collect(Collectors.toList());
Assertions.assertThat(statsWithSnapshotId1)
.as(
"Statistics file entry in TableMetadata should be deleted for the snapshot %s",
statisticsFile1.snapshotId())
.isEmpty();
Assertions.assertThat(table.statisticsFiles())
.as(
"Statistics file entry in TableMetadata should be present for the snapshot %s",
"Statistics file entry in TableMetadata should be present only for the snapshot %s",
statisticsFile2.snapshotId())
.extracting(StatisticsFile::snapshotId)
.containsExactly(statisticsFile2.snapshotId());
Expand All @@ -513,7 +503,58 @@ public void testExpireSnapshotsWithStatisticFiles() throws Exception {
.exists();
}

private StatisticsFile writeStatsFile(
@Test
public void testExpireSnapshotsWithPartitionStatisticFiles() {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
sql("INSERT INTO TABLE %s VALUES (10, 'abc')", tableName);
Table table = validationCatalog.loadTable(tableIdent);
String partitionStatsFileLocation1 = ProcedureUtil.statsFileLocation(table.location());
PartitionStatisticsFile partitionStatisticsFile1 =
ProcedureUtil.writePartitionStatsFile(
table.currentSnapshot().snapshotId(), partitionStatsFileLocation1, table.io());
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile1).commit();

sql("INSERT INTO %s SELECT 20, 'def'", tableName);
table.refresh();
String partitionStatsFileLocation2 = ProcedureUtil.statsFileLocation(table.location());
PartitionStatisticsFile partitionStatisticsFile2 =
ProcedureUtil.writePartitionStatsFile(
table.currentSnapshot().snapshotId(), partitionStatsFileLocation2, table.io());
table.updatePartitionStatistics().setPartitionStatistics(partitionStatisticsFile2).commit();

waitUntilAfter(table.currentSnapshot().timestampMillis());

Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));
List<Object[]> output =
sql(
"CALL %s.system.expire_snapshots(older_than => TIMESTAMP '%s',table => '%s')",
catalogName, currentTimestamp, tableIdent);
Assertions.assertThat(output.get(0)[5])
.as("should be 1 deleted partition statistics file")
.isEqualTo(1L);

table.refresh();
Assertions.assertThat(table.partitionStatisticsFiles())
.as(
"partition statistics file entry in TableMetadata should be present only for the snapshot %s",
partitionStatisticsFile2.snapshotId())
.extracting(PartitionStatisticsFile::snapshotId)
.containsExactly(partitionStatisticsFile2.snapshotId());

Assertions.assertThat(new File(partitionStatsFileLocation1))
.as(
"partition statistics file should not exist for snapshot %s",
partitionStatisticsFile1.snapshotId())
.doesNotExist();

Assertions.assertThat(new File(partitionStatsFileLocation2))
.as(
"partition statistics file should exist for snapshot %s",
partitionStatisticsFile2.snapshotId())
.exists();
}

private static StatisticsFile writeStatsFile(
long snapshotId, long snapshotSequenceNumber, String statsLocation, FileIO fileIO)
throws IOException {
try (PuffinWriter puffinWriter = Puffin.write(fileIO.newOutputFile(statsLocation)).build()) {
Expand All @@ -536,9 +577,4 @@ private StatisticsFile writeStatsFile(
.collect(ImmutableList.toImmutableList()));
}
}

private String statsFileLocation(String tableLocation) {
String statsFileName = "stats-file-" + UUID.randomUUID();
return tableLocation.replaceFirst("file:", "") + "/metadata/" + statsFileName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.iceberg.GenericStatisticsFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -541,6 +542,75 @@ public void testRemoveOrphanFilesWithStatisticFiles() throws Exception {
Assertions.assertThat(statsLocation.exists()).as("stats file should be deleted").isFalse();
}

@Test
public void testRemoveOrphanFilesWithPartitionStatisticFiles() throws Exception {
sql(
"CREATE TABLE %s USING iceberg "
+ "TBLPROPERTIES('format-version'='2') "
+ "AS SELECT 10 int, 'abc' data",
tableName);
Table table = Spark3Util.loadIcebergTable(spark, tableName);

String partitionStatsLocation = ProcedureUtil.statsFileLocation(table.location());
PartitionStatisticsFile partitionStatisticsFile =
ProcedureUtil.writePartitionStatsFile(
table.currentSnapshot().snapshotId(), partitionStatsLocation, table.io());

commitPartitionStatsTxn(table, partitionStatisticsFile);

// wait to ensure files are old enough
waitUntilAfter(System.currentTimeMillis());
Timestamp currentTimestamp = Timestamp.from(Instant.ofEpochMilli(System.currentTimeMillis()));

List<Object[]> output =
sql(
"CALL %s.system.remove_orphan_files("
+ "table => '%s',"
+ "older_than => TIMESTAMP '%s')",
catalogName, tableIdent, currentTimestamp);
Assertions.assertThat(output).as("Should be no orphan files").isEmpty();

Assertions.assertThat(new File(partitionStatsLocation))
.as("partition stats file should exist")
.exists();

removePartitionStatsTxn(table, partitionStatisticsFile);

output =
sql(
"CALL %s.system.remove_orphan_files("
+ "table => '%s',"
+ "older_than => TIMESTAMP '%s')",
catalogName, tableIdent, currentTimestamp);
Assertions.assertThat(output).as("Should be orphan files").hasSize(1);
Assertions.assertThat(Iterables.getOnlyElement(output))
.as("Deleted files")
.containsExactly("file:" + partitionStatsLocation);
Assertions.assertThat(new File(partitionStatsLocation))
.as("partition stats file should be deleted")
.doesNotExist();
}

private static void removePartitionStatsTxn(
Table table, PartitionStatisticsFile partitionStatisticsFile) {
Transaction transaction = table.newTransaction();
transaction
.updatePartitionStatistics()
.removePartitionStatistics(partitionStatisticsFile.snapshotId())
.commit();
transaction.commitTransaction();
}

private static void commitPartitionStatsTxn(
Table table, PartitionStatisticsFile partitionStatisticsFile) {
Transaction transaction = table.newTransaction();
transaction
.updatePartitionStatistics()
.setPartitionStatistics(partitionStatisticsFile)
.commit();
transaction.commitTransaction();
}

@Test
public void testRemoveOrphanFilesProcedureWithPrefixMode()
throws NoSuchTableException, ParseException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.iceberg.AllManifestsTable;
import org.apache.iceberg.BaseTable;
Expand All @@ -44,7 +43,6 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReachableFileUtil;
import org.apache.iceberg.StaticTableOperations;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.NotFoundException;
Expand Down Expand Up @@ -204,14 +202,8 @@ protected Dataset<FileInfo> manifestListDS(Table table, Set<Long> snapshotIds) {
}

protected Dataset<FileInfo> statisticsFileDS(Table table, Set<Long> snapshotIds) {
Predicate<StatisticsFile> predicate;
if (snapshotIds == null) {
predicate = statisticsFile -> true;
} else {
predicate = statisticsFile -> snapshotIds.contains(statisticsFile.snapshotId());
}

List<String> statisticsFiles = ReachableFileUtil.statisticsFilesLocations(table, predicate);
List<String> statisticsFiles =
ReachableFileUtil.statisticsFilesLocationsForSnapshots(table, snapshotIds);
return toFileInfoDS(statisticsFiles, STATISTICS_FILES);
}

Expand Down
Loading

0 comments on commit 02836ea

Please sign in to comment.