From 2eda8bec8fd1ba0fac187ed2da88be517256ebff Mon Sep 17 00:00:00 2001 From: Jackie Tien Date: Thu, 29 Aug 2024 16:35:36 +0800 Subject: [PATCH] Fix query stuck while DN restarting & keep quite while cleaning sort tmp file --- .../queryengine/execution/driver/Driver.java | 2 +- .../fragment/FragmentInstanceExecution.java | 2 +- .../fragment/FragmentInstanceState.java | 2 +- .../FixedRateFragInsStateTracker.java | 60 ++++++++++++------- .../apache/iotdb/commons/utils/FileUtils.java | 10 +++- 5 files changed, 50 insertions(+), 26 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java index 1211df6b74b0..812c84298fc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java @@ -445,7 +445,7 @@ private void cleanTmpFile() { if (!tmpPipeLineDir.exists()) { return; } - FileUtils.deleteFileOrDirectory(tmpPipeLineDir); + FileUtils.deleteFileOrDirectory(tmpPipeLineDir, true); } private static Throwable addSuppressedException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index c42834f51904..d1c8cc8ff6a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -358,7 +358,7 @@ private void deleteTmpFile() { + File.separator; File tmpFile = new File(tmpFilePath); if (tmpFile.exists()) { - FileUtils.deleteFileOrDirectory(tmpFile); + FileUtils.deleteFileOrDirectory(tmpFile, true); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java index 77531e944945..092b1be3816f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceState.java @@ -47,7 +47,7 @@ public enum FragmentInstanceState { /** Instance execution failed. */ FAILED(true, true), /** Instance is not found. */ - NO_SUCH_INSTANCE(false, false); + NO_SUCH_INSTANCE(false, true); public static final Set TERMINAL_INSTANCE_STATES = Stream.of(FragmentInstanceState.values()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java index 31b5742a9d07..a1932753d46b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FixedRateFragInsStateTracker.java @@ -88,10 +88,7 @@ public synchronized List filterUnFinishedFIs( return res; } for (FragmentInstanceId fragmentInstanceId : instanceIds) { - InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId); - if (stateMetrics == null - || stateMetrics.lastState == null - || !stateMetrics.lastState.isDone()) { + if (unfinished(fragmentInstanceId)) { // FI whose state has not been updated is considered to be unfinished.(In Query with limit // clause, it's possible that the query is finished before the state of FI being recorded.) res.add(fragmentInstanceId); @@ -100,6 +97,15 @@ public synchronized List filterUnFinishedFIs( return res; } + private boolean unfinished(FragmentInstanceId fragmentInstanceId) { + InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId); + // FI whose state has not been updated is considered to be unfinished.(In Query with limit + // clause, it's possible that the query is finished before the state of FI being recorded.) + return stateMetrics == null + || stateMetrics.lastState == null + || !stateMetrics.lastState.isDone(); + } + @Override public synchronized void abort() { aborted = true; @@ -117,32 +123,42 @@ public synchronized void abort() { private void fetchStateAndUpdate() { for (FragmentInstance instance : instances) { - try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { - FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance); - synchronized (this) { - InstanceStateMetrics metrics = - instanceStateMap.computeIfAbsent( - instance.getId(), k -> new InstanceStateMetrics(instance.isRoot())); - if (needPrintState( - metrics.lastState, instanceInfo.getState(), metrics.durationToLastPrintInMS)) { - if (logger.isDebugEnabled()) { - logger.debug("[PrintFIState] state is {}", instanceInfo.getState()); + if (unfinished(instance.getId())) { + try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) { + FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance); + synchronized (this) { + InstanceStateMetrics metrics = + instanceStateMap.computeIfAbsent( + instance.getId(), k -> new InstanceStateMetrics(instance.isRoot())); + if (needPrintState( + metrics.lastState, instanceInfo.getState(), metrics.durationToLastPrintInMS)) { + if (logger.isDebugEnabled()) { + logger.debug("[PrintFIState] state is {}", instanceInfo.getState()); + } + metrics.reset(instanceInfo.getState()); + } else { + metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS); } - metrics.reset(instanceInfo.getState()); - } else { - metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS); - } - updateQueryState(instance.getId(), instanceInfo); + updateQueryState(instance.getId(), instanceInfo); + } + } catch (ClientManagerException | TException e) { + // TODO: do nothing ? + logger.warn("error happened while fetching query state", e); } - } catch (ClientManagerException | TException e) { - // TODO: do nothing ? - logger.warn("error happened while fetching query state", e); } } } private void updateQueryState(FragmentInstanceId instanceId, FragmentInstanceInfo instanceInfo) { + // no such instance may be caused by DN restarting + if (instanceInfo.getState() == FragmentInstanceState.NO_SUCH_INSTANCE) { + stateMachine.transitionToFailed( + new RuntimeException( + String.format( + "FragmentInstance[%s] is failed. %s, may be caused by DN restarting.", + instanceId, instanceInfo.getMessage()))); + } if (instanceInfo.getState().isFailed()) { if (instanceInfo.getFailureInfoList() == null || instanceInfo.getFailureInfoList().isEmpty()) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java index 71498bbe2200..1fca92394af0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java @@ -62,6 +62,10 @@ public static boolean deleteFileIfExist(File file) { } public static void deleteFileOrDirectory(File file) { + deleteFileOrDirectory(file, false); + } + + public static void deleteFileOrDirectory(File file, boolean quiteForNoSuchFile) { if (file.isDirectory()) { for (File subfile : file.listFiles()) { deleteFileOrDirectory(subfile); @@ -69,7 +73,11 @@ public static void deleteFileOrDirectory(File file) { } try { Files.delete(file.toPath()); - } catch (NoSuchFileException | DirectoryNotEmptyException e) { + } catch (NoSuchFileException e) { + if (!quiteForNoSuchFile) { + LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e); + } + } catch (DirectoryNotEmptyException e) { LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e); } catch (Exception e) { LOGGER.warn("{}: {}", e.getMessage(), file.getName(), e);