Skip to content

Commit

Permalink
Fix query stuck while DN restarting & keep quite while cleaning sort …
Browse files Browse the repository at this point in the history
…tmp file
  • Loading branch information
JackieTien97 authored Aug 29, 2024
1 parent 61b7d58 commit 2eda8be
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ private void cleanTmpFile() {
if (!tmpPipeLineDir.exists()) {
return;
}
FileUtils.deleteFileOrDirectory(tmpPipeLineDir);
FileUtils.deleteFileOrDirectory(tmpPipeLineDir, true);
}

private static Throwable addSuppressedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ private void deleteTmpFile() {
+ File.separator;
File tmpFile = new File(tmpFilePath);
if (tmpFile.exists()) {
FileUtils.deleteFileOrDirectory(tmpFile);
FileUtils.deleteFileOrDirectory(tmpFile, true);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public enum FragmentInstanceState {
/** Instance execution failed. */
FAILED(true, true),
/** Instance is not found. */
NO_SUCH_INSTANCE(false, false);
NO_SUCH_INSTANCE(false, true);

public static final Set<FragmentInstanceState> TERMINAL_INSTANCE_STATES =
Stream.of(FragmentInstanceState.values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,7 @@ public synchronized List<FragmentInstanceId> filterUnFinishedFIs(
return res;
}
for (FragmentInstanceId fragmentInstanceId : instanceIds) {
InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId);
if (stateMetrics == null
|| stateMetrics.lastState == null
|| !stateMetrics.lastState.isDone()) {
if (unfinished(fragmentInstanceId)) {
// FI whose state has not been updated is considered to be unfinished.(In Query with limit
// clause, it's possible that the query is finished before the state of FI being recorded.)
res.add(fragmentInstanceId);
Expand All @@ -100,6 +97,15 @@ public synchronized List<FragmentInstanceId> filterUnFinishedFIs(
return res;
}

private boolean unfinished(FragmentInstanceId fragmentInstanceId) {
InstanceStateMetrics stateMetrics = instanceStateMap.get(fragmentInstanceId);
// FI whose state has not been updated is considered to be unfinished.(In Query with limit
// clause, it's possible that the query is finished before the state of FI being recorded.)
return stateMetrics == null
|| stateMetrics.lastState == null
|| !stateMetrics.lastState.isDone();
}

@Override
public synchronized void abort() {
aborted = true;
Expand All @@ -117,32 +123,42 @@ public synchronized void abort() {

private void fetchStateAndUpdate() {
for (FragmentInstance instance : instances) {
try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
synchronized (this) {
InstanceStateMetrics metrics =
instanceStateMap.computeIfAbsent(
instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
if (needPrintState(
metrics.lastState, instanceInfo.getState(), metrics.durationToLastPrintInMS)) {
if (logger.isDebugEnabled()) {
logger.debug("[PrintFIState] state is {}", instanceInfo.getState());
if (unfinished(instance.getId())) {
try (SetThreadName threadName = new SetThreadName(instance.getId().getFullId())) {
FragmentInstanceInfo instanceInfo = fetchInstanceInfo(instance);
synchronized (this) {
InstanceStateMetrics metrics =
instanceStateMap.computeIfAbsent(
instance.getId(), k -> new InstanceStateMetrics(instance.isRoot()));
if (needPrintState(
metrics.lastState, instanceInfo.getState(), metrics.durationToLastPrintInMS)) {
if (logger.isDebugEnabled()) {
logger.debug("[PrintFIState] state is {}", instanceInfo.getState());
}
metrics.reset(instanceInfo.getState());
} else {
metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
}
metrics.reset(instanceInfo.getState());
} else {
metrics.addDuration(STATE_FETCH_INTERVAL_IN_MS);
}

updateQueryState(instance.getId(), instanceInfo);
updateQueryState(instance.getId(), instanceInfo);
}
} catch (ClientManagerException | TException e) {
// TODO: do nothing ?
logger.warn("error happened while fetching query state", e);
}
} catch (ClientManagerException | TException e) {
// TODO: do nothing ?
logger.warn("error happened while fetching query state", e);
}
}
}

private void updateQueryState(FragmentInstanceId instanceId, FragmentInstanceInfo instanceInfo) {
// no such instance may be caused by DN restarting
if (instanceInfo.getState() == FragmentInstanceState.NO_SUCH_INSTANCE) {
stateMachine.transitionToFailed(
new RuntimeException(
String.format(
"FragmentInstance[%s] is failed. %s, may be caused by DN restarting.",
instanceId, instanceInfo.getMessage())));
}
if (instanceInfo.getState().isFailed()) {
if (instanceInfo.getFailureInfoList() == null
|| instanceInfo.getFailureInfoList().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,22 @@ public static boolean deleteFileIfExist(File file) {
}

public static void deleteFileOrDirectory(File file) {
deleteFileOrDirectory(file, false);
}

public static void deleteFileOrDirectory(File file, boolean quiteForNoSuchFile) {
if (file.isDirectory()) {
for (File subfile : file.listFiles()) {
deleteFileOrDirectory(subfile);
}
}
try {
Files.delete(file.toPath());
} catch (NoSuchFileException | DirectoryNotEmptyException e) {
} catch (NoSuchFileException e) {
if (!quiteForNoSuchFile) {
LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e);
}
} catch (DirectoryNotEmptyException e) {
LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e);
} catch (Exception e) {
LOGGER.warn("{}: {}", e.getMessage(), file.getName(), e);
Expand Down

0 comments on commit 2eda8be

Please sign in to comment.