Skip to content

Commit

Permalink
Core: Add predicate pushdown for files metadata table (apache#2926)
Browse files Browse the repository at this point in the history
  • Loading branch information
szehon-ho authored Aug 9, 2021
1 parent 7ff4671 commit 7a3bfed
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 105 deletions.
20 changes: 20 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* deserialization.
*/
abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
protected static final String PARTITION_FIELD_PREFIX = "partition.";
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
private final TableOperations ops;
Expand All @@ -47,6 +48,25 @@ protected BaseMetadataTable(TableOperations ops, Table table, String name) {
this.name = name;
}

/**
* This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
* expression against the given metadata table.
* <p>
* The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform.
* When this spec is used to project an expression for the given metadata table, the projection will remove
* predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields.
*
* @param metadataTableSchena schema of the metadata table
* @param spec spec on which the metadata table schema is based
* @param partitionPrefix prefix to remove from each field in the partition spec
* @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
*/
static PartitionSpec transformSpec(Schema metadataTableSchena, PartitionSpec spec, String partitionPrefix) {
PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchena);
spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name()));
return identitySpecBuilder.build();
}

abstract MetadataTableType metadataTableType();

protected Table table() {
Expand Down
25 changes: 24 additions & 1 deletion core/src/main/java/org/apache/iceberg/DataFilesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
Expand Down Expand Up @@ -109,7 +112,22 @@ protected CloseableIterable<FileScanTask> planFiles(
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);

return CloseableIterable.transform(manifests, manifest ->
// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
Expression partitionFilter = Projections
.inclusive(
transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
caseSensitive)
.project(rowFilter);

ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
partitionFilter, table().spec(), caseSensitive);
CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);

// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
// empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
// all cases.
return CloseableIterable.transform(filtered, manifest ->
new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
}
}
Expand Down Expand Up @@ -138,5 +156,10 @@ public CloseableIterable<StructLike> rows() {
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}

@VisibleForTesting
ManifestFile manifest() {
return manifest;
}
}
}
21 changes: 1 addition & 20 deletions core/src/main/java/org/apache/iceberg/PartitionsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class PartitionsTable extends BaseMetadataTable {
private final Schema schema;
static final boolean PLAN_SCANS_WITH_WORKER_POOL =
SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
private static final String PARTITION_FIELD_PREFIX = "partition.";

PartitionsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".partitions");
Expand Down Expand Up @@ -112,7 +111,7 @@ static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {

// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
Expression partitionFilter = Projections
.inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
.inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive)
.project(scan.filter());

ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
Expand All @@ -132,24 +131,6 @@ static CloseableIterable<FileScanTask> planFiles(StaticTableScan scan) {
return manifestGroup.planFiles();
}

/**
* This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
* expression against the partitions table.
* <p>
* The resulting partition spec maps partition.X fields to partition X using an identity partition transform. When
* this spec is used to project an expression for the partitions table, the projection will remove predicates for
* non-partition fields (not in the spec) and will remove the "partition." prefix from fields.
*
* @param partitionTableSchema schema of the partition table
* @param spec spec on which the partition table schema is based
* @return a spec used to rewrite partition table filters to partition filters using an inclusive projection
*/
private static PartitionSpec transformSpec(Schema partitionTableSchema, PartitionSpec spec) {
PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(partitionTableSchema);
spec.fields().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name(), pf.name()));
return identitySpecBuilder.build();
}

private class PartitionsScan extends StaticTableScan {
PartitionsScan(TableOperations ops, Table table) {
super(ops, table, PartitionsTable.this.schema(), PartitionsTable.this.metadataTableType().name(),
Expand Down
Loading

0 comments on commit 7a3bfed

Please sign in to comment.