From faae11758f51d49726c5f636ab7b3cba427b2aa9 Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Fri, 18 Oct 2024 11:22:37 -0300 Subject: [PATCH] Add closed and signal Signed-off-by: Marcos Candeia --- src/actors/proxyutil.ts | 6 ++++++ src/actors/util/channels/channel.ts | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/src/actors/proxyutil.ts b/src/actors/proxyutil.ts index 4c4dfbb..7c77c38 100644 --- a/src/actors/proxyutil.ts +++ b/src/actors/proxyutil.ts @@ -64,6 +64,12 @@ export class ActorAwaiter< const ch = await this.channel; await ch.close(); } + get signal() { + return new AbortSignal(); + } + get closed() { + return this.channel.then((ch) => ch.closed); + } async *recv(signal?: AbortSignal): AsyncIterableIterator { const ch = await this.channel; diff --git a/src/actors/util/channels/channel.ts b/src/actors/util/channels/channel.ts index ca27287..d8b34aa 100644 --- a/src/actors/util/channels/channel.ts +++ b/src/actors/util/channels/channel.ts @@ -133,6 +133,8 @@ export interface DuplexChannel extends Disposable { send: Channel["send"]; recv: Channel["recv"]; close: () => void | Promise; + closed: Promise; + signal: AbortSignal; } /** @@ -195,6 +197,8 @@ export const makeWebSocket = < }; socket.onopen = async () => { ch.resolve({ + closed: Promise.race([recvChan.closed, sendChan.closed]), + signal: link(recvChan.signal, sendChan.signal), recv: recvChan.recv.bind(recvChan), send: sendChan.send.bind(recvChan), close: () => socket.close(), @@ -227,6 +231,8 @@ export const makeDuplexChannel = ( const recvChan = makeChan(); const duplexChannel: DuplexChannel = { + closed: Promise.race([recvChan.closed, sendChan.closed]), + signal: link(sendChan.signal, recvChan.signal), send: sendChan.send.bind(sendChan), recv: recvChan.recv.bind(recvChan), close: () => { @@ -242,6 +248,8 @@ export const makeDuplexChannel = ( // If there's an upgrader, we upgrade the duplex channel if (upgrader && isUpgrade(upgrader)) { upgrader({ + closed: duplexChannel.closed, + signal: duplexChannel.signal, send: recvChan.send.bind(recvChan), recv: sendChan.recv.bind(sendChan), close: duplexChannel.close,