Skip to content

Commit

Permalink
Allow metadata invocation
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Oct 20, 2024
1 parent 61c3198 commit 073aa38
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 14 deletions.
37 changes: 30 additions & 7 deletions src/actors/proxyutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ export interface ActorInvoker<
name: string,
method: string,
methodArgs: unknown[],
metadata?: unknown,
): Promise<TResponse>;

invoke(
name: string,
method: string,
methodArgs: unknown[],
metadata: unknown,
connect: true,
): Promise<TChannel>;
}
Expand All @@ -56,6 +58,7 @@ export class ActorAwaiter<
protected actorMethod: string,
protected methodArgs: unknown[],
protected invoker: ActorInvoker<TResponse, TChannel>,
protected mMetadata?: unknown,
) {
}
[Symbol.dispose](): void {
Expand All @@ -82,6 +85,7 @@ export class ActorAwaiter<
this.actorName,
this.actorMethod,
this.methodArgs,
this.mMetadata,
true,
);
}
Expand All @@ -96,6 +100,7 @@ export class ActorAwaiter<
this.actorName,
this.actorMethod,
this.methodArgs,
this.mMetadata,
)
.catch(onrejected);
}
Expand All @@ -111,6 +116,7 @@ export class ActorAwaiter<
this.actorName,
this.actorMethod,
this.methodArgs,
this.mMetadata,
).then(onfufilled).catch(
onrejected,
);
Expand All @@ -128,13 +134,19 @@ export interface ProxyOptions<TInstance extends Actor> {
export type PromisifyKey<key extends keyof Actor, Actor> = Actor[key] extends
(...args: infer Args) => Awaited<infer Return>
? Return extends ChannelUpgrader<infer TSend, infer TReceive>
? (...args: Args) => DuplexChannel<TReceive, TSend>
: (...args: Args) => Promise<Return>
? { (...args: Args): DuplexChannel<TReceive, TSend> }
: { (...args: Args): Promise<Return> }
: Actor[key];

export type Promisify<Actor> = {
[key in keyof Actor]: PromisifyKey<key, Actor>;
};
export type Promisify<Actor> =
& {
[key in keyof Actor]: PromisifyKey<key, Actor>;
}
& (Actor extends { metadata?: infer TMetadata } ? {
withMetadata(metadata: TMetadata): Promisify<Actor>;
}
// deno-lint-ignore ban-types
: {});

const urlFor = (
serverUrl: string,
Expand Down Expand Up @@ -186,13 +198,18 @@ export const createHttpInvoker = <
}
const actorsServer = server ?? _server!;
return {
invoke: async (name, method, methodArgs, connect?: true) => {
invoke: async (name, method, methodArgs, metadata, connect?: true) => {
const endpoint = urlFor(actorsServer.url, name, method);
if (connect) {
const ws = new WebSocket(
`${endpoint}?args=${
encodeURIComponent(
btoa(JSON.stringify({ args: methodArgs ?? [] })),
btoa(
JSON.stringify({
args: methodArgs ?? [],
metadata: metadata ?? {},
}),
),
)
}&${ACTOR_ID_QS_NAME}=${actorId}`,
);
Expand All @@ -214,6 +231,7 @@ export const createHttpInvoker = <
body: JSON.stringify(
{
args: methodArgs ?? [],
metadata: metadata ?? {},
},
(_key, value) =>
typeof value === "bigint" ? value.toString() : value, // return everything else unchanged
Expand Down Expand Up @@ -269,19 +287,24 @@ export const createHttpInvoker = <
export const create = <TInstance extends Actor>(
actor: ActorConstructor<TInstance> | string,
invokerFactory: (id: string) => ActorInvoker,
metadata?: unknown,
): { id: (id: string) => Promisify<TInstance> } => {
const name = typeof actor === "string" ? actor : actor.name;
return {
id: (id: string): Promisify<TInstance> => {
return new Proxy<Promisify<TInstance>>({} as Promisify<TInstance>, {
get: (_, method) => {
if (method === "withMetadata") {
return (m: unknown) => create(actor, invokerFactory, m).id(id);
}
const invoker = invokerFactory(id);
return (...args: unknown[]) => {
const awaiter = new ActorAwaiter(
name,
String(method),
args,
invoker,
metadata,
);
return awaiter;
};
Expand Down
5 changes: 3 additions & 2 deletions src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class Hello {
class Counter {
private count: number;
private watchTarget = new WatchTarget<number>();
public metadata?: { extraSum: number };

constructor(protected state: ActorState) {
this.count = 0;
Expand Down Expand Up @@ -49,7 +50,7 @@ class Counter {
}

getCount(): number {
return this.count;
return this.count + (this.metadata?.extraSum ?? 0);
}

watch(): AsyncIterableIterator<number> {
Expand Down Expand Up @@ -120,7 +121,7 @@ Deno.test("counter tests", async () => {

const watcher = await actor.watch();

assertEquals(await actor.getCount(), 0);
assertEquals(await actor.withMetadata({ extraSum: 10 }).getCount(), 10);

// Test increment
const number = await actor.increment();
Expand Down
29 changes: 24 additions & 5 deletions src/actors/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export class ActorRuntime {
actorName,
methodName,
args,
metadata,
) => {
const actorInvoker = actorName ? this.actors.get(actorName) : undefined;
if (!actorInvoker) {
Expand All @@ -123,7 +124,16 @@ export class ActorRuntime {
);
}
await initialization;
return await (methodImpl as Function).bind(actor)(
// Create a proxy to override the 'name' property for this specific bind
const actorProxy = new Proxy(actor, {
get(target, prop, receiver) {
if (prop === "metadata") {
return metadata; // Only modify this property for the proxy
}
return Reflect.get(target, prop, receiver); // Default behavior for other properties
},
});
return await (methodImpl as Function).bind(actorProxy)(
...Array.isArray(args) ? args : [args],
);
};
Expand Down Expand Up @@ -169,9 +179,15 @@ export class ActorRuntime {
name: string,
method: string,
args: unknown[],
metadata: unknown,
connect?: true,
) => {
const resp = await this.invoker.invoke(name, method, args);
const resp = await this.invoker.invoke(
name,
method,
args,
metadata,
);
if (connect && isUpgrade(resp)) {
return makeDuplexChannel(resp);
}
Expand Down Expand Up @@ -231,19 +247,22 @@ export class ActorRuntime {
{ status: 404 },
);
}
let args = [];
let args = [], metadata = {};
if (req.headers.get("content-type")?.includes("application/json")) {
const { args: margs } = await req.json();
const { args: margs, metadata: maybeMetadata } = await req.json();
args = margs;
metadata = maybeMetadata;
} else if (url.searchParams.get("args")) {
const qargs = url.searchParams.get("args");

const parsedArgs = qargs
? JSON.parse(atob(decodeURIComponent(qargs)))
: {};
args = parsedArgs.args;
metadata = parsedArgs.metadata;
}
try {
const res = await this.invoker.invoke(actorName, method, args);
const res = await this.invoker.invoke(actorName, method, args, metadata);
if (req.headers.get("upgrade") === "websocket" && isUpgrade(res)) {
const { socket, response } = Deno.upgradeWebSocket(req);
makeWebSocket(socket).then((ch) => res(ch)).finally(() =>
Expand Down

0 comments on commit 073aa38

Please sign in to comment.