Skip to content

Commit

Permalink
Add rwLock
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Oct 13, 2024
1 parent b45f594 commit 0c482c2
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 3 deletions.
18 changes: 15 additions & 3 deletions src/actors/storage/cached.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ import type {
ActorStorageListOptions,
ActorStoragePutOptions,
} from "../storage.ts";
import { RwLock } from "../util/rwlock.ts";

export class CachedStorage implements ActorStorage {
protected cache: Map<string, any> = new Map<string, any>();
protected rwLock = new RwLock();

constructor(protected innerStorage: ActorStorage) {}
constructor(
protected innerStorage: ActorStorage,
protected cache = new Map<string, any>(),
) {}

private async getMany<T = unknown>(
keys: string[][],
options?: ActorStorageGetOptions,
): Promise<[string[], T][]> {
using _lock = await this.rwLock.rLock();
const { noCache } = options || {};
const result: [string[], T][] = [];
const keysToFetch: string[][] = [];
Expand Down Expand Up @@ -63,6 +68,7 @@ export class CachedStorage implements ActorStorage {
async list<T = unknown>(
options?: ActorStorageListOptions,
): Promise<[string[], T][]> {
using _lock = await this.rwLock.rLock();
const result = await this.innerStorage.list<T>(options);

for (const [key, value] of result) {
Expand All @@ -82,6 +88,7 @@ export class CachedStorage implements ActorStorage {
valueOrOptions?: T | ActorStoragePutOptions,
options?: ActorStoragePutOptions,
): Promise<void> {
using _lock = await this.rwLock.lock();
const entries: [string[], T][] = Array.isArray(keyOrEntries[0])
? keyOrEntries as [string[], T][]
: [[
Expand Down Expand Up @@ -119,6 +126,7 @@ export class CachedStorage implements ActorStorage {
keyOrKeys: string | string[] | string[][],
options?: ActorStoragePutOptions,
): Promise<number | boolean> {
using _lock = await this.rwLock.lock();
const keys = Array.isArray(keyOrKeys[0])
? keyOrKeys as string[][]
: [typeof keyOrKeys === "string" ? [keyOrKeys] : keyOrKeys as string[]];
Expand All @@ -133,11 +141,15 @@ export class CachedStorage implements ActorStorage {
}

async deleteAll(options?: ActorStoragePutOptions): Promise<void> {
using _lock = await this.rwLock.lock();
this.cache.clear();
await this.innerStorage.deleteAll(options);
}

async atomic(storage: (st: ActorStorage) => Promise<void>): Promise<void> {
await storage(this);
using _lock = await this.rwLock.lock();
const clone = new Map(this.cache);
await storage(new CachedStorage(this, clone)); // snapshot isolation
this.cache = clone;
}
}
68 changes: 68 additions & 0 deletions src/actors/util/rwlock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
export class RwLock {
private readers: number = 0; // Number of readers holding the lock
private writer: boolean = false; // Whether a writer holds the lock
private writeQueue: Array<() => void> = []; // Queue of waiting writers
private readQueue: Array<() => void> = []; // Queue of waiting readers

// Acquire the read lock
async rLock(): Promise<Disposable> {
if (this.writer || this.writeQueue.length > 0) {
// If there is a writer or writers waiting, wait for the writer to release
await new Promise<void>((resolve) => this.readQueue.push(resolve));
}
this.readers++;
return {
[Symbol.dispose]: () => {
this.rUnlock();
},
};
}

// Release the read lock
rUnlock(): void {
this.readers--;
this.checkRelease();
}

// Acquire the write lock
async lock(): Promise<Disposable> {
if (this.writer || this.readers > 0) {
// If there's a writer or any readers, wait for them to release
await new Promise<void>((resolve) => this.writeQueue.push(resolve));
}
this.writer = true;
return {
[Symbol.dispose]: () => {
this.unlock();
},
};
}

// Release the write lock
unlock(): void {
this.writer = false;
this.checkRelease();
}

// Check if any waiting readers or writers should be released
private checkRelease(): void {
if (this.writer || this.readers > 0) {
// If there's a writer or readers, don't release anyone else
return;
}

// If writers are queued, release the first writer
if (this.writeQueue.length > 0) {
const resolve = this.writeQueue.shift()!;
resolve();
this.writer = true;
} else {
// If readers are queued and no writers are waiting, release all readers
while (this.readQueue.length > 0) {
const resolve = this.readQueue.shift()!;
resolve();
this.readers++;
}
}
}
}

0 comments on commit 0c482c2

Please sign in to comment.