diff --git a/common/changes/@yuants/protocol/2025-02-19-17-40.json b/common/changes/@yuants/protocol/2025-02-19-17-40.json new file mode 100644 index 00000000..e77542e5 --- /dev/null +++ b/common/changes/@yuants/protocol/2025-02-19-17-40.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@yuants/protocol", + "comment": "enhance logging and candidate generation in TerminalClient and TerminalServer", + "type": "minor" + } + ], + "packageName": "@yuants/protocol" +} \ No newline at end of file diff --git a/libraries/protocol/etc/protocol.api.md b/libraries/protocol/etc/protocol.api.md index aa7c3b8f..f178e542 100644 --- a/libraries/protocol/etc/protocol.api.md +++ b/libraries/protocol/etc/protocol.api.md @@ -247,6 +247,8 @@ export class Terminal { // (undocumented) terminalInfo: ITerminalInfo; terminalInfos$: AsyncIterable; + // (undocumented) + terminalInfos: ITerminalInfo[]; } // @public diff --git a/libraries/protocol/src/client.ts b/libraries/protocol/src/client.ts index 1603114a..19c3a293 100644 --- a/libraries/protocol/src/client.ts +++ b/libraries/protocol/src/client.ts @@ -1,7 +1,8 @@ -import { UUID } from '@yuants/data-model'; +import { formatTime, UUID } from '@yuants/data-model'; import { nativeSubjectToSubject, observableToAsyncIterable } from '@yuants/utils'; import Ajv from 'ajv'; import { + combineLatest, defer, filter, firstValueFrom, @@ -9,6 +10,7 @@ import { map, mergeMap, Observable, + ReplaySubject, share, takeUntil, takeWhile, @@ -30,7 +32,7 @@ export class TerminalClient { * Resolve candidate target_terminal_ids for a request */ resolveTargetTerminalIds = async (method: string, req: ITerminalMessage['req']): Promise => { - await firstValueFrom(from(this.terminal.terminalInfos$)); + await firstValueFrom(this._generateCandidates$); const candidates = this._mapMethodToServiceIdToCandidateClientSide.get(method); if (!candidates) return []; const result: string[] = []; @@ -42,6 +44,8 @@ export class TerminalClient { return result; }; + private _generateCandidates$ = new ReplaySubject(1); + /** * Service Index (optimized for method resolution) */ @@ -55,27 +59,44 @@ export class TerminalClient { from(this.terminal.terminalInfos$) .pipe( tap((terminalInfos) => { + console.info(formatTime(Date.now()), 'Client', 'GenerateCandidates', 'Start'); + const t1 = Date.now(); + let init_cnt = 0; const nextMap: typeof this._mapMethodToServiceIdToCandidateClientSide = new Map(); for (const terminalInfo of terminalInfos) { for (const serviceInfo of Object.values(terminalInfo.serviceInfo || {})) { if (!nextMap.get(serviceInfo.method)) { nextMap.set(serviceInfo.method, new Map()); } + const serviceId = serviceInfo.service_id || serviceInfo.method; // if previous candidate exists, keep it // or create a new one - const candidate = this._mapMethodToServiceIdToCandidateClientSide + let candidate = this._mapMethodToServiceIdToCandidateClientSide .get(serviceInfo.method) - ?.get(serviceInfo.service_id || serviceInfo.method) ?? { - service_id: serviceInfo.service_id || serviceInfo.method, - serviceInfo, - terminal_id: terminalInfo.terminal_id, - // ISSUE: Ajv is very slow and cause a lot CPU utilization, so we must cache the compiled validator - validator: new Ajv({ strict: false }).compile(serviceInfo.schema), - }; - nextMap.get(serviceInfo.method)!.set(serviceInfo.service_id || serviceInfo.method, candidate); + ?.get(serviceId); + if (!candidate) { + init_cnt++; + candidate = { + service_id: serviceId, + serviceInfo, + terminal_id: terminalInfo.terminal_id, + // ISSUE: Ajv is very slow and cause a lot CPU utilization, so we must cache the compiled validator + validator: new Ajv({ strict: false }).compile(serviceInfo.schema), + }; + } + nextMap.get(serviceInfo.method)!.set(serviceId, candidate); } } this._mapMethodToServiceIdToCandidateClientSide = nextMap; + console.info( + formatTime(Date.now()), + 'Client', + 'GenerateCandidates', + 'Done', + `New Schemas: ${init_cnt}`, + `Duration: ${Date.now() - t1} ms`, + ); + this._generateCandidates$.next(); }), ) .pipe(takeUntil(this.terminal.dispose$)) @@ -130,6 +151,14 @@ export class TerminalClient { }; return observableToAsyncIterable( defer((): Observable => { + console.info( + formatTime(Date.now()), + 'Client', + 'RequestInitiated', + trace_id, + method, + target_terminal_id, + ); this._terminalOutput$.next(msg); return from(this.terminal.input$).pipe( filter((m) => m.trace_id === msg.trace_id), @@ -140,6 +169,18 @@ export class TerminalClient { each: 10000, meta: `request Timeout: method=${msg.method} target=${msg.target_terminal_id}`, }), + tap({ + finalize: () => { + console.info( + formatTime(Date.now()), + 'Client', + 'RequestFinalized', + trace_id, + method, + target_terminal_id, + ); + }, + }), share(), ); }), diff --git a/libraries/protocol/src/server.ts b/libraries/protocol/src/server.ts index c8706ba4..58895d34 100644 --- a/libraries/protocol/src/server.ts +++ b/libraries/protocol/src/server.ts @@ -114,6 +114,8 @@ export class TerminalServer { if (!msg.req) return; if (msg.frame || msg.res) return; + console.info(formatTime(Date.now()), 'Server', `RequestReceived`, msg.trace_id, msg.method); + const output$ = new Subject(); output$.subscribe((x) => { @@ -283,6 +285,7 @@ export class TerminalServer { this._requestFinalizing$.pipe(takeUntil(this.terminal.dispose$)).subscribe((requestContext) => { requestContext.finalized_at = Date.now(); requestContext.output$.complete(); + console.info(formatTime(Date.now()), 'Server', 'RequestFinalized', requestContext.message.trace_id); const duration = requestContext.finalized_at - requestContext.initilized_at; if (isNaN(duration)) return; diff --git a/libraries/protocol/src/terminal.ts b/libraries/protocol/src/terminal.ts index 0534c752..f9cc30e2 100644 --- a/libraries/protocol/src/terminal.ts +++ b/libraries/protocol/src/terminal.ts @@ -678,6 +678,8 @@ export class Terminal { repeat({ delay: 10000 }), ) .subscribe((list) => { + console.info(formatTime(Date.now()), 'Terminal', 'ListTerminalInfo', list.length); + this.terminalInfos = list; this._terminalInfos$.next(list); }), ); @@ -698,6 +700,14 @@ export class Terminal { return list; }), tap((list) => { + console.info( + formatTime(Date.now()), + 'Terminal', + 'TerminalInfoUpdate', + x.terminal_id, + list.length, + ); + this.terminalInfos = list; this._terminalInfos$.next(list); }), ), @@ -717,7 +727,14 @@ export class Terminal { switchMap(() => defer(() => this.request('UpdateTerminalInfo', '@host', this.terminalInfo)).pipe( tap((msg) => - console.info(formatTime(Date.now()), 'Terminal', 'terminalInfo', 'pushed', msg.res?.code), + console.info( + formatTime(Date.now()), + 'Terminal', + 'terminalInfo', + 'pushed', + msg.res?.code, + msg.trace_id, + ), ), timeout(5000), retry({ delay: 1000 }), @@ -1144,6 +1161,7 @@ export class Terminal { )), ); + terminalInfos: ITerminalInfo[] = []; private _terminalInfos$ = new ReplaySubject(1); /** * Terminal List of the same host