Skip to content

Commit

Permalink
Merge pull request #2966 from arunans23/master
Browse files Browse the repository at this point in the history
Stop transport listeners asynchronously during server shutdown
  • Loading branch information
arunans23 authored Sep 15, 2023
2 parents 0079a7f + 188d8e3 commit c003cd7
Showing 1 changed file with 59 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@
import org.wso2.micro.integrator.core.util.MicroIntegratorBaseUtils;

import java.lang.management.ManagementPermission;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.Query;
Expand Down Expand Up @@ -73,13 +79,44 @@ public void startMaintenance() throws Exception {
secMan.checkPermission(new ManagementPermission("control"));
}
log.info("Starting to switch to maintenance mode...");
stopTransportListeners();
destroyTransportListeners();
waitForRequestCompletion();
}

/**
* Stop Transport Listeners asynchronously and wait for the completion of the tasks
*/
private void stopTransportListeners() {
ExecutorService transportListenerShutdownPool = Executors.newFixedThreadPool(inTransports.size());
List<Future<Void>> listenerShutdownFutures = new ArrayList<>();
for (TransportInDescription tinDesc : inTransports.values()) {
TransportListener transport = tinDesc.getReceiver();
transport.stop();
Future<Void> future = transportListenerShutdownPool.submit(new TransportListenerShutdownTask(transport));
listenerShutdownFutures.add(future);
}

// Wait until shutting down the transport listeners before proceeding
for (Future<Void> future : listenerShutdownFutures) {
try {
future.get();
} catch (Exception e) {
log.error("Error while completing transport listener shutdown", e);
}
}
transportListenerShutdownPool.shutdown();
log.info("Stopped all transport listeners");
}

waitForRequestCompletion();
/**
* Destroy Transport Listeners
*/
private void destroyTransportListeners() {
// Destroy the TransportListener at the end to clear up resources
for (TransportInDescription tinDesc : inTransports.values()) {
TransportListener transport = tinDesc.getReceiver();
transport.destroy();
}
}

/**
Expand Down Expand Up @@ -209,4 +246,24 @@ public void endMaintenance() throws Exception {
}
log.info("Switched to normal mode");
}

/**
* Callable task to pause and shutdown a transport listener
*/
private class TransportListenerShutdownTask implements Callable<Void> {
private TransportListener transport;

public TransportListenerShutdownTask(TransportListener transport) {
this.transport = transport;
}

public Void call() throws Exception {
try {
transport.stop();
} catch (Exception e) {
log.error("Error while stopping Transport Listener", e);
}
return null;
}
}
}

0 comments on commit c003cd7

Please sign in to comment.