From f8a2d3c903449e1a89fe8f5e03ee4ddf960e8fe1 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 14 Aug 2024 16:26:55 +0800 Subject: [PATCH 1/6] feat(eventLoop): optimize eventLoop --- .../automq/stream/utils/threads/EventLoop.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java index 29250a2442..ec5a786e0b 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java +++ b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java @@ -22,9 +22,15 @@ public class EventLoop extends Thread implements Executor { private final Logger logger; private BlockingQueue tasks; - private boolean shutdown = false; + private volatile boolean shutdown = false; private CompletableFuture shutdownCf = new CompletableFuture<>(); + static final Runnable WAKEUP_TASK = new Runnable() { + @Override + public void run() { + } + }; + @SuppressWarnings("this-escape") public EventLoop(String name) { super(name); @@ -33,10 +39,14 @@ public EventLoop(String name) { start(); } + @Override public void run() { while (true) { try { Runnable task = tasks.poll(100, TimeUnit.MILLISECONDS); + if (task == WAKEUP_TASK) { + task = null; + } if (task == null) { if (shutdown) { shutdownCf.complete(null); @@ -79,6 +89,9 @@ public void execute(Runnable task) { public CompletableFuture shutdownGracefully() { shutdown = true; + if (!shutdownCf.isDone()) { + tasks.add(WAKEUP_TASK); + } return shutdownCf; } From b1445e4f23e9bf384df4afe49e13083f8a226d60 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 14 Aug 2024 17:25:08 +0800 Subject: [PATCH 2/6] feat(eventLoop): optimize eventLoop --- .../automq/stream/utils/threads/EventLoop.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java index ec5a786e0b..b16fdc47d7 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java +++ b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java @@ -17,12 +17,14 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import org.slf4j.Logger; public class EventLoop extends Thread implements Executor { private final Logger logger; private BlockingQueue tasks; - private volatile boolean shutdown = false; + private final AtomicBoolean shutdown = new AtomicBoolean(); private CompletableFuture shutdownCf = new CompletableFuture<>(); static final Runnable WAKEUP_TASK = new Runnable() { @@ -48,7 +50,7 @@ public void run() { task = null; } if (task == null) { - if (shutdown) { + if (shutdown.get()) { shutdownCf.complete(null); break; } else { @@ -88,15 +90,16 @@ public void execute(Runnable task) { } public CompletableFuture shutdownGracefully() { - shutdown = true; - if (!shutdownCf.isDone()) { - tasks.add(WAKEUP_TASK); + if (shutdown.compareAndSet(false, true)) { + if (!shutdownCf.isDone()) { + tasks.add(WAKEUP_TASK); + } } return shutdownCf; } private void check() { - if (shutdown) { + if (shutdown.get()) { throw new IllegalStateException("EventLoop is shutdown"); } } From c8e5240d3e0e09f0cc0f0ef94e40cb2b9f4c0640 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Wed, 14 Aug 2024 17:34:38 +0800 Subject: [PATCH 3/6] feat(eventLoop): optimize eventLoop --- .../java/com/automq/stream/utils/threads/EventLoop.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java index b16fdc47d7..58dcf99aa4 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java +++ b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java @@ -90,9 +90,11 @@ public void execute(Runnable task) { } public CompletableFuture shutdownGracefully() { - if (shutdown.compareAndSet(false, true)) { - if (!shutdownCf.isDone()) { - tasks.add(WAKEUP_TASK); + while (!shutdown.get()) { + if (shutdown.compareAndSet(false, true)) { + if (!shutdownCf.isDone()) { + tasks.add(WAKEUP_TASK); + } } } return shutdownCf; From 8cc63bdd556de7c2b3056293d692f24f4dd4f8f5 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Thu, 15 Aug 2024 08:55:02 +0800 Subject: [PATCH 4/6] feat(s3stream): optimize eventLoop --- .../main/java/com/automq/stream/utils/threads/EventLoop.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java index 58dcf99aa4..566232a1bd 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java +++ b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java @@ -87,6 +87,11 @@ public CompletableFuture submit(Runnable task) { public void execute(Runnable task) { check(); tasks.add(task); + if (shutdown.get()) { + if (tasks.remove(task)) { + throw new IllegalStateException("EventLoop is shutdown"); + } + } } public CompletableFuture shutdownGracefully() { From a1719697859e194e2e2077bf58237c62baa4813d Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Sun, 18 Aug 2024 18:51:52 +0800 Subject: [PATCH 5/6] feat(s3stream): optimize eventLoop --- .../stream/utils/threads/EventLoop.java | 26 ++++++------------- 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java index 566232a1bd..9c9cd9537e 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java +++ b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java @@ -17,14 +17,13 @@ import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; public class EventLoop extends Thread implements Executor { private final Logger logger; private BlockingQueue tasks; - private final AtomicBoolean shutdown = new AtomicBoolean(); + private volatile boolean shutdown; private CompletableFuture shutdownCf = new CompletableFuture<>(); static final Runnable WAKEUP_TASK = new Runnable() { @@ -50,7 +49,7 @@ public void run() { task = null; } if (task == null) { - if (shutdown.get()) { + if (shutdown) { shutdownCf.complete(null); break; } else { @@ -69,7 +68,7 @@ public void run() { } } - public CompletableFuture submit(Runnable task) { + public synchronized CompletableFuture submit(Runnable task) { check(); CompletableFuture cf = new CompletableFuture<>(); tasks.add(() -> { @@ -84,29 +83,20 @@ public CompletableFuture submit(Runnable task) { } @Override - public void execute(Runnable task) { + public synchronized void execute(Runnable task) { check(); tasks.add(task); - if (shutdown.get()) { - if (tasks.remove(task)) { - throw new IllegalStateException("EventLoop is shutdown"); - } - } } - public CompletableFuture shutdownGracefully() { - while (!shutdown.get()) { - if (shutdown.compareAndSet(false, true)) { - if (!shutdownCf.isDone()) { - tasks.add(WAKEUP_TASK); - } - } + public synchronized CompletableFuture shutdownGracefully() { + if (!shutdownCf.isDone() && tasks.isEmpty()) { + tasks.add(WAKEUP_TASK); } return shutdownCf; } private void check() { - if (shutdown.get()) { + if (shutdown) { throw new IllegalStateException("EventLoop is shutdown"); } } From 88443b7bb689c2c82498f8be49ce6d0b6d823354 Mon Sep 17 00:00:00 2001 From: CLFutureX <775523362@qq.com> Date: Mon, 19 Aug 2024 13:27:16 +0800 Subject: [PATCH 6/6] feat(s3stream): optimize eventLoop --- .../src/main/java/com/automq/stream/utils/threads/EventLoop.java | 1 + 1 file changed, 1 insertion(+) diff --git a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java index 9c9cd9537e..ca453c8700 100644 --- a/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java +++ b/s3stream/src/main/java/com/automq/stream/utils/threads/EventLoop.java @@ -89,6 +89,7 @@ public synchronized void execute(Runnable task) { } public synchronized CompletableFuture shutdownGracefully() { + shutdown = true; if (!shutdownCf.isDone() && tasks.isEmpty()) { tasks.add(WAKEUP_TASK); }