Skip to content

Commit

Permalink
Improve executor shutdown failsafe and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ascopes committed Oct 22, 2024
1 parent b0b8d4e commit 524e6bf
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
public final class ConcurrentExecutor {

private static final Logger log = LoggerFactory.getLogger(ConcurrentExecutor.class);
private final ExecutorService executorService;

// Visible for testing only.
final ExecutorService executorService;

public ConcurrentExecutor() {
ExecutorService executorService;
Expand Down Expand Up @@ -77,15 +79,19 @@ public ConcurrentExecutor() {
@SuppressWarnings({"ResultOfMethodCallIgnored", "unused"})
public void destroy() throws InterruptedException {
log.debug("Shutting down executor...");
executorService.shutdown();
log.debug("Awaiting executor termination...");

// If this fails, then we can't do much about it. Force shutdown and hope threads don't
// deadlock. Not going to bother adding complicated handling here as if we get stuck, we
// likely have far bigger problems to deal with.
executorService.awaitTermination(10, TimeUnit.SECONDS);
var remaining = executorService.shutdownNow();
log.debug("Shutdown ended, stubborn tasks that will be orphaned: {}", remaining);
var remainingTasks = executorService.shutdownNow();
if (remainingTasks.size() > 0) {
remainingTasks.forEach(future -> ((FutureTask<?>) future).cancel(true));
log.debug("Waiting for in-progress tasks to finish: {}", remainingTasks);
executorService.awaitTermination(5, TimeUnit.SECONDS);

Check warning on line 86 in protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java

View check run for this annotation

Codecov / codecov/patch

protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java#L84-L86

Added lines #L84 - L86 were not covered by tests
if (!executorService.isTerminated()) {
// Nothing else we can do.
log.warn(

Check warning on line 89 in protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java

View check run for this annotation

Codecov / codecov/patch

protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java#L89

Added line #L89 was not covered by tests
"Unable to shut down executor service, one or more tasks refused to cancel: {}'",
remainingTasks
);
}
}
}

public <R> FutureTask<R> submit(Callable<R> task) {
Expand All @@ -106,6 +112,8 @@ public <R> FutureTask<R> submit(Callable<R> task) {
return Collectors.collectingAndThen(Collectors.toUnmodifiableList(), this::await);
}

// Awaits each task, in the order it was scheduled. Any interrupt is caught and terminates
// the entire batch.
private <R> List<R> await(List<FutureTask<R>> scheduledTasks) {
try {
var results = new ArrayList<R>();
Expand All @@ -117,8 +125,8 @@ private <R> List<R> await(List<FutureTask<R>> scheduledTasks) {
} catch (ExecutionException ex) {
exceptions.add(ex.getCause());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return results;
exceptions.add(ex);
break;

Check warning on line 129 in protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java

View check run for this annotation

Codecov / codecov/patch

protobuf-maven-plugin/src/main/java/io/github/ascopes/protobufmavenplugin/utils/ConcurrentExecutor.java#L128-L129

Added lines #L128 - L129 were not covered by tests
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -58,7 +60,111 @@ void setUp() {

@AfterEach
void tearDown() throws InterruptedException {
executor.executorService.shutdownNow();
}

@Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS)
@DisplayName(".destroy() succeeds if the executor service is idle")
@Test
void destroySucceedsIfExecutorServiceIsIdle() throws Exception {
// Given
assertThat(executor.executorService.isTerminated())
.as("executorService.isTerminated()")
.isFalse();
assertThat(executor.executorService.isShutdown())
.as("executorService.isShutdown()")
.isFalse();

// When
executor.destroy();

// Then
assertThat(executor.executorService.isTerminated())
.as("executorService.isTerminated()")
.isTrue();
assertThat(executor.executorService.isShutdown())
.as("executorService.isShutdown()")
.isTrue();
}

@Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS)
@DisplayName(".destroy() succeeds if the executor service is already terminated")
@Test
void destroySucceedsIfExecutorServiceIsAlreadyTerminated() throws Exception {
// Given
executor.executorService.shutdownNow();

// When
executor.destroy();

// Then
assertThat(executor.executorService.isTerminated())
.as("executorService.isTerminated()")
.isTrue();
assertThat(executor.executorService.isShutdown())
.as("executorService.isShutdown()")
.isTrue();
}

@Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS)
@DisplayName(".destroy() interrupts any interruptable running tasks")
@Test
void destroyInterruptsAnyInterruptableRunningTasks() throws Exception {
// Given
var task1 = new FutureTask<>(() -> sleepWait(10_000));
var task2 = new FutureTask<>(() -> sleepWait(10_000));

executor.executorService.submit(task1);
executor.executorService.submit(task2);

// Give tasks the chance to start.
Thread.sleep(1_000);

// When
executor.destroy();

// Then
assertThatExceptionOfType(ExecutionException.class)
.as("exception raised by task1 (%s)", task1)
.isThrownBy(task1::get)
.withCauseInstanceOf(InterruptedException.class);
assertThatExceptionOfType(ExecutionException.class)
.as("exception raised by task2 (%s)", task2)
.isThrownBy(task2::get)
.withCauseInstanceOf(InterruptedException.class);
}

@Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS)
@DisplayName(".destroy() abandons any uninterruptable running tasks")
@Test
void destroyAbandonsAnyUninterruptableRunningTasks() throws Exception {
// Given
var task1 = new FutureTask<>(() -> spinWait(10_000));
var task2 = new FutureTask<>(() -> spinWait(10_000));

executor.executorService.submit(task1);
executor.executorService.submit(task2);

// Give tasks the chance to start.
Thread.sleep(1_000);

// When
executor.destroy();

assertThat(task1)
.as("task1 %s", task1)
.isNotDone();
assertThat(task2)
.as("task1 %s", task2)
.isNotDone();

// Then
assertThatExceptionOfType(TimeoutException.class)
.as("exception raised waiting for task1 (%s)", task1)
.isThrownBy(() -> task1.get(100, TimeUnit.MILLISECONDS));
assertThatExceptionOfType(TimeoutException.class)
.as("exception raised waiting for task2 (%s)", task2)
.isThrownBy(() -> task2.get(100, TimeUnit.MILLISECONDS));
}

@Timeout(value = 10_000, unit = TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -184,4 +290,32 @@ void awaitingAwaitsAllTasksAndRaisesTheirExceptions() {
ex -> assertThat(expectedExceptions).hasSize(ex.getSuppressed().length + 1)
);
}

// Sleep-based waits can consume thread interrupts and can be cancelled,
// representing some IO-bound work that cancels gracefully.
private static @Nullable Void sleepWait(int timeoutMs) throws InterruptedException {
var deadline = System.nanoTime() + timeoutMs * 1_000_000L;
do {
try {
var remaining = deadline - System.nanoTime();
Thread.sleep(remaining / 1_000_000L);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw ex;
}
} while (deadline - System.nanoTime() > 0);
return null;
}

// Spin-based waits should not perform IO, so should be uncancellable and
// uninterruptable, representing CPU bound work or a buggy/stubborn task.
private static @Nullable Void spinWait(int timeoutMs) {
var deadline = System.nanoTime() + timeoutMs * 1_000_000L;
// Do not perform anything that can be interrupted.
while (deadline - System.nanoTime() > 0) {
// Keep checkstyle happy by having a body on this loop.
continue;
}
return null;
}
}

0 comments on commit 524e6bf

Please sign in to comment.