Skip to content

Commit

Permalink
Add clean logic for FragmentInstance in case that callback is not add…
Browse files Browse the repository at this point in the history
…ed. (#12768)
  • Loading branch information
lancelly authored and OneSizeFitsQuorum committed Jun 27, 2024
1 parent b96dd0e commit cf22dd7
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -585,11 +585,13 @@ public MPPDataExchangeManager(
ExecutorService executorService,
IClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>
mppDataExchangeServiceClientManager) {
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.tsBlockSerdeFactory = Validate.notNull(tsBlockSerdeFactory);
this.executorService = Validate.notNull(executorService);
this.localMemoryManager = Validate.notNull(localMemoryManager, "localMemoryManager is null.");
this.tsBlockSerdeFactory =
Validate.notNull(tsBlockSerdeFactory, "tsBlockSerdeFactory is null.");
this.executorService = Validate.notNull(executorService, "executorService is null.");
this.mppDataExchangeServiceClientManager =
Validate.notNull(mppDataExchangeServiceClientManager);
Validate.notNull(
mppDataExchangeServiceClientManager, "mppDataExchangeServiceClientManager is null.");
sourceHandles = new ConcurrentHashMap<>();
shuffleSinkHandles = new ConcurrentHashMap<>();
}
Expand All @@ -601,10 +603,11 @@ public MPPDataExchangeServiceImpl getOrCreateMPPDataExchangeServiceImpl() {
return mppDataExchangeService;
}

public void deRegisterFragmentInstanceFromMemoryPool(String queryId, String fragmentInstanceId) {
public void deRegisterFragmentInstanceFromMemoryPool(
String queryId, String fragmentInstanceId, boolean forceDeregister) {
localMemoryManager
.getQueryPool()
.deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId);
.deRegisterFragmentInstanceFromQueryMemoryMap(queryId, fragmentInstanceId, forceDeregister);
}

public LocalMemoryManager getLocalMemoryManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void initialize(IDriverScheduler scheduler, boolean isExplainAnalyze) {

// release memory
exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId());
instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), true);

if (newState.isFailed()) {
scheduler.abortFragmentInstance(instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public FragmentInstanceInfo execDataQueryFragmentInstance(
instance.isExplainAnalyze(),
exchangeManager);
} catch (Throwable t) {
clearFIRelatedResources(instanceId);
logger.warn("error when create FragmentInstanceExecution.", t);
stateMachine.failed(t);
return null;
Expand Down Expand Up @@ -205,6 +206,14 @@ public FragmentInstanceInfo execDataQueryFragmentInstance(
}
}

private void clearFIRelatedResources(FragmentInstanceId instanceId) {
// close and remove all the handles of the fragment instance
exchangeManager.forceDeregisterFragmentInstance(instanceId.toThrift());
// clear MemoryPool
exchangeManager.deRegisterFragmentInstanceFromMemoryPool(
instanceId.getQueryId().getId(), instanceId.getFragmentInstanceId(), false);
}

private DataNodeQueryContext getOrCreateDataNodeQueryContext(QueryId queryId, int dataNodeFINum) {
return dataNodeQueryContextMap.computeIfAbsent(
queryId, queryId1 -> new DataNodeQueryContext(dataNodeFINum));
Expand Down Expand Up @@ -249,6 +258,7 @@ public FragmentInstanceInfo execSchemaQueryFragmentInstance(
false,
exchangeManager);
} catch (Throwable t) {
clearFIRelatedResources(instanceId);
logger.warn("Execute error caused by ", t);
stateMachine.failed(t);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void registerPlanNodeIdToQueryMemoryMap(
* @throws MemoryLeakException throw {@link MemoryLeakException}
*/
public void deRegisterFragmentInstanceFromQueryMemoryMap(
String queryId, String fragmentInstanceId) {
String queryId, String fragmentInstanceId, boolean forceDeregister) {
Map<String, Map<String, Long>> queryRelatedMemory = queryMemoryReservations.get(queryId);
if (queryRelatedMemory != null) {
Map<String, Long> fragmentRelatedMemory = queryRelatedMemory.get(fragmentInstanceId);
Expand All @@ -192,6 +192,13 @@ public void deRegisterFragmentInstanceFromQueryMemoryMap(
hasPotentialMemoryLeak =
fragmentRelatedMemory.values().stream().anyMatch(value -> value != 0L);
}
if (!forceDeregister && hasPotentialMemoryLeak) {
// If hasPotentialMemoryLeak is true, it means that LocalSourceChannel/LocalSourceHandles
// have not been closed.
// We should wait for them to be closed. Make sure this method is called again with
// forceDeregister == true, after all LocalSourceChannel/LocalSourceHandles are closed.
return;
}
synchronized (queryMemoryReservations) {
queryRelatedMemory.remove(fragmentInstanceId);
if (queryRelatedMemory.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ private void cleanUpResultHandle() {
.deRegisterFragmentInstanceFromMemoryPool(
fragmentInstanceId.queryId,
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
fragmentInstanceId));
fragmentInstanceId),
true);
}
}

Expand Down

0 comments on commit cf22dd7

Please sign in to comment.