Skip to content

Commit

Permalink
Add catalog/db/table filter info in SchemaScanNode
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Jan 13, 2025
1 parent 56a61a9 commit 2b949ca
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 22 deletions.
2 changes: 2 additions & 0 deletions be/src/exec/schema_scanner/schema_tables_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ Status SchemaTablesScanner::_get_new_table() {
_db_index++;
if (nullptr != _param->common_param->wild) {
table_params.__set_pattern(*(_param->common_param->wild));
} else if (nullptr != _param->common_param->table) {
table_params.__set_pattern(*(_param->common_param->table));
}
if (nullptr != _param->common_param->current_user_ident) {
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,11 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
SchemaScanNode scanNode = null;
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
table.getName())) {
scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), tupleDescriptor);
scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), tupleDescriptor,
schemaScan.getSchemaCatalog(), schemaScan.getSchemaDatabase(), schemaScan.getSchemaTable());
} else {
scanNode = new SchemaScanNode(context.nextPlanNodeId(), tupleDescriptor);
scanNode = new SchemaScanNode(context.nextPlanNodeId(), tupleDescriptor,
schemaScan.getSchemaCatalog(), schemaScan.getSchemaDatabase(), schemaScan.getSchemaTable());
}
scanNode.setNereidsId(schemaScan.getId());
SchemaScanNode finalScanNode = scanNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import org.apache.doris.nereids.rules.rewrite.PushDownAggWithDistinctThroughJoinOneSide;
import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownEncodeSlot;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterIntoSchemaScan;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject;
import org.apache.doris.nereids.rules.rewrite.PushDownLimit;
import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughJoin;
Expand Down Expand Up @@ -410,7 +411,8 @@ public class Rewriter extends AbstractBatchJobExecutor {
topDown(
new PruneOlapScanPartition(),
new PruneEmptyPartition(),
new PruneFileScanPartition()
new PruneFileScanPartition(),
new PushDownFilterIntoSchemaScan()
)
),
topic("MV optimization",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ public enum RuleType {
OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),

FILE_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
PUSH_FILTER_INTO_SCHEMA_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_JDBC_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_ODBC_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_ES_SCAN(RuleTypeClass.REWRITE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ public Rule build() {
scan.getTable(),
scan.getQualifier(),
Optional.empty(),
scan.getLogicalProperties())
scan.getLogicalProperties(),
scan.getSchemaCatalog(),
scan.getSchemaDatabase(),
scan.getSchemaTable())
).toRule(RuleType.LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.catalog.Column;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;

import com.google.common.collect.ImmutableList;

import java.util.Optional;

/**
* Used to push down catalog/db/table name to schema scan node.
*/
public class PushDownFilterIntoSchemaScan extends OneRewriteRuleFactory {

@Override
public Rule build() {
return logicalFilter(logicalSchemaScan()).when(p -> !p.child().isFilterPushed()).thenApply(ctx -> {
LogicalFilter<LogicalSchemaScan> filter = ctx.root;
LogicalSchemaScan scan = filter.child();
String schemaCatalog = null;
String schemaDatabase = null;
String schemaTable = null;
for (Expression expression : filter.getConjuncts()) {
if (!(expression instanceof EqualTo)) {
continue;
}
Expression slot = expression.child(0);
if (!(slot instanceof SlotReference)) {
continue;
}
Optional<Column> column = ((SlotReference) slot).getColumn();
if (!column.isPresent()) {
continue;
}
String columnName = column.get().getName();
Expression slotValue = expression.child(1);
if (!(slotValue instanceof VarcharLiteral)) {
continue;
}
String columnValue = ((VarcharLiteral) slotValue).getValue();
if ("TABLE_CATALOG".equals(columnName)) {
schemaCatalog = columnValue;
} else if ("TABLE_SCHEMA".equals(columnName)) {
schemaDatabase = columnValue;
} else if ("TABLE_NAME".equals(columnName)) {
schemaTable = columnValue;
}
}
LogicalSchemaScan rewrittenScan = scan.withSchemaIdentifier(schemaCatalog, schemaDatabase, schemaTable);
return filter.withChildren(ImmutableList.of(rewrittenScan));
}).toRule(RuleType.PUSH_FILTER_INTO_SCHEMA_SCAN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,51 @@
import org.apache.doris.nereids.util.Utils;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* LogicalSchemaScan.
*/
public class LogicalSchemaScan extends LogicalCatalogRelation {

private final boolean filterPushed;
private final String schemaCatalog;
private final String schemaDatabase;
private final String schemaTable;

public LogicalSchemaScan(RelationId id, TableIf table, List<String> qualifier) {
super(id, PlanType.LOGICAL_SCHEMA_SCAN, table, qualifier);
this.filterPushed = false;
this.schemaCatalog = null;
this.schemaDatabase = null;
this.schemaTable = null;
}

public LogicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties) {
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
boolean filterPushed, String schemaCatalog, String schemaDatabase, String schemaTable) {
super(id, PlanType.LOGICAL_SCHEMA_SCAN, table, qualifier, groupExpression, logicalProperties);
this.filterPushed = filterPushed;
this.schemaCatalog = schemaCatalog;
this.schemaDatabase = schemaDatabase;
this.schemaTable = schemaTable;
}

public boolean isFilterPushed() {
return filterPushed;
}

public String getSchemaCatalog() {
return schemaCatalog;
}

public String getSchemaDatabase() {
return schemaDatabase;
}

public String getSchemaTable() {
return schemaTable;
}

@Override
Expand All @@ -56,22 +87,53 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalSchemaScan(relationId, table, qualifier,
groupExpression, Optional.of(getLogicalProperties()));
groupExpression, Optional.of(getLogicalProperties()), filterPushed,
schemaCatalog, schemaDatabase, schemaTable);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalSchemaScan(relationId, table, qualifier, groupExpression, logicalProperties);
return new LogicalSchemaScan(relationId, table, qualifier, groupExpression, logicalProperties, filterPushed,
schemaCatalog, schemaDatabase, schemaTable);
}

@Override
public LogicalSchemaScan withRelationId(RelationId relationId) {
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(), Optional.empty());
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(), Optional.empty(), filterPushed,
schemaCatalog, schemaDatabase, schemaTable);
}

public LogicalSchemaScan withSchemaIdentifier(String schemaCatalog, String schemaDatabase, String schemaTable) {
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(),
Optional.of(getLogicalProperties()), true, schemaCatalog, schemaDatabase, schemaTable);
}

@Override
public String toString() {
return Utils.toSqlString("LogicalSchemaScan");
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
LogicalSchemaScan that = (LogicalSchemaScan) o;
return Objects.equals(schemaCatalog, that.schemaCatalog)
&& Objects.equals(schemaDatabase, that.schemaDatabase)
&& Objects.equals(schemaTable, that.schemaTable)
&& filterPushed == that.filterPushed;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), schemaCatalog, schemaDatabase, schemaTable, filterPushed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,48 @@
import org.apache.doris.statistics.Statistics;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* PhysicalSchemaScan.
*/
public class PhysicalSchemaScan extends PhysicalCatalogRelation {

private final String schemaCatalog;
private final String schemaDatabase;
private final String schemaTable;

public PhysicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties) {
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
String schemaCatalog, String schemaDatabase, String schemaTable) {
super(id, PlanType.PHYSICAL_SCHEMA_SCAN, table, qualifier, groupExpression, logicalProperties);
this.schemaCatalog = schemaCatalog;
this.schemaDatabase = schemaDatabase;
this.schemaTable = schemaTable;
}

public PhysicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics) {
PhysicalProperties physicalProperties, Statistics statistics,
String schemaCatalog, String schemaDatabase, String schemaTable) {
super(id, PlanType.PHYSICAL_SCHEMA_SCAN, table, qualifier, groupExpression,
logicalProperties, physicalProperties, statistics);
this.schemaCatalog = schemaCatalog;
this.schemaDatabase = schemaDatabase;
this.schemaTable = schemaTable;
}

public String getSchemaCatalog() {
return schemaCatalog;
}

public String getSchemaDatabase() {
return schemaDatabase;
}

public String getSchemaTable() {
return schemaTable;
}

@Override
Expand All @@ -61,28 +86,53 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
groupExpression, getLogicalProperties(), physicalProperties, statistics);
groupExpression, getLogicalProperties(), physicalProperties, statistics,
schemaCatalog, schemaDatabase, schemaTable);
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
groupExpression, logicalProperties.get(), physicalProperties, statistics);
groupExpression, logicalProperties.get(), physicalProperties, statistics,
schemaCatalog, schemaDatabase, schemaTable);
}

@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
Statistics statistics) {
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
groupExpression, getLogicalProperties(), physicalProperties, statistics);
groupExpression, getLogicalProperties(), physicalProperties, statistics,
schemaCatalog, schemaDatabase, schemaTable);
}

@Override
public String toString() {
return Utils.toSqlString("PhysicalSchemaScan");
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PhysicalSchemaScan that = (PhysicalSchemaScan) o;
return Objects.equals(schemaCatalog, that.schemaCatalog)
&& Objects.equals(schemaDatabase, that.schemaDatabase)
&& Objects.equals(schemaTable, that.schemaTable);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), schemaCatalog, schemaDatabase, schemaTable);
}

@Override
public boolean canPushDownRuntimeFilter() {
// currently be doesn't support schema scan rf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ public static boolean isBackendPartitionedSchemaTable(String tableName) {
private Map<Long, Long> partitionIDToBackendID;
private Collection<Long> selectedPartitionIds = Lists.newArrayList();

public BackendPartitionedSchemaScanNode(PlanNodeId id, TupleDescriptor desc) {
super(id, desc);
public BackendPartitionedSchemaScanNode(PlanNodeId id, TupleDescriptor desc,
String schemaCatalog, String schemaDatabase, String schemaTable) {
super(id, desc, schemaCatalog, schemaDatabase, schemaTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,13 @@ public class SchemaScanNode extends ScanNode {
/**
* Constructs node to scan given data files of table 'tbl'.
*/
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc) {
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc,
String schemaCatalog, String schemaDb, String schemaTable) {
super(id, desc, "SCAN SCHEMA", StatisticalType.SCHEMA_SCAN_NODE);
this.tableName = desc.getTable().getName();
this.schemaCatalog = schemaCatalog;
this.schemaDb = schemaDb;
this.schemaTable = schemaTable;
}

public String getTableName() {
Expand Down Expand Up @@ -130,7 +134,9 @@ protected void toThrift(TPlanNode msg) {
msg.schema_scan_node.setDb("SESSION");
}
}
msg.schema_scan_node.setCatalog(desc.getTable().getDatabase().getCatalog().getName());
if (schemaCatalog != null) {
msg.schema_scan_node.setCatalog(schemaCatalog);
}
msg.schema_scan_node.show_hidden_cloumns = Util.showHiddenColumns();

if (schemaTable != null) {
Expand Down
Loading

0 comments on commit 2b949ca

Please sign in to comment.