Skip to content

Commit

Permalink
[Improvement](nereids)Support ODBC table for new planner. (apache#29129)
Browse files Browse the repository at this point in the history
  • Loading branch information
zy-kkk authored and seawinde committed Jan 3, 2024
1 parent ab2f2fb commit 03f1bd9
Show file tree
Hide file tree
Showing 17 changed files with 417 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
Expand Down Expand Up @@ -137,6 +138,12 @@ public Cost visitPhysicalJdbcScan(PhysicalJdbcScan physicalJdbcScan, PlanContext
return CostV1.ofCpu(context.getSessionVariable(), statistics.getRowCount());
}

@Override
public Cost visitPhysicalOdbcScan(PhysicalOdbcScan physicalOdbcScan, PlanContext context) {
Statistics statistics = context.getStatisticsWithCheck();
return CostV1.ofCpu(context.getSessionVariable(), statistics.getRowCount());
}

@Override
public Cost visitPhysicalEsScan(PhysicalEsScan physicalEsScan, PlanContext context) {
Statistics statistics = context.getStatisticsWithCheck();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
Expand Down Expand Up @@ -142,6 +143,11 @@ public Cost visitPhysicalJdbcScan(PhysicalJdbcScan physicalJdbcScan, PlanContext
return calculateScanWithoutRF(context.getStatisticsWithCheck());
}

@Override
public Cost visitPhysicalOdbcScan(PhysicalOdbcScan physicalOdbcScan, PlanContext context) {
return calculateScanWithoutRF(context.getStatisticsWithCheck());
}

@Override
public Cost visitPhysicalEsScan(PhysicalEsScan physicalEsScan, PlanContext context) {
return calculateScanWithoutRF(context.getStatisticsWithCheck());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Function.NullableMode;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
Expand Down Expand Up @@ -112,6 +113,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
Expand Down Expand Up @@ -176,6 +178,7 @@
import org.apache.doris.planner.external.hudi.HudiScanNode;
import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.jdbc.JdbcScanNode;
import org.apache.doris.planner.external.odbc.OdbcScanNode;
import org.apache.doris.planner.external.paimon.PaimonScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticConstants;
Expand Down Expand Up @@ -577,6 +580,29 @@ public PlanFragment visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanTransla
return planFragment;
}

@Override
public PlanFragment visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanTranslatorContext context) {
List<Slot> slots = odbcScan.getOutput();
TableIf table = odbcScan.getTable();
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context);
OdbcScanNode odbcScanNode = new OdbcScanNode(odbcScan.translatePlanNodeId(), tupleDescriptor,
(OdbcTable) table);
odbcScanNode.addConjuncts(translateToLegacyConjuncts(odbcScan.getConjuncts()));
Utils.execWithUncheckedException(odbcScanNode::init);
context.addScanNode(odbcScanNode);
context.getRuntimeTranslator().ifPresent(
runtimeFilterGenerator -> runtimeFilterGenerator.getContext().getTargetListByScan(odbcScan).forEach(
expr -> runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
)
);
Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), odbcScan);
return planFragment;
}

@Override
public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTranslatorContext context) {
List<Slot> slots = olapScan.getOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderTopN;
import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoEsScan;
import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoJdbcScan;
import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoOdbcScan;
import org.apache.doris.nereids.rules.rewrite.PushDownCountThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownCountThroughJoinOneSide;
import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin;
Expand Down Expand Up @@ -344,6 +345,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
new PruneEmptyPartition(),
new PruneFileScanPartition(),
new PushConjunctsIntoJdbcScan(),
new PushConjunctsIntoOdbcScan(),
new PushConjunctsIntoEsScan()
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
Expand Down Expand Up @@ -149,6 +150,11 @@ public PhysicalProperties visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, PlanC
return PhysicalProperties.STORAGE_ANY;
}

@Override
public PhysicalProperties visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, PlanContext context) {
return PhysicalProperties.STORAGE_ANY;
}

@Override
public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanContext context) {
return new PhysicalProperties(olapScan.getDistributionSpec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToNestedLoopJoin;
import org.apache.doris.nereids.rules.implementation.LogicalLimitToPhysicalLimit;
import org.apache.doris.nereids.rules.implementation.LogicalOdbcScanToPhysicalOdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalOlapScanToPhysicalOlapScan;
import org.apache.doris.nereids.rules.implementation.LogicalOlapTableSinkToPhysicalOlapTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalOneRowRelationToPhysicalOneRowRelation;
Expand Down Expand Up @@ -165,6 +166,7 @@ public class RuleSet {
.add(new LogicalSchemaScanToPhysicalSchemaScan())
.add(new LogicalFileScanToPhysicalFileScan())
.add(new LogicalJdbcScanToPhysicalJdbcScan())
.add(new LogicalOdbcScanToPhysicalOdbcScan())
.add(new LogicalEsScanToPhysicalEsScan())
.add(new LogicalProjectToPhysicalProject())
.add(new LogicalLimitToPhysicalLimit())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ public enum RuleType {
OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
FILE_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_JDBC_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_ODBC_SCAN(RuleTypeClass.REWRITE),
PUSH_CONJUNCTS_INTO_ES_SCAN(RuleTypeClass.REWRITE),
OLAP_SCAN_TABLET_PRUNE(RuleTypeClass.REWRITE),
PUSH_AGGREGATE_TO_OLAP_SCAN(RuleTypeClass.REWRITE),
Expand Down Expand Up @@ -368,6 +369,7 @@ public enum RuleType {
LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
Expand Down Expand Up @@ -242,6 +243,8 @@ private LogicalPlan getLogicalPlan(TableIf table, UnboundRelation unboundRelatio
case JDBC_EXTERNAL_TABLE:
case JDBC:
return new LogicalJdbcScan(unboundRelation.getRelationId(), table, tableQualifier);
case ODBC:
return new LogicalOdbcScan(unboundRelation.getRelationId(), table, tableQualifier);
case ES_EXTERNAL_TABLE:
return new LogicalEsScan(unboundRelation.getRelationId(), (EsExternalTable) table, tableQualifier);
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.implementation;

import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;

import java.util.Optional;

/**
* Implementation rule that convert logical OdbcScan to physical OdbcScan.
*/
public class LogicalOdbcScanToPhysicalOdbcScan extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalOdbcScan().then(odbcScan ->
new PhysicalOdbcScan(
odbcScan.getRelationId(),
odbcScan.getTable(),
odbcScan.getQualifier(),
Optional.empty(),
odbcScan.getLogicalProperties(),
odbcScan.getConjuncts())
).toRule(RuleType.LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.rewrite;

import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;

/**
* Rewrite odbc plan to set the conjuncts.
*/
public class PushConjunctsIntoOdbcScan extends OneRewriteRuleFactory {

@Override
public Rule build() {
return logicalFilter(logicalOdbcScan()).thenApply(ctx -> {
LogicalFilter<LogicalOdbcScan> filter = ctx.root;
LogicalOdbcScan scan = filter.child();
LogicalOdbcScan rewrittenScan = scan.withConjuncts(filter.getConjuncts());
return new LogicalFilter<>(filter.getConjuncts(), rewrittenScan);
}).toRule(RuleType.PUSH_CONJUNCTS_INTO_ODBC_SCAN);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
Expand Down Expand Up @@ -98,6 +99,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
Expand Down Expand Up @@ -317,6 +319,12 @@ public Statistics visitLogicalJdbcScan(LogicalJdbcScan jdbcScan, Void context) {
return computeCatalogRelation(jdbcScan);
}

@Override
public Statistics visitLogicalOdbcScan(LogicalOdbcScan odbcScan, Void context) {
odbcScan.getExpressions();
return computeCatalogRelation(odbcScan);
}

@Override
public Statistics visitLogicalEsScan(LogicalEsScan esScan, Void context) {
esScan.getExpressions();
Expand Down Expand Up @@ -460,6 +468,11 @@ public Statistics visitPhysicalJdbcScan(PhysicalJdbcScan jdbcScan, Void context)
return computeCatalogRelation(jdbcScan);
}

@Override
public Statistics visitPhysicalOdbcScan(PhysicalOdbcScan odbcScan, Void context) {
return computeCatalogRelation(odbcScan);
}

@Override
public Statistics visitPhysicalEsScan(PhysicalEsScan esScan, Void context) {
return computeCatalogRelation(esScan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN;
Expand Down Expand Up @@ -238,6 +239,18 @@ public Plan visitLogicalJdbcScan(LogicalJdbcScan jdbcScan, DeepCopierContext con
return newJdbcScan;
}

@Override
public Plan visitLogicalOdbcScan(LogicalOdbcScan odbcScan, DeepCopierContext context) {
if (context.getRelationReplaceMap().containsKey(odbcScan.getRelationId())) {
return context.getRelationReplaceMap().get(odbcScan.getRelationId());
}
LogicalOdbcScan newOdbcScan = new LogicalOdbcScan(StatementScopeIdGenerator.newRelationId(),
odbcScan.getTable(), odbcScan.getQualifier());
updateReplaceMapWithOutput(odbcScan, newOdbcScan, context.exprIdReplaceMap);
context.putRelation(odbcScan.getRelationId(), newOdbcScan);
return newOdbcScan;
}

@Override
public Plan visitLogicalEsScan(LogicalEsScan esScan, DeepCopierContext context) {
if (context.getRelationReplaceMap().containsKey(esScan.getRelationId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public enum PlanType {
LOGICAL_EMPTY_RELATION,
LOGICAL_ES_SCAN,
LOGICAL_JDBC_SCAN,
LOGICAL_ODBC_SCAN,
LOGICAL_OLAP_SCAN,
LOGICAL_ONE_ROW_RELATION,
LOGICAL_SCHEMA_SCAN,
Expand Down Expand Up @@ -83,6 +84,7 @@ public enum PlanType {
PHYSICAL_ES_SCAN,
PHYSICAL_FILE_SCAN,
PHYSICAL_JDBC_SCAN,
PHYSICAL_ODBC_SCAN,
PHYSICAL_ONE_ROW_RELATION,
PHYSICAL_OLAP_SCAN,
PHYSICAL_SCHEMA_SCAN,
Expand Down
Loading

0 comments on commit 03f1bd9

Please sign in to comment.