From 9f99d14d95ce3ec5c2634a9c8d0ed122e2eea4c2 Mon Sep 17 00:00:00 2001 From: joecqupt Date: Tue, 5 Nov 2024 00:10:14 +0800 Subject: [PATCH 1/2] update ConnectionEventListener --- .../connection/ConnectionEventListener.java | 4 +-- .../DefaultConnectionEventProcessor.java | 27 +++++++------------ .../ClientConnectionManagerTest.java | 26 +++++++----------- 3 files changed, 20 insertions(+), 37 deletions(-) diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventListener.java b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventListener.java index c04a23b..3cef8b8 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventListener.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/ConnectionEventListener.java @@ -2,8 +2,6 @@ public interface ConnectionEventListener { - ConnectionEvent interest(); - - void onEvent(Connection connection); + void onEvent(ConnectionEvent connectionEvent, Connection connection); } diff --git a/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionEventProcessor.java b/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionEventProcessor.java index a7f2cfa..17b9ca2 100644 --- a/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionEventProcessor.java +++ b/src/main/java/io/github/xinfra/lab/remoting/connection/DefaultConnectionEventProcessor.java @@ -21,7 +21,7 @@ public class DefaultConnectionEventProcessor extends AbstractLifeCycle implement protected LinkedBlockingQueue eventQueue = new LinkedBlockingQueue<>(); - private Map> listeners = new ConcurrentHashMap<>(); + private CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>(); @Override public void startup() { @@ -48,14 +48,7 @@ public void handleEvent(ConnectionEvent event, Connection connection) { public void addConnectionEventListener(ConnectionEventListener listener) { ensureStarted(); Validate.notNull(listener, "listener must not be null"); - Validate.notNull(listener.interest(), "listener interest event must not be null"); - ConnectionEvent event = listener.interest(); - List connectionEventListeners = listeners.get(event); - if (connectionEventListeners == null) { - listeners.computeIfAbsent(event, e -> new CopyOnWriteArrayList<>()); - connectionEventListeners = listeners.get(event); - } - connectionEventListeners.add(listener); + listeners.addIfAbsent(listener); } @AllArgsConstructor @@ -81,17 +74,15 @@ public void run() { continue; } - List connectionEventListeners = listeners.get(event.connectionEvent); - if (connectionEventListeners != null) { - for (ConnectionEventListener listener : connectionEventListeners) { - try { - listener.onEvent(event.connection); - } - catch (Throwable t) { - log.warn("{} onEvent execute fail", listener, t); - } + for (ConnectionEventListener listener : listeners) { + try { + listener.onEvent(event.connectionEvent, event.connection); + } + catch (Throwable t) { + log.warn("{} onEvent execute fail", listener, t); } } + } } diff --git a/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java b/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java index 63611ce..209f001 100644 --- a/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java +++ b/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java @@ -285,14 +285,11 @@ void testConnectionEventListener() throws RemotingException, InterruptedExceptio AtomicReference connectionRef1 = new AtomicReference<>(); connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() { @Override - public ConnectionEvent interest() { - return ConnectionEvent.CONNECT; - } - - @Override - public void onEvent(Connection connection) { - connectionRef1.set(connection); - connectFlag.set(true); + public void onEvent(ConnectionEvent connectionEvent, Connection connection) { + if (ConnectionEvent.CONNECT == connectionEvent) { + connectionRef1.set(connection); + connectFlag.set(true); + } } }); @@ -300,14 +297,11 @@ public void onEvent(Connection connection) { AtomicReference connectionRef2 = new AtomicReference<>(); connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() { @Override - public ConnectionEvent interest() { - return ConnectionEvent.CLOSE; - } - - @Override - public void onEvent(Connection connection) { - connectionRef2.set(connection); - closeFlag.set(true); + public void onEvent(ConnectionEvent connectionEvent, Connection connection) { + if (ConnectionEvent.CLOSE == connectionEvent) { + connectionRef2.set(connection); + closeFlag.set(true); + } } }); From d696eee4ea24c2b60f823793d784b7e01b4f6ffa Mon Sep 17 00:00:00 2001 From: jiangyuan Date: Tue, 5 Nov 2024 13:52:42 +0800 Subject: [PATCH 2/2] update ut --- .../remoting/connection/ClientConnectionManagerTest.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java b/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java index 209f001..14e84b1 100644 --- a/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java +++ b/src/test/java/io/github/xinfra/lab/remoting/connection/ClientConnectionManagerTest.java @@ -281,6 +281,15 @@ void testDisableReconnect() throws InterruptedException, TimeoutException { @Test void testConnectionEventListener() throws RemotingException, InterruptedException, TimeoutException { + + connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() { + @Override + public void onEvent(ConnectionEvent connectionEvent, Connection connection) { + // threw exception will not affect others listener + throw new RuntimeException("test throw exception"); + } + }); + AtomicBoolean connectFlag = new AtomicBoolean(false); AtomicReference connectionRef1 = new AtomicReference<>(); connectionManager.connectionEventProcessor().addConnectionEventListener(new ConnectionEventListener() {