Skip to content

Commit

Permalink
Allow in context channel (#8)
Browse files Browse the repository at this point in the history
* Add incontext channel invocation

Signed-off-by: Marcos Candeia <[email protected]>

* Make channel upgrader work

Signed-off-by: Marcos Candeia <[email protected]>

* Add channel upgrader type

Signed-off-by: Marcos Candeia <[email protected]>

---------

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Oct 16, 2024
1 parent de7665b commit deaf097
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 3 deletions.
3 changes: 3 additions & 0 deletions src/actors/proxyutil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ export class ActorAwaiter<
protected invoker: ActorInvoker<TResponse, TChannel>,
) {
}
[Symbol.dispose](): void {
this.close();
}
async close() {
const ch = await this.channel;
await ch.close();
Expand Down
25 changes: 25 additions & 0 deletions src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ class Hello {
sayHello(): string {
return "Hello, World!";
}

chan(name: string): ChannelUpgrader<string, string> {
return (async ({ send }) => {
for (let i = 0; i < 10; i++) {
await send(`Hello ${name} ${i}`);
}
});
}
}
class Counter {
private count: number;
Expand Down Expand Up @@ -48,6 +56,14 @@ class Counter {
return this.watchTarget.subscribe();
}

async *readHelloChan(name: string): AsyncIterableIterator<string> {
const hello = this.state.proxy(Hello).id(this.state.id);
using helloChan = hello.chan(name);
for await (const event of helloChan.recv()) {
yield event;
}
}

chan(name: string): ChannelUpgrader<string, string> {
return (async ({ send, recv }) => {
await send(`Hello ${name}`);
Expand Down Expand Up @@ -133,4 +149,13 @@ Deno.test("counter tests", async () => {
const prevReqCount = reqCount;
assertEquals(await actor.callSayHello(), "Hello, World!");
assertEquals(reqCount, prevReqCount + 1); // does not need a new request for invoking another actor method

const prevReqCountHello = reqCount;
const helloChan = await actor.readHelloChan(name);
for (let i = 0; i < 10; i++) {
const { value } = await helloChan.next();
assertEquals(`Hello ${name} ${i}`, value);
}
helloChan.return?.();
assertEquals(reqCount, prevReqCountHello + 1); // does not need a new request for invoking another actor method
});
21 changes: 19 additions & 2 deletions src/actors/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import type { ActorStorage } from "./storage.ts";
import { DenoKvActorStorage } from "./storage/denoKv.ts";
import { S3ActorStorage } from "./storage/s3.ts";
import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts";
import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts";
import {
isUpgrade,
makeDuplexChannel,
makeWebSocket,
} from "./util/channels/channel.ts";

/**
* Represents an actor.
Expand Down Expand Up @@ -150,7 +154,20 @@ export class ActorRuntime {
proxy: (actor) => {
const invoker = (id: string) => {
if (id === actorId) {
return this.invoker;
return {
invoke: async (
name: string,
method: string,
args: unknown[],
connect?: true,
) => {
const resp = await this.invoker.invoke(name, method, args);
if (connect && isUpgrade(resp)) {
return makeDuplexChannel(resp);
}
return resp;
},
};
}
return createHttpInvoker(id);
};
Expand Down
41 changes: 40 additions & 1 deletion src/actors/util/channels/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ export const makeChan = <T>(capacity = 0): Channel<T> => {
};
};

export interface DuplexChannel<TSend, TReceive> {
export interface DuplexChannel<TSend, TReceive> extends Disposable {
send: Channel<TSend>["send"];
recv: Channel<TReceive>["recv"];
close: () => void | Promise<void>;
Expand Down Expand Up @@ -195,6 +195,7 @@ export const makeWebSocket = <
recv: recvChan.recv.bind(recvChan),
send: sendChan.send.bind(recvChan),
close: () => socket.close(),
[Symbol.dispose]: () => socket.close(),
});
for await (const message of sendChan.recv()) {
try {
Expand All @@ -209,3 +210,41 @@ export const makeWebSocket = <
};
return ch.promise;
};

/**
* Creates a new duplex channel.
* @param upgrader the channel upgrader
* @returns a created duplex channel
*/
export const makeDuplexChannel = <TSend, TReceive>(
upgrader?: ChannelUpgrader<TSend, TReceive>,
): DuplexChannel<TSend, TReceive> => {
// Create internal send and receive channels
const sendChan = makeChan<TSend>();
const recvChan = makeChan<TReceive>();

const duplexChannel: DuplexChannel<TSend, TReceive> = {
send: sendChan.send.bind(sendChan),
recv: recvChan.recv.bind(recvChan),
close: () => {
sendChan.close();
recvChan.close();
},
[Symbol.dispose]: () => {
sendChan.close();
recvChan.close();
},
};

// If there's an upgrader, we upgrade the duplex channel
if (upgrader && isUpgrade(upgrader)) {
upgrader({
send: recvChan.send.bind(recvChan),
recv: sendChan.recv.bind(sendChan),
close: duplexChannel.close,
[Symbol.dispose]: duplexChannel[Symbol.dispose],
});
}

return duplexChannel;
};

0 comments on commit deaf097

Please sign in to comment.