Skip to content

Commit

Permalink
feat(protocol): fix terminal connection error handling and improve me…
Browse files Browse the repository at this point in the history
…ssage buffering (#1089)
  • Loading branch information
zccz14 authored Feb 26, 2025
1 parent 17ddd1b commit c7986c0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
10 changes: 10 additions & 0 deletions common/changes/@yuants/protocol/2025-02-26-16-31.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@yuants/protocol",
"comment": "try fix terminal error",
"type": "patch"
}
],
"packageName": "@yuants/protocol"
}
16 changes: 10 additions & 6 deletions libraries/protocol/src/create-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ export function createConnectionWs<T = any>(URL: string): IConnection<T> {

const input$ = new Subject<any>();
const output$ = new Subject<any>();
const connection$ = new ReplaySubject<any>(1);
const isConnected$ = new ReplaySubject<boolean>();
const connection$ = new Subject<any>();
const isConnected$ = new ReplaySubject<boolean>(1);

// ISSUE: Messages are lost when not connected and need to be buffered and resent
// - When not connected for a long time, messages accumulate, causing high memory usage.
Expand All @@ -70,11 +70,15 @@ export function createConnectionWs<T = any>(URL: string): IConnection<T> {
mergeMap((x) => x),
repeat(),
)
.subscribe(output$);
.subscribe({
next: (x) => {
output$.next(x);
},
});

const connect = () => {
isConnected$.next(false);
const ws = (serviceWsRef.current = new WebSocket(URL));
isConnected$.next(false);
ws.addEventListener('open', () => {
console.debug(formatTime(Date.now()), 'connection established', URL);
connection$.next(ws);
Expand Down Expand Up @@ -120,8 +124,8 @@ export function createConnectionWs<T = any>(URL: string): IConnection<T> {
console.debug(formatTime(Date.now()), 'connection closing because output complete', URL);
serviceWsRef.current?.close();
},
error: () => {
console.debug(formatTime(Date.now()), 'connection closing because output error', URL);
error: (err) => {
console.debug(formatTime(Date.now()), 'connection closing because output error', URL, `${err}`);
serviceWsRef.current?.close();
},
});
Expand Down

0 comments on commit c7986c0

Please sign in to comment.