From 188d8e34b15804358ea90e308ad1695bb7147f26 Mon Sep 17 00:00:00 2001 From: Arunan Date: Tue, 5 Sep 2023 16:47:23 +0530 Subject: [PATCH] Stop transport listeners asynchronously during server shutdown --- .../org/wso2/micro/core/ServerManagement.java | 61 ++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/components/org.wso2.micro.integrator.core/src/main/java/org/wso2/micro/core/ServerManagement.java b/components/org.wso2.micro.integrator.core/src/main/java/org/wso2/micro/core/ServerManagement.java index 4517c378b9..ff6ada5ce4 100644 --- a/components/org.wso2.micro.integrator.core/src/main/java/org/wso2/micro/core/ServerManagement.java +++ b/components/org.wso2.micro.integrator.core/src/main/java/org/wso2/micro/core/ServerManagement.java @@ -27,9 +27,15 @@ import org.wso2.micro.integrator.core.util.MicroIntegratorBaseUtils; import java.lang.management.ManagementPermission; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.Query; @@ -73,13 +79,44 @@ public void startMaintenance() throws Exception { secMan.checkPermission(new ManagementPermission("control")); } log.info("Starting to switch to maintenance mode..."); + stopTransportListeners(); + destroyTransportListeners(); + waitForRequestCompletion(); + } + + /** + * Stop Transport Listeners asynchronously and wait for the completion of the tasks + */ + private void stopTransportListeners() { + ExecutorService transportListenerShutdownPool = Executors.newFixedThreadPool(inTransports.size()); + List> listenerShutdownFutures = new ArrayList<>(); for (TransportInDescription tinDesc : inTransports.values()) { TransportListener transport = tinDesc.getReceiver(); - transport.stop(); + Future future = transportListenerShutdownPool.submit(new TransportListenerShutdownTask(transport)); + listenerShutdownFutures.add(future); + } + + // Wait until shutting down the transport listeners before proceeding + for (Future future : listenerShutdownFutures) { + try { + future.get(); + } catch (Exception e) { + log.error("Error while completing transport listener shutdown", e); + } } + transportListenerShutdownPool.shutdown(); log.info("Stopped all transport listeners"); + } - waitForRequestCompletion(); + /** + * Destroy Transport Listeners + */ + private void destroyTransportListeners() { + // Destroy the TransportListener at the end to clear up resources + for (TransportInDescription tinDesc : inTransports.values()) { + TransportListener transport = tinDesc.getReceiver(); + transport.destroy(); + } } /** @@ -209,4 +246,24 @@ public void endMaintenance() throws Exception { } log.info("Switched to normal mode"); } + + /** + * Callable task to pause and shutdown a transport listener + */ + private class TransportListenerShutdownTask implements Callable { + private TransportListener transport; + + public TransportListenerShutdownTask(TransportListener transport) { + this.transport = transport; + } + + public Void call() throws Exception { + try { + transport.stop(); + } catch (Exception e) { + log.error("Error while stopping Transport Listener", e); + } + return null; + } + } }