Skip to content

Commit

Permalink
Add support for strongly typed websockets
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Sep 30, 2024
1 parent 0d95bd7 commit f078dd1
Show file tree
Hide file tree
Showing 5 changed files with 365 additions and 12 deletions.
211 changes: 211 additions & 0 deletions src/actors/channels/channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import { Queue } from "@core/asyncutil/queue";
import { jsonSerializer } from "./serializers.ts";

export interface Channel<T> {
closed: Promise<void>;
signal: AbortSignal;
close(): void;
send(value: T): Promise<void>;
recv(signal?: AbortSignal): AsyncIterableIterator<T>;
}

/**
* Checks if a value is a channel.
*
* @param v - The value to check.
*
* @returns True if the value is a channel, false otherwise.
*/
export const isChannel = <
T,
TChannel extends Channel<T> = Channel<T>,
>(v: TChannel | unknown): v is TChannel => {
return typeof (v as TChannel).recv === "function" &&
typeof (v as TChannel).send === "function";
};

/**
* Checks if a value is a channel upgrader.
*
* @param v - The value to check.
*
* @returns True if the value is a channel upgrader, false otherwise.
*/
export const isUpgrade = (
v: ChannelUpgrader<unknown, unknown> | unknown,
): v is ChannelUpgrader<unknown, unknown> => {
return typeof (v as ChannelUpgrader<unknown, unknown>) === "function";
};

/**
* Links multiple abort signals together such that when any of them
* are aborted, the returned signal is also aborted.
*
* @param signals - The abort signals to link together.
*
* @returns The linked abort signal.
*/
export const link = (...signals: AbortSignal[]): AbortSignal => {
const ctrl = new AbortController();
for (const signal of signals) {
signal.addEventListener("abort", (evt) => {
if (!ctrl.signal.aborted) {
ctrl.abort(evt);
}
});
}
return ctrl.signal;
};

export class ClosedChannelError extends Error {
constructor() {
super("Channel is closed");
}
}
export const ifClosedChannel =
(cb: () => Promise<void> | void) => (err: unknown) => {
if (err instanceof ClosedChannelError) {
return cb();
}
throw err;
};

export const ignoreIfClosed = ifClosedChannel(() => {});
export const makeChan = <T>(capacity = 0): Channel<T> => {
let currentCapacity = capacity;
const queue: Queue<{ value: T; resolve: () => void }> = new Queue();
const ctrl = new AbortController();
const abortPromise = Promise.withResolvers<void>();
ctrl.signal.onabort = () => {
abortPromise.resolve();
};

const send = (value: T): Promise<void> => {
return new Promise((resolve, reject) => {
if (ctrl.signal.aborted) reject(new ClosedChannelError());
let mResolve = resolve;
if (currentCapacity > 0) {
currentCapacity--;
mResolve = () => {
currentCapacity++;
};
resolve();
}
queue.push({ value, resolve: mResolve });
});
};

const close = () => {
ctrl.abort();
};

const recv = async function* (
signal?: AbortSignal,
): AsyncIterableIterator<T> {
const linked = signal ? link(ctrl.signal, signal) : ctrl.signal;
while (true) {
if (linked.aborted) {
return;
}
try {
const next = await queue.pop({ signal: linked });
next.resolve();
yield next.value;
} catch (_err) {
if (linked.aborted) {
return;
}
throw _err;
}
}
};

return {
send,
recv,
close,
signal: ctrl.signal,
closed: abortPromise.promise,
};
};

export interface DuplexChannel<TSend, TReceive> {
send: Channel<TSend>["send"];
recv: Channel<TReceive>["recv"];
close: () => void | Promise<void>;
}

export type ChannelUpgrader<TSend, TReceive = TSend> = (
ch: DuplexChannel<TSend, TReceive>,
) => Promise<void>;

// deno-lint-ignore no-explicit-any
export type Message<TMessageProperties = any> = TMessageProperties & {
chunk?: Uint8Array;
};

export interface MessageSerializer<
TSend,
TReceive,
TRawFormat extends string | ArrayBufferLike | ArrayBufferView | Blob,
> {
binaryType?: BinaryType;
serialize: (
msg: Message<TSend>,
) => TRawFormat;
deserialize: (str: TRawFormat) => Message<TReceive>;
}

export const makeWebSocket = <
TSend,
TReceive,
TMessageFormat extends string | ArrayBufferLike | ArrayBufferView | Blob =
| string
| ArrayBufferLike
| ArrayBufferView
| Blob,
>(
socket: WebSocket,
_serializer?: MessageSerializer<TSend, TReceive, TMessageFormat>,
): Promise<DuplexChannel<Message<TSend>, Message<TReceive>>> => {
const serializer = _serializer ??
jsonSerializer<Message<TSend>, Message<TReceive>>();
const sendChan = makeChan<Message<TSend>>();
const recvChan = makeChan<Message<TReceive>>();
const ch = Promise.withResolvers<
DuplexChannel<Message<TSend>, Message<TReceive>>
>();
socket.binaryType = serializer.binaryType ?? "blob";
socket.onclose = () => {
sendChan.close();
recvChan.close();
};
socket.onerror = (err) => {
socket.close();
ch.reject(err);
};
socket.onmessage = async (msg) => {
if (recvChan.signal.aborted) {
return;
}
await recvChan.send(serializer.deserialize(msg.data));
};
socket.onopen = async () => {
ch.resolve({
recv: recvChan.recv.bind(recvChan),
send: sendChan.send.bind(recvChan),
close: () => socket.close(),
});
for await (const message of sendChan.recv()) {
try {
socket.send(
serializer.serialize(message),
);
} catch (_err) {
console.error("error sending message through socket", message);
}
}
socket.close();
};
return ch.promise;
};
14 changes: 14 additions & 0 deletions src/actors/channels/serializers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type { MessageSerializer } from "./channel.ts";

export const jsonSerializer = <TSend, TReceive>(): MessageSerializer<
TSend,
TReceive,
string
> => {
return {
deserialize: (msg) => {
return JSON.parse(msg);
},
serialize: JSON.stringify,
};
};
102 changes: 94 additions & 8 deletions src/actors/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,75 @@
import {
type ChannelUpgrader,
type DuplexChannel,
makeWebSocket,
} from "./channels/channel.ts";
import type { Actor, ActorConstructor } from "./runtime.ts";
import { EVENT_STREAM_RESPONSE_HEADER, readFromStream } from "./stream.ts";

export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id";
export const ACTOR_ID_QS_NAME = "x-deno-isolate-instance-id";
/**
* Promise.prototype.then onfufilled callback type.
*/
export type Fulfilled<R, T> = ((result: R) => T | PromiseLike<T>) | null;

/**
* Promise.then onrejected callback type.
*/
// deno-lint-ignore no-explicit-any
export type Rejected<E> = ((reason: any) => E | PromiseLike<E>) | null;

export class ActorAwaiter<
TResponse,
TChannel extends DuplexChannel<unknown, unknown>,
> implements
PromiseLike<
TResponse
>,
DuplexChannel<unknown, unknown> {
ch: Promise<TChannel> | null = null;
constructor(
protected fetcher: () => Promise<
TResponse
>,
protected ws: () => Promise<TChannel>,
) {
}
async close() {
const ch = await this.channel;
await ch.close();
}

async *recv(signal?: AbortSignal): AsyncIterableIterator<unknown> {
const ch = await this.channel;
yield* ch.recv(signal);
}

private get channel(): Promise<TChannel> {
return this.ch ??= this.ws();
}

async send(value: unknown): Promise<void> {
const ch = await this.channel;
await ch.send(value);
}

catch<TResult>(onrejected: Rejected<TResult>): Promise<TResponse | TResult> {
return this.fetcher().catch(onrejected);
}

then<TResult1, TResult2 = TResult1>(
onfufilled?: Fulfilled<
TResponse,
TResult1
>,
onrejected?: Rejected<TResult2>,
): Promise<TResult1 | TResult2> {
return this.fetcher().then(onfufilled).catch(
onrejected,
);
}
}

/**
* options to create a new actor proxy.
Expand All @@ -11,11 +79,15 @@ export interface ProxyOptions<TInstance extends Actor> {
server: string;
}

type PromisifyKey<key extends keyof Actor, Actor> = Actor[key] extends
(...args: infer Args) => Awaited<infer Return>
? Return extends ChannelUpgrader<infer TSend, infer TReceive>
? (...args: Args) => DuplexChannel<TSend, TReceive>
: (...args: Args) => Promise<Return>
: Actor[key];

type Promisify<Actor> = {
[key in keyof Actor]: Actor[key] extends (...args: infer Args) => infer Return
? Return extends Promise<unknown> ? Actor[key]
: (...args: Args) => Promise<Return>
: Actor[key];
[key in keyof Actor]: PromisifyKey<key, Actor>;
};

export interface ActorsServer {
Expand Down Expand Up @@ -66,12 +138,13 @@ export const actors = {
id: (id: string): Promisify<TInstance> => {
return new Proxy<Promisify<TInstance>>({} as Promisify<TInstance>, {
get: (_, prop) => {
return async (...args: unknown[]) => {
const endpoint = `${actorsServer.url}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${String(prop)}`;
const fetcher = async (...args: unknown[]) => {
const abortCtrl = new AbortController();
const resp = await fetch(
`${actorsServer.url}/actors/${
typeof actor === "string" ? actor : actor.name
}/invoke/${String(prop)}`,
endpoint,
{
method: "POST",
signal: abortCtrl.signal,
Expand Down Expand Up @@ -104,6 +177,19 @@ export const actors = {
}
return resp.json();
};
return (...args: unknown[]) => {
const awaiter = new ActorAwaiter(() => fetcher(...args), () => {
const ws = new WebSocket(
`${endpoint}?args=${
encodeURIComponent(
btoa(JSON.stringify({ args: args ?? [] })),
)
}&${ACTOR_ID_QS_NAME}=${id}`,
);
return makeWebSocket(ws);
});
return awaiter;
};
},
});
},
Expand Down
Loading

0 comments on commit f078dd1

Please sign in to comment.