Skip to content

Commit

Permalink
[core] Support filter pushdown in FilesTable (apache#2808)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 30, 2024
1 parent 9abfbd1 commit 15f8c9a
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.paimon.utils.SnapshotManager;

import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
Expand All @@ -70,11 +71,24 @@ public AbstractInnerTableScan withBucket(int bucket) {
return this;
}

@Override
public AbstractInnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
snapshotReader.withBucketFilter(bucketFilter);
return this;
}

@Override
public AbstractInnerTableScan withPartitionFilter(Map<String, String> partitionSpec) {
snapshotReader.withPartitionFilter(partitionSpec);
return this;
}

@Override
public AbstractInnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
snapshotReader.withLevelFilter(levelFilter);
return this;
}

public AbstractInnerTableScan withMetricsRegistry(MetricRegistry metricsRegistry) {
snapshotReader.withMetricRegistry(metricsRegistry);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import java.util.Map;

Expand All @@ -36,6 +37,14 @@ default InnerTableScan withPartitionFilter(Map<String, String> partitionSpec) {
return this;
}

default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}

default InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
return this;
}

default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
// do nothing, should implement this if need
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/** {@link TableScan} implementation for batch planning. */
public class InnerTableScanImpl extends AbstractInnerTableScan {
Expand Down Expand Up @@ -60,12 +59,6 @@ public InnerTableScan withFilter(Predicate predicate) {
return this;
}

@Override
public InnerTableScan withPartitionFilter(Map<String, String> partitionSpec) {
snapshotReader.withPartitionFilter(partitionSpec);
return this;
}

@Override
public InnerTableScan withLimit(int limit) {
this.pushDownLimit = limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,30 @@ public InnerTableScan withMetricsRegistry(MetricRegistry metricsRegistry) {
return this;
}

@Override
public InnerTableScan withLimit(int limit) {
batchScan.withLimit(limit);
return this;
}

@Override
public InnerTableScan withPartitionFilter(Map<String, String> partitionSpec) {
batchScan.withPartitionFilter(partitionSpec);
return this;
}

@Override
public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
batchScan.withBucketFilter(bucketFilter);
return this;
}

@Override
public InnerTableScan withLevelFilter(Filter<Integer> levelFilter) {
batchScan.withLevelFilter(levelFilter);
return this;
}

@Override
public Plan plan() {
return batchScan.plan();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@
package org.apache.paimon.table.system;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.LazyGenericRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.format.FieldStats;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -54,12 +58,15 @@

import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -142,19 +149,47 @@ private static class FilesScan extends ReadOnceTableScan {

private final FileStoreTable storeTable;

@Nullable private LeafPredicate partitionPredicate;
@Nullable private LeafPredicate bucketPredicate;
@Nullable private LeafPredicate levelPredicate;

private FilesScan(FileStoreTable storeTable) {
this.storeTable = storeTable;
}

@Override
public InnerTableScan withFilter(Predicate predicate) {
// TODO
public InnerTableScan withFilter(Predicate pushdown) {
List<Predicate> predicates = PredicateBuilder.splitAnd(pushdown);
for (Predicate predicate : predicates) {
if (predicate instanceof LeafPredicate) {
LeafPredicate leaf = (LeafPredicate) predicate;
switch (leaf.fieldName()) {
case "partition":
this.partitionPredicate = leaf;
break;
case "bucket":
this.bucketPredicate = leaf;
break;
case "level":
this.levelPredicate = leaf;
break;
default:
break;
}
}
}
return this;
}

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new FilesSplit(storeTable));
return () ->
Collections.singletonList(
new FilesSplit(
storeTable,
partitionPredicate,
bucketPredicate,
levelPredicate));
}
}

Expand All @@ -164,8 +199,19 @@ private static class FilesSplit implements Split {

private final FileStoreTable storeTable;

private FilesSplit(FileStoreTable storeTable) {
@Nullable private final LeafPredicate partitionPredicate;
@Nullable private final LeafPredicate bucketPredicate;
@Nullable private final LeafPredicate levelPredicate;

private FilesSplit(
FileStoreTable storeTable,
@Nullable LeafPredicate partitionPredicate,
@Nullable LeafPredicate bucketPredicate,
@Nullable LeafPredicate levelPredicate) {
this.storeTable = storeTable;
this.partitionPredicate = partitionPredicate;
this.bucketPredicate = bucketPredicate;
this.levelPredicate = levelPredicate;
}

@Override
Expand All @@ -178,7 +224,42 @@ public long rowCount() {
}

private TableScan.Plan plan() {
return storeTable.newScan().plan();
InnerTableScan scan = storeTable.newScan();
if (partitionPredicate != null) {
if (partitionPredicate.function() instanceof Equal) {
String partitionStr = partitionPredicate.literals().get(0).toString();
if (partitionStr.startsWith("[")) {
partitionStr = partitionStr.substring(1);
}
if (partitionStr.endsWith("]")) {
partitionStr = partitionStr.substring(0, partitionStr.length() - 1);
}
String[] partFields = partitionStr.split(", ");
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
List<String> partitionKeys = storeTable.partitionKeys();
for (int i = 0; i < partitionKeys.size(); i++) {
partSpec.put(partitionKeys.get(i), partFields[i]);
}
scan.withPartitionFilter(partSpec);
}
// TODO support range?
}
if (bucketPredicate != null) {
scan.withBucketFilter(
bucket -> {
// bucket index: 1
return bucketPredicate.test(GenericRow.of(null, bucket));
});
}
if (levelPredicate != null) {
scan.withLevelFilter(
level -> {
// level index: 5
return levelPredicate.test(
GenericRow.of(null, null, null, null, null, level));
});
}
return scan.plan();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,35 @@
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.SnapshotManager;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link FilesTable}. */
public class FilesTableTest extends TableTestBase {
private static final String tableName = "MyTable";

private FileStoreTable table;
private FileStoreScan scan;
Expand All @@ -64,6 +68,7 @@ public class FilesTableTest extends TableTestBase {

@BeforeEach
public void before() throws Exception {
String tableName = "MyTable";
FileIO fileIO = LocalFileIO.create();
Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, database, tableName));
Schema schema =
Expand Down Expand Up @@ -94,6 +99,37 @@ public void before() throws Exception {
write(table, GenericRow.of(2, 1, 3), GenericRow.of(2, 2, 4));
}

@Test
public void testReadWithFilter() throws Exception {
compact(table, row(2), 0);
write(table, GenericRow.of(3, 1, 1));
assertThat(readPartBucketLevel(null))
.containsExactlyInAnyOrder("[1]-0-0", "[1]-0-0", "[1]-1-0", "[2]-0-5");

PredicateBuilder builder = new PredicateBuilder(FilesTable.TABLE_TYPE);
assertThat(readPartBucketLevel(builder.equal(0, "[2]")))
.containsExactlyInAnyOrder("[2]-0-5");
assertThat(readPartBucketLevel(builder.equal(1, 1))).containsExactlyInAnyOrder("[1]-1-0");
assertThat(readPartBucketLevel(builder.equal(5, 5))).containsExactlyInAnyOrder("[2]-0-5");
}

private List<String> readPartBucketLevel(Predicate predicate) throws IOException {
ReadBuilder readBuilder = filesTable.newReadBuilder().withFilter(predicate);
List<String> rows = new ArrayList<>();
readBuilder
.newRead()
.createReader(readBuilder.newScan().plan())
.forEachRemaining(
row ->
rows.add(
row.getString(0)
+ "-"
+ row.getInt(1)
+ "-"
+ row.getInt(5)));
return rows;
}

@Test
public void testReadFilesFromLatest() throws Exception {
List<InternalRow> expectedRow = getExceptedResult(2L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -630,6 +631,32 @@ private void assertFilesTable(String tableName) {
: "[5],[10]")));
}

@Test
public void testFilesTableWithFilter() {
tEnv.getConfig().set(TABLE_DML_SYNC, true);
sql(
"CREATE TABLE T_WITH_FILTER (k INT, p INT, v INT, PRIMARY KEY (k, p) NOT ENFORCED) "
+ "PARTITIONED BY (p) WITH ('bucket'='2')");
sql("INSERT INTO T_WITH_FILTER VALUES (1, 2, 3), (4, 5, 6)");
sql("CALL sys.compact('default.T_WITH_FILTER')");
sql("INSERT INTO T_WITH_FILTER VALUES (7, 8, 9)");

assertThat(sql("SELECT `partition`, bucket, level FROM T_WITH_FILTER$files"))
.containsExactlyInAnyOrder(
Row.of("[2]", 0, 5), Row.of("[5]", 0, 5), Row.of("[8]", 1, 0));

assertThat(
sql(
"SELECT `partition`, bucket, level FROM T_WITH_FILTER$files WHERE `partition`='[2]'"))
.containsExactlyInAnyOrder(Row.of("[2]", 0, 5));

assertThat(sql("SELECT `partition`, bucket, level FROM T_WITH_FILTER$files WHERE bucket=0"))
.containsExactlyInAnyOrder(Row.of("[2]", 0, 5), Row.of("[5]", 0, 5));

assertThat(sql("SELECT `partition`, bucket, level FROM T_WITH_FILTER$files WHERE level=0"))
.containsExactlyInAnyOrder(Row.of("[8]", 1, 0));
}

@Nonnull
private List<String> getRowStringList(List<Row> rows) {
return rows.stream()
Expand Down

0 comments on commit 15f8c9a

Please sign in to comment.