diff --git a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java index 29250a2442..ca453c8700 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java +++ b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java @@ -17,14 +17,21 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; + import org.slf4j.Logger; public class EventLoop extends Thread implements Executor { private final Logger logger; private BlockingQueue tasks; - private boolean shutdown = false; + private volatile boolean shutdown; private CompletableFuture shutdownCf = new CompletableFuture<>(); + static final Runnable WAKEUP_TASK = new Runnable() { + @Override + public void run() { + } + }; + @SuppressWarnings("this-escape") public EventLoop(String name) { super(name); @@ -33,10 +40,14 @@ public EventLoop(String name) { start(); } + @Override public void run() { while (true) { try { Runnable task = tasks.poll(100, TimeUnit.MILLISECONDS); + if (task == WAKEUP_TASK) { + task = null; + } if (task == null) { if (shutdown) { shutdownCf.complete(null); @@ -57,7 +68,7 @@ public void run() { } } - public CompletableFuture submit(Runnable task) { + public synchronized CompletableFuture submit(Runnable task) { check(); CompletableFuture cf = new CompletableFuture<>(); tasks.add(() -> { @@ -72,13 +83,16 @@ public CompletableFuture submit(Runnable task) { } @Override - public void execute(Runnable task) { + public synchronized void execute(Runnable task) { check(); tasks.add(task); } - public CompletableFuture shutdownGracefully() { + public synchronized CompletableFuture shutdownGracefully() { shutdown = true; + if (!shutdownCf.isDone() && tasks.isEmpty()) { + tasks.add(WAKEUP_TASK); + } return shutdownCf; }