Skip to content

Commit

Permalink
fix: reconnecting ws channel not work (#3612)
Browse files Browse the repository at this point in the history
  • Loading branch information
bytemain authored May 6, 2024
1 parent b5df89c commit 197b8ef
Show file tree
Hide file tree
Showing 16 changed files with 304 additions and 164 deletions.
101 changes: 53 additions & 48 deletions packages/components/src/notification/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import cls from 'classnames';
import React from 'react';

import { IAction } from '@opensumi/ide-core-common';
import { isUndefined } from '@opensumi/ide-utils';
import { CancelablePromise, createCancelablePromise, isUndefined } from '@opensumi/ide-utils';

import { Button } from '../button';
import { MessageType } from '../common';
Expand Down Expand Up @@ -33,55 +33,60 @@ export function open<T = string>(
description?: string | React.ReactNode,
duration?: number,
onClose?: () => void,
): Promise<T | undefined> | undefined {
return new Promise((resolve) => {
const args: ArgsProps = {
key,
className: cls('kt-notification-wrapper', {
['kt-notification-info']: type === MessageType.Info,
['kt-notification-error']: type === MessageType.Error,
['kt-notification-warn']: type === MessageType.Warning,
}),
duration: isUndefined(duration) ? DURATION[type] : duration,
onClose: () => {
onClose && onClose();
cachedArgs.delete(key);
resolve(undefined);
},
btn: buttons
? buttons.map((button, index) => {
const isStringButton = typeof button === 'string';
const buttonProps = {
className: `${cls('kt-notification-button')}${isStringButton ? '' : button.class}`,
ghost: isStringButton ? index === 0 : !button.primary,
key: isStringButton ? button : button.id,
onClick: () => {
resolve(button as any);
antdNotification.close(key);
if (!isStringButton) {
button.run();
}
},
};
const text = isStringButton ? button : button.label;
return (
<Button size='small' {...buttonProps}>
{text}
</Button>
);
})
: null,
message,
description,
};
cachedArgs.set(key, [type, args]);
): CancelablePromise<T | undefined> | undefined {
return createCancelablePromise<T | undefined>((token) => {
token.onCancellationRequested(() => {
close(key);
});
return new Promise((resolve) => {
const args: ArgsProps = {
key,
className: cls('kt-notification-wrapper', {
['kt-notification-info']: type === MessageType.Info,
['kt-notification-error']: type === MessageType.Error,
['kt-notification-warn']: type === MessageType.Warning,
}),
duration: isUndefined(duration) ? DURATION[type] : duration,
onClose: () => {
onClose && onClose();
cachedArgs.delete(key);
resolve(undefined);
},
btn: buttons
? buttons.map((button, index) => {
const isStringButton = typeof button === 'string';
const buttonProps = {
className: `${cls('kt-notification-button')}${isStringButton ? '' : button.class}`,
ghost: isStringButton ? index === 0 : !button.primary,
key: isStringButton ? button : button.id,
onClick: () => {
resolve(button as any);
antdNotification.close(key);
if (!isStringButton) {
button.run();
}
},
};
const text = isStringButton ? button : button.label;
return (
<Button size='small' {...buttonProps}>
{text}
</Button>
);
})
: null,
message,
description,
};
cachedArgs.set(key, [type, args]);

// closable 为 false 时,不展示 closeIcon
if (!closable) {
args.closeIcon = <span />;
}
// closable 为 false 时,不展示 closeIcon
if (!closable) {
args.closeIcon = <span />;
}

doOpenNotification(type, args);
doOpenNotification(type, args);
});
});
}

Expand Down
99 changes: 59 additions & 40 deletions packages/connection/src/browser/ws-channel-handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { IReporterService, REPORT_NAME } from '@opensumi/ide-core-common';
import { Barrier, Deferred, IReporterService, MultiMap, REPORT_NAME } from '@opensumi/ide-core-common';

import { NetSocketConnection } from '../common/connection';
import { ReconnectingWebSocketConnection } from '../common/connection/drivers/reconnecting-websocket';
Expand All @@ -10,12 +10,17 @@ import { WSChannel, parse, pingMessage } from '../common/ws-channel';
*/
export class WSChannelHandler {
private channelMap: Map<string, WSChannel> = new Map();
private channelCloseEventMap: Map<string, WSCloseInfo> = new Map();
private channelCloseEventMap = new MultiMap<string, WSCloseInfo>();
private logger = console;
public clientId: string;
private heartbeatMessageTimer: NodeJS.Timeout | null;
private reporterService: IReporterService;

/**
* 保证在连接建立后再执行后续操作
*/
private openingBarrier = new Barrier();

LOG_TAG: string;

constructor(public connection: ReconnectingWebSocketConnection | NetSocketConnection, logger: any, clientId: string) {
Expand Down Expand Up @@ -64,68 +69,82 @@ export class WSChannelHandler {
});

const reopenExistsChannel = () => {
if (this.channelMap.size) {
if (this.channelMap.size > 0) {
this.channelMap.forEach((channel) => {
channel.onOpen(() => {
const closeInfo = this.channelCloseEventMap.get(channel.id);
this.reporterService &&
this.reporterService.point(REPORT_NAME.CHANNEL_RECONNECT, REPORT_NAME.CHANNEL_RECONNECT, closeInfo);
this.logger.log(this.LOG_TAG, `channel reconnect ${this.clientId}:${channel.channelPath}`);
});

channel.open(channel.channelPath, this.clientId);
// 针对前端需要重新设置下后台状态的情况
channel.fireReopen();
});
}
};

await new Promise<void>((resolve) => {
if (this.connection.isOpen()) {
this.heartbeatMessage();
resolve();
this.connection.onClose((code, reason) => {
this.channelMap.forEach((channel) => {
channel.close(code ?? 1000, reason ?? '');
});
});

if (this.connection.isOpen()) {
this.heartbeatMessage();
this.openingBarrier.open();
}

this.connection.onOpen(() => {
this.heartbeatMessage();
// 说明是重连
if (this.openingBarrier.isOpen()) {
reopenExistsChannel();
} else {
this.connection.onOpen(() => {
this.heartbeatMessage();
resolve();
reopenExistsChannel();
});
this.openingBarrier.open();
}
});

this.connection.onceClose((code, reason) => {
if (this.channelMap.size) {
this.channelMap.forEach((channel) => {
channel.close(code ?? 1000, reason ?? '');
});
}
});
await this.openingBarrier.wait();
}

public async openChannel(channelPath: string) {
const channelId = `${this.clientId}:${channelPath}`;
const channel = new WSChannel(this.connection, {
id: channelId,
id: `${this.clientId}:${channelPath}`,
logger: this.logger,
});

this.channelMap.set(channel.id, channel);

await new Promise<void>((resolve) => {
channel.onOpen(() => {
resolve();
});
channel.onceClose((code: number, reason: string) => {
this.channelCloseEventMap.set(channelId, {
channelPath,
closeEvent: { code, reason },
connectInfo: (navigator as any).connection as ConnectionInfo,
channel.onOpen(() => {
const closeInfo = this.channelCloseEventMap.get(channel.id);
if (closeInfo) {
closeInfo.forEach((info) => {
this.reporterService &&
this.reporterService.point(REPORT_NAME.CHANNEL_RECONNECT, REPORT_NAME.CHANNEL_RECONNECT, info);
});
this.logger.log(this.LOG_TAG, 'channel close: ', `code: ${code}, reason: ${reason}`);

this.channelCloseEventMap.delete(channel.id);

this.logger.log(this.LOG_TAG, `channel reconnect ${this.clientId}:${channel.channelPath}`);
} else {
this.logger.log(this.LOG_TAG, `channel open ${this.clientId}:${channel.channelPath}`);
}
});

channel.onClose((code: number, reason: string) => {
this.channelCloseEventMap.set(channel.id, {
channelPath,
closeEvent: { code, reason },
connectInfo: (navigator as any).connection as ConnectionInfo,
});
channel.open(channelPath, this.clientId);
this.logger.log(this.LOG_TAG, 'channel close: ', `code: ${code}, reason: ${reason}`);
});

const deferred = new Deferred<void>();

const dispose = channel.onOpen(() => {
deferred.resolve();
dispose.dispose();
});

channel.open(channelPath, this.clientId);

await deferred.promise;

return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,21 @@ export class ReconnectingWebSocketConnection extends BaseConnection<Uint8Array>
};
}
onceClose(cb: (code?: number, reason?: string) => void): IDisposable {
const disposable = this.onClose(wrapper);

return {
dispose: () => {
disposable.dispose();
},
};
function wrapper(code: number, reason: string) {
cb(code, reason);
disposable.dispose();
}
}
onClose(cb: (code?: number, reason?: string) => void): IDisposable {
const handler = (e: CloseEvent) => {
cb(e.code, e.reason);
this.socket.removeEventListener('close', handler);
};

this.socket.addEventListener('close', handler);
Expand Down Expand Up @@ -84,8 +96,6 @@ export class ReconnectingWebSocketConnection extends BaseConnection<Uint8Array>
static forURL(url: UrlProvider, protocols?: string | string[], options?: ReconnectingWebSocketOptions) {
const rawConnection = new ReconnectingWebSocket(url, protocols, options);
rawConnection.binaryType = 'arraybuffer';
const connection = new ReconnectingWebSocketConnection(rawConnection);

return connection;
return new ReconnectingWebSocketConnection(rawConnection);
}
}
33 changes: 22 additions & 11 deletions packages/connection/src/common/connection/drivers/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,38 @@ export class StreamConnection extends BaseConnection<Uint8Array> {
}

onceClose(cb: (code?: number, reason?: string) => void): IDisposable {
const disposable = this.onClose(wrapper);
return {
dispose: () => {
disposable.dispose();
},
};

function wrapper(code: number, reason: string) {
cb(code, reason);
disposable.dispose();
}
}

onClose(cb: (code?: number, reason?: string) => void): IDisposable {
const wrapper = (hadError: boolean) => {
const code: number = hadError ? 1 : 0;
const reason: string = hadError ? 'had error' : '';
cb(code, reason);
dispose();
};

const dispose = () => {
this.readable.off('close', wrapper);
if ((this.writable as any) !== (this.readable as any)) {
this.writable.off('close', wrapper);
}
};

this.readable.once('close', wrapper);
this.readable.on('close', wrapper);
if ((this.writable as any) !== (this.readable as any)) {
this.writable.once('close', wrapper);
this.writable.on('close', wrapper);
}

return {
dispose,
dispose: () => {
this.readable.off('close', wrapper);
if ((this.writable as any) !== (this.readable as any)) {
this.writable.off('close', wrapper);
}
},
};
}

Expand Down
12 changes: 5 additions & 7 deletions packages/connection/src/common/rpc-service/center.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Deferred } from '@opensumi/ide-core-common';
import { addElement } from '@opensumi/ide-utils/lib/arrays';

import { METHOD_NOT_REGISTERED } from '../constants';
import { TSumiProtocol } from '../rpc';
Expand Down Expand Up @@ -54,15 +55,14 @@ export class RPCServiceCenter {

this.protocolRegistry.applyTo(connection.io);

const index = this.proxies.length - 1;
const proxy = new ProxySumi(this.serviceRegistry, this.logger);
proxy.listen(connection);

this.proxies.push(proxy);
const remove = addElement(this.proxies, proxy);

return {
dispose: () => {
this.proxies.splice(index, 1);
remove.dispose();
proxy.dispose();
},
};
Expand All @@ -73,16 +73,14 @@ export class RPCServiceCenter {
this.deferred.resolve();
}

const index = this.proxies.length - 1;

const proxy = new ProxyJson(this.serviceRegistry, this.logger);
proxy.listen(connection);

this.proxies.push(proxy);
const remove = addElement(this.proxies, proxy);

return {
dispose: () => {
this.proxies.splice(index, 1);
remove.dispose();
proxy.dispose();
},
};
Expand Down
Loading

1 comment on commit 197b8ef

@opensumi
Copy link
Contributor

@opensumi opensumi bot commented on 197b8ef May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Release Candidate Summary:

Released 🚀 2.27.3-rc-1714982362.0

2.27.3-rc-1714982362.0

user input ref: main

197b8ef fix: reconnecting ws channel not work (#3612)

Please sign in to comment.