diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java index 6eb3bd5afdd8d1..d0ebd3f26aea79 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java @@ -202,6 +202,13 @@ public boolean isRunningAsConnectorOperator() { public void setScanSampleStrategy(RemoteFilesSampleStrategy strategy) { } + public boolean isConnectorScanNode() { + return this instanceof HdfsScanNode || this instanceof IcebergScanNode || + this instanceof HudiScanNode || this instanceof DeltaLakeScanNode || + this instanceof FileTableScanNode || this instanceof PaimonScanNode || + this instanceof OdpsScanNode || this instanceof IcebergMetadataScanNode; + } + protected String explainColumnDict(String prefix) { StringBuilder output = new StringBuilder(); if (!appliedDictStringColumns.isEmpty()) { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/BackendSelectorFactory.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/BackendSelectorFactory.java index c4e6054f07a261..0cb9d43439cabe 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/BackendSelectorFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/BackendSelectorFactory.java @@ -14,15 +14,7 @@ package com.starrocks.qe.scheduler.assignment; -import com.starrocks.planner.DeltaLakeScanNode; -import com.starrocks.planner.FileTableScanNode; -import com.starrocks.planner.HdfsScanNode; -import com.starrocks.planner.HudiScanNode; -import com.starrocks.planner.IcebergMetadataScanNode; -import com.starrocks.planner.IcebergScanNode; -import com.starrocks.planner.OdpsScanNode; import com.starrocks.planner.OlapScanNode; -import com.starrocks.planner.PaimonScanNode; import com.starrocks.planner.ScanNode; import com.starrocks.planner.SchemaScanNode; import com.starrocks.qe.BackendSelector; @@ -68,10 +60,7 @@ public static BackendSelector create(ScanNode scanNode, if (scanNode instanceof SchemaScanNode) { return new NormalBackendSelector(scanNode, locations, assignment, workerProvider, false); - } else if (scanNode instanceof HdfsScanNode || scanNode instanceof IcebergScanNode || - scanNode instanceof HudiScanNode || scanNode instanceof DeltaLakeScanNode || - scanNode instanceof FileTableScanNode || scanNode instanceof PaimonScanNode - || scanNode instanceof OdpsScanNode || scanNode instanceof IcebergMetadataScanNode) { + } else if (scanNode.isConnectorScanNode()) { return new HDFSBackendSelector(scanNode, locations, assignment, workerProvider, sessionVariable.getForceScheduleLocal(), sessionVariable.getHDFSBackendSelectorScanRangeShuffle(), useIncrementalScanRanges); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/slot/SlotEstimatorFactory.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/slot/SlotEstimatorFactory.java index 9d0d9c6c9ea867..a77da8757ba6ac 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/slot/SlotEstimatorFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/slot/SlotEstimatorFactory.java @@ -20,6 +20,7 @@ import com.starrocks.planner.PlanFragment; import com.starrocks.planner.PlanFragmentId; import com.starrocks.planner.PlanNode; +import com.starrocks.planner.ScanNode; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.DefaultCoordinator; import com.starrocks.sql.optimizer.cost.feature.CostPredictor; @@ -55,7 +56,11 @@ public int estimateSlots(QueryQueueOptions opts, ConnectContext context, Default if (CostPredictor.getServiceBasedCostPredictor().isAvailable() && coord.getPredictedCost() > 0) { memCost = coord.getPredictedCost(); } else { - memCost = (long) context.getAuditEventBuilder().build().planMemCosts; + // The estimate of planMemCosts is typically an underestimation, often several orders of magnitude smaller than + // the actual memory usage, whereas planCpuCosts tends to be relatively larger. + // Therefore, the maximum value between the two is used as the estimate for memory. + memCost = (long) Math.max(context.getAuditEventBuilder().build().planMemCosts, + context.getAuditEventBuilder().build().planCpuCosts); } long numSlotsPerWorker = memCost / opts.v2().getMemBytesPerSlot(); numSlotsPerWorker = Math.max(numSlotsPerWorker, 0); @@ -72,7 +77,7 @@ public int estimateSlots(QueryQueueOptions opts, ConnectContext context, Default public static class ParallelismBasedSlotsEstimator implements SlotEstimator { @Override public int estimateSlots(QueryQueueOptions opts, ConnectContext context, DefaultCoordinator coord) { - Map fragmentContexts = collectFragmentContexts(coord); + Map fragmentContexts = collectFragmentContexts(opts, coord); int numSlots = fragmentContexts.values().stream() .mapToInt(fragmentContext -> estimateFragmentSlots(opts, fragmentContext)) .max().orElse(1); @@ -117,13 +122,14 @@ private static int estimateNumSlotsBySourceNode(QueryQueueOptions opts, PlanNode return (int) (sourceNode.getCardinality() / opts.v2().getNumRowsPerSlot()); } - private static Map collectFragmentContexts(DefaultCoordinator coord) { + private static Map collectFragmentContexts(QueryQueueOptions opts, + DefaultCoordinator coord) { PlanFragment rootFragment = coord.getExecutionDAG().getRootFragment().getPlanFragment(); PlanNode rootNode = rootFragment.getPlanRoot(); Map contexts = Maps.newHashMap(); collectFragmentSourceNodes(rootNode, contexts); - calculateFragmentWorkers(rootFragment, contexts); + calculateFragmentWorkers(opts, rootFragment, contexts); return contexts; } @@ -138,8 +144,9 @@ private static void collectFragmentSourceNodes(PlanNode node, Map collectFragmentSourceNodes(child, contexts)); } - private static void calculateFragmentWorkers(PlanFragment fragment, Map contexts) { - fragment.getChildren().forEach(child -> calculateFragmentWorkers(child, contexts)); + private static void calculateFragmentWorkers(QueryQueueOptions opts, PlanFragment fragment, + Map contexts) { + fragment.getChildren().forEach(child -> calculateFragmentWorkers(opts, child, contexts)); FragmentContext context = contexts.get(fragment.getFragmentId()); if (context == null) { @@ -154,6 +161,10 @@ private static void calculateFragmentWorkers(PlanFragment fragment, Map 1;