Skip to content

Commit

Permalink
packages/core: add transaction flag to ExecutionContext methods
Browse files Browse the repository at this point in the history
  • Loading branch information
joeltg committed Feb 6, 2025
1 parent 11bcf49 commit 934b96b
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 49 deletions.
26 changes: 19 additions & 7 deletions packages/core/src/ExecutionContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ export class ExecutionContext {
return await this.messageLog.isAncestor(this.root, ancestor)
}

public async getModelValue<T extends ModelValue = ModelValue>(model: string, key: string): Promise<null | T> {
public async getModelValue<T extends ModelValue = ModelValue>(
model: string,
key: string,
transaction: boolean,
): Promise<null | T> {
if (this.modelEntries[model] === undefined) {
const { name } = this.message.payload
throw new Error(`could not access model db.${model} inside runtime action ${name}`)
Expand Down Expand Up @@ -94,7 +98,7 @@ export class ExecutionContext {
}
}

public setModelValue(model: string, value: ModelValue): void {
public setModelValue(model: string, value: ModelValue, transaction: boolean): void {
assert(this.db.models[model] !== undefined, "model not found")
validateModelValue(this.db.models[model], value)
const {
Expand All @@ -105,30 +109,38 @@ export class ExecutionContext {
this.modelEntries[model][key] = value
}

public deleteModelValue(model: string, key: string): void {
public deleteModelValue(model: string, key: string, transaction: boolean): void {
assert(this.db.models[model] !== undefined, "model not found")
this.modelEntries[model][key] = null
}

public async updateModelValue(model: string, value: Record<string, PropertyValue | undefined>): Promise<void> {
public async updateModelValue(
model: string,
value: Record<string, PropertyValue | undefined>,
transaction: boolean,
): Promise<void> {
assert(this.db.models[model] !== undefined, "model not found")
const {
primaryKey: [primaryKey],
} = this.db.models[model]
const key = value[primaryKey] as string
const previousValue = await this.getModelValue(model, key)
const previousValue = await this.getModelValue(model, key, transaction)
const result = updateModelValues(value, previousValue)
validateModelValue(this.db.models[model], result)
this.modelEntries[model][key] = result
}

public async mergeModelValue(model: string, value: Record<string, PropertyValue | undefined>): Promise<void> {
public async mergeModelValue(
model: string,
value: Record<string, PropertyValue | undefined>,
transaction: boolean,
): Promise<void> {
assert(this.db.models[model] !== undefined, "model not found")
const {
primaryKey: [primaryKey],
} = this.db.models[model]
const key = value[primaryKey] as string
const previousValue = await this.getModelValue(model, key)
const previousValue = await this.getModelValue(model, key, transaction)
const result = mergeModelValues(value, previousValue)
validateModelValue(this.db.models[model], result)
this.modelEntries[model][key] = result
Expand Down
62 changes: 26 additions & 36 deletions packages/core/src/runtime/ContractRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ export class ContractRuntime extends AbstractRuntime {

#context: ExecutionContext | null = null
#thisHandle: QuickJSHandle | null = null
#transaction: boolean = false

constructor(
public readonly topic: string,
Expand All @@ -85,55 +86,42 @@ export class ContractRuntime extends AbstractRuntime {
get: vm.wrapFunction((model, key) => {
assert(typeof model === "string", 'expected typeof model === "string"')
assert(typeof key === "string", 'expected typeof key === "string"')
return this.context.getModelValue(model, key)
return this.context.getModelValue(model, key, this.#transaction)
}),
set: vm.context.newFunction("set", (modelHandle, valueHandle) => {
const model = vm.context.getString(modelHandle)
const value = this.vm.unwrapValue(valueHandle) as ModelValue
this.context.setModelValue(model, value)

set: vm.wrapFunction((model, value) => {
assert(typeof model === "string", 'expected typeof model === "string"')
this.context.setModelValue(model, value as ModelValue, this.#transaction)
}),
update: vm.context.newFunction("update", (modelHandle, valueHandle) => {
const model = vm.context.getString(modelHandle)
const value = this.vm.unwrapValue(valueHandle) as ModelValue

const promise = vm.context.newPromise()
update: vm.wrapFunction(async (model, value) => {
assert(typeof model === "string", 'expected typeof model === "string"')
await this.context.updateModelValue(model, value as ModelValue, this.#transaction)
}),

// TODO: Ensure concurrent merges into the same value don't create a race condition
// if the user doesn't call db.update() with await.
this.context
.updateModelValue(model, value)
.then(() => promise.resolve())
.catch((err) => promise.reject())
merge: vm.wrapFunction(async (model, value) => {
assert(typeof model === "string", 'expected typeof model === "string"')
await this.context.mergeModelValue(model, value as ModelValue, this.#transaction)
}),

promise.settled.then(vm.runtime.executePendingJobs)
return promise.handle
delete: vm.wrapFunction((model, key) => {
assert(typeof model === "string", 'expected typeof model === "string"')
assert(typeof key === "string", 'expected typeof key === "string"')
this.context.deleteModelValue(model, key, this.#transaction)
}),
merge: vm.context.newFunction("merge", (modelHandle, valueHandle) => {
const model = vm.context.getString(modelHandle)
const value = this.vm.unwrapValue(valueHandle) as ModelValue

transaction: vm.context.newFunction("transaction", (callbackHandle) => {
const promise = vm.context.newPromise()

// TODO: Ensure concurrent merges into the same value don't create a race condition
// if the user doesn't call db.update() with await.
this.context
.mergeModelValue(model, value)
.then(() => promise.resolve())
.catch((err) => promise.reject())
this.#transaction = true
this.vm
.callAsync(callbackHandle, this.thisHandle, [])
.then(promise.resolve, promise.reject)
.finally(() => void (this.#transaction = false))

promise.settled.then(vm.runtime.executePendingJobs)
return promise.handle
}),

delete: vm.context.newFunction("delete", (modelHandle, keyHandle) => {
const model = vm.context.getString(modelHandle)
const key = vm.context.getString(keyHandle)
this.context.deleteModelValue(model, key)
}),

transaction: vm.context.newFunction("transaction", (callbackHandle) => {
this.vm.call(callbackHandle, this.thisHandle, [])
}),
})
.consume(vm.cache)
}
Expand Down Expand Up @@ -180,6 +168,8 @@ export class ContractRuntime extends AbstractRuntime {
timestamp,
})

this.vm.context.setProp(thisHandle, "db", this.#databaseAPI)

const argHandles = Array.isArray(args) ? args.map(this.vm.wrapValue) : [this.vm.wrapValue(args)]

try {
Expand Down
20 changes: 14 additions & 6 deletions packages/core/src/runtime/FunctionRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export class FunctionRuntime<ModelsT extends ModelSchema> extends AbstractRuntim
}

#context: ExecutionContext | null = null
#transaction: boolean = false
#thisValue: ActionContext<DeriveModelTypes<ModelsT>> | null = null
#queue = new PQueue({ concurrency: 1 })
#db: ModelAPI<DeriveModelTypes<ModelsT>>
Expand All @@ -40,17 +41,24 @@ export class FunctionRuntime<ModelsT extends ModelSchema> extends AbstractRuntim

this.#db = {
get: async <T extends keyof DeriveModelTypes<ModelsT> & string>(model: T, key: string) => {
const result = await this.#queue.add(() => this.context.getModelValue<DeriveModelTypes<ModelsT>[T]>(model, key))
const result = await this.#queue.add(() =>
this.context.getModelValue<DeriveModelTypes<ModelsT>[T]>(model, key, this.#transaction),
)
return result ?? null
},

set: (model, value) => this.#queue.add(() => this.context.setModelValue(model, value)),
update: (model, value) => this.#queue.add(() => this.context.updateModelValue(model, value)),
merge: (model, value) => this.#queue.add(() => this.context.mergeModelValue(model, value)),
delete: (model, key) => this.#queue.add(() => this.context.deleteModelValue(model, key)),
set: (model, value) => this.#queue.add(() => this.context.setModelValue(model, value, this.#transaction)),
update: (model, value) => this.#queue.add(() => this.context.updateModelValue(model, value, this.#transaction)),
merge: (model, value) => this.#queue.add(() => this.context.mergeModelValue(model, value, this.#transaction)),
delete: (model, key) => this.#queue.add(() => this.context.deleteModelValue(model, key, this.#transaction)),

transaction: async (callback) => {
await callback.apply(this.thisValue, [])
try {
this.#transaction = true
await callback.apply(this.thisValue, [])
} finally {
this.#transaction = false
}
},
}
}
Expand Down

0 comments on commit 934b96b

Please sign in to comment.