From f5b3de6f0a05b49aafb94b16f1298b8516b318fb Mon Sep 17 00:00:00 2001 From: Ashley Scopes <73482956+ascopes@users.noreply.github.com> Date: Mon, 21 Oct 2024 08:16:08 +0100 Subject: [PATCH] Improve executor shutdown failsafe and tests --- .../utils/ConcurrentExecutor.java | 29 ++-- .../utils/ConcurrentExecutorTest.java | 131 ++++++++++++++++++ 2 files changed, 148 insertions(+), 12 deletions(-) diff --git a/protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java b/protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java index c695d5db..6032a0d3 100644 --- a/protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java +++ b/protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java @@ -42,7 +42,9 @@ public final class ConcurrentExecutor { private static final Logger log = LoggerFactory.getLogger(ConcurrentExecutor.class); - private final ExecutorService executorService; + + // Visible for testing only. + final ExecutorService executorService; public ConcurrentExecutor() { ExecutorService executorService; @@ -77,15 +79,16 @@ public ConcurrentExecutor() { @SuppressWarnings({"ResultOfMethodCallIgnored", "unused"}) public void destroy() throws InterruptedException { log.debug("Shutting down executor..."); - executorService.shutdown(); - log.debug("Awaiting executor termination..."); - - // If this fails, then we can't do much about it. Force shutdown and hope threads don't - // deadlock. Not going to bother adding complicated handling here as if we get stuck, we - // likely have far bigger problems to deal with. - executorService.awaitTermination(10, TimeUnit.SECONDS); - var remaining = executorService.shutdownNow(); - log.debug("Shutdown ended, stubborn tasks that will be orphaned: {}", remaining); + var remainingTasks = executorService.shutdownNow(); + if (remainingTasks.size() > 0) { + remainingTasks.forEach(future -> ((FutureTask) future).cancel(true)); + log.debug("Waiting for in-progress tasks to finish: {}", remainingTasks); + executorService.awaitTermination(5, TimeUnit.SECONDS); + if (!executorService.isTerminated()) { + // Nothing else we can do. + log.warn("Unable to shut down executor service, one or more tasks refused to cancel: {}", remainingTasks); + } + } } public FutureTask submit(Callable task) { @@ -106,6 +109,8 @@ public FutureTask submit(Callable task) { return Collectors.collectingAndThen(Collectors.toUnmodifiableList(), this::await); } + // Awaits each task, in the order it was scheduled. Any interrupt is caught and terminates + // the entire batch. private List await(List> scheduledTasks) { try { var results = new ArrayList(); @@ -117,8 +122,8 @@ private List await(List> scheduledTasks) { } catch (ExecutionException ex) { exceptions.add(ex.getCause()); } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - return results; + exceptions.add(ex); + break; } } diff --git a/protobuf-maven-plugin/src/test/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutorTest.java b/protobuf-maven-plugin/src/test/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutorTest.java index cf1ed71f..1bff30e9 100644 --- a/protobuf-maven-plugin/src/test/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutorTest.java +++ b/protobuf-maven-plugin/src/test/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutorTest.java @@ -31,9 +31,11 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -58,7 +60,111 @@ void setUp() { @AfterEach void tearDown() throws InterruptedException { + executor.executorService.shutdownNow(); + } + + @Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS) + @DisplayName(".destroy() succeeds if the executor service is idle") + @Test + void destroySucceedsIfExecutorServiceIsIdle() throws Exception { + // Given + assertThat(executor.executorService.isTerminated()) + .as("executorService.isTerminated()") + .isFalse(); + assertThat(executor.executorService.isShutdown()) + .as("executorService.isShutdown()") + .isFalse(); + + // When + executor.destroy(); + + // Then + assertThat(executor.executorService.isTerminated()) + .as("executorService.isTerminated()") + .isTrue(); + assertThat(executor.executorService.isShutdown()) + .as("executorService.isShutdown()") + .isTrue(); + } + + @Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS) + @DisplayName(".destroy() succeeds if the executor service is already terminated") + @Test + void destroySucceedsIfExecutorServiceIsAlreadyTerminated() throws Exception { + // Given + executor.executorService.shutdownNow(); + + // When + executor.destroy(); + + // Then + assertThat(executor.executorService.isTerminated()) + .as("executorService.isTerminated()") + .isTrue(); + assertThat(executor.executorService.isShutdown()) + .as("executorService.isShutdown()") + .isTrue(); + } + + @Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS) + @DisplayName(".destroy() interrupts any interruptable running tasks") + @Test + void destroyInterruptsAnyInterruptableRunningTasks() throws Exception { + // Given + var task1 = new FutureTask<>(() -> sleepWait(10_000)); + var task2 = new FutureTask<>(() -> sleepWait(10_000)); + + executor.executorService.submit(task1); + executor.executorService.submit(task2); + + // Give tasks the chance to start. + Thread.sleep(1_000); + + // When executor.destroy(); + + // Then + assertThatExceptionOfType(ExecutionException.class) + .as("exception raised by task1 (%s)", task1) + .isThrownBy(task1::get) + .withCauseInstanceOf(InterruptedException.class); + assertThatExceptionOfType(ExecutionException.class) + .as("exception raised by task2 (%s)", task2) + .isThrownBy(task2::get) + .withCauseInstanceOf(InterruptedException.class); + } + + @Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS) + @DisplayName(".destroy() abandons any uninterruptable running tasks") + @Test + void destroyAbandonsAnyUninterruptableRunningTasks() throws Exception { + // Given + var task1 = new FutureTask<>(() -> spinWait(10_000)); + var task2 = new FutureTask<>(() -> spinWait(10_000)); + + executor.executorService.submit(task1); + executor.executorService.submit(task2); + + // Give tasks the chance to start. + Thread.sleep(1_000); + + // When + executor.destroy(); + + assertThat(task1) + .as("task1 %s", task1) + .isNotDone(); + assertThat(task2) + .as("task1 %s", task2) + .isNotDone(); + + // Then + assertThatExceptionOfType(TimeoutException.class) + .as("exception raised waiting for task1 (%s)", task1) + .isThrownBy(() -> task1.get(100, TimeUnit.MILLISECONDS)); + assertThatExceptionOfType(TimeoutException.class) + .as("exception raised waiting for task2 (%s)", task2) + .isThrownBy(() -> task2.get(100, TimeUnit.MILLISECONDS)); } @Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS) @@ -184,4 +290,29 @@ void awaitingAwaitsAllTasksAndRaisesTheirExceptions() { ex -> assertThat(expectedExceptions).hasSize(ex.getSuppressed().length + 1) ); } + + // Sleep-based waits can consume thread interrupts and can be cancelled, + // representing some IO-bound work that cancels gracefully. + private static @Nullable Void sleepWait(int timeoutMs) throws InterruptedException { + var deadline = System.nanoTime() + timeoutMs * 1_000_000L; + do { + try { + var remaining = deadline - System.nanoTime(); + Thread.sleep(remaining / 1_000_000L); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw ex; + } + } while (deadline - System.nanoTime() > 0); + return null; + } + + // Spin-based waits should not perform IO, so should be uncancellable and + // uninterruptable, representing CPU bound work or a buggy/stubborn task. + private static @Nullable Void spinWait(int timeoutMs) { + var deadline = System.nanoTime() + timeoutMs * 1_000_000L; + // Do not perform anything that can be interrupted. + while (deadline - System.nanoTime() > 0); + return null; + } }