From 073aa381900482d22ae29008c848addd263ea92a Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Sun, 20 Oct 2024 12:30:32 -0300 Subject: [PATCH] Allow metadata invocation Signed-off-by: Marcos Candeia --- src/actors/proxyutil.ts | 37 ++++++++++++++++++++++++++++++------- src/actors/runtime.test.ts | 5 +++-- src/actors/runtime.ts | 29 ++++++++++++++++++++++++----- 3 files changed, 57 insertions(+), 14 deletions(-) diff --git a/src/actors/proxyutil.ts b/src/actors/proxyutil.ts index d4f6ecc..c4201dd 100644 --- a/src/actors/proxyutil.ts +++ b/src/actors/proxyutil.ts @@ -33,12 +33,14 @@ export interface ActorInvoker< name: string, method: string, methodArgs: unknown[], + metadata?: unknown, ): Promise; invoke( name: string, method: string, methodArgs: unknown[], + metadata: unknown, connect: true, ): Promise; } @@ -56,6 +58,7 @@ export class ActorAwaiter< protected actorMethod: string, protected methodArgs: unknown[], protected invoker: ActorInvoker, + protected mMetadata?: unknown, ) { } [Symbol.dispose](): void { @@ -82,6 +85,7 @@ export class ActorAwaiter< this.actorName, this.actorMethod, this.methodArgs, + this.mMetadata, true, ); } @@ -96,6 +100,7 @@ export class ActorAwaiter< this.actorName, this.actorMethod, this.methodArgs, + this.mMetadata, ) .catch(onrejected); } @@ -111,6 +116,7 @@ export class ActorAwaiter< this.actorName, this.actorMethod, this.methodArgs, + this.mMetadata, ).then(onfufilled).catch( onrejected, ); @@ -128,13 +134,19 @@ export interface ProxyOptions { export type PromisifyKey = Actor[key] extends (...args: infer Args) => Awaited ? Return extends ChannelUpgrader - ? (...args: Args) => DuplexChannel - : (...args: Args) => Promise + ? { (...args: Args): DuplexChannel } + : { (...args: Args): Promise } : Actor[key]; -export type Promisify = { - [key in keyof Actor]: PromisifyKey; -}; +export type Promisify = + & { + [key in keyof Actor]: PromisifyKey; + } + & (Actor extends { metadata?: infer TMetadata } ? { + withMetadata(metadata: TMetadata): Promisify; + } + // deno-lint-ignore ban-types + : {}); const urlFor = ( serverUrl: string, @@ -186,13 +198,18 @@ export const createHttpInvoker = < } const actorsServer = server ?? _server!; return { - invoke: async (name, method, methodArgs, connect?: true) => { + invoke: async (name, method, methodArgs, metadata, connect?: true) => { const endpoint = urlFor(actorsServer.url, name, method); if (connect) { const ws = new WebSocket( `${endpoint}?args=${ encodeURIComponent( - btoa(JSON.stringify({ args: methodArgs ?? [] })), + btoa( + JSON.stringify({ + args: methodArgs ?? [], + metadata: metadata ?? {}, + }), + ), ) }&${ACTOR_ID_QS_NAME}=${actorId}`, ); @@ -214,6 +231,7 @@ export const createHttpInvoker = < body: JSON.stringify( { args: methodArgs ?? [], + metadata: metadata ?? {}, }, (_key, value) => typeof value === "bigint" ? value.toString() : value, // return everything else unchanged @@ -269,12 +287,16 @@ export const createHttpInvoker = < export const create = ( actor: ActorConstructor | string, invokerFactory: (id: string) => ActorInvoker, + metadata?: unknown, ): { id: (id: string) => Promisify } => { const name = typeof actor === "string" ? actor : actor.name; return { id: (id: string): Promisify => { return new Proxy>({} as Promisify, { get: (_, method) => { + if (method === "withMetadata") { + return (m: unknown) => create(actor, invokerFactory, m).id(id); + } const invoker = invokerFactory(id); return (...args: unknown[]) => { const awaiter = new ActorAwaiter( @@ -282,6 +304,7 @@ export const create = ( String(method), args, invoker, + metadata, ); return awaiter; }; diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts index 571df95..72537cb 100644 --- a/src/actors/runtime.test.ts +++ b/src/actors/runtime.test.ts @@ -21,6 +21,7 @@ class Hello { class Counter { private count: number; private watchTarget = new WatchTarget(); + public metadata?: { extraSum: number }; constructor(protected state: ActorState) { this.count = 0; @@ -49,7 +50,7 @@ class Counter { } getCount(): number { - return this.count; + return this.count + (this.metadata?.extraSum ?? 0); } watch(): AsyncIterableIterator { @@ -120,7 +121,7 @@ Deno.test("counter tests", async () => { const watcher = await actor.watch(); - assertEquals(await actor.getCount(), 0); + assertEquals(await actor.withMetadata({ extraSum: 10 }).getCount(), 10); // Test increment const number = await actor.increment(); diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts index 766aee0..401f751 100644 --- a/src/actors/runtime.ts +++ b/src/actors/runtime.ts @@ -102,6 +102,7 @@ export class ActorRuntime { actorName, methodName, args, + metadata, ) => { const actorInvoker = actorName ? this.actors.get(actorName) : undefined; if (!actorInvoker) { @@ -123,7 +124,16 @@ export class ActorRuntime { ); } await initialization; - return await (methodImpl as Function).bind(actor)( + // Create a proxy to override the 'name' property for this specific bind + const actorProxy = new Proxy(actor, { + get(target, prop, receiver) { + if (prop === "metadata") { + return metadata; // Only modify this property for the proxy + } + return Reflect.get(target, prop, receiver); // Default behavior for other properties + }, + }); + return await (methodImpl as Function).bind(actorProxy)( ...Array.isArray(args) ? args : [args], ); }; @@ -169,9 +179,15 @@ export class ActorRuntime { name: string, method: string, args: unknown[], + metadata: unknown, connect?: true, ) => { - const resp = await this.invoker.invoke(name, method, args); + const resp = await this.invoker.invoke( + name, + method, + args, + metadata, + ); if (connect && isUpgrade(resp)) { return makeDuplexChannel(resp); } @@ -231,19 +247,22 @@ export class ActorRuntime { { status: 404 }, ); } - let args = []; + let args = [], metadata = {}; if (req.headers.get("content-type")?.includes("application/json")) { - const { args: margs } = await req.json(); + const { args: margs, metadata: maybeMetadata } = await req.json(); args = margs; + metadata = maybeMetadata; } else if (url.searchParams.get("args")) { const qargs = url.searchParams.get("args"); + const parsedArgs = qargs ? JSON.parse(atob(decodeURIComponent(qargs))) : {}; args = parsedArgs.args; + metadata = parsedArgs.metadata; } try { - const res = await this.invoker.invoke(actorName, method, args); + const res = await this.invoker.invoke(actorName, method, args, metadata); if (req.headers.get("upgrade") === "websocket" && isUpgrade(res)) { const { socket, response } = Deno.upgradeWebSocket(req); makeWebSocket(socket).then((ch) => res(ch)).finally(() =>