Skip to content

Commit

Permalink
fix(net): clear connect when disconnected
Browse files Browse the repository at this point in the history
  • Loading branch information
blakebyrnes committed Nov 5, 2024
1 parent e09385a commit 5a8a5fe
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 25 deletions.
45 changes: 33 additions & 12 deletions net/lib/ConnectionToCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export default class ConnectionToCore<
TCoreApiHandlers extends IApiHandlers,
TEventSpec,
> extends TypedEventEmitter<IConnectionToCoreEvents<TEventSpec>> {
public static readonly MinimumAutoReconnectMillis = 1000;
public connectAction: IConnectAction;
public disconnectAction: IConnectAction;

Expand All @@ -53,10 +54,12 @@ export default class ConnectionToCore<

protected events = new EventSubscriber();

private isConnectionTerminated = false;
private didCallConnectionTerminated = false;
private lastDisconnectDate?: Date;

constructor(public transport: ITransport) {
super();

bindFunctions(this);

this.events.on(transport, 'disconnected', this.onConnectionTerminated);
Expand Down Expand Up @@ -87,6 +90,7 @@ export default class ConnectionToCore<

try {
await this.transport.connect?.(timeoutMs);
this.didCallConnectionTerminated = false;
await this.afterConnectHook();
connectAction.resolvable.resolve();
this.emit('connected');
Expand All @@ -98,8 +102,8 @@ export default class ConnectionToCore<
}

public async disconnect(fatalError?: Error): Promise<void> {
if (this.disconnectAction) return this.disconnectAction.resolvable.promise;
this.autoReconnect = false;
if (this.disconnectAction) return this.disconnectAction.resolvable.promise;

const disconnectAction: IConnectAction = {
isAutomatic: false,
Expand All @@ -120,6 +124,7 @@ export default class ConnectionToCore<

this.transport.disconnect?.();
await this.onConnectionTerminated();
this.connectAction = null;
log.stats('ConnectionToCore.Disconnected', {
parentLogId: logid,
host: this.transport.host,
Expand All @@ -140,16 +145,20 @@ export default class ConnectionToCore<
},
timeoutMs?: number,
): Promise<ICoreResponsePayload<TCoreApiHandlers, T>['data']> {
const connect = this.connectAction;
const disconnect = this.disconnectAction;
const activeConnectHook = this.connectAction?.isCallingHook && this.connectAction;
const activeDisconnectHook = this.disconnectAction?.isCallingHook && this.disconnectAction;

if (!disconnect && !connect && this.autoReconnect) {
// if we are not connected, try to connect (except during a disconnect)
if (this.shouldAutoConnect()) {
await this.connect({ timeoutMs, isAutoConnect: true });
}

const { promise, id } = this.pendingMessages.create(timeoutMs, !!connect || !!disconnect);
if (connect) connect.hookMessageId = id;
if (disconnect) disconnect.hookMessageId = id;
const { promise, id } = this.pendingMessages.create(
timeoutMs,
!!activeConnectHook || !!activeDisconnectHook,
);
if (activeConnectHook) activeConnectHook.hookMessageId = id;
if (activeDisconnectHook) activeDisconnectHook.hookMessageId = id;

try {
const [result] = await Promise.all([
Expand All @@ -168,8 +177,8 @@ export default class ConnectionToCore<
}
throw error;
} finally {
if (connect) connect.hookMessageId = null;
if (disconnect) disconnect.hookMessageId = null;
if (activeConnectHook) activeConnectHook.hookMessageId = null;
if (activeDisconnectHook) activeDisconnectHook.hookMessageId = null;
}
}

Expand All @@ -180,6 +189,17 @@ export default class ConnectionToCore<
return false;
}

public shouldAutoConnect(): boolean {
if (!this.autoReconnect || !!this.connectAction) return false;
// if we're mid-disconnect, don't auto-reconnect
if (this.disconnectAction?.hookMessageId) return false;
if (!this.lastDisconnectDate) return true;
const reconnectMillis = (this.constructor as typeof ConnectionToCore)
.MinimumAutoReconnectMillis;
if (Number.isNaN(reconnectMillis)) return false;
return Date.now() - this.lastDisconnectDate.getTime() >= reconnectMillis;
}

protected onMessage(
payload: ICoreResponsePayload<TCoreApiHandlers, any> | ICoreEventPayload<TEventSpec, any>,
): void {
Expand Down Expand Up @@ -214,8 +234,9 @@ export default class ConnectionToCore<
}

protected async onConnectionTerminated(): Promise<void> {
if (this.isConnectionTerminated) return;
this.isConnectionTerminated = true;
if (this.didCallConnectionTerminated) return;
this.lastDisconnectDate = new Date();
this.didCallConnectionTerminated = true;
this.emit('disconnected');

// clear all pending messages
Expand Down
55 changes: 42 additions & 13 deletions net/test/ConnectionToCore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import WsTransportToCore from '../lib/WsTransportToCore';

const apiSpy = jest.fn();
const apiSpec = {
api(test: boolean) {
api(test: any) {
apiSpy();
return { test };
},
Expand Down Expand Up @@ -78,6 +78,7 @@ test('should cancel connect messages if a connection closes before connecting',
test('should be able to reconnect after a disconnect', async () => {
const server = new Server();
const wss = new WebsocketServer({ server });
apiSpy.mockClear();
try {
server.listen(0);
wss.on('connection', (ws, req) => {
Expand All @@ -88,23 +89,51 @@ test('should be able to reconnect after a disconnect', async () => {

const wsTransportToCore = new WsTransportToCore(`ws://localhost:${host.port}`);

const disconnectSpy = jest.spyOn(ConnectionToCore.prototype, 'disconnect');
const terminateSpy = jest.spyOn<any, any>(ConnectionToCore.prototype, 'onConnectionTerminated');
const connectionToCore = new ConnectionToCore(wsTransportToCore);
needsClosing.push(() => connectionToCore.disconnect());
await expect(connectionToCore.sendRequest({ command: 'api', args: [true] })).resolves.toEqual({
test: true,
await expect(connectionToCore.sendRequest({ command: 'api', args: [1] })).resolves.toEqual({
test: 1,
});
expect(apiSpy).toHaveBeenCalledTimes(1);
expect(wss.clients.size).toBe(1);
const didDisconnect = new Promise(resolve => wsTransportToCore.once('disconnected', resolve));
for (const client of wss.clients) {
client.close();
const disconnects = jest.fn();
connectionToCore.on('disconnected', disconnects);
for (let i = 0; i < 10; i++) {
disconnectSpy.mockClear();
terminateSpy.mockClear();
apiSpy.mockClear();
const didDisconnect = new Promise(resolve => wsTransportToCore.once('disconnected', resolve));
expect(wss.clients.size).toBe(1);
for (const client of wss.clients) {
client.close();
}
await didDisconnect;

expect(disconnects).toHaveBeenCalledTimes(i + 1);
expect(disconnectSpy).not.toHaveBeenCalled();
expect(terminateSpy).toHaveBeenCalledTimes(1);
expect(connectionToCore.transport.isConnected).toBe(false);
expect(connectionToCore.connectAction).toBeFalsy();
// @ts-expect-error
expect(connectionToCore.lastDisconnectDate).toBeTruthy();
expect(connectionToCore.shouldAutoConnect()).toBe(false);

// @ts-expect-error
connectionToCore.lastDisconnectDate = new Date(
// @ts-expect-error
connectionToCore.lastDisconnectDate.getTime() - 1000,
);
expect(connectionToCore.shouldAutoConnect()).toBe(true);

await expect(
connectionToCore.sendRequest({ command: 'api', args: [i + 2] }),
).resolves.toEqual({
test: i + 2,
});
expect(apiSpy).toHaveBeenCalledTimes(1);
expect(connectionToCore.connectAction).toBeTruthy();
}
await didDisconnect;
expect(connectionToCore.transport.isConnected).toBe(false);

await expect(connectionToCore.sendRequest({ command: 'api', args: [true] })).resolves.toEqual({
test: true,
});
} finally {
wss.close();
server.unref().close();
Expand Down

0 comments on commit 5a8a5fe

Please sign in to comment.