Skip to content

Commit

Permalink
IGNITE-20079 SQL Calcite: Write additional performance statistics inf…
Browse files Browse the repository at this point in the history
…o for queries - Fixes #10880.

Signed-off-by: Aleksey Plekhanov <[email protected]>
  • Loading branch information
alex-plekhanov committed Sep 14, 2023
1 parent 9a21681 commit b26a4c5
Show file tree
Hide file tree
Showing 42 changed files with 913 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlDynamicParam;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.util.SqlOperatorTables;
import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.calcite.sql.validate.SqlValidator;
Expand Down Expand Up @@ -94,7 +96,10 @@
import org.apache.ignite.internal.processors.query.calcite.prepare.QueryPlanCacheImpl;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolder;
import org.apache.ignite.internal.processors.query.calcite.schema.SchemaHolderImpl;
import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlAlterUser;
import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlConformance;
import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlCreateUser;
import org.apache.ignite.internal.processors.query.calcite.sql.IgniteSqlOption;
import org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteOwnSqlOperatorTable;
import org.apache.ignite.internal.processors.query.calcite.sql.fun.IgniteStdSqlOperatorTable;
import org.apache.ignite.internal.processors.query.calcite.sql.generated.IgniteSqlParserImpl;
Expand Down Expand Up @@ -527,6 +532,22 @@ private String removeSensitive(SqlNode qry) {
@Override public SqlNode visit(SqlLiteral literal) {
return new SqlDynamicParam(-1, literal.getParserPosition());
}

@Override public SqlNode visit(SqlCall call) {
// Handle some special cases.
if (call instanceof IgniteSqlOption)
return call;
else if (call instanceof IgniteSqlCreateUser) {
return new IgniteSqlCreateUser(call.getParserPosition(), ((IgniteSqlCreateUser)call).user(),
SqlLiteral.createCharString("hidden", SqlParserPos.ZERO));
}
else if (call instanceof IgniteSqlAlterUser) {
return new IgniteSqlAlterUser(call.getParserPosition(), ((IgniteSqlAlterUser)call).user(),
SqlLiteral.createCharString("hidden", SqlParserPos.ZERO));
}

return super.visit(call);
}
}
).toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public QueryRegistryImpl(GridKernalContext ctx) {
String initiatorId = fieldsQry != null ? fieldsQry.getQueryInitiatorId() : null;

long locId = qryMgr.register(rootQry.sql(), GridCacheQueryType.SQL_FIELDS, rootQry.context().schemaName(),
false, createCancelToken(qry), initiatorId, false, false, false);
false, createCancelToken(qry), initiatorId, false, true, false);

rootQry.localQueryId(locId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
Expand Down Expand Up @@ -673,6 +674,16 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
}
}

if (perfStatProc.enabled()) {
perfStatProc.queryProperty(
GridCacheQueryType.SQL_FIELDS,
qry.initiatorNodeId(),
qry.localQueryId(),
"Query plan",
plan.textPlan()
);
}

QueryProperties qryProps = qry.context().unwrap(QueryProperties.class);

Function<Object, Object> fieldConverter = (qryProps == null || qryProps.keepBinary()) ? null :
Expand Down Expand Up @@ -720,8 +731,22 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
};
}

Runnable onClose = () -> {
if (perfStatProc.enabled()) {
perfStatProc.queryRowsProcessed(
GridCacheQueryType.SQL_FIELDS,
qry.initiatorNodeId(),
qry.localQueryId(),
"Fetched",
resultSetChecker.fetchedSize()
);
}

resultSetChecker.checkOnClose();
};

Iterator<List<?>> it = new ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
fieldConverter, rowConverter, resultSetChecker::checkOnClose);
fieldConverter, rowConverter, onClose);

return new ListFieldsQueryCursor<>(plan, it, ectx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public LogicalRelImplementor(
if (idx != null && !tbl.isIndexRebuildInProgress()) {
Iterable<Row> rowsIter = idx.scan(ctx, grp, ranges, requiredColumns);

return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj);
return new ScanStorageNode<>(idx.name(), ctx, rowType, rowsIter, filters, prj);
}
else {
// Index was invalidated after planning, workaround through table-scan -> sort -> index spool.
Expand All @@ -339,7 +339,7 @@ public LogicalRelImplementor(
if (!spoolNodeRequired && projects != null)
rowType = rel.getRowType();

Node<Row> node = new ScanStorageNode<>(ctx, rowType, rowsIter, filterHasCorrelation ? null : filters,
Node<Row> node = new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filterHasCorrelation ? null : filters,
projNodeRequired ? null : prj);

RelCollation collation = rel.collation();
Expand Down Expand Up @@ -406,14 +406,14 @@ public LogicalRelImplementor(
IgniteIndex idx = tbl.getIndex(rel.indexName());

if (idx != null && !tbl.isIndexRebuildInProgress()) {
return new ScanStorageNode<>(ctx, rel.getRowType(), () -> Collections.singletonList(ctx.rowHandler()
.factory(ctx.getTypeFactory(), rel.getRowType())
.create(idx.count(ctx, ctx.group(rel.sourceId()), rel.notNull()))).iterator());
return new ScanStorageNode<>(idx.name() + "_COUNT", ctx, rel.getRowType(),
() -> Collections.singletonList(ctx.rowHandler().factory(ctx.getTypeFactory(), rel.getRowType())
.create(idx.count(ctx, ctx.group(rel.sourceId()), rel.notNull()))).iterator());
}
else {
CollectNode<Row> replacement = CollectNode.createCountCollector(ctx);

replacement.register(new ScanStorageNode<>(ctx, rel.getTable().getRowType(), tbl.scan(ctx,
replacement.register(new ScanStorageNode<>(tbl.name(), ctx, rel.getTable().getRowType(), tbl.scan(ctx,
ctx.group(rel.sourceId()), ImmutableBitSet.of(0))));

return replacement;
Expand All @@ -429,14 +429,16 @@ public LogicalRelImplementor(
ImmutableBitSet requiredColumns = idxBndRel.requiredColumns();
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);

if (idx != null && !tbl.isIndexRebuildInProgress())
return new ScanStorageNode<>(ctx, rowType, idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns));
if (idx != null && !tbl.isIndexRebuildInProgress()) {
return new ScanStorageNode<>(idx.name() + "_BOUND", ctx, rowType,
idx.firstOrLast(idxBndRel.first(), ctx, grp, requiredColumns));
}
else {
assert requiredColumns.cardinality() == 1;

Iterable<Row> rowsIter = tbl.scan(ctx, grp, idxBndRel.requiredColumns());

Node<Row> scanNode = new ScanStorageNode<>(ctx, rowType, rowsIter,
Node<Row> scanNode = new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter,
r -> ctx.rowHandler().get(0, r) != null, null);

RelCollation collation = idx.collation().apply(LogicalScanConverterRule.createMapping(
Expand Down Expand Up @@ -481,7 +483,7 @@ public LogicalRelImplementor(

Iterable<Row> rowsIter = tbl.scan(ctx, group, requiredColunms);

return new ScanStorageNode<>(ctx, rowType, rowsIter, filters, prj);
return new ScanStorageNode<>(tbl.name(), ctx, rowType, rowsIter, filters, prj);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.query.calcite.exec.rel;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.calcite.rel.type.RelDataType;
Expand All @@ -27,41 +28,61 @@
* Scan storage node.
*/
public class ScanStorageNode<Row> extends ScanNode<Row> {
/** */
@Nullable private final AtomicLong processedRowsCntr;

/**
* @param storageName Storage (index or table) name.
* @param ctx Execution context.
* @param rowType Row type.
* @param src Source.
* @param filter Row filter.
* @param rowTransformer Row transformer (projection).
*/
public ScanStorageNode(
String storageName,
ExecutionContext<Row> ctx,
RelDataType rowType,
Iterable<Row> src,
@Nullable Predicate<Row> filter,
@Nullable Function<Row, Row> rowTransformer
) {
super(ctx, rowType, src, filter, rowTransformer);

processedRowsCntr = context().ioTracker().processedRowsCounter("Scanned " + storageName);
}

/**
* @param storageName Storage (index or table) name.
* @param ctx Execution context.
* @param rowType Row type.
* @param src Source.
*/
public ScanStorageNode(ExecutionContext<Row> ctx, RelDataType rowType, Iterable<Row> src) {
super(ctx, rowType, src);
public ScanStorageNode(String storageName, ExecutionContext<Row> ctx, RelDataType rowType, Iterable<Row> src) {
this(storageName, ctx, rowType, src, null, null);
}

/** {@inheritDoc} */
@Override protected int processNextBatch() throws Exception {
try {
context().ioTracker().startTracking();

return super.processNextBatch();
int processed = super.processNextBatch();

if (processedRowsCntr != null)
processedRowsCntr.addAndGet(processed);

return processed;
}
finally {
context().ioTracker().stopTracking();
}
}

/** */
@Override public void closeInternal() {
super.closeInternal();

context().ioTracker().flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.ignite.internal.processors.query.calcite.exec.tracker;

import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;

/**
* I/O operations tracker interface.
*/
Expand All @@ -26,4 +29,16 @@ public interface IoTracker {

/** Stop tracking and save result. */
public void stopTracking();

/**
* Register counter for processed rows.
*
* @param action Action with rows.
*/
@Nullable public AtomicLong processedRowsCounter(String action);

/**
* Flush tracked data.
*/
public void flush();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.ignite.internal.processors.query.calcite.exec.tracker;

import java.util.concurrent.atomic.AtomicLong;
import org.jetbrains.annotations.Nullable;

/**
* I/O operations tracker that does nothing.
*/
Expand All @@ -33,4 +36,14 @@ public class NoOpIoTracker implements IoTracker {
@Override public void stopTracking() {
// No-op.
}

/** {@inheritDoc} */
@Nullable @Override public AtomicLong processedRowsCounter(String action) {
return null;
}

/** {@inheritDoc} */
@Override public void flush() {
// No-op.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.ignite.internal.processors.query.calcite.exec.tracker;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsQueryHelper;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.util.typedef.T2;

/**
* Performance statistics gathering I/O operations tracker.
Expand All @@ -36,6 +40,15 @@ public class PerformanceStatisticsIoTracker implements IoTracker {
/** */
private final long originatingQryId;

/** */
private final AtomicLong logicalReads = new AtomicLong();

/** */
private final AtomicLong physicalReads = new AtomicLong();

/** */
private final List<T2<String, AtomicLong>> cntrs = new CopyOnWriteArrayList<>();

/** */
public PerformanceStatisticsIoTracker(
PerformanceStatisticsProcessor perfStatProc,
Expand All @@ -56,13 +69,45 @@ public PerformanceStatisticsIoTracker(
@Override public void stopTracking() {
IoStatisticsHolder stat = IoStatisticsQueryHelper.finishGatheringQueryStatistics();

if (stat.logicalReads() > 0 || stat.physicalReads() > 0) {
logicalReads.addAndGet(stat.logicalReads());
physicalReads.addAndGet(stat.physicalReads());
}

/** {@inheritDoc} */
@Override public AtomicLong processedRowsCounter(String action) {
AtomicLong cntr = new AtomicLong();

cntrs.add(new T2<>(action, cntr));

return cntr;
}

/** {@inheritDoc} */
@Override public void flush() {
long logicalReads = this.logicalReads.getAndSet(0);
long physicalReads = this.physicalReads.getAndSet(0);

if (logicalReads > 0 || physicalReads > 0) {
perfStatProc.queryReads(
GridCacheQueryType.SQL_FIELDS,
originatingNodeId,
originatingQryId,
stat.logicalReads(),
stat.physicalReads());
logicalReads,
physicalReads);
}

for (T2<String, AtomicLong> cntr : cntrs) {
long rowsCnt = cntr.get2().getAndSet(0);

if (rowsCnt > 0) {
perfStatProc.queryRowsProcessed(
GridCacheQueryType.SQL_FIELDS,
originatingNodeId,
originatingQryId,
cntr.get1(),
rowsCnt
);
}
}
}
}
Loading

0 comments on commit b26a4c5

Please sign in to comment.