diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java index a38ef923bbb38..ac0b411109089 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/exchange/MPPDataExchangeManager.java @@ -585,11 +585,13 @@ public MPPDataExchangeManager( ExecutorService executorService, IClientManager mppDataExchangeServiceClientManager) { - this.localMemoryManager = Validate.notNull(localMemoryManager); - this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory); - this.executorService = Validate.notNull(executorService); + this.localMemoryManager = Validate.notNull(localMemoryManager, "localMemoryManager is null."); + this.tsBlockSerdeFactory = + Validate.notNull(tsBlockSerdeFactory, "tsBlockSerdeFactory is null."); + this.executorService = Validate.notNull(executorService, "executorService is null."); this.mppDataExchangeServiceClientManager = - Validate.notNull(mppDataExchangeServiceClientManager); + Validate.notNull( + mppDataExchangeServiceClientManager, "mppDataExchangeServiceClientManager is null."); sourceHandles = new ConcurrentHashMap<>(); shuffleSinkHandles = new ConcurrentHashMap<>(); } @@ -601,10 +603,11 @@ public MPPDataExchangeServiceImpl getOrCreateMPPDataExchangeServiceImpl() { return mppDataExchangeService; } - public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String fragmentInstanceId) { + public void deRegisterFragmentInstanceFromMemoryPool( + String queryId, String fragmentInstanceId, boolean forceDeregister) { localMemoryManager .getQueryPool() - .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId); + .deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId, forceDeregister); } public LocalMemoryManager getLocalMemoryManager() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index d4ba3abd9697c..b31d4e13eeb94 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -309,7 +309,7 @@ private void initialize(IDriverScheduler scheduler, boolean isExplainAnalyze) { // release memory exchangeManager.deRegisterFragmentInstanceFromMemoryPool( - instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId()); + instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true); if (newState.isFailed()) { scheduler.abortFragmentInstance(instanceId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java index bea961c7525a9..900865dc88701 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java @@ -177,6 +177,7 @@ public FragmentInstanceInfo execDataQueryFragmentInstance( instance.isExplainAnalyze(), exchangeManager); } catch (Throwable t) { + clearFIRelatedResources(instanceId); logger.warn("error when create FragmentInstanceExecution.", t); stateMachine.failed(t); return null; @@ -205,6 +206,14 @@ public FragmentInstanceInfo execDataQueryFragmentInstance( } } + private void clearFIRelatedResources(FragmentInstanceId instanceId) { + // close and remove all the handles of the fragment instance + exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift()); + // clear MemoryPool + exchangeManager.deRegisterFragmentInstanceFromMemoryPool( + instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), false); + } + private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId queryId, int dataNodeFINum) { return dataNodeQueryContextMap.computeIfAbsent( queryId, queryId1 -> new DataNodeQueryContext(dataNodeFINum)); @@ -249,6 +258,7 @@ public FragmentInstanceInfo execSchemaQueryFragmentInstance( false, exchangeManager); } catch (Throwable t) { + clearFIRelatedResources(instanceId); logger.warn("Execute error caused by ", t); stateMachine.failed(t); return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java index 188833e329ebc..a9ab6ed5d8120 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java @@ -181,7 +181,7 @@ public void registerPlanNodeIdToQueryMemoryMap( * @throws MemoryLeakException throw {@link MemoryLeakException} */ public void deRegisterFragmentInstanceFromQueryMemoryMap( - String queryId, String fragmentInstanceId) { + String queryId, String fragmentInstanceId, boolean forceDeregister) { Map> queryRelatedMemory = queryMemoryReservations.get(queryId); if (queryRelatedMemory != null) { Map fragmentRelatedMemory = queryRelatedMemory.get(fragmentInstanceId); @@ -192,6 +192,13 @@ public void deRegisterFragmentInstanceFromQueryMemoryMap( hasPotentialMemoryLeak = fragmentRelatedMemory.values().stream().anyMatch(value -> value != 0L); } + if (!forceDeregister && hasPotentialMemoryLeak) { + // If hasPotentialMemoryLeak is true, it means that LocalSourceChannel/LocalSourceHandles + // have not been closed. + // We should wait for them to be closed. Make sure this method is called again with + // forceDeregister == true, after all LocalSourceChannel/LocalSourceHandles are closed. + return; + } synchronized (queryMemoryReservations) { queryRelatedMemory.remove(fragmentInstanceId); if (queryRelatedMemory.isEmpty()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 40a188e3d6a7b..8cb8ea48ea827 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -365,7 +365,8 @@ private void cleanUpResultHandle() { .deRegisterFragmentInstanceFromMemoryPool( fragmentInstanceId.queryId, FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId( - fragmentInstanceId)); + fragmentInstanceId), + true); } }