Skip to content

Commit

Permalink
feat(protocol): add isConnected$ API to track terminal connection sta…
Browse files Browse the repository at this point in the history
…tus (#1088)
  • Loading branch information
zccz14 authored Feb 26, 2025
1 parent 3037d32 commit 17ddd1b
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 20 deletions.
6 changes: 5 additions & 1 deletion apps/vendor-ctp/src/bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
map,
mergeMap,
of,
ReplaySubject,
shareReplay,
Subject,
tap,
Expand Down Expand Up @@ -42,7 +43,8 @@ export const createZMQConnection = (
): IConnection<IBridgeMessage<any, any>> => {
const input$ = new Subject<IBridgeMessage<any, any>>();
const output$ = new Subject<IBridgeMessage<any, any>>();
const connection$ = new Subject<unknown>();
const connection$ = new ReplaySubject<unknown>();
const isConnected$ = new ReplaySubject<boolean>(1);

const pullSock = new zmq.Pull({ context });
pullSock.connect(ZMQ_PULL_URL);
Expand All @@ -57,6 +59,7 @@ export const createZMQConnection = (
pushSock.events.on('accept', (e) => {
console.debug(formatTime(Date.now()), 'onAccept', e.address);
connection$.next(e);
isConnected$.next(true);
});

from(pullSock)
Expand Down Expand Up @@ -99,5 +102,6 @@ export const createZMQConnection = (
input$: observableToAsyncIterable(input$),
output$: subjectToNativeSubject(output$),
connection$: observableToAsyncIterable(connection$),
isConnected$: observableToAsyncIterable(isConnected$),
};
};
2 changes: 1 addition & 1 deletion apps/vendor-huobi/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ const createConnectionGzipWS = <T = any>(URL: string): IConnection<T> => {
const output$ = new Subject<any>();
output$.pipe(map((msg) => JSON.stringify(msg))).subscribe(nativeSubjectToSubject(conn.output$));
return {
...conn,
input$: observableToAsyncIterable(input$),
output$: subjectToNativeSubject(output$),
connection$: conn.connection$,
};
};

Expand Down
10 changes: 10 additions & 0 deletions common/changes/@yuants/protocol/2025-02-25-20-50.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/protocol",
"comment": "add API terminal.isConnected$ to track connection status",
"type": "minor"
}
],
"packageName": "@yuants/protocol"
}
10 changes: 10 additions & 0 deletions common/changes/@yuants/vendor-ctp/2025-02-26-08-09.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/vendor-ctp",
"comment": "update interface",
"type": "patch"
}
],
"packageName": "@yuants/vendor-ctp"
}
10 changes: 10 additions & 0 deletions common/changes/@yuants/vendor-huobi/2025-02-26-08-09.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/vendor-huobi",
"comment": "update interface",
"type": "patch"
}
],
"packageName": "@yuants/vendor-huobi"
}
5 changes: 5 additions & 0 deletions libraries/protocol/etc/protocol.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export interface IChannelTypes {
export interface IConnection<T> {
connection$: AsyncIterable<unknown>;
input$: AsyncIterable<T>;
// (undocumented)
isConnected$: AsyncIterable<boolean>;
output$: NativeSubject<T>;
}

Expand Down Expand Up @@ -203,6 +205,7 @@ export const subscribeChannel: <T extends keyof IChannelTypes>(terminal: Termina
// @public
export class Terminal {
constructor(host_url: string, terminalInfo: ITerminalInfo, options?: {
verbose?: boolean;
disableTerminate?: boolean;
disableMetrics?: boolean;
connection?: IConnection<string>;
Expand All @@ -217,8 +220,10 @@ export class Terminal {
// (undocumented)
host_url: string;
input$: AsyncIterable<ITerminalMessage>;
isConnected$: AsyncIterable<boolean>;
// (undocumented)
options: {
verbose?: boolean;
disableTerminate?: boolean;
disableMetrics?: boolean;
connection?: IConnection<string>;
Expand Down
36 changes: 20 additions & 16 deletions libraries/protocol/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,16 @@ export class TerminalClient {
};
return observableToAsyncIterable(
defer((): Observable<any> => {
console.info(
formatTime(Date.now()),
'Client',
'RequestInitiated',
trace_id,
method,
target_terminal_id,
);
if (this.terminal.options.verbose) {
console.info(
formatTime(Date.now()),
'Client',
'RequestInitiated',
trace_id,
method,
target_terminal_id,
);
}
this._terminalOutput$.next(msg);
return from(this.terminal.input$).pipe(
filter((m) => m.trace_id === msg.trace_id),
Expand All @@ -171,14 +173,16 @@ export class TerminalClient {
}),
tap({
finalize: () => {
console.info(
formatTime(Date.now()),
'Client',
'RequestFinalized',
trace_id,
method,
target_terminal_id,
);
if (this.terminal.options.verbose) {
console.info(
formatTime(Date.now()),
'Client',
'RequestFinalized',
trace_id,
method,
target_terminal_id,
);
}
},
}),
share(),
Expand Down
12 changes: 11 additions & 1 deletion libraries/protocol/src/create-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import WebSocket from 'isomorphic-ws';
import {
Observable,
ReplaySubject,
Subject,
bufferTime,
defer,
Expand All @@ -34,6 +35,8 @@ export interface IConnection<T> {
output$: NativeSubject<T>;
/** Connection established Action */
connection$: AsyncIterable<unknown>;

isConnected$: AsyncIterable<boolean>;
}

/**
Expand All @@ -47,7 +50,8 @@ export function createConnectionWs<T = any>(URL: string): IConnection<T> {

const input$ = new Subject<any>();
const output$ = new Subject<any>();
const connection$ = new Subject<any>();
const connection$ = new ReplaySubject<any>(1);
const isConnected$ = new ReplaySubject<boolean>();

// ISSUE: Messages are lost when not connected and need to be buffered and resent
// - When not connected for a long time, messages accumulate, causing high memory usage.
Expand All @@ -69,17 +73,21 @@ export function createConnectionWs<T = any>(URL: string): IConnection<T> {
.subscribe(output$);

const connect = () => {
isConnected$.next(false);
const ws = (serviceWsRef.current = new WebSocket(URL));
ws.addEventListener('open', () => {
console.debug(formatTime(Date.now()), 'connection established', URL);
connection$.next(ws);
isConnected$.next(true);
});
ws.addEventListener('error', (e: any) => {
console.error(formatTime(Date.now()), 'WebSocketConnectionError', e.error);
isConnected$.next(false);
ws.close();
});
ws.addEventListener('close', () => {
console.debug(formatTime(Date.now()), 'connection closed', URL);
isConnected$.next(false);
// Allow external control of reconnection through output.complete or output.error
if (!output$.isStopped) {
setTimeout(connect, 1000); // reconnect after 1 sec
Expand Down Expand Up @@ -124,6 +132,7 @@ export function createConnectionWs<T = any>(URL: string): IConnection<T> {
input$: observableToAsyncIterable(input$),
output$: subjectToNativeSubject(output$),
connection$: observableToAsyncIterable(connection$),
isConnected$: observableToAsyncIterable(isConnected$),
};
}

Expand All @@ -144,5 +153,6 @@ export function createConnectionJson<T = any>(URL: string): IConnection<T> {
input$: observableToAsyncIterable(input$),
output$: subjectToNativeSubject(output$),
connection$: conn.connection$,
isConnected$: conn.isConnected$,
};
}
13 changes: 13 additions & 0 deletions libraries/protocol/src/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
share,
shareReplay,
switchMap,
takeUntil,
takeWhile,
tap,
timeout,
Expand All @@ -52,6 +53,7 @@ import { TerminalServer } from './server';
import { IService, ITerminalMessage } from './services';
import { PromRegistry } from './services/metrics';
import { getSimplePeerInstance } from './webrtc';
import { WebSocket } from 'isomorphic-ws';

const TerminalReceivedBytesTotal = PromRegistry.create('counter', 'terminal_received_bytes_total');
const TerminalTransmittedBytesTotal = PromRegistry.create('counter', 'terminal_transmitted_bytes_total');
Expand Down Expand Up @@ -102,10 +104,16 @@ export class Terminal {

private _terminalInfoUpdated$ = new Subject<void>();

/**
* if the terminal is connected to host
*/
isConnected$: AsyncIterable<boolean>;

constructor(
public host_url: string,
public terminalInfo: ITerminalInfo,
public options: {
verbose?: boolean;
disableTerminate?: boolean;
disableMetrics?: boolean;
connection?: IConnection<string>;
Expand All @@ -119,12 +127,17 @@ export class Terminal {
channelIdSchemas: [],
};

if (isNode) {
this.options.verbose ??= true;
}

const url = new URL(host_url);
url.searchParams.set('terminal_id', this.terminal_id); // make sure terminal_id is in the connection parameters
this.host_url = url.toString();
this.has_header = url.searchParams.get('has_header') === 'true';

this._conn = this.options.connection || createConnectionWs(this.host_url);
this.isConnected$ = this._conn.isConnected$;
this._setupTunnel();
this._setupDebugLog();
this._setupPredefinedServerHandlers();
Expand Down
10 changes: 9 additions & 1 deletion ui/web/src/modules/Terminals/NetworkStatusWidget.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@ import { useObservableState } from 'observable-hooks';
import React from 'react';
import { executeCommand } from '../CommandCenter';
import { currentHostConfig$ } from '../Workbench/model';
import { isTerminalConnnected$ } from './is-connected';

export const NetworkStatusWidget = React.memo(() => {
const config = useObservableState(currentHostConfig$);
const isConnected = useObservableState(isTerminalConnnected$);

return (
<Button
type="tertiary"
theme="borderless"
icon={config ? <Wifi size={20} /> : <CloseWifi size={20} />}
icon={
config ? (
<Wifi size={20} fill={isConnected ? 'var(--semi-color-success)' : 'var(--semi-color-warning)'} />
) : (
<CloseWifi size={20} />
)
}
onClick={() => {
executeCommand('HostList');
}}
Expand Down
1 change: 1 addition & 0 deletions ui/web/src/modules/Terminals/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import './TerminalList';
import { terminal$ } from './create-connection';
export * from './InlineTerminalId';
export * from './create-connection';
export * from './is-connected';

export const useTick = (datasource_id: string, product_id: string) =>
terminal$.pipe(switchMap((terminal) => (terminal ? _useTick(terminal, datasource_id, product_id) : EMPTY)));
7 changes: 7 additions & 0 deletions ui/web/src/modules/Terminals/is-connected.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { EMPTY, shareReplay, switchMap } from 'rxjs';
import { terminal$ } from './create-connection';

export const isTerminalConnnected$ = terminal$.pipe(
switchMap((terminal) => terminal?.isConnected$ ?? EMPTY),
shareReplay(1),
);

0 comments on commit 17ddd1b

Please sign in to comment.