diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts index 5513137..300bee7 100644 --- a/src/actors/runtime.test.ts +++ b/src/actors/runtime.test.ts @@ -84,6 +84,9 @@ Deno.test("counter tests", async () => { await ch.close(); const watcher = await actor.watch(); + + assertEquals(await actor.getCount(), 0); + // Test increment const number = await actor.increment(); assertEquals(number, 1); diff --git a/src/actors/storage.ts b/src/actors/storage.ts index 91cce75..9319a80 100644 --- a/src/actors/storage.ts +++ b/src/actors/storage.ts @@ -1,8 +1,8 @@ export interface ActorStorageListOptions { - start?: string; - startAfter?: string; - end?: string; - prefix?: string; + start?: string[]; + startAfter?: string[]; + end?: string[]; + prefix?: string[]; reverse?: boolean; limit?: number; noCache?: boolean; @@ -24,23 +24,33 @@ export interface ActorStorage { options?: ActorStorageGetOptions, ): Promise; get( - keys: string[], + key: string[], options?: ActorStorageGetOptions, - ): Promise>; + ): Promise; + get( + keys: string[][], + options?: ActorStorageGetOptions, + ): Promise<[string[], T][]>; list( options?: ActorStorageListOptions, - ): Promise>; + ): Promise<[string[], T][]>; put( key: string, value: T, options?: ActorStoragePutOptions, ): Promise; put( - entries: Record, + key: string[], + value: T, + options?: ActorStoragePutOptions, + ): Promise; + put( + entries: [string[], T][], options?: ActorStoragePutOptions, ): Promise; - delete(key: string, options?: ActorStoragePutOptions): Promise; - delete(keys: string[], options?: ActorStoragePutOptions): Promise; + delete(key: string[], options?: ActorStoragePutOptions): Promise; + delete(key: string[], options?: ActorStoragePutOptions): Promise; + delete(keys: string[][], options?: ActorStoragePutOptions): Promise; deleteAll(options?: ActorStoragePutOptions): Promise; atomic(storage: (st: ActorStorage) => Promise): Promise; } diff --git a/src/actors/storage/cached.ts b/src/actors/storage/cached.ts index 8657936..7ba0e7e 100644 --- a/src/actors/storage/cached.ts +++ b/src/actors/storage/cached.ts @@ -12,16 +12,17 @@ export class CachedStorage implements ActorStorage { constructor(protected innerStorage: ActorStorage) {} private async getMany( - keys: string[], + keys: string[][], options?: ActorStorageGetOptions, - ): Promise> { + ): Promise<[string[], T][]> { const { noCache } = options || {}; - const result = new Map(); - const keysToFetch: string[] = []; + const result: [string[], T][] = []; + const keysToFetch: string[][] = []; for (const key of keys) { - if (!noCache && this.cache.has(key)) { - result.set(key, this.cache.get(key) as T); + const keyString = this.keyToString(key); + if (!noCache && this.cache.has(keyString)) { + result.push([key, this.cache.get(keyString) as T]); } else { keysToFetch.push(key); } @@ -29,43 +30,47 @@ export class CachedStorage implements ActorStorage { if (keysToFetch.length > 0) { const fetched = await this.innerStorage.get(keysToFetch, options); - for (const [key, value] of fetched.entries()) { - this.cache.set(key, value); - result.set(key, value); + for (const [key, value] of fetched) { + this.cache.set(this.keyToString(key), value); + result.push([key, value]); } } return result; } + + // Helper function to convert array of strings into a single string for cache key + private keyToString(key: string[]): string { + return key.join(":@:"); + } + async get( - key: string, - options?: ActorStorageGetOptions, - ): Promise; - async get( - keys: string[], - options?: ActorStorageGetOptions, - ): Promise>; - async get( - keys: string | string[], + keyOrKeys: string | string[] | string[][], options?: ActorStorageGetOptions, - ): Promise | string> { - if (typeof keys === "string") { - const results = await this.getMany([keys], options); - return results.get(keys) as string; + ): Promise { + if (Array.isArray(keyOrKeys[0])) { + // If the first element is an array, it's a list of keys + return this.getMany(keyOrKeys as string[][], options); + } else { + // Single key case + const results = await this.getMany([ + typeof keyOrKeys === "string" ? [keyOrKeys] : keyOrKeys as string[], + ], options); + return results[0][1] as T; } - return this.getMany(keys, options); } async list( options?: ActorStorageListOptions, - ): Promise> { + ): Promise<[string[], T][]> { const result = await this.innerStorage.list(options); - for (const [key, value] of result.entries()) { - if (this.cache.has(key)) { - result.set(key, this.cache.get(key)); + for (const [key, value] of result) { + const keyString = this.keyToString(key); + if (this.cache.has(keyString)) { + result.push([key, this.cache.get(keyString)]); } else { - this.cache.set(key, value); + this.cache.set(keyString, value); } } @@ -73,46 +78,54 @@ export class CachedStorage implements ActorStorage { } async put( - keyOrEntries: string | Record, + keyOrEntries: string | string[] | [string[], T][], valueOrOptions?: T | ActorStoragePutOptions, options?: ActorStoragePutOptions, ): Promise { - const entries = typeof keyOrEntries === "string" - ? { [keyOrEntries]: valueOrOptions as T } - : keyOrEntries; - // Multiple entries put + const entries: [string[], T][] = Array.isArray(keyOrEntries[0]) + ? keyOrEntries as [string[], T][] + : [[ + typeof keyOrEntries === "string" + ? [keyOrEntries] + : keyOrEntries as string[], + valueOrOptions as T, + ]]; + await this.innerStorage.put( entries, - (typeof keyOrEntries === "string" - ? options - : valueOrOptions) as ActorStoragePutOptions, + (Array.isArray(keyOrEntries[0]) + ? valueOrOptions + : options) as ActorStoragePutOptions, ); - for (const key in entries) { - this.cache.set(key, entries[key]); + + for (const [key, value] of entries) { + this.cache.set(this.keyToString(key), value); } } async delete( - key: string, + key: string[], options?: ActorStoragePutOptions, ): Promise; async delete( - keys: string[], + keys: string[][], options?: ActorStoragePutOptions, ): Promise; - async delete( - keyOrKeys: string | string[], + keyOrKeys: string | string[] | string[][], options?: ActorStoragePutOptions, - ): Promise { - const keys = typeof keyOrKeys === "string" ? [keyOrKeys] : keyOrKeys; - // Multiple keys delete - const result = await this.innerStorage.delete( - keys, - options, - ); - keys.forEach((key) => this.cache.delete(key)); - return result; + ): Promise { + const keys = Array.isArray(keyOrKeys[0]) + ? keyOrKeys as string[][] + : [typeof keyOrKeys === "string" ? [keyOrKeys] : keyOrKeys as string[]]; + + const result = await this.innerStorage.delete(keys, options); + + keys.forEach((key) => { + this.cache.delete(this.keyToString(key)); + }); + + return Array.isArray(keyOrKeys[0]) ? result : result > 0; } async deleteAll(options?: ActorStoragePutOptions): Promise { diff --git a/src/actors/storage/denoKv.ts b/src/actors/storage/denoKv.ts index 1ab006b..e2c6f40 100644 --- a/src/actors/storage/denoKv.ts +++ b/src/actors/storage/denoKv.ts @@ -15,7 +15,6 @@ const ACTORS_KV_DATABASE = Deno.env.get("ACTORS_KV_DATABASE") ?? join(Deno.cwd(), "kv"); const ACTORS_DENO_KV_TOKEN = Deno.env.get("ACTORS_DENO_KV_TOKEN"); -// this is necessary since deno cluster does not allow inject DENO_* env vars. so this is a workaround to make it work. ACTORS_DENO_KV_TOKEN && Deno.env.set("DENO_KV_ACCESS_TOKEN", ACTORS_DENO_KV_TOKEN); @@ -25,12 +24,14 @@ interface AtomicOp { kv: Deno.AtomicOperation; dirty: Deno.KvEntryMaybe[]; } + export class DenoKvActorStorage implements ActorStorage { private kv: Deno.Kv; private atomicOp?: AtomicOp; private kvOrTransaction: Deno.Kv | Deno.AtomicOperation; + constructor(protected options: StorageOptions) { - this.kv = kv; // Initialize the Deno.Kv instance + this.kv = kv; this.kvOrTransaction = options.atomicOp?.kv ?? kv; this.atomicOp = options.atomicOp; } @@ -49,7 +50,6 @@ export class DenoKvActorStorage implements ActorStorage { }, }); return await _storage(st).then(async () => { - // performs OCC check (optimistic concurrency control) for (const entry of dirty) { atomicOp.check(entry); } @@ -61,77 +61,95 @@ export class DenoKvActorStorage implements ActorStorage { } // Build the full key based on actor name, id, and provided key - buildKey(key: string): string[] { - return [this.options.actorName, this.options.actorId, key]; + buildKey(key: string[]): string[] { + return [this.options.actorName, this.options.actorId, ...key]; } - // Single get method that handles both string and array of strings + // Single get method that handles both single and multiple keys async get( - keyOrKeys: string | string[], - ): Promise> { - // If the input is a single string, perform a single get - if (typeof keyOrKeys === "string") { - const result = await this.kv.get(this.buildKey(keyOrKeys)); - this.atomicOp?.dirty?.push(result); - return result?.value ?? undefined; - } - - // If the input is an array of strings, perform multiple gets and return a Map - const result = new Map(); - for (const key of keyOrKeys) { - const value = await this.get(key) as T; - if (value !== undefined) { - result.set(key, value); + keyOrKeys: string | string[] | string[][], + ): Promise { + if (Array.isArray(keyOrKeys[0])) { + const result: [string[], T][] = []; + for (const key of keyOrKeys as string[][]) { + const value = await this.get(key) as T; + if (value !== undefined) { + result.push([key, value]); + } } + return result; + } else { + const result = await this.kv.get( + this.buildKey( + Array.isArray(keyOrKeys) ? keyOrKeys as string[] : [keyOrKeys], + ), + ); + this.atomicOp?.dirty?.push(result); + return result?.value!; } - - return result; } - // Put function that directly stores the value in Deno.Kv + // Put function that stores value in Deno.Kv async put( key: string, value: T, options?: ActorStoragePutOptions, ): Promise; async put( - entries: Record, + key: string[], + value: T, options?: ActorStoragePutOptions, ): Promise; async put( - entry: string | Record, - value: T | ActorStoragePutOptions, + entries: [string[], T][], + options?: ActorStoragePutOptions, + ): Promise; + async put( + entry: string | string[] | [string[], T][], + valueOrOptions?: T | ActorStoragePutOptions, + _options?: ActorStoragePutOptions, ): Promise { - const entries = typeof entry === "string" ? { [entry]: value } : entry; - - for (const [key, value] of Object.entries(entries)) { + const entries: [string[], T][] = Array.isArray(entry[0]) + ? entry as [string[], T][] + : [[ + typeof entry === "string" ? [entry] : entry as string[], + valueOrOptions as T, + ]]; + + for (const [key, value] of entries) { await this.kvOrTransaction.set(this.buildKey(key), value); } } // Delete function that removes keys from Deno.Kv - async delete(key: string, options?: ActorStoragePutOptions): Promise; async delete( - keys: string[], + key: string[], + options?: ActorStoragePutOptions, + ): Promise; + async delete( + keys: string[][], options?: ActorStoragePutOptions, ): Promise; async delete( - keys: string | string[], - ): Promise { - const fullKeys = Array.isArray(keys) ? keys : [keys]; - let deletedCount = 0; + keyOrKeys: string | string[] | string[][], + _options?: ActorStoragePutOptions, + ): Promise { + const keys = Array.isArray(keyOrKeys[0]) + ? keyOrKeys as string[][] + : [typeof keyOrKeys === "string" ? [keyOrKeys] : keyOrKeys as string[]]; + let deletedCount = 0; const batch = this.atomicOp?.kv ?? this.kv.atomic(); - for (const key of fullKeys) { + for (const key of keys) { batch.delete(this.buildKey(key)); deletedCount++; } !this.atomicOp && await batch.commit(); - return Array.isArray(keys) ? deletedCount : deletedCount > 0; + return Array.isArray(keyOrKeys[0]) ? deletedCount : deletedCount > 0; } - // Delete all records within a certain range based on the options provided + // Delete all records within a range async deleteAll(): Promise { const iter = await this.list(); @@ -146,23 +164,26 @@ export class DenoKvActorStorage implements ActorStorage { // List records in the storage with optional range and filtering async list( options?: ActorStorageListOptions, - ): Promise> { - const map = new Map(); + ): Promise<[string[], T][]> { + const result: [string[], T][] = []; + const selector = { + start: options?.start ? this.buildKey(options.start) : undefined, + end: options?.end ? this.buildKey(options.end) : undefined, + prefix: options?.prefix ? this.buildKey(options.prefix) : [], + }; const iter = this.kv.list( + selector, { - start: options?.start ? this.buildKey(options.start) : [], - end: options?.end ? this.buildKey(options.end) : [], - prefix: options?.prefix ? this.buildKey(options.prefix) : [], - }, - { - limit: options?.limit, + limit: options?.limit ?? 1000, reverse: options?.reverse, }, ); for await (const entry of iter) { - map.set(entry.key[entry.key.length - 1].toString(), entry.value); + console.log(entry); + result.push([(entry.key as string[]).slice(-2), entry.value]); } - return map; + + return result; } }