diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java index 38f67f59d1ff0..815b593cde21e 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/Query.java @@ -117,12 +117,20 @@ public UUID initiatorNodeId() { return initNodeId; } + /** */ + public void onError(Throwable failure) { + tryClose(failure); + } + /** */ protected void tryClose(@Nullable Throwable failure) { List> fragments = new ArrayList<>(this.fragments); AtomicInteger cntDown = new AtomicInteger(fragments.size()); + if (cntDown.get() == 0) + unregister.accept(this, failure); + for (RunningFragment frag : fragments) { frag.context().execute(() -> { frag.root().close(); @@ -148,8 +156,6 @@ public void cancel() { if (state == QueryState.INITED) { state = QueryState.CLOSING; - assert memoryTracker == null; - try { exch.closeQuery(initNodeId, id); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java index fd97c0cf1d87b..7bf9df35469b6 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/RootQuery.java @@ -378,7 +378,7 @@ else if (state == QueryState.CLOSING) } /** */ - public void onError(Throwable error) { + @Override public void onError(Throwable error) { root.onError(error); tryClose(error); diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java index bfea6190758d6..0b5e4c4893574 100644 --- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java +++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java @@ -805,19 +805,10 @@ private void onMessage(UUID nodeId, final QueryStartRequest msg) { } catch (IgniteCheckedException e) { U.error(log, "Error occurred during send error message: " + X.getFullStackTrace(e)); - - IgniteException wrpEx = new IgniteException("Error occurred during send error message", e); - - e.addSuppressed(ex); - - Query qry = (Query)qryReg.query(msg.queryId()); - - qry.cancel(); - - throw wrpEx; } - - throw ex; + finally { + qryReg.query(msg.queryId()).onError(ex); + } } } @@ -853,7 +844,7 @@ private void onMessage(UUID nodeId, ErrorMessage msg) { ); } - ((RootQuery)qry).onError(e); + qry.onError(e); } } diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java index c0635c7241df5..4aa7a43a1de50 100644 --- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java +++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/RunningQueriesIntegrationTest.java @@ -378,4 +378,49 @@ public void testQueryInitiator() throws IgniteCheckedException { assertTrue(GridTestUtils.waitForCondition(() -> F.isEmpty(engine.runningQueries()), PLANNER_TIMEOUT * 2)); } + + /** */ + @Test + public void testErrorOnRemoteFragment() throws Exception { + MetricRegistry mreg = client.context().metric().registry(SQL_USER_QUERIES_REG_NAME); + mreg.reset(); + + CalciteQueryProcessor clientEngine = queryProcessor(client); + CalciteQueryProcessor srvEngine = queryProcessor(srv); + + sql("CREATE TABLE t(id int, val varchar)"); + + IgniteCacheTable oldTbl = (IgniteCacheTable)srvEngine.schemaHolder().schema("PUBLIC").getTable("T"); + + CountDownLatch initLatch = new CountDownLatch(1); + + IgniteCacheTable newTbl = new CacheTableImpl(srv.context(), oldTbl.descriptor()) { + @Override public Iterable scan( + ExecutionContext execCtx, + ColocationGroup grp, + Predicate filter, + Function rowTransformer, + @Nullable ImmutableBitSet usedColumns + ) { + initLatch.countDown(); + + throw new IllegalStateException("Init error"); + } + }; + + srvEngine.schemaHolder().schema("PUBLIC").add("T", newTbl); + + IgniteInternalFuture>> fut = GridTestUtils.runAsync(() -> sql("SELECT * FROM t")); + + initLatch.await(TIMEOUT_IN_MS, TimeUnit.MILLISECONDS); + + assertTrue(GridTestUtils.waitForCondition( + () -> clientEngine.runningQueries().isEmpty() && srvEngine.runningQueries().isEmpty(), + TIMEOUT_IN_MS)); + + GridTestUtils.assertThrowsAnyCause(log, () -> fut.get(100), IllegalStateException.class, "Init error"); + + assertEquals(0, ((LongMetric)mreg.findMetric("canceled")).value()); + assertEquals(1, ((LongMetric)mreg.findMetric("failed")).value()); + } }