Skip to content

Commit

Permalink
Add gradual disconnection for socket connections (microsoft#19938)
Browse files Browse the repository at this point in the history
# Description

We add the concept of gradually disconnecting connections in the Nexus
socket server. This enables traffic to have less of a spike from clients
reconnecting.

Three new config options are added to socketIOConfig (reused this one as
it already configures socketIOServer.ts)

gradualDisconnectEnabled 
gradualDisconnectDrainTime 
gradualDisconnectDrainInterval
  • Loading branch information
alteut authored Mar 20, 2024
1 parent 580478d commit eeb6e2c
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 3 deletions.
15 changes: 15 additions & 0 deletions server/routerlicious/packages/lambdas/src/nexus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,18 @@
Nexus verifies and maintains websocket sessions to clients. It provides an orderer for writer clients and keeps a list of readonly clients connected to the session.

Clients connect to it after retrieving the discovery result from Alfred. The deltaStreamUrl returned there points them to Nexus.

## Graceful shutdown

To avoid a spike from reconnecting clients when downing Nexus, you can opt to disconnect them in batches using the following configuration under nexus.socketIo configuration:

```
"nexus": {
"socketIo": {
"gracefulShutdownEnabled": true,
"gracefulShutdownDrainTimeMs": 30000,
"gracefulShutdownDrainIntervalMs": 1000
[...]
```

From this example configuration, whenever Nexus receives a SIGTERM (signal 15) it will disconnect its socket connections over 30seconds, disconnecting a batch every 1s. Check also the configuration for total shutdown timeout for Nexus to make sure it is more than the drain time.
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@
"shouldDisableDefaultNamespace": false
},
"socketIo": {
"perMessageDeflate": false
"perMessageDeflate": false,
"gracefulShutdownEnabled": false
},
"jwtTokenCache": {
"enable": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ import { EventEmitter } from "events";
import * as http from "http";
import * as util from "util";
import * as core from "@fluidframework/server-services-core";
import { BaseTelemetryProperties, Lumberjack } from "@fluidframework/server-services-telemetry";
import { Namespace, Server, Socket } from "socket.io";
import {
BaseTelemetryProperties,
Lumberjack,
LumberEventName,
} from "@fluidframework/server-services-telemetry";
import { Namespace, Server, Socket, RemoteSocket } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import type { Adapter } from "socket.io-adapter";
import { IRedisClientConnectionManager } from "@fluidframework/server-services-utils";
Expand Down Expand Up @@ -71,6 +75,7 @@ class SocketIoServer implements core.IWebSocketServer {
private readonly io: Server,
private readonly redisClientConnectionManagerForPub: IRedisClientConnectionManager,
private readonly redisClientConnectionManagerForSub: IRedisClientConnectionManager,
private readonly socketIoConfig?: any,
) {
this.io.on("connection", (socket: Socket) => {
const webSocket = new SocketIoSocket(socket);
Expand Down Expand Up @@ -121,6 +126,72 @@ class SocketIoServer implements core.IWebSocketServer {
}

public async close(): Promise<void> {
const sleep = async (timeMs: number) =>
new Promise((resolve) => setTimeout(resolve, timeMs));

if (this.socketIoConfig?.gracefulShutdownEnabled) {
// Gradual disconnection of websocket connections
const drainTime = this.socketIoConfig?.gracefulShutdownDrainTimeMs ?? 30000;
const drainInterval = this.socketIoConfig?.gracefulShutdownDrainIntervalMs ?? 1000;
if (drainTime > 0 && drainInterval > 0) {
// we are assuming no new connections appear once we start. any leftover connections will be closed when close is called
const connections = await this.io.fetchSockets();
const connectionCount = connections.length;
const telemetryProperties = {
drainTime,
drainInterval,
connectionCount,
};
Lumberjack.info("Graceful disconnection started", telemetryProperties);
const metricForTimeTaken = Lumberjack.newLumberMetric(
LumberEventName.GracefulShutdown,
telemetryProperties,
);
// total number of drains to run
const totalDrains = Math.ceil(drainTime / drainInterval);
// number of connections to disconnect per drain
const connectionsToDisconnectPerDrain = Math.ceil(connectionCount / totalDrains);
let done = false;
const drainConnections = Array.from(connections.values());
let n = 0;
if (connectionsToDisconnectPerDrain > 0) {
// start draining let done = false;
for (let i = 0; i < totalDrains; i++) {
for (let j = 0; j < connectionsToDisconnectPerDrain; j++) {
const connection: RemoteSocket<any, any> = drainConnections[n];
if (!connection) {
done = true;
break;
}
try {
connection.disconnect(true);
} catch (e) {
Lumberjack.error("Graceful disconnect exception", undefined, e);
}
n++;
}
if (done) {
break;
}
Lumberjack.info("Graceful disconnect batch processed", {
disconnectedSoFar: n + 1,
connectionCount,
});
await sleep(drainInterval);
}
}
if (n + 1 < connectionCount) {
metricForTimeTaken.error(
`Graceful shutdown finished incompletely. Missed ${
connectionCount - n - 1
} connections.`,
);
} else {
metricForTimeTaken.success("Graceful shutdown finished");
}
}
}

// eslint-disable-next-line @typescript-eslint/promise-function-async
const pubClosedP = util.promisify(((callback) =>
this.redisClientConnectionManagerForPub.getRedisClient().quit(callback)) as any)();
Expand Down Expand Up @@ -194,5 +265,6 @@ export function create(
io,
redisClientConnectionManagerForPub,
redisClientConnectionManagerForSub,
socketIoConfig,
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export enum LumberEventName {

// Fluid server infrastructure
RunService = "RunService",
GracefulShutdown = "GracefulShutdown",

// Unit Testing
UnitTestEvent = "UnitTestEvent",
Expand Down

0 comments on commit eeb6e2c

Please sign in to comment.