diff --git a/src/actors/storage/cached.ts b/src/actors/storage/cached.ts index 705588d..a176bfd 100644 --- a/src/actors/storage/cached.ts +++ b/src/actors/storage/cached.ts @@ -5,16 +5,21 @@ import type { ActorStorageListOptions, ActorStoragePutOptions, } from "../storage.ts"; +import { RwLock } from "../util/rwlock.ts"; export class CachedStorage implements ActorStorage { - protected cache: Map = new Map(); + protected rwLock = new RwLock(); - constructor(protected innerStorage: ActorStorage) {} + constructor( + protected innerStorage: ActorStorage, + protected cache = new Map(), + ) {} private async getMany( keys: string[][], options?: ActorStorageGetOptions, ): Promise<[string[], T][]> { + using _lock = await this.rwLock.rLock(); const { noCache } = options || {}; const result: [string[], T][] = []; const keysToFetch: string[][] = []; @@ -63,6 +68,7 @@ export class CachedStorage implements ActorStorage { async list( options?: ActorStorageListOptions, ): Promise<[string[], T][]> { + using _lock = await this.rwLock.rLock(); const result = await this.innerStorage.list(options); for (const [key, value] of result) { @@ -82,6 +88,7 @@ export class CachedStorage implements ActorStorage { valueOrOptions?: T | ActorStoragePutOptions, options?: ActorStoragePutOptions, ): Promise { + using _lock = await this.rwLock.lock(); const entries: [string[], T][] = Array.isArray(keyOrEntries[0]) ? keyOrEntries as [string[], T][] : [[ @@ -119,6 +126,7 @@ export class CachedStorage implements ActorStorage { keyOrKeys: string | string[] | string[][], options?: ActorStoragePutOptions, ): Promise { + using _lock = await this.rwLock.lock(); const keys = Array.isArray(keyOrKeys[0]) ? keyOrKeys as string[][] : [typeof keyOrKeys === "string" ? [keyOrKeys] : keyOrKeys as string[]]; @@ -133,11 +141,15 @@ export class CachedStorage implements ActorStorage { } async deleteAll(options?: ActorStoragePutOptions): Promise { + using _lock = await this.rwLock.lock(); this.cache.clear(); await this.innerStorage.deleteAll(options); } async atomic(storage: (st: ActorStorage) => Promise): Promise { - await storage(this); + using _lock = await this.rwLock.lock(); + const clone = new Map(this.cache); + await storage(new CachedStorage(this, clone)); // snapshot isolation + this.cache = clone; } } diff --git a/src/actors/util/rwlock.ts b/src/actors/util/rwlock.ts new file mode 100644 index 0000000..cd72a56 --- /dev/null +++ b/src/actors/util/rwlock.ts @@ -0,0 +1,68 @@ +export class RwLock { + private readers: number = 0; // Number of readers holding the lock + private writer: boolean = false; // Whether a writer holds the lock + private writeQueue: Array<() => void> = []; // Queue of waiting writers + private readQueue: Array<() => void> = []; // Queue of waiting readers + + // Acquire the read lock + async rLock(): Promise { + if (this.writer || this.writeQueue.length > 0) { + // If there is a writer or writers waiting, wait for the writer to release + await new Promise((resolve) => this.readQueue.push(resolve)); + } + this.readers++; + return { + [Symbol.dispose]: () => { + this.rUnlock(); + }, + }; + } + + // Release the read lock + rUnlock(): void { + this.readers--; + this.checkRelease(); + } + + // Acquire the write lock + async lock(): Promise { + if (this.writer || this.readers > 0) { + // If there's a writer or any readers, wait for them to release + await new Promise((resolve) => this.writeQueue.push(resolve)); + } + this.writer = true; + return { + [Symbol.dispose]: () => { + this.unlock(); + }, + }; + } + + // Release the write lock + unlock(): void { + this.writer = false; + this.checkRelease(); + } + + // Check if any waiting readers or writers should be released + private checkRelease(): void { + if (this.writer || this.readers > 0) { + // If there's a writer or readers, don't release anyone else + return; + } + + // If writers are queued, release the first writer + if (this.writeQueue.length > 0) { + const resolve = this.writeQueue.shift()!; + resolve(); + this.writer = true; + } else { + // If readers are queued and no writers are waiting, release all readers + while (this.readQueue.length > 0) { + const resolve = this.readQueue.shift()!; + resolve(); + this.readers++; + } + } + } +}