Skip to content

Commit

Permalink
[Enhancement] record audit stats for all statements (#56257)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
(cherry picked from commit 1bc0233)
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Feb 26, 2025
1 parent 52134ca commit d161eda
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 33 deletions.
53 changes: 49 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/plugin/AuditEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,24 @@ public AuditEventBuilder setReturnRows(long returnRows) {
return this;
}

public AuditEventBuilder addScanBytes(long scanBytes) {
if (auditEvent.scanBytes == -1) {
auditEvent.scanBytes = scanBytes;
} else {
auditEvent.scanBytes += scanBytes;
}
return this;
}

public AuditEventBuilder addScanRows(long scanRows) {
if (auditEvent.scanRows == -1) {
auditEvent.scanRows = scanRows;
} else {
auditEvent.scanRows += scanRows;
}
return this;
}

/**
* Cpu cost in nanoseconds
*/
Expand All @@ -232,13 +250,30 @@ public AuditEventBuilder setCpuCostNs(long cpuNs) {
return this;
}

public AuditEventBuilder setMemCostBytes(long memCostBytes) {
auditEvent.memCostBytes = memCostBytes;
public AuditEventBuilder addCpuCostNs(long cpuNs) {
if (auditEvent.cpuCostNs == -1) {
auditEvent.cpuCostNs = cpuNs;
} else {
auditEvent.cpuCostNs += cpuNs;
}
return this;
}

public AuditEventBuilder setSpilledBytes(long spilledBytes) {
auditEvent.spilledBytes = spilledBytes;
public AuditEventBuilder addMemCostBytes(long memCostBytes) {
if (auditEvent.memCostBytes == -1) {
auditEvent.memCostBytes = memCostBytes;
} else {
auditEvent.memCostBytes = Math.max(auditEvent.memCostBytes, memCostBytes);
}
return this;
}

public AuditEventBuilder addSpilledBytes(long spilledBytes) {
if (auditEvent.spilledBytes == -1) {
auditEvent.spilledBytes = spilledBytes;
} else {
auditEvent.spilledBytes += spilledBytes;
}
return this;
}

Expand Down Expand Up @@ -334,5 +369,15 @@ public AuditEventBuilder setIsForwardToLeader(boolean isForwardToLeader) {
public AuditEvent build() {
return this.auditEvent;
}

// Copy execution statistics from another audit event
public void copyExecStatsFrom(AuditEvent event) {
this.auditEvent.cpuCostNs = event.cpuCostNs;
this.auditEvent.memCostBytes = event.memCostBytes;
this.auditEvent.scanBytes = event.scanBytes;
this.auditEvent.scanRows = event.scanRows;
this.auditEvent.spilledBytes = event.spilledBytes;
this.auditEvent.returnRows = event.returnRows;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,6 @@ public void auditAfterExec(String origStmt, StatementBase parsedStmt, PQueryStat
.setStmtId(ctx.getStmtId())
.setIsForwardToLeader(isForwardToLeader)
.setQueryId(ctx.getQueryId() == null ? "NaN" : ctx.getQueryId().toString());
if (statistics != null) {
ctx.getAuditEventBuilder().setScanBytes(statistics.scanBytes);
ctx.getAuditEventBuilder().setScanRows(statistics.scanRows);
ctx.getAuditEventBuilder().setCpuCostNs(statistics.cpuCostNs == null ? -1 : statistics.cpuCostNs);
ctx.getAuditEventBuilder().setMemCostBytes(statistics.memCostBytes == null ? -1 : statistics.memCostBytes);
ctx.getAuditEventBuilder().setSpilledBytes(statistics.spillBytes == null ? -1 : statistics.spillBytes);
ctx.getAuditEventBuilder().setReturnRows(statistics.returnedRows == null ? 0 : statistics.returnedRows);
}

if (ctx.getState().isQuery()) {
MetricRepo.COUNTER_QUERY_ALL.increase(1L);
Expand Down
84 changes: 63 additions & 21 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.PlanNodeId;
import com.starrocks.planner.ScanNode;
import com.starrocks.plugin.AuditEvent;
import com.starrocks.privilege.AccessDeniedException;
import com.starrocks.privilege.ObjectType;
import com.starrocks.privilege.PrivilegeException;
Expand Down Expand Up @@ -809,9 +810,27 @@ public void execute() throws Exception {
if (shouldMarkIdleCheck(parsedStmt)) {
WarehouseIdleChecker.decreaseRunningSQL(context.getCurrentWarehouseId());
}

recordExecStatsIntoContext();
}
}

/**
* record execution stats for all kinds of statement
* some statements may execute multiple statement which will also create multiple StmtExecutor, so here
* we accumulate them into the ConnectContext instead of using the last one
*/
private void recordExecStatsIntoContext() {
PQueryStatistics execStats = getQueryStatisticsForAuditLog();
context.getAuditEventBuilder().addCpuCostNs(execStats.getCpuCostNs() != null ? execStats.getCpuCostNs() : 0);
context.getAuditEventBuilder()
.addMemCostBytes(execStats.getMemCostBytes() != null ? execStats.getMemCostBytes() : 0);
context.getAuditEventBuilder().addScanBytes(execStats.getScanBytes() != null ? execStats.getScanBytes() : 0);
context.getAuditEventBuilder().addScanRows(execStats.getScanRows() != null ? execStats.getScanRows() : 0);
context.getAuditEventBuilder().addSpilledBytes(execStats.spillBytes != null ? execStats.spillBytes : 0);
context.getAuditEventBuilder().setReturnRows(execStats.returnedRows == null ? 0 : execStats.returnedRows);
}

private void clearQueryScopeHintContext() {
Iterator<Map.Entry<String, UserVariable>> iterator = context.userVariables.entrySet().iterator();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -1312,27 +1331,41 @@ private void handleQueryStmt(ExecPlan execPlan) throws Exception {
}
}

statisticsForAuditLog = batch.getQueryStatistics();
if (!isOutfileQuery) {
context.getState().setEof();
} else {
context.getState().setOk(statisticsForAuditLog.returnedRows, 0, "");
}
if (null == statisticsForAuditLog || null == statisticsForAuditLog.statsItems ||
statisticsForAuditLog.statsItems.isEmpty()) {
return;
}
// collect table-level metrics
Set<Long> tableIds = Sets.newHashSet();
for (QueryStatisticsItemPB item : statisticsForAuditLog.statsItems) {
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(item.tableId);
entity.counterScanRowsTotal.increase(item.scanRows);
entity.counterScanBytesTotal.increase(item.scanBytes);
tableIds.add(item.tableId);
}
for (Long tableId : tableIds) {
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
entity.counterScanFinishedTotal.increase(1L);
processQueryStatisticsFromResult(batch, execPlan, isOutfileQuery);
}

/**
* The query result batch will piggyback query statistics in it
*/
private void processQueryStatisticsFromResult(RowBatch batch, ExecPlan execPlan, boolean isOutfileQuery) {
if (batch != null) {
statisticsForAuditLog = batch.getQueryStatistics();
if (!isOutfileQuery) {
context.getState().setEof();
} else {
context.getState().setOk(statisticsForAuditLog.returnedRows, 0, "");
}

if (null == statisticsForAuditLog || null == statisticsForAuditLog.statsItems ||
statisticsForAuditLog.statsItems.isEmpty()) {
return;
}

// collect table-level metrics
Set<Long> tableIds = Sets.newHashSet();
for (QueryStatisticsItemPB item : statisticsForAuditLog.statsItems) {
if (item == null) {
continue;
}
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(item.tableId);
entity.counterScanRowsTotal.increase(item.scanRows);
entity.counterScanBytesTotal.increase(item.scanBytes);
tableIds.add(item.tableId);
}
for (Long tableId : tableIds) {
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(tableId);
entity.counterScanFinishedTotal.increase(1L);
}
}
}

Expand Down Expand Up @@ -1439,6 +1472,10 @@ private void executeAnalyze(AnalyzeStmt analyzeStmt, AnalyzeStatus analyzeStatus
statsConnectCtx.setStatisticsConnection(true);
try (var guard = statsConnectCtx.bindScope()) {
executeAnalyze(statsConnectCtx, analyzeStmt, analyzeStatus, db, table);
} finally {
// copy the stats to current context
AuditEvent event = statsConnectCtx.getAuditEventBuilder().build();
context.getAuditEventBuilder().copyExecStatsFrom(event);
}

}
Expand Down Expand Up @@ -2571,6 +2608,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
GlobalStateMgr.getCurrentState().getOperationListenerBus()
.onDMLStmtJobTransactionFinish(txnState, database, targetTable, dmlType);
}
recordExecStatsIntoContext();
}

String errMsg = "";
Expand Down Expand Up @@ -2633,11 +2671,13 @@ public Pair<List<TResultBatch>, Status> executeStmtWithExecPlan(ConnectContext c
sqlResult.add(batch.getBatch());
}
} while (!batch.isEos());
processQueryStatisticsFromResult(batch, plan, false);
} catch (Exception e) {
LOG.warn("Failed to execute executeStmtWithExecPlan", e);
coord.getExecStatus().setInternalErrorStatus(e.getMessage());
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(context.getExecutionId());
recordExecStatsIntoContext();
}
return Pair.create(sqlResult, coord.getExecStatus());
}
Expand Down Expand Up @@ -2692,6 +2732,7 @@ public void executeStmtWithResultQueue(ConnectContext context, ExecPlan plan, Qu
}
} while (!batch.isEos());
context.getState().setEof();
processQueryStatisticsFromResult(batch, plan, false);
} catch (Exception e) {
LOG.error("Failed to execute metadata collection job", e);
if (coord.getExecStatus().ok()) {
Expand All @@ -2706,6 +2747,7 @@ public void executeStmtWithResultQueue(ConnectContext context, ExecPlan plan, Qu
} else {
QeProcessorImpl.INSTANCE.unregisterQuery(context.getExecutionId());
}
recordExecStatsIntoContext();
}
}

Expand Down

0 comments on commit d161eda

Please sign in to comment.