Skip to content

Commit

Permalink
Implements Optimistic concurrency control for denokv operations
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed Sep 24, 2024
1 parent c82a191 commit 7c49270
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions src/actors/storage/denoKv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,43 @@ import type {
export interface StorageOptions {
actorName: string;
actorId: string;
atomicOp?: Deno.AtomicOperation;
atomicOp?: AtomicOp;
}

const kv = await Deno.openKv(join(Deno.cwd(), "kv"));

interface AtomicOp {
kv: Deno.AtomicOperation;
dirty: Deno.KvEntryMaybe<unknown>[];
}
export class DenoKvActorStorage implements ActorStorage {
private kv: Deno.Kv;
private atomicOp?: Deno.AtomicOperation;
private atomicOp?: AtomicOp;
private kvOrTransaction: Deno.Kv | Deno.AtomicOperation;
constructor(protected options: StorageOptions) {
this.kv = kv; // Initialize the Deno.Kv instance
this.kvOrTransaction = options.atomicOp ?? kv;
this.kvOrTransaction = options.atomicOp?.kv ?? kv;
this.atomicOp = options.atomicOp;
}

async atomic(_storage: (st: ActorStorage) => Promise<void>): Promise<void> {
if (this.kv instanceof Deno.AtomicOperation) {
if (this.atomicOp) {
throw new Error(`not implemented`);
}
const atomicOp = this.kv.atomic();
const dirty: Deno.KvEntryMaybe<unknown>[] = [];
const st = new DenoKvActorStorage({
...this.options,
atomicOp,
atomicOp: {
kv: atomicOp,
dirty,
},
});
return await _storage(st).then(async () => {
// performs OCC check (optimistic concurrency control)
for (const entry of dirty) {
atomicOp.check(entry);
}
const result = await atomicOp.commit();
if (!result.ok) {
throw new Error(`atomic operation failed`);
Expand All @@ -52,6 +64,7 @@ export class DenoKvActorStorage implements ActorStorage {
// If the input is a single string, perform a single get
if (typeof keyOrKeys === "string") {
const result = await this.kv.get<T>(this.buildKey(keyOrKeys));
this.atomicOp?.dirty?.push(result);
return result?.value ?? undefined;
}

Expand Down Expand Up @@ -100,7 +113,7 @@ export class DenoKvActorStorage implements ActorStorage {
const fullKeys = Array.isArray(keys) ? keys : [keys];
let deletedCount = 0;

const batch = this.atomicOp ?? this.kv.atomic();
const batch = this.atomicOp?.kv ?? this.kv.atomic();
for (const key of fullKeys) {
batch.delete(this.buildKey(key));
deletedCount++;
Expand All @@ -114,7 +127,7 @@ export class DenoKvActorStorage implements ActorStorage {
async deleteAll(): Promise<void> {
const iter = await this.list();

const batch = this.atomicOp ?? this.kv.atomic();
const batch = this.atomicOp?.kv ?? this.kv.atomic();
for (const [key] of iter) {
batch.delete(this.buildKey(key));
}
Expand Down

0 comments on commit 7c49270

Please sign in to comment.