diff --git a/fe/fe-core/src/main/java/com/starrocks/plugin/AuditEvent.java b/fe/fe-core/src/main/java/com/starrocks/plugin/AuditEvent.java index b7f1c0c2149b3..c22737e9f80c8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/plugin/AuditEvent.java +++ b/fe/fe-core/src/main/java/com/starrocks/plugin/AuditEvent.java @@ -224,6 +224,24 @@ public AuditEventBuilder setReturnRows(long returnRows) { return this; } + public AuditEventBuilder addScanBytes(long scanBytes) { + if (auditEvent.scanBytes == -1) { + auditEvent.scanBytes = scanBytes; + } else { + auditEvent.scanBytes += scanBytes; + } + return this; + } + + public AuditEventBuilder addScanRows(long scanRows) { + if (auditEvent.scanRows == -1) { + auditEvent.scanRows = scanRows; + } else { + auditEvent.scanRows += scanRows; + } + return this; + } + /** * Cpu cost in nanoseconds */ @@ -232,13 +250,30 @@ public AuditEventBuilder setCpuCostNs(long cpuNs) { return this; } - public AuditEventBuilder setMemCostBytes(long memCostBytes) { - auditEvent.memCostBytes = memCostBytes; + public AuditEventBuilder addCpuCostNs(long cpuNs) { + if (auditEvent.cpuCostNs == -1) { + auditEvent.cpuCostNs = cpuNs; + } else { + auditEvent.cpuCostNs += cpuNs; + } return this; } - public AuditEventBuilder setSpilledBytes(long spilledBytes) { - auditEvent.spilledBytes = spilledBytes; + public AuditEventBuilder addMemCostBytes(long memCostBytes) { + if (auditEvent.memCostBytes == -1) { + auditEvent.memCostBytes = memCostBytes; + } else { + auditEvent.memCostBytes = Math.max(auditEvent.memCostBytes, memCostBytes); + } + return this; + } + + public AuditEventBuilder addSpilledBytes(long spilledBytes) { + if (auditEvent.spilledBytes == -1) { + auditEvent.spilledBytes = spilledBytes; + } else { + auditEvent.spilledBytes += spilledBytes; + } return this; } @@ -334,5 +369,15 @@ public AuditEventBuilder setIsForwardToLeader(boolean isForwardToLeader) { public AuditEvent build() { return this.auditEvent; } + + // Copy execution statistics from another audit event + public void copyExecStatsFrom(AuditEvent event) { + this.auditEvent.cpuCostNs = event.cpuCostNs; + this.auditEvent.memCostBytes = event.memCostBytes; + this.auditEvent.scanBytes = event.scanBytes; + this.auditEvent.scanRows = event.scanRows; + this.auditEvent.spilledBytes = event.spilledBytes; + this.auditEvent.returnRows = event.returnRows; + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java index 80f39a0bc1169..94b2ca7c34ba2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/ConnectProcessor.java @@ -200,14 +200,6 @@ public void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStat .setStmtId(ctx.getStmtId()) .setIsForwardToLeader(isForwardToLeader) .setQueryId(ctx.getQueryId() == null ? "NaN" : ctx.getQueryId().toString()); - if (statistics != null) { - ctx.getAuditEventBuilder().setScanBytes(statistics.scanBytes); - ctx.getAuditEventBuilder().setScanRows(statistics.scanRows); - ctx.getAuditEventBuilder().setCpuCostNs(statistics.cpuCostNs == null ? -1 : statistics.cpuCostNs); - ctx.getAuditEventBuilder().setMemCostBytes(statistics.memCostBytes == null ? -1 : statistics.memCostBytes); - ctx.getAuditEventBuilder().setSpilledBytes(statistics.spillBytes == null ? -1 : statistics.spillBytes); - ctx.getAuditEventBuilder().setReturnRows(statistics.returnedRows == null ? 0 : statistics.returnedRows); - } if (ctx.getState().isQuery()) { MetricRepo.COUNTER_QUERY_ALL.increase(1L); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java index 311c7759326c3..59deec68f2e87 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java @@ -110,6 +110,7 @@ import com.starrocks.planner.PlanFragment; import com.starrocks.planner.PlanNodeId; import com.starrocks.planner.ScanNode; +import com.starrocks.plugin.AuditEvent; import com.starrocks.privilege.AccessDeniedException; import com.starrocks.privilege.ObjectType; import com.starrocks.privilege.PrivilegeException; @@ -809,9 +810,27 @@ public void execute() throws Exception { if (shouldMarkIdleCheck(parsedStmt)) { WarehouseIdleChecker.decreaseRunningSQL(context.getCurrentWarehouseId()); } + + recordExecStatsIntoContext(); } } + /** + * record execution stats for all kinds of statement + * some statements may execute multiple statement which will also create multiple StmtExecutor, so here + * we accumulate them into the ConnectContext instead of using the last one + */ + private void recordExecStatsIntoContext() { + PQueryStatistics execStats = getQueryStatisticsForAuditLog(); + context.getAuditEventBuilder().addCpuCostNs(execStats.getCpuCostNs() != null ? execStats.getCpuCostNs() : 0); + context.getAuditEventBuilder() + .addMemCostBytes(execStats.getMemCostBytes() != null ? execStats.getMemCostBytes() : 0); + context.getAuditEventBuilder().addScanBytes(execStats.getScanBytes() != null ? execStats.getScanBytes() : 0); + context.getAuditEventBuilder().addScanRows(execStats.getScanRows() != null ? execStats.getScanRows() : 0); + context.getAuditEventBuilder().addSpilledBytes(execStats.spillBytes != null ? execStats.spillBytes : 0); + context.getAuditEventBuilder().setReturnRows(execStats.returnedRows == null ? 0 : execStats.returnedRows); + } + private void clearQueryScopeHintContext() { Iterator> iterator = context.userVariables.entrySet().iterator(); while (iterator.hasNext()) { @@ -1312,27 +1331,41 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception { } } - statisticsForAuditLog = batch.getQueryStatistics(); - if (!isOutfileQuery) { - context.getState().setEof(); - } else { - context.getState().setOk(statisticsForAuditLog.returnedRows, 0, ""); - } - if (null == statisticsForAuditLog || null == statisticsForAuditLog.statsItems || - statisticsForAuditLog.statsItems.isEmpty()) { - return; - } - // collect table-level metrics - Set tableIds = Sets.newHashSet(); - for (QueryStatisticsItemPB item : statisticsForAuditLog.statsItems) { - TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(item.tableId); - entity.counterScanRowsTotal.increase(item.scanRows); - entity.counterScanBytesTotal.increase(item.scanBytes); - tableIds.add(item.tableId); - } - for (Long tableId : tableIds) { - TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId); - entity.counterScanFinishedTotal.increase(1L); + processQueryStatisticsFromResult(batch, execPlan, isOutfileQuery); + } + + /** + * The query result batch will piggyback query statistics in it + */ + private void processQueryStatisticsFromResult(RowBatch batch, ExecPlan execPlan, boolean isOutfileQuery) { + if (batch != null) { + statisticsForAuditLog = batch.getQueryStatistics(); + if (!isOutfileQuery) { + context.getState().setEof(); + } else { + context.getState().setOk(statisticsForAuditLog.returnedRows, 0, ""); + } + + if (null == statisticsForAuditLog || null == statisticsForAuditLog.statsItems || + statisticsForAuditLog.statsItems.isEmpty()) { + return; + } + + // collect table-level metrics + Set tableIds = Sets.newHashSet(); + for (QueryStatisticsItemPB item : statisticsForAuditLog.statsItems) { + if (item == null) { + continue; + } + TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(item.tableId); + entity.counterScanRowsTotal.increase(item.scanRows); + entity.counterScanBytesTotal.increase(item.scanBytes); + tableIds.add(item.tableId); + } + for (Long tableId : tableIds) { + TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId); + entity.counterScanFinishedTotal.increase(1L); + } } } @@ -1439,6 +1472,10 @@ private void executeAnalyze(AnalyzeStmt analyzeStmt, AnalyzeStatus analyzeStatus statsConnectCtx.setStatisticsConnection(true); try (var guard = statsConnectCtx.bindScope()) { executeAnalyze(statsConnectCtx, analyzeStmt, analyzeStatus, db, table); + } finally { + // copy the stats to current context + AuditEvent event = statsConnectCtx.getAuditEventBuilder().build(); + context.getAuditEventBuilder().copyExecStatsFrom(event); } } @@ -2571,6 +2608,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception { GlobalStateMgr.getCurrentState().getOperationListenerBus() .onDMLStmtJobTransactionFinish(txnState, database, targetTable, dmlType); } + recordExecStatsIntoContext(); } String errMsg = ""; @@ -2633,11 +2671,13 @@ public Pair, Status> executeStmtWithExecPlan(ConnectContext c sqlResult.add(batch.getBatch()); } } while (!batch.isEos()); + processQueryStatisticsFromResult(batch, plan, false); } catch (Exception e) { LOG.warn("Failed to execute executeStmtWithExecPlan", e); coord.getExecStatus().setInternalErrorStatus(e.getMessage()); } finally { QeProcessorImpl.INSTANCE.unregisterQuery(context.getExecutionId()); + recordExecStatsIntoContext(); } return Pair.create(sqlResult, coord.getExecStatus()); } @@ -2692,6 +2732,7 @@ public void executeStmtWithResultQueue(ConnectContext context, ExecPlan plan, Qu } } while (!batch.isEos()); context.getState().setEof(); + processQueryStatisticsFromResult(batch, plan, false); } catch (Exception e) { LOG.error("Failed to execute metadata collection job", e); if (coord.getExecStatus().ok()) { @@ -2706,6 +2747,7 @@ public void executeStmtWithResultQueue(ConnectContext context, ExecPlan plan, Qu } else { QeProcessorImpl.INSTANCE.unregisterQuery(context.getExecutionId()); } + recordExecStatsIntoContext(); } }