From 026a9b00459ae458eb4c6c620c78742dc43a00e8 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 24 Jan 2025 07:47:34 +0100 Subject: [PATCH] Core, Spark: Include content offset/size in PositionDeletesTable (#11808) --- .../org/apache/iceberg/MetadataColumns.java | 2 + .../apache/iceberg/PositionDeletesTable.java | 73 +++++++++++++------ .../iceberg/TestMetadataTableScans.java | 7 +- .../iceberg/spark/source/DVIterator.java | 5 ++ .../source/TestPositionDeletesReader.java | 70 ++++++++++++------ 5 files changed, 111 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java index 060d27a018c0..792ed6a062ef 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java +++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java @@ -56,6 +56,8 @@ private MetadataColumns() {} public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5; public static final String PARTITION_COLUMN_NAME = "_partition"; public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to"; + public static final int CONTENT_OFFSET_COLUMN_ID = Integer.MAX_VALUE - 6; + public static final int CONTENT_SIZE_IN_BYTES_COLUMN_ID = Integer.MAX_VALUE - 7; // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns public static final NestedField DELETE_FILE_PATH = diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 382ad663a8d1..e205e6a9426e 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -52,6 +52,8 @@ public class PositionDeletesTable extends BaseMetadataTable { public static final String PARTITION = "partition"; public static final String SPEC_ID = "spec_id"; public static final String DELETE_FILE_PATH = "delete_file_path"; + public static final String CONTENT_OFFSET = "content_offset"; + public static final String CONTENT_SIZE_IN_BYTES = "content_size_in_bytes"; private final Schema schema; private final int defaultSpecId; @@ -110,31 +112,54 @@ public Map properties() { } private Schema calculateSchema() { + int formatVersion = TableUtil.formatVersion(table()); Types.StructType partitionType = Partitioning.partitionType(table()); - List columns = - ImmutableList.of( - MetadataColumns.DELETE_FILE_PATH, - MetadataColumns.DELETE_FILE_POS, - Types.NestedField.optional( - MetadataColumns.DELETE_FILE_ROW_FIELD_ID, - MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, - table().schema().asStruct(), - MetadataColumns.DELETE_FILE_ROW_DOC), - Types.NestedField.required( - MetadataColumns.PARTITION_COLUMN_ID, - PARTITION, - partitionType, - "Partition that position delete row belongs to"), - Types.NestedField.required( - MetadataColumns.SPEC_ID_COLUMN_ID, - SPEC_ID, - Types.IntegerType.get(), - MetadataColumns.SPEC_ID_COLUMN_DOC), - Types.NestedField.required( - MetadataColumns.FILE_PATH_COLUMN_ID, - DELETE_FILE_PATH, - Types.StringType.get(), - MetadataColumns.FILE_PATH_COLUMN_DOC)); + ImmutableList.Builder builder = + ImmutableList.builder() + .add(MetadataColumns.DELETE_FILE_PATH) + .add(MetadataColumns.DELETE_FILE_POS) + .add( + Types.NestedField.optional( + MetadataColumns.DELETE_FILE_ROW_FIELD_ID, + MetadataColumns.DELETE_FILE_ROW_FIELD_NAME, + table().schema().asStruct(), + MetadataColumns.DELETE_FILE_ROW_DOC)) + .add( + Types.NestedField.required( + MetadataColumns.PARTITION_COLUMN_ID, + PARTITION, + partitionType, + "Partition that position delete row belongs to")) + .add( + Types.NestedField.required( + MetadataColumns.SPEC_ID_COLUMN_ID, + SPEC_ID, + Types.IntegerType.get(), + MetadataColumns.SPEC_ID_COLUMN_DOC)) + .add( + Types.NestedField.required( + MetadataColumns.FILE_PATH_COLUMN_ID, + DELETE_FILE_PATH, + Types.StringType.get(), + MetadataColumns.FILE_PATH_COLUMN_DOC)); + + if (formatVersion >= 3) { + builder + .add( + Types.NestedField.optional( + MetadataColumns.CONTENT_OFFSET_COLUMN_ID, + CONTENT_OFFSET, + Types.LongType.get(), + "The offset in the DV where the content starts")) + .add( + Types.NestedField.optional( + MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID, + CONTENT_SIZE_IN_BYTES, + Types.LongType.get(), + "The length in bytes of the DV blob")); + } + + List columns = builder.build(); // Calculate used ids (for de-conflict) Set currentlyUsedIds = diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index 56b11009fc12..0e77e38ca360 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -1704,7 +1704,12 @@ public void testPositionDeletesManyColumns() { table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit(); PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table); - assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size()).isEqualTo(2010); + int expectedIds = + formatVersion >= 3 + ? 2012 // partition col + 8 columns + 2003 ids inside the deleted row column + : 2010; // partition col + 6 columns + 2003 ids inside the deleted row column + assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size()) + .isEqualTo(expectedIds); BatchScan scan = positionDeletesTable.newBatchScan(); assertThat(scan).isInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java index 7b08b86cbfd0..0c319e2bd41a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java @@ -30,6 +30,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ScanTaskUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.unsafe.types.UTF8String; @@ -79,6 +80,10 @@ public InternalRow next() { rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID)); } else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) { rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID)); + } else if (fieldId == MetadataColumns.CONTENT_OFFSET_COLUMN_ID) { + rowValues.add(deleteFile.contentOffset()); + } else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID) { + rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile)); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java index 5b876dfc57ce..c182413f3938 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesReader.java @@ -157,13 +157,17 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException Table positionDeletesTable = catalog.loadTable(TableIdentifier.of("default", "test", "position_deletes")); - Schema projectedSchema = - positionDeletesTable - .schema() - .select( - MetadataColumns.DELETE_FILE_PATH.name(), - MetadataColumns.DELETE_FILE_POS.name(), - PositionDeletesTable.DELETE_FILE_PATH); + List columns = + Lists.newArrayList( + MetadataColumns.DELETE_FILE_PATH.name(), + MetadataColumns.DELETE_FILE_POS.name(), + PositionDeletesTable.DELETE_FILE_PATH); + if (formatVersion >= 3) { + columns.add(PositionDeletesTable.CONTENT_OFFSET); + columns.add(PositionDeletesTable.CONTENT_SIZE_IN_BYTES); + } + + Schema projectedSchema = positionDeletesTable.schema().select(columns); List scanTasks = Lists.newArrayList( @@ -187,15 +191,27 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException String dataFileLocation = formatVersion >= 3 ? deleteFile1.referencedDataFile() : dataFile1.location(); - Object[] first = { - UTF8String.fromString(dataFileLocation), 0L, UTF8String.fromString(deleteFile1.location()) - }; - Object[] second = { - UTF8String.fromString(dataFileLocation), 1L, UTF8String.fromString(deleteFile1.location()) - }; + List first = + Lists.newArrayList( + UTF8String.fromString(dataFileLocation), + 0L, + UTF8String.fromString(deleteFile1.location())); + List second = + Lists.newArrayList( + UTF8String.fromString(dataFileLocation), + 1L, + UTF8String.fromString(deleteFile1.location())); + + if (formatVersion >= 3) { + first.add(deleteFile1.contentOffset()); + first.add(deleteFile1.contentSizeInBytes()); + second.add(deleteFile1.contentOffset()); + second.add(deleteFile1.contentSizeInBytes()); + } + assertThat(internalRowsToJava(actualRows, projectedSchema)) .hasSize(2) - .containsExactly(first, second); + .containsExactly(first.toArray(), second.toArray()); } assertThat(scanTasks.get(1)).isInstanceOf(PositionDeletesScanTask.class); @@ -214,15 +230,27 @@ public void readPositionDeletesTableWithMultipleDeleteFiles() throws IOException String dataFileLocation = formatVersion >= 3 ? deleteFile2.referencedDataFile() : dataFile2.location(); - Object[] first = { - UTF8String.fromString(dataFileLocation), 2L, UTF8String.fromString(deleteFile2.location()) - }; - Object[] second = { - UTF8String.fromString(dataFileLocation), 3L, UTF8String.fromString(deleteFile2.location()) - }; + List first = + Lists.newArrayList( + UTF8String.fromString(dataFileLocation), + 2L, + UTF8String.fromString(deleteFile2.location())); + List second = + Lists.newArrayList( + UTF8String.fromString(dataFileLocation), + 3L, + UTF8String.fromString(deleteFile2.location())); + + if (formatVersion >= 3) { + first.add(deleteFile2.contentOffset()); + first.add(deleteFile2.contentSizeInBytes()); + second.add(deleteFile2.contentOffset()); + second.add(deleteFile2.contentSizeInBytes()); + } + assertThat(internalRowsToJava(actualRows, projectedSchema)) .hasSize(2) - .containsExactly(first, second); + .containsExactly(first.toArray(), second.toArray()); } }