Skip to content

Commit

Permalink
Add closed and signal
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Oct 18, 2024
1 parent 929b512 commit faae117
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/actors/proxyutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ export class ActorAwaiter<
const ch = await this.channel;
await ch.close();
}
get signal() {
return new AbortSignal();
}
get closed() {
return this.channel.then((ch) => ch.closed);
}

async *recv(signal?: AbortSignal): AsyncIterableIterator<unknown> {
const ch = await this.channel;
Expand Down
8 changes: 8 additions & 0 deletions src/actors/util/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ export interface DuplexChannel<TSend, TReceive> extends Disposable {
send: Channel<TSend>["send"];
recv: Channel<TReceive>["recv"];
close: () => void | Promise<void>;
closed: Promise<void>;
signal: AbortSignal;
}

/**
Expand Down Expand Up @@ -195,6 +197,8 @@ export const makeWebSocket = <
};
socket.onopen = async () => {
ch.resolve({
closed: Promise.race([recvChan.closed, sendChan.closed]),
signal: link(recvChan.signal, sendChan.signal),
recv: recvChan.recv.bind(recvChan),
send: sendChan.send.bind(recvChan),
close: () => socket.close(),
Expand Down Expand Up @@ -227,6 +231,8 @@ export const makeDuplexChannel = <TSend, TReceive>(
const recvChan = makeChan<TReceive>();

const duplexChannel: DuplexChannel<TSend, TReceive> = {
closed: Promise.race([recvChan.closed, sendChan.closed]),
signal: link(sendChan.signal, recvChan.signal),
send: sendChan.send.bind(sendChan),
recv: recvChan.recv.bind(recvChan),
close: () => {
Expand All @@ -242,6 +248,8 @@ export const makeDuplexChannel = <TSend, TReceive>(
// If there's an upgrader, we upgrade the duplex channel
if (upgrader && isUpgrade(upgrader)) {
upgrader({
closed: duplexChannel.closed,
signal: duplexChannel.signal,
send: recvChan.send.bind(recvChan),
recv: sendChan.recv.bind(sendChan),
close: duplexChannel.close,
Expand Down

0 comments on commit faae117

Please sign in to comment.