diff --git a/.gitignore b/.gitignore index c6bba59..8bc5f04 100644 --- a/.gitignore +++ b/.gitignore @@ -128,3 +128,7 @@ dist .yarn/build-state.yml .yarn/install-state.gz .pnp.* + + +.vscode +kv \ No newline at end of file diff --git a/README.md b/README.md index 776f564..204b9d5 100644 --- a/README.md +++ b/README.md @@ -1,55 +1,72 @@ +jsr + # Actors -High-scale interactive services often demand a combination of high throughput, low latency, and high availability. These are challenging goals to meet with traditional stateless architectures. Inspired by the Orleans virtual-actor pattern, the **Actors** library offers a stateful solution, enabling developers to manage distributed state in a seamless and scalable way. +High-scale interactive services often demand a combination of high throughput, +low latency, and high availability. These are challenging goals to meet with +traditional stateless architectures. Inspired by the Orleans virtual-actor +pattern, the **Actors** library offers a stateful solution, enabling developers +to manage distributed state in a seamless and scalable way. + +The **Actors** model simplifies the development of stateful applications by +abstracting away the complexity of distributed system concerns, such as +reliability and resource management. This allows developers to focus on building +logic while the framework handles the intricacies of state distribution and +fault tolerance. -The **Actors** model simplifies the development of stateful applications by abstracting away the complexity of distributed system concerns, such as reliability and resource management. This allows developers to focus on building logic while the framework handles the intricacies of state distribution and fault tolerance. +With **Actors**, developers create "actors" – isolated, stateful objects that +can be invoked directly. Each actor is uniquely addressable, enabling efficient +and straightforward interaction across distributed environments. -With **Actors**, developers create "actors" – isolated, stateful objects that can be invoked directly. Each actor is uniquely addressable, enabling efficient and straightforward interaction across distributed environments. +## Key Features -## Key Features: -- **Simplified State Management:** Build stateful services using a straightforward programming model, without worrying about distributed systems complexities like locks or consistency. -- **No Distributed Locks:** Actors handle state independently, eliminating the need for distributed locks. Each actor is responsible for its own state, making it simple to work with highly concurrent scenarios without race conditions. -- **Virtual Actors:** Actors are automatically instantiated, managed, and scaled by the framework, freeing you from managing lifecycles manually. -- **Powered by Deno Cluster Isolates:** Achieve high-performance applications that scale effortlessly by leveraging Deno cluster's unique isolate addressing. +- **Simplified State Management:** Build stateful services using a + straightforward programming model, without worrying about distributed systems + complexities like locks or consistency. +- **No Distributed Locks:** Actors handle state independently, eliminating the + need for distributed locks. Each actor is responsible for its own state, + making it simple to work with highly concurrent scenarios without race + conditions. +- **Virtual Actors:** Actors are automatically instantiated, managed, and scaled + by the framework, freeing you from managing lifecycles manually. +- **Powered by Deno Cluster Isolates:** Achieve high-performance applications + that scale effortlessly by leveraging Deno cluster's unique isolate + addressing. ## Example: Simple Atomic Counter without Distributed Locks ```typescript -import { Actor, ActorState, actors } from "@deco/actors"; +import { actors, ActorState } from "@deco/actors"; -interface ICounter { - increment(): Promise; - getCount(): Promise; -} +class Counter { + private count: number; -export default class Counter extends Actor implements ICounter { - private count: number; - - constructor(state: ActorState) { - super(state); - this.count = 0; - state.blockConcurrencyWhile(async () => { - this.count = await this.getCount(); - }); - } - - async increment(): Promise { - let val = await this.state.storage.get("counter"); - await this.state.storage.put("counter", ++val); - return val; - } - - async getCount(): Promise { - return await this.state.storage.get("counter"); - } + constructor(protected state: ActorState) { + this.count = 0; + state.blockConcurrencyWhile(async () => { + this.count = await this.state.storage.get("counter") ?? 0; + }); + } + + async increment(): Promise { + await this.state.storage.put("counter", ++this.count); + return this.count; + } + + getCount(): number { + return this.count; + } } // Invoking the counter actor -const counter = actors.proxy({ id: "counter-1" }); +const counterProxy = actors.proxy({ + actor: Counter, + server: "http://localhost:8000", +}); +const counter = counterProxy.id("counter-id"); // Increment counter await counter.increment(); // Get current count const currentCount = await counter.getCount(); console.log(`Current count: ${currentCount}`); - ``` diff --git a/deno.json b/deno.json new file mode 100644 index 0000000..4346cef --- /dev/null +++ b/deno.json @@ -0,0 +1,20 @@ +{ + "name": "@deco/actors", + "exports": { + ".": "./src/actors/mod.ts", + "./hono": "./src/actors/hono/middleware.ts" + }, + "imports": { + "@core/asyncutil": "jsr:@core/asyncutil@^1.1.1", + "@hono/hono": "jsr:@hono/hono@^4.6.2", + "@std/assert": "jsr:@std/assert@^1.0.5", + "@std/async": "jsr:@std/async@^1.0.5", + "@std/path": "jsr:@std/path@^1.0.6" + }, + "tasks": { + "check": "deno fmt && deno lint --fix && deno check ./src/actors/mod.ts ./src/actors/hono/middleware.ts", + "test": "rm kv;deno test -A --unstable-kv ." + }, + "lock": false, + "version": "0.0.0" +} diff --git a/src/actors/factory.ts b/src/actors/factory.ts new file mode 100644 index 0000000..dbbad56 --- /dev/null +++ b/src/actors/factory.ts @@ -0,0 +1,47 @@ +import { + type Actor, + ACTOR_ID_HEADER_NAME, + type ActorConstructor, +} from "./runtime.ts"; + +export interface ProxyOptions { + actor: ActorConstructor | string; + server: string; +} + +type Promisify = { + [key in keyof Actor]: Actor[key] extends (...args: infer Args) => infer Return + ? Return extends Promise ? Actor[key] + : (...args: Args) => Promise + : Actor[key]; +}; +export const actors = { + proxy: (c: ProxyOptions) => { + return { + id: (id: string): Promisify => { + return new Proxy>({} as Promisify, { + get: (_, prop) => { + return async (...args: unknown[]) => { + const resp = await fetch( + `${c.server}/actors/${ + typeof c.actor === "string" ? c.actor : c.actor.name + }/invoke/${String(prop)}`, + { + method: "POST", + headers: { + "Content-Type": "application/json", + [ACTOR_ID_HEADER_NAME]: id, + }, + body: JSON.stringify({ + args, + }), + }, + ); + return resp.json(); + }; + }, + }); + }, + }; + }, +}; diff --git a/src/actors/hono/middleware.ts b/src/actors/hono/middleware.ts new file mode 100644 index 0000000..c3fee81 --- /dev/null +++ b/src/actors/hono/middleware.ts @@ -0,0 +1,19 @@ +import type { MiddlewareHandler } from "@hono/hono"; +import type { ActorRuntime } from "../mod.ts"; + +/** + * Adds middleware to the Hono server that routes requests to actors. + * the default base path is `/actors`. + */ +export const useActors = ( + rt: ActorRuntime, + basePath = "/actors", +): MiddlewareHandler => { + return async (ctx, next) => { + if (!ctx.req.path.startsWith(basePath)) { + return next(); + } + const response = await rt.fetch(ctx.req.raw); + ctx.res = response; + }; +}; diff --git a/src/actors/mod.ts b/src/actors/mod.ts new file mode 100644 index 0000000..558a390 --- /dev/null +++ b/src/actors/mod.ts @@ -0,0 +1,7 @@ +// deno-lint-ignore no-empty-interface +export interface Actor { +} + +export { ActorRuntime } from "./runtime.ts"; +export { ActorState } from "./state.ts"; +export { type ActorStorage } from "./storage.ts"; diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts new file mode 100644 index 0000000..10aa189 --- /dev/null +++ b/src/actors/runtime.test.ts @@ -0,0 +1,57 @@ +import { assertEquals } from "@std/assert"; +import { actors } from "./factory.ts"; +import { ActorRuntime } from "./runtime.ts"; +import type { ActorState } from "./state.ts"; + +class Counter { + private count: number; + + constructor(protected state: ActorState) { + this.count = 0; + state.blockConcurrencyWhile(async () => { + this.count = await this.state.storage.get("counter") ?? 0; + }); + } + + async increment(): Promise { + await this.state.storage.put("counter", ++this.count); + return this.count; + } + + getCount(): number { + return this.count; + } +} + +const runServer = (rt: ActorRuntime): AsyncDisposable => { + const server = Deno.serve(rt.fetch.bind(rt)); + return { + async [Symbol.asyncDispose]() { + await server.shutdown(); + }, + }; +}; + +Deno.test("counter increment and getCount", async () => { + const rt = new ActorRuntime([Counter]); + await using _server = runServer(rt); + const actorId = "1234"; + const counterProxy = actors.proxy({ + actor: Counter, + server: "http://localhost:8000", + }); + + const actor = counterProxy.id(actorId); + // Test increment + const number = await actor.increment(); + assertEquals(number, 1); + + // Test getCount + assertEquals(await actor.getCount(), 1); + + // Test increment again + assertEquals(await actor.increment(), 2); + + // Test getCount again + assertEquals(await actor.getCount(), 2); +}); diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts new file mode 100644 index 0000000..7207599 --- /dev/null +++ b/src/actors/runtime.ts @@ -0,0 +1,155 @@ +import { ActorState } from "./state.ts"; +import { DenoKvActorStorage } from "./storage/denoKv.ts"; +/** + * Represents an actor. + */ +// deno-lint-ignore no-empty-interface +export interface Actor { +} + +/** + * The name of the header used to specify the actor ID. + */ +export const ACTOR_ID_HEADER_NAME = "x-deno-isolate-instance-id"; +const ACTOR_NAME_PATH_PARAM = "actorName"; +const METHOD_NAME_PATH_PARAM = "methodName"; + +/** + * The URL pattern for invoking an actor method. + */ +export const actorInvokeUrl = new URLPattern({ + pathname: + `/actors/:${ACTOR_NAME_PATH_PARAM}/invoke/:${METHOD_NAME_PATH_PARAM}`, +}); + +// deno-lint-ignore no-explicit-any +type Function = (...args: any) => any; +const isInvocable = (f: never | Function): f is Function => { + return typeof f === "function"; +}; + +/** + * Represents a constructor function for creating an actor instance. + * @template TInstance - The type of the actor instance. + */ +export type ActorConstructor = new ( + state: ActorState, +) => TInstance; + +/** + * Represents an actor invoker. + */ +export interface ActorInvoker { + /** + * The actor instance. + */ + actor: Actor; + /** + * The actor state. + */ + state: ActorState; + /** + * A promise that resolves when the actor is initialized. + */ + initialization: PromiseWithResolvers; +} + +/** + * Represents the runtime for managing and invoking actors. + */ +export class ActorRuntime { + private actors: Map = new Map(); + private initilized = false; + /** + * Creates an instance of ActorRuntime. + * @param actorsConstructors - An array of actor constructors. + */ + constructor( + protected actorsConstructors: Array, + ) { + } + + /** + * Ensures that the actors are initialized for the given actor ID. + * @param actorId - The ID of the actor. + */ + ensureInitialized(actorId: string) { + if (this.initilized) { + return; + } + this.actorsConstructors.forEach((Actor) => { + const initialization = Promise.withResolvers(); + const storage = new DenoKvActorStorage({ + actorId, + actorName: Actor.name, + }); + const state = new ActorState({ + initialization, + storage, + }); + const actor = new Actor( + state, + ); + this.actors.set(Actor.name, { + actor, + state, + initialization, + }); + }); + this.initilized = true; + } + + /** + * Handles an incoming request and invokes the corresponding actor method. + * @param req - The incoming request. + * @returns A promise that resolves to the response. + */ + async fetch(req: Request): Promise { + const url = new URL(req.url); + const actorId = req.headers.get(ACTOR_ID_HEADER_NAME); + if (!actorId) { + return new Response(`missing ${ACTOR_ID_HEADER_NAME} header`, { + status: 400, + }); + } + + this.ensureInitialized(actorId); + + const result = actorInvokeUrl.exec(url); + if (!result) { + return new Response(null, { status: 404 }); + } + const groups = result?.pathname.groups ?? {}; + const actorName = groups[ACTOR_NAME_PATH_PARAM]; + const actorInvoker = actorName ? this.actors.get(actorName) : undefined; + if (!actorInvoker) { + return new Response(`actor ${ACTOR_NAME_PATH_PARAM} not found`, { + status: 404, + }); + } + const { actor, initialization } = actorInvoker; + const method = groups[METHOD_NAME_PATH_PARAM]; + if (!method || !(method in actor)) { + return new Response(`method not found for the actor`, { status: 404 }); + } + let args = []; + if (req.headers.get("content-length") !== null) { + const { args: margs } = await req.json(); + args = margs; + } + const methodImpl = actor[method as keyof typeof actor]; + if (!isInvocable(methodImpl)) { + return new Response( + `cannot invoke actor method for type ${typeof methodImpl}`, + { + status: 400, + }, + ); + } + await initialization.promise; + const res = await (methodImpl as Function).bind(actor)( + ...Array.isArray(args) ? args : [args], + ); + return Response.json(res); + } +} diff --git a/src/actors/state.ts b/src/actors/state.ts new file mode 100644 index 0000000..839693b --- /dev/null +++ b/src/actors/state.ts @@ -0,0 +1,21 @@ +import type { ActorStorage } from "./storage.ts"; + +export interface ActorStateOptions { + initialization: PromiseWithResolvers; + storage: ActorStorage; +} +/** + * Represents the state of an actor. + */ +export class ActorState { + public storage: ActorStorage; + constructor(private options: ActorStateOptions) { + this.storage = options.storage; + } + + async blockConcurrencyWhile(callback: () => Promise): Promise { + return await callback().finally(() => { + this.options.initialization.resolve(); + }); + } +} diff --git a/src/actors/storage.ts b/src/actors/storage.ts new file mode 100644 index 0000000..91cce75 --- /dev/null +++ b/src/actors/storage.ts @@ -0,0 +1,46 @@ +export interface ActorStorageListOptions { + start?: string; + startAfter?: string; + end?: string; + prefix?: string; + reverse?: boolean; + limit?: number; + noCache?: boolean; +} +export interface ActorStorageGetOptions { + noCache?: boolean; +} + +export interface ActorStoragePutOptions { + noCache?: boolean; +} + +/** + * Represents the storage of an actor. + */ +export interface ActorStorage { + get( + key: string, + options?: ActorStorageGetOptions, + ): Promise; + get( + keys: string[], + options?: ActorStorageGetOptions, + ): Promise>; + list( + options?: ActorStorageListOptions, + ): Promise>; + put( + key: string, + value: T, + options?: ActorStoragePutOptions, + ): Promise; + put( + entries: Record, + options?: ActorStoragePutOptions, + ): Promise; + delete(key: string, options?: ActorStoragePutOptions): Promise; + delete(keys: string[], options?: ActorStoragePutOptions): Promise; + deleteAll(options?: ActorStoragePutOptions): Promise; + atomic(storage: (st: ActorStorage) => Promise): Promise; +} diff --git a/src/actors/storage/cached.ts b/src/actors/storage/cached.ts new file mode 100644 index 0000000..8657936 --- /dev/null +++ b/src/actors/storage/cached.ts @@ -0,0 +1,126 @@ +// deno-lint-ignore-file no-explicit-any +import type { + ActorStorage, + ActorStorageGetOptions, + ActorStorageListOptions, + ActorStoragePutOptions, +} from "../storage.ts"; + +export class CachedStorage implements ActorStorage { + protected cache: Map = new Map(); + + constructor(protected innerStorage: ActorStorage) {} + + private async getMany( + keys: string[], + options?: ActorStorageGetOptions, + ): Promise> { + const { noCache } = options || {}; + const result = new Map(); + const keysToFetch: string[] = []; + + for (const key of keys) { + if (!noCache && this.cache.has(key)) { + result.set(key, this.cache.get(key) as T); + } else { + keysToFetch.push(key); + } + } + + if (keysToFetch.length > 0) { + const fetched = await this.innerStorage.get(keysToFetch, options); + for (const [key, value] of fetched.entries()) { + this.cache.set(key, value); + result.set(key, value); + } + } + + return result; + } + async get( + key: string, + options?: ActorStorageGetOptions, + ): Promise; + async get( + keys: string[], + options?: ActorStorageGetOptions, + ): Promise>; + async get( + keys: string | string[], + options?: ActorStorageGetOptions, + ): Promise | string> { + if (typeof keys === "string") { + const results = await this.getMany([keys], options); + return results.get(keys) as string; + } + return this.getMany(keys, options); + } + + async list( + options?: ActorStorageListOptions, + ): Promise> { + const result = await this.innerStorage.list(options); + + for (const [key, value] of result.entries()) { + if (this.cache.has(key)) { + result.set(key, this.cache.get(key)); + } else { + this.cache.set(key, value); + } + } + + return result; + } + + async put( + keyOrEntries: string | Record, + valueOrOptions?: T | ActorStoragePutOptions, + options?: ActorStoragePutOptions, + ): Promise { + const entries = typeof keyOrEntries === "string" + ? { [keyOrEntries]: valueOrOptions as T } + : keyOrEntries; + // Multiple entries put + await this.innerStorage.put( + entries, + (typeof keyOrEntries === "string" + ? options + : valueOrOptions) as ActorStoragePutOptions, + ); + for (const key in entries) { + this.cache.set(key, entries[key]); + } + } + + async delete( + key: string, + options?: ActorStoragePutOptions, + ): Promise; + async delete( + keys: string[], + options?: ActorStoragePutOptions, + ): Promise; + + async delete( + keyOrKeys: string | string[], + options?: ActorStoragePutOptions, + ): Promise { + const keys = typeof keyOrKeys === "string" ? [keyOrKeys] : keyOrKeys; + // Multiple keys delete + const result = await this.innerStorage.delete( + keys, + options, + ); + keys.forEach((key) => this.cache.delete(key)); + return result; + } + + async deleteAll(options?: ActorStoragePutOptions): Promise { + this.cache.clear(); + await this.innerStorage.deleteAll(options); + } + + async atomic(storage: (st: ActorStorage) => Promise): Promise { + await storage(this); + } +} diff --git a/src/actors/storage/denoKv.ts b/src/actors/storage/denoKv.ts new file mode 100644 index 0000000..3895d28 --- /dev/null +++ b/src/actors/storage/denoKv.ts @@ -0,0 +1,147 @@ +import { join } from "@std/path"; +import type { + ActorStorage, + ActorStorageListOptions, + ActorStoragePutOptions, +} from "../storage.ts"; + +export interface StorageOptions { + actorName: string; + actorId: string; + atomicOp?: Deno.AtomicOperation; +} + +const kv = await Deno.openKv(join(Deno.cwd(), "kv")); + +export class DenoKvActorStorage implements ActorStorage { + private kv: Deno.Kv; + private atomicOp?: Deno.AtomicOperation; + private kvOrTransaction: Deno.Kv | Deno.AtomicOperation; + constructor(protected options: StorageOptions) { + this.kv = kv; // Initialize the Deno.Kv instance + this.kvOrTransaction = options.atomicOp ?? kv; + this.atomicOp = options.atomicOp; + } + + async atomic(_storage: (st: ActorStorage) => Promise): Promise { + if (this.kv instanceof Deno.AtomicOperation) { + throw new Error(`not implemented`); + } + const atomicOp = this.kv.atomic(); + const st = new DenoKvActorStorage({ + ...this.options, + atomicOp, + }); + return await _storage(st).then(async () => { + const result = await atomicOp.commit(); + if (!result.ok) { + throw new Error(`atomic operation failed`); + } + }); + } + + // Build the full key based on actor name, id, and provided key + buildKey(key: string): string[] { + return [this.options.actorName, this.options.actorId, key]; + } + + // Single get method that handles both string and array of strings + async get( + keyOrKeys: string | string[], + ): Promise> { + // If the input is a single string, perform a single get + if (typeof keyOrKeys === "string") { + const result = await this.kv.get(this.buildKey(keyOrKeys)); + return result?.value ?? undefined; + } + + // If the input is an array of strings, perform multiple gets and return a Map + const result = new Map(); + for (const key of keyOrKeys) { + const value = await this.get(key) as T; + if (value !== undefined) { + result.set(key, value); + } + } + + return result; + } + + // Put function that directly stores the value in Deno.Kv + async put( + key: string, + value: T, + options?: ActorStoragePutOptions, + ): Promise; + async put( + entries: Record, + options?: ActorStoragePutOptions, + ): Promise; + async put( + entry: string | Record, + value: T | ActorStoragePutOptions, + ): Promise { + const entries = typeof entry === "string" ? { [entry]: value } : entry; + + for (const [key, value] of Object.entries(entries)) { + await this.kvOrTransaction.set(this.buildKey(key), value); + } + } + + // Delete function that removes keys from Deno.Kv + async delete(key: string, options?: ActorStoragePutOptions): Promise; + async delete( + keys: string[], + options?: ActorStoragePutOptions, + ): Promise; + async delete( + keys: string | string[], + ): Promise { + const fullKeys = Array.isArray(keys) ? keys : [keys]; + let deletedCount = 0; + + const batch = this.atomicOp ?? this.kv.atomic(); + for (const key of fullKeys) { + batch.delete(this.buildKey(key)); + deletedCount++; + } + !this.atomicOp && await batch.commit(); + + return Array.isArray(keys) ? deletedCount : deletedCount > 0; + } + + // Delete all records within a certain range based on the options provided + async deleteAll(): Promise { + const iter = await this.list(); + + const batch = this.atomicOp ?? this.kv.atomic(); + for (const [key] of iter) { + batch.delete(this.buildKey(key)); + } + + !this.atomicOp && await batch.commit(); + } + + // List records in the storage with optional range and filtering + async list( + options?: ActorStorageListOptions, + ): Promise> { + const map = new Map(); + const iter = this.kv.list( + { + start: options?.start ? this.buildKey(options.start) : [], + end: options?.end ? this.buildKey(options.end) : [], + prefix: options?.prefix ? this.buildKey(options.prefix) : [], + }, + { + limit: options?.limit, + reverse: options?.reverse, + }, + ); + + for await (const entry of iter) { + map.set(entry.key[entry.key.length - 1].toString(), entry.value); + } + return map; + } +}