Skip to content

Commit

Permalink
Prevent query blocking if final query info is lost
Browse files Browse the repository at this point in the history
We observed it may happen that final task info notification may be lost.
In such case query progression will be blocked.
Commit adds a stop-gap cod to mitigate this issue and unblock or fail query
until we find and fix the bug.
  • Loading branch information
losipiuk committed Feb 8, 2025
1 parent 103c08d commit 7cd7792
Showing 1 changed file with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -1659,6 +1660,33 @@ public Void onSinkInstanceHandleAcquired(SinkInstanceHandleAcquiredEvent sinkIns
nodeLease.release();
}
});

// we observed it may happen that final task info notification may be lost.
// in such case query progression will be blocked.
// the code below is a stop-gap to mitigate this issue and unblock or fail query
// until we find and fix the bug
AtomicBoolean finalTaskInfoReceived = new AtomicBoolean();
task.addStateChangeListener(taskStatus -> {
if (!taskStatus.getState().isDone()) {
return;
}
switch (taskStatus.getState()) {
case FINISHED -> scheduledExecutorService.schedule(() -> {
if (!finalTaskInfoReceived.get()) {
log.error("Did not receive final task info for task %s after it FINISHED; internal inconsistency; failing query", task.getTaskId());
queryStateMachine.transitionToFailed(new TrinoException(GENERIC_INTERNAL_ERROR, "Did not receive final task info for task after it finished; failing query"));
}
}, 1, MINUTES);
case CANCELED, ABORTED, FAILED -> scheduledExecutorService.schedule(() -> {
if (!finalTaskInfoReceived.get()) {
log.error("Did not receive final task info for task %s after it %s; internal inconsistency; marking task failed in scheduler to unblock query progression", taskStatus.getState(), task.getTaskId());
eventQueue.add(new RemoteTaskCompletedEvent(taskStatus));
}
}, 1, MINUTES);
default -> throw new IllegalStateException("Unexpected task state: " + taskStatus.getState());
}
});

task.addFinalTaskInfoListener(taskExecutionStats::update);
task.addFinalTaskInfoListener(taskInfo -> eventQueue.add(new RemoteTaskCompletedEvent(taskInfo.taskStatus())));
nodeLease.attachTaskId(task.getTaskId());
Expand Down

0 comments on commit 7cd7792

Please sign in to comment.