diff --git a/apps/vendor-ctp/src/bridge.ts b/apps/vendor-ctp/src/bridge.ts index 443ac93e7..32f0d072a 100644 --- a/apps/vendor-ctp/src/bridge.ts +++ b/apps/vendor-ctp/src/bridge.ts @@ -12,6 +12,7 @@ import { map, mergeMap, of, + ReplaySubject, shareReplay, Subject, tap, @@ -42,7 +43,8 @@ export const createZMQConnection = ( ): IConnection> => { const input$ = new Subject>(); const output$ = new Subject>(); - const connection$ = new Subject(); + const connection$ = new ReplaySubject(); + const isConnected$ = new ReplaySubject(1); const pullSock = new zmq.Pull({ context }); pullSock.connect(ZMQ_PULL_URL); @@ -57,6 +59,7 @@ export const createZMQConnection = ( pushSock.events.on('accept', (e) => { console.debug(formatTime(Date.now()), 'onAccept', e.address); connection$.next(e); + isConnected$.next(true); }); from(pullSock) @@ -99,5 +102,6 @@ export const createZMQConnection = ( input$: observableToAsyncIterable(input$), output$: subjectToNativeSubject(output$), connection$: observableToAsyncIterable(connection$), + isConnected$: observableToAsyncIterable(isConnected$), }; }; diff --git a/apps/vendor-huobi/src/api.ts b/apps/vendor-huobi/src/api.ts index f5ee0d952..c663e8567 100644 --- a/apps/vendor-huobi/src/api.ts +++ b/apps/vendor-huobi/src/api.ts @@ -25,9 +25,9 @@ const createConnectionGzipWS = (URL: string): IConnection => { const output$ = new Subject(); output$.pipe(map((msg) => JSON.stringify(msg))).subscribe(nativeSubjectToSubject(conn.output$)); return { + ...conn, input$: observableToAsyncIterable(input$), output$: subjectToNativeSubject(output$), - connection$: conn.connection$, }; }; diff --git a/common/changes/@yuants/protocol/2025-02-25-20-50.json b/common/changes/@yuants/protocol/2025-02-25-20-50.json new file mode 100644 index 000000000..b31f4dd2a --- /dev/null +++ b/common/changes/@yuants/protocol/2025-02-25-20-50.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/protocol", + "comment": "add API terminal.isConnected$ to track connection status", + "type": "minor" + } + ], + "packageName": "@yuants/protocol" +} \ No newline at end of file diff --git a/common/changes/@yuants/vendor-ctp/2025-02-26-08-09.json b/common/changes/@yuants/vendor-ctp/2025-02-26-08-09.json new file mode 100644 index 000000000..451c265e4 --- /dev/null +++ b/common/changes/@yuants/vendor-ctp/2025-02-26-08-09.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/vendor-ctp", + "comment": "update interface", + "type": "patch" + } + ], + "packageName": "@yuants/vendor-ctp" +} \ No newline at end of file diff --git a/common/changes/@yuants/vendor-huobi/2025-02-26-08-09.json b/common/changes/@yuants/vendor-huobi/2025-02-26-08-09.json new file mode 100644 index 000000000..c564a5f9a --- /dev/null +++ b/common/changes/@yuants/vendor-huobi/2025-02-26-08-09.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/vendor-huobi", + "comment": "update interface", + "type": "patch" + } + ], + "packageName": "@yuants/vendor-huobi" +} \ No newline at end of file diff --git a/libraries/protocol/etc/protocol.api.md b/libraries/protocol/etc/protocol.api.md index f178e5421..f3688c13c 100644 --- a/libraries/protocol/etc/protocol.api.md +++ b/libraries/protocol/etc/protocol.api.md @@ -44,6 +44,8 @@ export interface IChannelTypes { export interface IConnection { connection$: AsyncIterable; input$: AsyncIterable; + // (undocumented) + isConnected$: AsyncIterable; output$: NativeSubject; } @@ -203,6 +205,7 @@ export const subscribeChannel: (terminal: Termina // @public export class Terminal { constructor(host_url: string, terminalInfo: ITerminalInfo, options?: { + verbose?: boolean; disableTerminate?: boolean; disableMetrics?: boolean; connection?: IConnection; @@ -217,8 +220,10 @@ export class Terminal { // (undocumented) host_url: string; input$: AsyncIterable; + isConnected$: AsyncIterable; // (undocumented) options: { + verbose?: boolean; disableTerminate?: boolean; disableMetrics?: boolean; connection?: IConnection; diff --git a/libraries/protocol/src/client.ts b/libraries/protocol/src/client.ts index 19c3a293e..29a4ac5ea 100644 --- a/libraries/protocol/src/client.ts +++ b/libraries/protocol/src/client.ts @@ -151,14 +151,16 @@ export class TerminalClient { }; return observableToAsyncIterable( defer((): Observable => { - console.info( - formatTime(Date.now()), - 'Client', - 'RequestInitiated', - trace_id, - method, - target_terminal_id, - ); + if (this.terminal.options.verbose) { + console.info( + formatTime(Date.now()), + 'Client', + 'RequestInitiated', + trace_id, + method, + target_terminal_id, + ); + } this._terminalOutput$.next(msg); return from(this.terminal.input$).pipe( filter((m) => m.trace_id === msg.trace_id), @@ -171,14 +173,16 @@ export class TerminalClient { }), tap({ finalize: () => { - console.info( - formatTime(Date.now()), - 'Client', - 'RequestFinalized', - trace_id, - method, - target_terminal_id, - ); + if (this.terminal.options.verbose) { + console.info( + formatTime(Date.now()), + 'Client', + 'RequestFinalized', + trace_id, + method, + target_terminal_id, + ); + } }, }), share(), diff --git a/libraries/protocol/src/create-connection.ts b/libraries/protocol/src/create-connection.ts index 4fe9f0e00..ad532b593 100644 --- a/libraries/protocol/src/create-connection.ts +++ b/libraries/protocol/src/create-connection.ts @@ -8,6 +8,7 @@ import { import WebSocket from 'isomorphic-ws'; import { Observable, + ReplaySubject, Subject, bufferTime, defer, @@ -34,6 +35,8 @@ export interface IConnection { output$: NativeSubject; /** Connection established Action */ connection$: AsyncIterable; + + isConnected$: AsyncIterable; } /** @@ -47,7 +50,8 @@ export function createConnectionWs(URL: string): IConnection { const input$ = new Subject(); const output$ = new Subject(); - const connection$ = new Subject(); + const connection$ = new ReplaySubject(1); + const isConnected$ = new ReplaySubject(); // ISSUE: Messages are lost when not connected and need to be buffered and resent // - When not connected for a long time, messages accumulate, causing high memory usage. @@ -69,17 +73,21 @@ export function createConnectionWs(URL: string): IConnection { .subscribe(output$); const connect = () => { + isConnected$.next(false); const ws = (serviceWsRef.current = new WebSocket(URL)); ws.addEventListener('open', () => { console.debug(formatTime(Date.now()), 'connection established', URL); connection$.next(ws); + isConnected$.next(true); }); ws.addEventListener('error', (e: any) => { console.error(formatTime(Date.now()), 'WebSocketConnectionError', e.error); + isConnected$.next(false); ws.close(); }); ws.addEventListener('close', () => { console.debug(formatTime(Date.now()), 'connection closed', URL); + isConnected$.next(false); // Allow external control of reconnection through output.complete or output.error if (!output$.isStopped) { setTimeout(connect, 1000); // reconnect after 1 sec @@ -124,6 +132,7 @@ export function createConnectionWs(URL: string): IConnection { input$: observableToAsyncIterable(input$), output$: subjectToNativeSubject(output$), connection$: observableToAsyncIterable(connection$), + isConnected$: observableToAsyncIterable(isConnected$), }; } @@ -144,5 +153,6 @@ export function createConnectionJson(URL: string): IConnection { input$: observableToAsyncIterable(input$), output$: subjectToNativeSubject(output$), connection$: conn.connection$, + isConnected$: conn.isConnected$, }; } diff --git a/libraries/protocol/src/terminal.ts b/libraries/protocol/src/terminal.ts index f9cc30e24..deb628045 100644 --- a/libraries/protocol/src/terminal.ts +++ b/libraries/protocol/src/terminal.ts @@ -31,6 +31,7 @@ import { share, shareReplay, switchMap, + takeUntil, takeWhile, tap, timeout, @@ -52,6 +53,7 @@ import { TerminalServer } from './server'; import { IService, ITerminalMessage } from './services'; import { PromRegistry } from './services/metrics'; import { getSimplePeerInstance } from './webrtc'; +import { WebSocket } from 'isomorphic-ws'; const TerminalReceivedBytesTotal = PromRegistry.create('counter', 'terminal_received_bytes_total'); const TerminalTransmittedBytesTotal = PromRegistry.create('counter', 'terminal_transmitted_bytes_total'); @@ -102,10 +104,16 @@ export class Terminal { private _terminalInfoUpdated$ = new Subject(); + /** + * if the terminal is connected to host + */ + isConnected$: AsyncIterable; + constructor( public host_url: string, public terminalInfo: ITerminalInfo, public options: { + verbose?: boolean; disableTerminate?: boolean; disableMetrics?: boolean; connection?: IConnection; @@ -119,12 +127,17 @@ export class Terminal { channelIdSchemas: [], }; + if (isNode) { + this.options.verbose ??= true; + } + const url = new URL(host_url); url.searchParams.set('terminal_id', this.terminal_id); // make sure terminal_id is in the connection parameters this.host_url = url.toString(); this.has_header = url.searchParams.get('has_header') === 'true'; this._conn = this.options.connection || createConnectionWs(this.host_url); + this.isConnected$ = this._conn.isConnected$; this._setupTunnel(); this._setupDebugLog(); this._setupPredefinedServerHandlers(); diff --git a/ui/web/src/modules/Terminals/NetworkStatusWidget.tsx b/ui/web/src/modules/Terminals/NetworkStatusWidget.tsx index e6a888405..8cdb12d85 100644 --- a/ui/web/src/modules/Terminals/NetworkStatusWidget.tsx +++ b/ui/web/src/modules/Terminals/NetworkStatusWidget.tsx @@ -4,15 +4,23 @@ import { useObservableState } from 'observable-hooks'; import React from 'react'; import { executeCommand } from '../CommandCenter'; import { currentHostConfig$ } from '../Workbench/model'; +import { isTerminalConnnected$ } from './is-connected'; export const NetworkStatusWidget = React.memo(() => { const config = useObservableState(currentHostConfig$); + const isConnected = useObservableState(isTerminalConnnected$); return (