Skip to content

Commit

Permalink
feat(protocol): enhance logging and candidate generation in TerminalC…
Browse files Browse the repository at this point in the history
…lient and TerminalServer (#1078)
  • Loading branch information
zccz14 authored Feb 19, 2025
1 parent 1714eaa commit 4d68c1f
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 12 deletions.
10 changes: 10 additions & 0 deletions common/changes/@yuants/protocol/2025-02-19-17-40.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/protocol",
"comment": "enhance logging and candidate generation in TerminalClient and TerminalServer",
"type": "minor"
}
],
"packageName": "@yuants/protocol"
}
2 changes: 2 additions & 0 deletions libraries/protocol/etc/protocol.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ export class Terminal {
// (undocumented)
terminalInfo: ITerminalInfo;
terminalInfos$: AsyncIterable<ITerminalInfo[]>;
// (undocumented)
terminalInfos: ITerminalInfo[];
}

// @public
Expand Down
63 changes: 52 additions & 11 deletions libraries/protocol/src/client.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import { UUID } from '@yuants/data-model';
import { formatTime, UUID } from '@yuants/data-model';
import { nativeSubjectToSubject, observableToAsyncIterable } from '@yuants/utils';
import Ajv from 'ajv';
import {
combineLatest,
defer,
filter,
firstValueFrom,
from,
map,
mergeMap,
Observable,
ReplaySubject,
share,
takeUntil,
takeWhile,
Expand All @@ -30,7 +32,7 @@ export class TerminalClient {
* Resolve candidate target_terminal_ids for a request
*/
resolveTargetTerminalIds = async (method: string, req: ITerminalMessage['req']): Promise<string[]> => {
await firstValueFrom(from(this.terminal.terminalInfos$));
await firstValueFrom(this._generateCandidates$);
const candidates = this._mapMethodToServiceIdToCandidateClientSide.get(method);
if (!candidates) return [];
const result: string[] = [];
Expand All @@ -42,6 +44,8 @@ export class TerminalClient {
return result;
};

private _generateCandidates$ = new ReplaySubject<void>(1);

/**
* Service Index (optimized for method resolution)
*/
Expand All @@ -55,27 +59,44 @@ export class TerminalClient {
from(this.terminal.terminalInfos$)
.pipe(
tap((terminalInfos) => {
console.info(formatTime(Date.now()), 'Client', 'GenerateCandidates', 'Start');
const t1 = Date.now();
let init_cnt = 0;
const nextMap: typeof this._mapMethodToServiceIdToCandidateClientSide = new Map();
for (const terminalInfo of terminalInfos) {
for (const serviceInfo of Object.values(terminalInfo.serviceInfo || {})) {
if (!nextMap.get(serviceInfo.method)) {
nextMap.set(serviceInfo.method, new Map());
}
const serviceId = serviceInfo.service_id || serviceInfo.method;
// if previous candidate exists, keep it
// or create a new one
const candidate = this._mapMethodToServiceIdToCandidateClientSide
let candidate = this._mapMethodToServiceIdToCandidateClientSide
.get(serviceInfo.method)
?.get(serviceInfo.service_id || serviceInfo.method) ?? {
service_id: serviceInfo.service_id || serviceInfo.method,
serviceInfo,
terminal_id: terminalInfo.terminal_id,
// ISSUE: Ajv is very slow and cause a lot CPU utilization, so we must cache the compiled validator
validator: new Ajv({ strict: false }).compile(serviceInfo.schema),
};
nextMap.get(serviceInfo.method)!.set(serviceInfo.service_id || serviceInfo.method, candidate);
?.get(serviceId);
if (!candidate) {
init_cnt++;
candidate = {
service_id: serviceId,
serviceInfo,
terminal_id: terminalInfo.terminal_id,
// ISSUE: Ajv is very slow and cause a lot CPU utilization, so we must cache the compiled validator
validator: new Ajv({ strict: false }).compile(serviceInfo.schema),
};
}
nextMap.get(serviceInfo.method)!.set(serviceId, candidate);
}
}
this._mapMethodToServiceIdToCandidateClientSide = nextMap;
console.info(
formatTime(Date.now()),
'Client',
'GenerateCandidates',
'Done',
`New Schemas: ${init_cnt}`,
`Duration: ${Date.now() - t1} ms`,
);
this._generateCandidates$.next();
}),
)
.pipe(takeUntil(this.terminal.dispose$))
Expand Down Expand Up @@ -130,6 +151,14 @@ export class TerminalClient {
};
return observableToAsyncIterable(
defer((): Observable<any> => {
console.info(
formatTime(Date.now()),
'Client',
'RequestInitiated',
trace_id,
method,
target_terminal_id,
);
this._terminalOutput$.next(msg);
return from(this.terminal.input$).pipe(
filter((m) => m.trace_id === msg.trace_id),
Expand All @@ -140,6 +169,18 @@ export class TerminalClient {
each: 10000,
meta: `request Timeout: method=${msg.method} target=${msg.target_terminal_id}`,
}),
tap({
finalize: () => {
console.info(
formatTime(Date.now()),
'Client',
'RequestFinalized',
trace_id,
method,
target_terminal_id,
);
},
}),
share(),
);
}),
Expand Down
3 changes: 3 additions & 0 deletions libraries/protocol/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ export class TerminalServer {
if (!msg.req) return;
if (msg.frame || msg.res) return;

console.info(formatTime(Date.now()), 'Server', `RequestReceived`, msg.trace_id, msg.method);

const output$ = new Subject<IServiceOutput>();

output$.subscribe((x) => {
Expand Down Expand Up @@ -283,6 +285,7 @@ export class TerminalServer {
this._requestFinalizing$.pipe(takeUntil(this.terminal.dispose$)).subscribe((requestContext) => {
requestContext.finalized_at = Date.now();
requestContext.output$.complete();
console.info(formatTime(Date.now()), 'Server', 'RequestFinalized', requestContext.message.trace_id);

const duration = requestContext.finalized_at - requestContext.initilized_at;
if (isNaN(duration)) return;
Expand Down
20 changes: 19 additions & 1 deletion libraries/protocol/src/terminal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,8 @@ export class Terminal {
repeat({ delay: 10000 }),
)
.subscribe((list) => {
console.info(formatTime(Date.now()), 'Terminal', 'ListTerminalInfo', list.length);
this.terminalInfos = list;
this._terminalInfos$.next(list);
}),
);
Expand All @@ -698,6 +700,14 @@ export class Terminal {
return list;
}),
tap((list) => {
console.info(
formatTime(Date.now()),
'Terminal',
'TerminalInfoUpdate',
x.terminal_id,
list.length,
);
this.terminalInfos = list;
this._terminalInfos$.next(list);
}),
),
Expand All @@ -717,7 +727,14 @@ export class Terminal {
switchMap(() =>
defer(() => this.request('UpdateTerminalInfo', '@host', this.terminalInfo)).pipe(
tap((msg) =>
console.info(formatTime(Date.now()), 'Terminal', 'terminalInfo', 'pushed', msg.res?.code),
console.info(
formatTime(Date.now()),
'Terminal',
'terminalInfo',
'pushed',
msg.res?.code,
msg.trace_id,
),
),
timeout(5000),
retry({ delay: 1000 }),
Expand Down Expand Up @@ -1144,6 +1161,7 @@ export class Terminal {
)),
);

terminalInfos: ITerminalInfo[] = [];
private _terminalInfos$ = new ReplaySubject<ITerminalInfo[]>(1);
/**
* Terminal List of the same host
Expand Down

0 comments on commit 4d68c1f

Please sign in to comment.