Skip to content

Commit

Permalink
IGNITE-20010 SQL Calcite: Fix query leak on remote fragment initializ…
Browse files Browse the repository at this point in the history
…ation phase failure - Fixes #10849.

Signed-off-by: Aleksey Plekhanov <[email protected]>
  • Loading branch information
alex-plekhanov committed Jul 21, 2023
1 parent a3b0e9f commit 8fb4c1a
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,20 @@ public UUID initiatorNodeId() {
return initNodeId;
}

/** */
public void onError(Throwable failure) {
tryClose(failure);
}

/** */
protected void tryClose(@Nullable Throwable failure) {
List<RunningFragment<RowT>> fragments = new ArrayList<>(this.fragments);

AtomicInteger cntDown = new AtomicInteger(fragments.size());

if (cntDown.get() == 0)
unregister.accept(this, failure);

for (RunningFragment<RowT> frag : fragments) {
frag.context().execute(() -> {
frag.root().close();
Expand All @@ -148,8 +156,6 @@ public void cancel() {
if (state == QueryState.INITED) {
state = QueryState.CLOSING;

assert memoryTracker == null;

try {
exch.closeQuery(initNodeId, id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ else if (state == QueryState.CLOSING)
}

/** */
public void onError(Throwable error) {
@Override public void onError(Throwable error) {
root.onError(error);

tryClose(error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,19 +805,10 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) {
}
catch (IgniteCheckedException e) {
U.error(log, "Error occurred during send error message: " + X.getFullStackTrace(e));

IgniteException wrpEx = new IgniteException("Error occurred during send error message", e);

e.addSuppressed(ex);

Query<Row> qry = (Query<Row>)qryReg.query(msg.queryId());

qry.cancel();

throw wrpEx;
}

throw ex;
finally {
qryReg.query(msg.queryId()).onError(ex);
}
}
}

Expand Down Expand Up @@ -853,7 +844,7 @@ private void onMessage(UUID nodeId, ErrorMessage msg) {
);
}

((RootQuery<Row>)qry).onError(e);
qry.onError(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,4 +378,49 @@ public void testQueryInitiator() throws IgniteCheckedException {

assertTrue(GridTestUtils.waitForCondition(() -> F.isEmpty(engine.runningQueries()), PLANNER_TIMEOUT * 2));
}

/** */
@Test
public void testErrorOnRemoteFragment() throws Exception {
MetricRegistry mreg = client.context().metric().registry(SQL_USER_QUERIES_REG_NAME);
mreg.reset();

CalciteQueryProcessor clientEngine = queryProcessor(client);
CalciteQueryProcessor srvEngine = queryProcessor(srv);

sql("CREATE TABLE t(id int, val varchar)");

IgniteCacheTable oldTbl = (IgniteCacheTable)srvEngine.schemaHolder().schema("PUBLIC").getTable("T");

CountDownLatch initLatch = new CountDownLatch(1);

IgniteCacheTable newTbl = new CacheTableImpl(srv.context(), oldTbl.descriptor()) {
@Override public <Row> Iterable<Row> scan(
ExecutionContext<Row> execCtx,
ColocationGroup grp,
Predicate<Row> filter,
Function<Row, Row> rowTransformer,
@Nullable ImmutableBitSet usedColumns
) {
initLatch.countDown();

throw new IllegalStateException("Init error");
}
};

srvEngine.schemaHolder().schema("PUBLIC").add("T", newTbl);

IgniteInternalFuture<List<List<?>>> fut = GridTestUtils.runAsync(() -> sql("SELECT * FROM t"));

initLatch.await(TIMEOUT_IN_MS, TimeUnit.MILLISECONDS);

assertTrue(GridTestUtils.waitForCondition(
() -> clientEngine.runningQueries().isEmpty() && srvEngine.runningQueries().isEmpty(),
TIMEOUT_IN_MS));

GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IllegalStateException.class, "Init error");

assertEquals(0, ((LongMetric)mreg.findMetric("canceled")).value());
assertEquals(1, ((LongMetric)mreg.findMetric("failed")).value());
}
}

0 comments on commit 8fb4c1a

Please sign in to comment.