From 197b8ef9917d764baced87ea93e800b1ce43ac82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=87=8E=E5=A3=B0?= Date: Mon, 6 May 2024 15:29:10 +0800 Subject: [PATCH] fix: reconnecting ws channel not work (#3612) --- .../components/src/notification/index.tsx | 101 +++++++++--------- .../src/browser/ws-channel-handler.ts | 99 ++++++++++------- .../drivers/reconnecting-websocket.ts | 18 +++- .../src/common/connection/drivers/stream.ts | 33 ++++-- .../src/common/rpc-service/center.ts | 12 +-- .../src/node/common-channel-handler.ts | 10 +- packages/connection/src/node/ws.ts | 6 +- .../core-browser/src/bootstrap/connection.ts | 21 ++-- .../src/browser/extension.service.ts | 92 ++++++++++++---- packages/extension/src/common/extension.ts | 4 +- .../extension/src/node/extension.service.ts | 11 +- .../overlay/src/browser/message.service.tsx | 4 +- packages/overlay/src/common/index.ts | 10 +- packages/utils/src/async.ts | 4 + packages/utils/src/cancellation.ts | 9 ++ packages/utils/src/map.ts | 34 ++++++ 16 files changed, 304 insertions(+), 164 deletions(-) diff --git a/packages/components/src/notification/index.tsx b/packages/components/src/notification/index.tsx index 1d790fcb47..e1c4abbfd7 100644 --- a/packages/components/src/notification/index.tsx +++ b/packages/components/src/notification/index.tsx @@ -2,7 +2,7 @@ import cls from 'classnames'; import React from 'react'; import { IAction } from '@opensumi/ide-core-common'; -import { isUndefined } from '@opensumi/ide-utils'; +import { CancelablePromise, createCancelablePromise, isUndefined } from '@opensumi/ide-utils'; import { Button } from '../button'; import { MessageType } from '../common'; @@ -33,55 +33,60 @@ export function open( description?: string | React.ReactNode, duration?: number, onClose?: () => void, -): Promise | undefined { - return new Promise((resolve) => { - const args: ArgsProps = { - key, - className: cls('kt-notification-wrapper', { - ['kt-notification-info']: type === MessageType.Info, - ['kt-notification-error']: type === MessageType.Error, - ['kt-notification-warn']: type === MessageType.Warning, - }), - duration: isUndefined(duration) ? DURATION[type] : duration, - onClose: () => { - onClose && onClose(); - cachedArgs.delete(key); - resolve(undefined); - }, - btn: buttons - ? buttons.map((button, index) => { - const isStringButton = typeof button === 'string'; - const buttonProps = { - className: `${cls('kt-notification-button')}${isStringButton ? '' : button.class}`, - ghost: isStringButton ? index === 0 : !button.primary, - key: isStringButton ? button : button.id, - onClick: () => { - resolve(button as any); - antdNotification.close(key); - if (!isStringButton) { - button.run(); - } - }, - }; - const text = isStringButton ? button : button.label; - return ( - - ); - }) - : null, - message, - description, - }; - cachedArgs.set(key, [type, args]); +): CancelablePromise | undefined { + return createCancelablePromise((token) => { + token.onCancellationRequested(() => { + close(key); + }); + return new Promise((resolve) => { + const args: ArgsProps = { + key, + className: cls('kt-notification-wrapper', { + ['kt-notification-info']: type === MessageType.Info, + ['kt-notification-error']: type === MessageType.Error, + ['kt-notification-warn']: type === MessageType.Warning, + }), + duration: isUndefined(duration) ? DURATION[type] : duration, + onClose: () => { + onClose && onClose(); + cachedArgs.delete(key); + resolve(undefined); + }, + btn: buttons + ? buttons.map((button, index) => { + const isStringButton = typeof button === 'string'; + const buttonProps = { + className: `${cls('kt-notification-button')}${isStringButton ? '' : button.class}`, + ghost: isStringButton ? index === 0 : !button.primary, + key: isStringButton ? button : button.id, + onClick: () => { + resolve(button as any); + antdNotification.close(key); + if (!isStringButton) { + button.run(); + } + }, + }; + const text = isStringButton ? button : button.label; + return ( + + ); + }) + : null, + message, + description, + }; + cachedArgs.set(key, [type, args]); - // closable 为 false 时,不展示 closeIcon - if (!closable) { - args.closeIcon = ; - } + // closable 为 false 时,不展示 closeIcon + if (!closable) { + args.closeIcon = ; + } - doOpenNotification(type, args); + doOpenNotification(type, args); + }); }); } diff --git a/packages/connection/src/browser/ws-channel-handler.ts b/packages/connection/src/browser/ws-channel-handler.ts index c4ccd4b56e..fe3a630984 100644 --- a/packages/connection/src/browser/ws-channel-handler.ts +++ b/packages/connection/src/browser/ws-channel-handler.ts @@ -1,4 +1,4 @@ -import { IReporterService, REPORT_NAME } from '@opensumi/ide-core-common'; +import { Barrier, Deferred, IReporterService, MultiMap, REPORT_NAME } from '@opensumi/ide-core-common'; import { NetSocketConnection } from '../common/connection'; import { ReconnectingWebSocketConnection } from '../common/connection/drivers/reconnecting-websocket'; @@ -10,12 +10,17 @@ import { WSChannel, parse, pingMessage } from '../common/ws-channel'; */ export class WSChannelHandler { private channelMap: Map = new Map(); - private channelCloseEventMap: Map = new Map(); + private channelCloseEventMap = new MultiMap(); private logger = console; public clientId: string; private heartbeatMessageTimer: NodeJS.Timeout | null; private reporterService: IReporterService; + /** + * 保证在连接建立后再执行后续操作 + */ + private openingBarrier = new Barrier(); + LOG_TAG: string; constructor(public connection: ReconnectingWebSocketConnection | NetSocketConnection, logger: any, clientId: string) { @@ -64,15 +69,8 @@ export class WSChannelHandler { }); const reopenExistsChannel = () => { - if (this.channelMap.size) { + if (this.channelMap.size > 0) { this.channelMap.forEach((channel) => { - channel.onOpen(() => { - const closeInfo = this.channelCloseEventMap.get(channel.id); - this.reporterService && - this.reporterService.point(REPORT_NAME.CHANNEL_RECONNECT, REPORT_NAME.CHANNEL_RECONNECT, closeInfo); - this.logger.log(this.LOG_TAG, `channel reconnect ${this.clientId}:${channel.channelPath}`); - }); - channel.open(channel.channelPath, this.clientId); // 针对前端需要重新设置下后台状态的情况 channel.fireReopen(); @@ -80,52 +78,73 @@ export class WSChannelHandler { } }; - await new Promise((resolve) => { - if (this.connection.isOpen()) { - this.heartbeatMessage(); - resolve(); + this.connection.onClose((code, reason) => { + this.channelMap.forEach((channel) => { + channel.close(code ?? 1000, reason ?? ''); + }); + }); + + if (this.connection.isOpen()) { + this.heartbeatMessage(); + this.openingBarrier.open(); + } + + this.connection.onOpen(() => { + this.heartbeatMessage(); + // 说明是重连 + if (this.openingBarrier.isOpen()) { reopenExistsChannel(); } else { - this.connection.onOpen(() => { - this.heartbeatMessage(); - resolve(); - reopenExistsChannel(); - }); + this.openingBarrier.open(); } }); - this.connection.onceClose((code, reason) => { - if (this.channelMap.size) { - this.channelMap.forEach((channel) => { - channel.close(code ?? 1000, reason ?? ''); - }); - } - }); + await this.openingBarrier.wait(); } + public async openChannel(channelPath: string) { - const channelId = `${this.clientId}:${channelPath}`; const channel = new WSChannel(this.connection, { - id: channelId, + id: `${this.clientId}:${channelPath}`, logger: this.logger, }); - this.channelMap.set(channel.id, channel); - await new Promise((resolve) => { - channel.onOpen(() => { - resolve(); - }); - channel.onceClose((code: number, reason: string) => { - this.channelCloseEventMap.set(channelId, { - channelPath, - closeEvent: { code, reason }, - connectInfo: (navigator as any).connection as ConnectionInfo, + channel.onOpen(() => { + const closeInfo = this.channelCloseEventMap.get(channel.id); + if (closeInfo) { + closeInfo.forEach((info) => { + this.reporterService && + this.reporterService.point(REPORT_NAME.CHANNEL_RECONNECT, REPORT_NAME.CHANNEL_RECONNECT, info); }); - this.logger.log(this.LOG_TAG, 'channel close: ', `code: ${code}, reason: ${reason}`); + + this.channelCloseEventMap.delete(channel.id); + + this.logger.log(this.LOG_TAG, `channel reconnect ${this.clientId}:${channel.channelPath}`); + } else { + this.logger.log(this.LOG_TAG, `channel open ${this.clientId}:${channel.channelPath}`); + } + }); + + channel.onClose((code: number, reason: string) => { + this.channelCloseEventMap.set(channel.id, { + channelPath, + closeEvent: { code, reason }, + connectInfo: (navigator as any).connection as ConnectionInfo, }); - channel.open(channelPath, this.clientId); + this.logger.log(this.LOG_TAG, 'channel close: ', `code: ${code}, reason: ${reason}`); }); + const deferred = new Deferred(); + + const dispose = channel.onOpen(() => { + deferred.resolve(); + dispose.dispose(); + }); + + channel.open(channelPath, this.clientId); + + await deferred.promise; + return channel; } diff --git a/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts b/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts index 98ed930002..d29c426a97 100644 --- a/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts +++ b/packages/connection/src/common/connection/drivers/reconnecting-websocket.ts @@ -52,9 +52,21 @@ export class ReconnectingWebSocketConnection extends BaseConnection }; } onceClose(cb: (code?: number, reason?: string) => void): IDisposable { + const disposable = this.onClose(wrapper); + + return { + dispose: () => { + disposable.dispose(); + }, + }; + function wrapper(code: number, reason: string) { + cb(code, reason); + disposable.dispose(); + } + } + onClose(cb: (code?: number, reason?: string) => void): IDisposable { const handler = (e: CloseEvent) => { cb(e.code, e.reason); - this.socket.removeEventListener('close', handler); }; this.socket.addEventListener('close', handler); @@ -84,8 +96,6 @@ export class ReconnectingWebSocketConnection extends BaseConnection static forURL(url: UrlProvider, protocols?: string | string[], options?: ReconnectingWebSocketOptions) { const rawConnection = new ReconnectingWebSocket(url, protocols, options); rawConnection.binaryType = 'arraybuffer'; - const connection = new ReconnectingWebSocketConnection(rawConnection); - - return connection; + return new ReconnectingWebSocketConnection(rawConnection); } } diff --git a/packages/connection/src/common/connection/drivers/stream.ts b/packages/connection/src/common/connection/drivers/stream.ts index 631fe28a04..5639ec3d02 100644 --- a/packages/connection/src/common/connection/drivers/stream.ts +++ b/packages/connection/src/common/connection/drivers/stream.ts @@ -32,27 +32,38 @@ export class StreamConnection extends BaseConnection { } onceClose(cb: (code?: number, reason?: string) => void): IDisposable { + const disposable = this.onClose(wrapper); + return { + dispose: () => { + disposable.dispose(); + }, + }; + + function wrapper(code: number, reason: string) { + cb(code, reason); + disposable.dispose(); + } + } + + onClose(cb: (code?: number, reason?: string) => void): IDisposable { const wrapper = (hadError: boolean) => { const code: number = hadError ? 1 : 0; const reason: string = hadError ? 'had error' : ''; cb(code, reason); - dispose(); - }; - - const dispose = () => { - this.readable.off('close', wrapper); - if ((this.writable as any) !== (this.readable as any)) { - this.writable.off('close', wrapper); - } }; - this.readable.once('close', wrapper); + this.readable.on('close', wrapper); if ((this.writable as any) !== (this.readable as any)) { - this.writable.once('close', wrapper); + this.writable.on('close', wrapper); } return { - dispose, + dispose: () => { + this.readable.off('close', wrapper); + if ((this.writable as any) !== (this.readable as any)) { + this.writable.off('close', wrapper); + } + }, }; } diff --git a/packages/connection/src/common/rpc-service/center.ts b/packages/connection/src/common/rpc-service/center.ts index 908642f8a9..67a66ce95d 100644 --- a/packages/connection/src/common/rpc-service/center.ts +++ b/packages/connection/src/common/rpc-service/center.ts @@ -1,4 +1,5 @@ import { Deferred } from '@opensumi/ide-core-common'; +import { addElement } from '@opensumi/ide-utils/lib/arrays'; import { METHOD_NOT_REGISTERED } from '../constants'; import { TSumiProtocol } from '../rpc'; @@ -54,15 +55,14 @@ export class RPCServiceCenter { this.protocolRegistry.applyTo(connection.io); - const index = this.proxies.length - 1; const proxy = new ProxySumi(this.serviceRegistry, this.logger); proxy.listen(connection); - this.proxies.push(proxy); + const remove = addElement(this.proxies, proxy); return { dispose: () => { - this.proxies.splice(index, 1); + remove.dispose(); proxy.dispose(); }, }; @@ -73,16 +73,14 @@ export class RPCServiceCenter { this.deferred.resolve(); } - const index = this.proxies.length - 1; - const proxy = new ProxyJson(this.serviceRegistry, this.logger); proxy.listen(connection); - this.proxies.push(proxy); + const remove = addElement(this.proxies, proxy); return { dispose: () => { - this.proxies.splice(index, 1); + remove.dispose(); proxy.dispose(); }, }; diff --git a/packages/connection/src/node/common-channel-handler.ts b/packages/connection/src/node/common-channel-handler.ts index c1e6ffb8c0..ca5d6df49c 100644 --- a/packages/connection/src/node/common-channel-handler.ts +++ b/packages/connection/src/node/common-channel-handler.ts @@ -9,6 +9,12 @@ import { CommonChannelHandlerOptions, WebSocketHandler } from './ws'; export { commonChannelPathHandler }; +export interface WebSocketConnection extends WebSocket { + routeParam: { + pathname: string; + }; +} + /** * Channel Handler for nodejs */ @@ -27,7 +33,7 @@ export class CommonChannelHandler extends BaseCommonChannelHandler implements We } private initWSServer() { - this.logger.log('init Common Channel Handler'); + this.logger.log('init common channel handler'); this.wsServer = new WebSocket.Server({ noServer: true, ...this.options.wsServerOptions, @@ -43,7 +49,7 @@ export class CommonChannelHandler extends BaseCommonChannelHandler implements We if (routeResult) { this.wsServer.handleUpgrade(request, socket, head, (connection) => { - (connection as any).routeParam = { + (connection as WebSocketConnection).routeParam = { pathname, }; diff --git a/packages/connection/src/node/ws.ts b/packages/connection/src/node/ws.ts index de937ced4c..45311aac57 100644 --- a/packages/connection/src/node/ws.ts +++ b/packages/connection/src/node/ws.ts @@ -1,6 +1,5 @@ import assert from 'assert'; import http from 'http'; -import url from 'url'; import ws from 'ws'; @@ -90,7 +89,10 @@ export class WebSocketServerRoute { server.on('upgrade', (request, socket, head) => { assert(request.url, 'cannot parse url from http request'); - const wsPathname: string = url.parse(request.url).pathname as string; + + // request.url: `/path?query=a#hash` + const url = new URL(request.url, 'wss://base'); + const wsPathname: string = url.pathname; let wsHandlerIndex = 0; const wsHandlerLength = wsServerHandlerArr.length; diff --git a/packages/core-browser/src/bootstrap/connection.ts b/packages/core-browser/src/bootstrap/connection.ts index 5ddd460d5b..7bb3ae1ce5 100644 --- a/packages/core-browser/src/bootstrap/connection.ts +++ b/packages/core-browser/src/bootstrap/connection.ts @@ -45,13 +45,13 @@ export async function createConnectionService( if (channelHandler.connection.isOpen()) { onOpen(); - } else { - const dispose = channelHandler.connection.onOpen(() => { - onOpen(); - dispose.dispose(); - }); } + // reconnecting will still emit the open event + channelHandler.connection.onOpen(() => { + onOpen(); + }); + channelHandler.connection.onceClose(() => { stateService.reachedState('core_module_initialized').then(() => { eventBus.fire(new BrowserConnectionCloseEvent()); @@ -71,7 +71,6 @@ export async function createConnectionService( useValue: channelHandler, }); - // reconnecting will not execute the following logic const channel = await channelHandler.openChannel(RPCServiceChannelPath); channel.onReopen(() => onReconnect()); @@ -80,15 +79,9 @@ export async function createConnectionService( export function bindConnectionService(injector: Injector, modules: ModuleConstructor[], channel: WSChannel) { const clientCenter = new RPCServiceCenter(); - - const dispose = clientCenter.setSumiConnection(channel.createSumiConnection()); - - const toDispose = channel.onClose(() => { - dispose.dispose(); - toDispose.dispose(); - }); - + const disposable = clientCenter.setSumiConnection(channel.createSumiConnection()); initConnectionService(injector, modules, clientCenter); + return disposable; } /** diff --git a/packages/extension/src/browser/extension.service.ts b/packages/extension/src/browser/extension.service.ts index f9d1c0eba2..beb852d015 100644 --- a/packages/extension/src/browser/extension.service.ts +++ b/packages/extension/src/browser/extension.service.ts @@ -12,13 +12,17 @@ import { } from '@opensumi/ide-core-browser'; import { IProgressService } from '@opensumi/ide-core-browser/lib/progress'; import { + CancelablePromise, + CancellationToken, ExtensionActivatedEvent, ExtensionDidContributes, GeneralSettingsId, + MayCancelablePromise, OnEvent, ProgressLocation, URI, WithEventBus, + createCancelablePromise, getLanguageId, localize, sleep, @@ -145,6 +149,7 @@ export class ExtensionServiceImpl extends WithEventBus implements ExtensionServi // 插件进程是否正在等待重启,页面不可见的时候被设置 private isExtProcessWaitingForRestart: ERestartPolicy | undefined; + private pCrashMessageModel: MayCancelablePromise | undefined; // 针对 activationEvents 为 * 的插件 public eagerExtensionsActivated: Deferred = new Deferred(); @@ -318,41 +323,71 @@ export class ExtensionServiceImpl extends WithEventBus implements ExtensionServi if (document.visibilityState === 'visible') { this.extProcessRestartHandler(restartPolicy); } else { + this.logger.log('[ext-restart]: page is not visible, waiting for restart'); this.isExtProcessWaitingForRestart = restartPolicy; } } + private extProcessRestartPromise: CancelablePromise | undefined; + private async extProcessRestartHandler(restartPolicy: ERestartPolicy = ERestartPolicy.Always) { - if (this.isExtProcessRestarting) { + if (this.isExtProcessRestarting && restartPolicy !== ERestartPolicy.Always) { + this.logger.log('[ext-restart]: ext process is restarting, skip'); return; } + const doRestart = async (token: CancellationToken) => { + const policy = this.isExtProcessWaitingForRestart || restartPolicy; + + if (this.pCrashMessageModel) { + // crash message model is still open, close it + this.pCrashMessageModel.cancel?.(); + this.pCrashMessageModel = undefined; + } + + token.onCancellationRequested(() => { + this.logger.log('[ext-restart]: ext process restart canceled'); + this.isExtProcessRestarting = false; + this.isExtProcessWaitingForRestart = undefined; + }); + + switch (policy) { + // @ts-expect-error Need fall-through + case ERestartPolicy.WhenExit: + // if we can get the pid, then the process is still running, no need to restart. + // if pid is null, it means the process is exited, then we need to start it. + if (await this.getExtProcessPID(token)) { + break; + } + case ERestartPolicy.Always: + if (token.isCancellationRequested) { + break; + } + try { + await this.startExtProcess(false); + } catch (err) { + this.logger.error(`[ext-restart]: ext-host restart failure, error: ${err}`); + } + break; + } + + this.isExtProcessRestarting = false; + this.isExtProcessWaitingForRestart = undefined; + }; + const restartProgress = () => { + this.logger.log('[ext-restart]: restart ext process, restart policy', restartPolicy); this.progressService.withProgress( { location: ProgressLocation.Notification, title: localize('extension.exthostRestarting.content'), }, async () => { - const policy = this.isExtProcessWaitingForRestart || restartPolicy; - - switch (policy) { - // @ts-expect-error Need fall-through - case ERestartPolicy.WhenExit: - if (await this.ping()) { - break; - } - case ERestartPolicy.Always: - try { - await this.startExtProcess(false); - } catch (err) { - this.logger.error(`[ext-restart]: ext-host restart failure, error: ${err}`); - } - break; + if (this.extProcessRestartPromise) { + this.extProcessRestartPromise.cancel(); } - - this.isExtProcessRestarting = false; - this.isExtProcessWaitingForRestart = undefined; + this.extProcessRestartPromise = createCancelablePromise(doRestart); + await this.extProcessRestartPromise; }, ); }; @@ -371,8 +406,12 @@ export class ExtensionServiceImpl extends WithEventBus implements ExtensionServi } } - private async ping(): Promise { - return await Promise.race([this.extensionNodeClient.pid(), sleep(1000).then(() => null)]); + private async getExtProcessPID(token: CancellationToken): Promise { + return await Promise.race([ + this.extensionNodeClient.pid(), + sleep(1000).then(() => null), + CancellationToken.resolveIfCancelled(token, null), + ]); } private async startExtProcess(init: boolean) { @@ -710,6 +749,7 @@ export class ExtensionServiceImpl extends WithEventBus implements ExtensionServi // RPC call from node public async $restartExtProcess() { + this.logger.log('[ext-restart]: receive the command from the node side to restart the process'); await this.restartExtProcess(ERestartPolicy.Always); } @@ -733,6 +773,10 @@ export class ExtensionServiceImpl extends WithEventBus implements ExtensionServi } public async $processCrashRestart() { + if (this.pCrashMessageModel) { + this.pCrashMessageModel.cancel?.(); + } + const okText = localize('common.yes'); const options = [okText]; const ifRequiredReload = this.invalidReloadStrategy === 'ifRequired'; @@ -740,11 +784,15 @@ export class ExtensionServiceImpl extends WithEventBus implements ExtensionServi options.unshift(localize('common.no')); } - const msg = await this.messageService.info( + this.pCrashMessageModel = this.messageService.info( localize('extension.crashedExthostReload.confirm'), options, !!ifRequiredReload, ); + + const msg = await this.pCrashMessageModel; + this.pCrashMessageModel = undefined; + if (msg === okText) { await this.restartExtProcess(ERestartPolicy.Always); } diff --git a/packages/extension/src/common/extension.ts b/packages/extension/src/common/extension.ts index 92a089054c..0d54588666 100644 --- a/packages/extension/src/common/extension.ts +++ b/packages/extension/src/common/extension.ts @@ -33,7 +33,7 @@ import { Extension } from '../hosted/vscode.extension'; import { ActivatedExtension, ActivatedExtensionJSON, ExtensionsActivator } from './activator'; import { ISumiExtensionContributions } from './sumi/extension'; import { IExtensionContributions, IExtensionLanguagePack, IMainThreadCommands } from './vscode'; -import { ExtensionKind, ThemeIcon } from './vscode/ext-types'; +import { ThemeIcon } from './vscode/ext-types'; export interface IExtensionMetaData { id: string; @@ -217,7 +217,7 @@ export enum ERestartPolicy { /** * Restart extension process when extension process is not running or not responding */ - WhenExit = 'WhenExit', + WhenExit = 'when-exit', } export abstract class ExtensionService { diff --git a/packages/extension/src/node/extension.service.ts b/packages/extension/src/node/extension.service.ts index 68060c620b..5e820a376e 100644 --- a/packages/extension/src/node/extension.service.ts +++ b/packages/extension/src/node/extension.service.ts @@ -174,18 +174,19 @@ export class ExtensionNodeServiceImpl implements IExtensionNodeService { await this.clientExtProcessExtConnectionDeferredMap.get(clientId)?.promise; const extProcessId = this.clientExtProcessMap.get(clientId); - const notExistExtension = + const extProcessNotExist = isUndefined(extProcessId) || !( (await this.extensionHostManager.isRunning(extProcessId)) && this.clientExtProcessExtConnection.has(clientId) ); - if (notExistExtension) { - // 进程未调用启动直接连接 - this.logger.error(`${clientId} clientId process connection not exists`); + if (extProcessNotExist) { + this.logger.error(`${clientId} clientId process connection not exists, try to notify client to restart`); /** * 如果前端与后端连接后发现没有对应的插件进程实例,那么通知前端重启插件进程 - * 一般这种情况出现在用户关闭电脑超过 ProcessCloseExitThreshold 设定的最大时间,插件进程被杀死后,前端再次建立连接时 + * 已知如下场景会出现这种情况: + * 1. 用户关闭电脑超过 ProcessCloseExitThreshold 设定的最大时间,插件进程被杀死后,前端再次建立连接时 + * 2. Node 进程被杀死,插件进程也会被杀死 */ this.restartExtProcessByClient(clientId); this.reporterService.point(REPORT_NAME.EXTENSION_NOT_EXIST, clientId); diff --git a/packages/overlay/src/browser/message.service.tsx b/packages/overlay/src/browser/message.service.tsx index 0cef1e6581..ccd3878335 100644 --- a/packages/overlay/src/browser/message.service.tsx +++ b/packages/overlay/src/browser/message.service.tsx @@ -4,7 +4,7 @@ import { Autowired, Injectable } from '@opensumi/di'; import { notification, open } from '@opensumi/ide-components'; import { parseWithoutEscape } from '@opensumi/ide-components/lib/utils'; import { IOpenerService, toMarkdown } from '@opensumi/ide-core-browser'; -import { MessageType, localize, uuid } from '@opensumi/ide-core-common'; +import { MayCancelablePromise, MessageType, localize, uuid } from '@opensumi/ide-core-common'; import { AbstractMessageService, IMessageService, MAX_MESSAGE_LENGTH } from '../common'; @@ -43,7 +43,7 @@ export class MessageService extends AbstractMessageService implements IMessageSe buttons?: string[], closable = true, from?: string, - ): Promise { + ): MayCancelablePromise { if (!rawMessage) { return Promise.resolve(undefined); } diff --git a/packages/overlay/src/common/index.ts b/packages/overlay/src/common/index.ts index d0aea5dafa..20b5a62874 100644 --- a/packages/overlay/src/common/index.ts +++ b/packages/overlay/src/common/index.ts @@ -1,6 +1,6 @@ import React from 'react'; -import { Event, MessageType, URI } from '@opensumi/ide-core-common'; +import { Event, MayCancelablePromise, MessageType, URI } from '@opensumi/ide-core-common'; export const IMessageService = Symbol('IMessageService'); @@ -10,19 +10,19 @@ export interface IMessageService { buttons?: string[], closable?: boolean, props?: Record, - ): Promise; + ): MayCancelablePromise; warning( message: string | React.ReactNode, buttons?: string[], closable?: boolean, props?: Record, - ): Promise; + ): MayCancelablePromise; error( message: string | React.ReactNode, buttons?: string[], closable?: boolean, props?: Record, - ): Promise; + ): MayCancelablePromise; open( message: string | React.ReactNode, type: MessageType, @@ -30,7 +30,7 @@ export interface IMessageService { closable?: boolean, from?: string, props?: Record, - ): Promise; + ): MayCancelablePromise; hide(value?: T): void; } diff --git a/packages/utils/src/async.ts b/packages/utils/src/async.ts index c4e8ac6869..0874c1f2ba 100644 --- a/packages/utils/src/async.ts +++ b/packages/utils/src/async.ts @@ -8,6 +8,10 @@ export interface CancelablePromise extends Promise { cancel(): void; } +export interface MayCancelablePromise extends Promise { + cancel?(): void; +} + export function createCancelablePromise(callback: (token: CancellationToken) => Promise): CancelablePromise { const source = new CancellationTokenSource(); diff --git a/packages/utils/src/cancellation.ts b/packages/utils/src/cancellation.ts index 44bbd06a0a..cad07f2cc5 100644 --- a/packages/utils/src/cancellation.ts +++ b/packages/utils/src/cancellation.ts @@ -42,6 +42,15 @@ export namespace CancellationToken { ); } + export function resolveIfCancelled(token: CancellationToken, value: T): Promise { + return new Promise((resolve) => { + const listener = token.onCancellationRequested(() => { + listener.dispose(); + resolve(value); + }); + }); + } + export const None: CancellationToken = Object.freeze({ isCancellationRequested: false, onCancellationRequested: Event.None, diff --git a/packages/utils/src/map.ts b/packages/utils/src/map.ts index 99445d67e6..8f9dce7118 100644 --- a/packages/utils/src/map.ts +++ b/packages/utils/src/map.ts @@ -1,3 +1,4 @@ +import flatten from 'lodash/flatten'; import { URI as Uri } from 'vscode-uri'; import { CharCode } from './charCode'; @@ -929,3 +930,36 @@ export class CaseInsensitiveMap extends Map { return super.has(key); } } + +export class MultiMap { + private _map = new Map(); + + get size(): number { + let count = 0; + for (const [, values] of this._map) { + count += values.length; + } + return count; + } + + get keys(): K[] { + return keys(this._map); + } + + get values(): V[] { + return flatten(values(this._map)); + } + + get(key: K): V[] | undefined { + return this._map.get(key); + } + + set(key: K, value: V): void { + const array = getOrSet(this._map, key, []); + array.push(value); + } + + delete(key: K): void { + this._map.delete(key); + } +}