Skip to content

Commit

Permalink
IGNITE-19748 SQL Calcite: Queries timeout support - Fixes #10827.
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksey Plekhanov <[email protected]>
  • Loading branch information
alex-plekhanov committed Jul 18, 2023
1 parent 621bf86 commit 6a1cca6
Show file tree
Hide file tree
Showing 24 changed files with 505 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
/** */
private final CalciteQueryEngineConfiguration cfg;

/** */
private final DistributedCalciteConfiguration distrCfg;

/** */
private volatile boolean started;

Expand Down Expand Up @@ -253,6 +256,8 @@ public CalciteQueryProcessor(GridKernalContext ctx) {
.findAny()
.orElse(new CalciteQueryEngineConfiguration());
}

distrCfg = new DistributedCalciteConfiguration(ctx, log);
}

/**
Expand Down Expand Up @@ -548,6 +553,11 @@ private <T> T processQuery(
) {
SqlFieldsQuery fldsQry = qryCtx != null ? qryCtx.unwrap(SqlFieldsQuery.class) : null;

long timeout = fldsQry != null ? fldsQry.getTimeout() : 0;

if (timeout <= 0)
timeout = distrCfg.defaultQueryTimeout();

RootQuery<Object[]> qry = new RootQuery<>(
sql,
schemaHolder.schema(schema),
Expand All @@ -557,7 +567,8 @@ private <T> T processQuery(
exchangeSvc,
(q, ex) -> qryReg.unregister(q.id(), ex),
log,
queryPlannerTimeout
queryPlannerTimeout,
timeout
);

if (qrys != null)
Expand Down Expand Up @@ -665,4 +676,9 @@ public QueryRegistry queryRegistry() {
public CalciteQueryEngineConfiguration config() {
return cfg;
}

/** */
public DistributedCalciteConfiguration distributedConfiguration() {
return distrCfg;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite;

import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.DistributedSqlConfiguration;

/** Distributed Calcite-engine configuration. */
public class DistributedCalciteConfiguration extends DistributedSqlConfiguration {
/** */
public DistributedCalciteConfiguration(GridKernalContext ctx, IgniteLogger log) {
super(ctx, log);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public class RootQuery<RowT> extends Query<RowT> implements TrackableQuery {
/** */
private final long plannerTimeout;

/** */
private final long totalTimeout;

/** */
private volatile long locQryId;

Expand All @@ -107,7 +110,8 @@ public RootQuery(
ExchangeService exch,
BiConsumer<Query<RowT>, Throwable> unregister,
IgniteLogger log,
long plannerTimeout
long plannerTimeout,
long totalTimeout
) {
super(
UUID.randomUUID(),
Expand All @@ -127,7 +131,8 @@ public RootQuery(
remoteFragments = new HashMap<>();
waiting = new HashSet<>();

this.plannerTimeout = plannerTimeout;
this.plannerTimeout = totalTimeout > 0 ? Math.min(plannerTimeout, totalTimeout) : plannerTimeout;
this.totalTimeout = totalTimeout;

Context parent = Commons.convert(qryCtx);

Expand All @@ -152,7 +157,8 @@ public RootQuery(
* @param schema new schema.
*/
public RootQuery<RowT> childQuery(SchemaPlus schema) {
return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), ctx.isLocal(), exch, unregister, log, plannerTimeout);
return new RootQuery<>(sql, schema, params, QueryContext.of(cancel), ctx.isLocal(), exch, unregister, log,
plannerTimeout, totalTimeout);
}

/** */
Expand Down Expand Up @@ -420,6 +426,7 @@ public void onError(Throwable error) {
msgSb.append(", planningTime=").append(root == null ? U.currentTimeMillis() - startTs : planningTime).append("ms")
.append(", execTime=").append(root == null ? 0 : root.execTime()).append("ms")
.append(", idleTime=").append(root == null ? 0 : root.idleTime()).append("ms")
.append(", timeout=").append(totalTimeout).append("ms")
.append(", type=CALCITE")
.append(", state=").append(state)
.append(", schema=").append(ctx.schemaName())
Expand All @@ -435,6 +442,18 @@ public void onError(Throwable error) {
return root == null ? U.currentTimeMillis() - startTs : planningTime + root.execTime();
}

/**
* @return Time left to execute the query, {@code -1} if timeout is not set, {@code 0} if timeout reached.
*/
public long remainingTime() {
if (totalTimeout <= 0)
return -1;

long curTimeout = totalTimeout - (U.currentTimeMillis() - startTs);

return curTimeout <= 0 ? 0 : curTimeout;
}

/** */
@Override public String toString() {
return S.toString(RootQuery.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ private ExecutionContext<?> baseInboxContext(UUID nodeId, UUID qryId, long fragm
null,
NoOpMemoryTracker.INSTANCE,
NoOpIoTracker.INSTANCE,
0,
ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.TypeUtils;
import org.apache.ignite.internal.util.lang.RunnableX;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;

import static org.apache.ignite.internal.processors.query.calcite.util.Commons.checkRange;
Expand Down Expand Up @@ -101,6 +102,12 @@ public class ExecutionContext<Row> extends AbstractQueryContext implements DataC
/** */
private final IoTracker ioTracker;

/** */
private final long timeout;

/** */
private final long startTs;

/** */
private Object[] correlations = new Object[16];

Expand All @@ -122,6 +129,7 @@ public ExecutionContext(
RowHandler<Row> handler,
MemoryTracker qryMemoryTracker,
IoTracker ioTracker,
long timeout,
Map<String, Object> params
) {
super(qctx);
Expand All @@ -136,6 +144,9 @@ public ExecutionContext(
this.qryMemoryTracker = qryMemoryTracker;
this.ioTracker = ioTracker;
this.params = params;
this.timeout = timeout;

startTs = U.currentTimeMillis();

baseDataContext = new BaseDataContext(qctx.typeFactory());

Expand Down Expand Up @@ -324,6 +335,11 @@ public boolean isCancelled() {
return cancelFlag.get();
}

/** */
public boolean isTimedOut() {
return timeout > 0 && U.currentTimeMillis() - startTs >= timeout;
}

/** */
public Object unspecifiedValue() {
return UNSPECIFIED_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,13 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
assert nodes != null && nodes.size() == 1 && F.first(nodes).equals(localNodeId());
}

long timeout = qry.remainingTime();

if (timeout == 0) {
throw new IgniteSQLException("The query was cancelled due to timeout", IgniteQueryErrorCode.QUERY_CANCELED,
new QueryCancelledException());
}

FragmentDescription fragmentDesc = new FragmentDescription(
fragment.fragmentId(),
plan.mapping(fragment),
Expand All @@ -596,6 +603,7 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
handler,
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
createIoTracker(locNodeId, qry.localQueryId()),
timeout,
Commons.parametersMap(qry.parameters()));

Node<Row> node = new LogicalRelImplementor<>(ectx, partitionService(), mailboxRegistry(),
Expand Down Expand Up @@ -634,7 +642,8 @@ private ListFieldsQueryCursor<?> mapAndExecutePlan(
fragmentDesc,
fragmentsPerNode.get(nodeId).intValue(),
qry.parameters(),
parametersMarshalled
parametersMarshalled,
timeout
);

messageService().send(nodeId, req);
Expand Down Expand Up @@ -777,6 +786,7 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
handler,
qry.createMemoryTracker(memoryTracker, cfg.getQueryMemoryQuota()),
createIoTracker(nodeId, msg.originatingQryId()),
msg.timeout(),
Commons.parametersMap(msg.parameters())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,7 @@ protected void rowType(RelDataType rowType) {
* @param e Exception.
*/
public void onError(Throwable e) {
if (e instanceof QueryCancelledException)
U.warn(context().logger(), "Execution is cancelled.", e);
else
onErrorInternal(e);
onErrorInternal(e);
}

/** */
Expand Down Expand Up @@ -185,6 +182,8 @@ protected boolean isClosed() {
protected void checkState() throws Exception {
if (context().isCancelled())
throw new QueryCancelledException();
if (context().isTimedOut())
throw new QueryCancelledException("The query was timed out.");
if (Thread.interrupted())
throw new IgniteInterruptedCheckedException("Thread was interrupted.");
if (!U.assertionsEnabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,6 @@ public void init() {
flush();
}

/** {@inheritDoc} */
@Override public void onError(Throwable e) {
onErrorInternal(e);
}

/** {@inheritDoc} */
@Override protected void onErrorInternal(Throwable e) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public class QueryStartRequest implements MarshalableMessage, ExecutionContextAw
/** */
private byte[] paramsBytes;

/** */
private long timeout;

/** */
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
public QueryStartRequest(
Expand All @@ -70,7 +73,8 @@ public QueryStartRequest(
FragmentDescription fragmentDesc,
int totalFragmentsCnt,
Object[] params,
@Nullable byte[] paramsBytes
@Nullable byte[] paramsBytes,
long timeout
) {
this.qryId = qryId;
this.originatingQryId = originatingQryId;
Expand All @@ -81,6 +85,7 @@ public QueryStartRequest(
this.totalFragmentsCnt = totalFragmentsCnt;
this.params = params;
this.paramsBytes = paramsBytes; // If we already have marshalled params, use it.
this.timeout = timeout;
}

/** */
Expand Down Expand Up @@ -154,6 +159,13 @@ public byte[] parametersMarshalled() {
return paramsBytes;
}

/**
* @return Query timeout.
*/
public long timeout() {
return timeout;
}

/** {@inheritDoc} */
@Override public void prepareMarshal(MarshallingContext ctx) throws IgniteCheckedException {
if (paramsBytes == null && params != null)
Expand Down Expand Up @@ -219,12 +231,18 @@ public byte[] parametersMarshalled() {
writer.incrementState();

case 6:
if (!writer.writeInt("totalFragmentsCnt", totalFragmentsCnt))
if (!writer.writeLong("timeout", timeout))
return false;

writer.incrementState();

case 7:
if (!writer.writeInt("totalFragmentsCnt", totalFragmentsCnt))
return false;

writer.incrementState();

case 8:
if (!writer.writeAffinityTopologyVersion("ver", ver))
return false;

Expand Down Expand Up @@ -292,14 +310,22 @@ public byte[] parametersMarshalled() {
reader.incrementState();

case 6:
totalFragmentsCnt = reader.readInt("totalFragmentsCnt");
timeout = reader.readLong("timeout");

if (!reader.isLastRead())
return false;

reader.incrementState();

case 7:
totalFragmentsCnt = reader.readInt("totalFragmentsCnt");

if (!reader.isLastRead())
return false;

reader.incrementState();

case 8:
ver = reader.readAffinityTopologyVersion("ver");

if (!reader.isLastRead())
Expand All @@ -319,6 +345,6 @@ public byte[] parametersMarshalled() {

/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 8;
return 9;
}
}
Loading

0 comments on commit 6a1cca6

Please sign in to comment.