Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add locking in the app to avoid deadlock #123

Merged
merged 1 commit into from
Aug 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 137 additions & 96 deletions forge/context-driver/sequelize.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,40 @@ const path = require('path')

let sequelize, app

/**
*/
const activeLocks = new Map()
/**
* This is a simple instanceId-level locking mechanism that ensures we single-thread
* requests related to a single instance.
*
* This is not scalable, but solves an immediate issue around deadlocks caused by
* parallel requests to update different context scopes for the same instance.
*
* See https://github.com/FlowFuse/file-server/issues/122
* @param {*} instanceId the id of the instance to lock
* @returns a promise that resolves once the lock is held. The promise resolves with a function that must be called to release the lock
*/
async function getInstanceLock (instanceId) {
let lockingPromise
if (!activeLocks.has(instanceId)) {
lockingPromise = Promise.resolve()
activeLocks.set(instanceId, lockingPromise)
} else {
lockingPromise = activeLocks.get(instanceId)
}
let unlockNextPromise
const nextPromise = new Promise(resolve => {
unlockNextPromise = () => {
resolve()
}
return unlockNextPromise
})
const unlockPromise = lockingPromise.then(() => unlockNextPromise)
activeLocks.set(instanceId, lockingPromise.then(() => nextPromise))
return unlockPromise
}

module.exports = {
init: async function (_app) {
app = _app
Expand Down Expand Up @@ -44,7 +78,7 @@ module.exports = {

sequelize = new Sequelize(dbOptions)

app.log.info(`FlowForge File Server Sequelize Context connected to ${dbOptions.dialect} on ${dbOptions.host || dbOptions.storage}`)
app.log.info(`FlowFuse File Server Sequelize Context connected to ${dbOptions.dialect} on ${dbOptions.host || dbOptions.storage}`)

const Context = sequelize.define('Context', {
project: { type: DataTypes.STRING, allowNull: false, unique: 'context-project-scope-unique' },
Expand All @@ -56,106 +90,113 @@ module.exports = {
},
/**
* Set the context data for a given scope
* @param {string} projectId - The project id
* @param {string} instanceId - The instance id
* @param {string} scope - The context scope to write to
* @param {[{key:string, value:any}]} input - The context data to write
* @param {boolean} [overwrite=false] - If true, any context data will be overwritten (i.e. for a cache dump). If false, the context data will be merged with the existing data.
* @param {number} quotaOverride - if set overrides the locally configured limit
*/
set: async function (projectId, scope, input, overwrite = false, quotaOverride = 0) {
const { path } = parseScope(scope)
await sequelize.transaction({
type: Sequelize.Transaction.TYPES.IMMEDIATE
},
async (t) => {
// get the existing row of context data from the database (if any)
let existingRow = await this.Context.findOne({
where: {
project: projectId,
scope: path
},
lock: t.LOCK.UPDATE,
transaction: t
})
const quotaLimit = quotaOverride || app.config?.context?.quota || 0
// if quota is set, check if we are over quota or will be after this update
if (quotaLimit > 0) {
// Difficulties implementing this correctly
// - The final size of data can only be determined after the data is stored.
// This is due to the fact that some keys may be deleted and some may be added
// and the size of the data is not the same as the size of the keys.
// This implementation is not ideal, but it is a good approximation and will
// prevent the possibility of runaway storage usage.
let changeSize = 0
let hasValues = false
// if we are overwriting, then we need to remove the existing size to get the final size
if (existingRow) {
if (overwrite) {
changeSize -= getItemSize(existingRow.values || '')
} else {
hasValues = existingRow?.values && Object.keys(existingRow.values).length > 0
set: async function (instanceId, scope, input, overwrite = false, quotaOverride = 1000) {
// Obtain the lock for this instance
const unlock = await getInstanceLock(instanceId)
try {
const { path } = parseScope(scope)
await sequelize.transaction({
type: Sequelize.Transaction.TYPES.IMMEDIATE
},
async (t) => {
// get the existing row of context data from the database (if any)
let existingRow = await this.Context.findOne({
where: {
project: instanceId,
scope: path
},
lock: t.LOCK.UPDATE,
transaction: t
})
const quotaLimit = quotaOverride || app.config?.context?.quota || 0
// if quota is set, check if we are over quota or will be after this update
if (quotaLimit > 0) {
// Difficulties implementing this correctly
// - The final size of data can only be determined after the data is stored.
// This is due to the fact that some keys may be deleted and some may be added
// and the size of the data is not the same as the size of the keys.
// This implementation is not ideal, but it is a good approximation and will
// prevent the possibility of runaway storage usage.
let changeSize = 0
let hasValues = false
// if we are overwriting, then we need to remove the existing size to get the final size
if (existingRow) {
if (overwrite) {
changeSize -= getItemSize(existingRow.values || '')
} else {
hasValues = existingRow?.values && Object.keys(existingRow.values).length > 0
}
}
}
// calculate the change in size
for (const element of input) {
const currentItem = hasValues ? getObjectProperty(existingRow.values, element.key) : undefined
if (currentItem === undefined && element.value !== undefined) {
// this is an addition
changeSize += getItemSize(element.value)
} else if (currentItem !== undefined && element.value === undefined) {
// this is an deletion
changeSize -= getItemSize(currentItem)
} else {
// this is an update
changeSize -= getItemSize(currentItem)
changeSize += getItemSize(element.value)
// calculate the change in size
for (const element of input) {
const currentItem = hasValues ? getObjectProperty(existingRow.values, element.key) : undefined
if (currentItem === undefined && element.value !== undefined) {
// this is an addition
changeSize += getItemSize(element.value)
} else if (currentItem !== undefined && element.value === undefined) {
// this is an deletion
changeSize -= getItemSize(currentItem)
} else {
// this is an update
changeSize -= getItemSize(currentItem)
changeSize += getItemSize(element.value)
}
}
}
// only calculate the current size if we are going to need it
if (changeSize >= 0) {
const currentSize = await this.quota(projectId)
if (currentSize + changeSize > quotaLimit) {
const err = new Error('Over Quota')
err.code = 'over_quota'
err.error = err.message
err.limit = quotaLimit
throw err
// only calculate the current size if we are going to need it
if (changeSize >= 0) {
const currentSize = await this.quota(instanceId)
if (currentSize + changeSize > quotaLimit) {
const err = new Error('Over Quota')
err.code = 'over_quota'
err.error = err.message
err.limit = quotaLimit
throw err
}
}
}
}

// if we are overwriting, then we need to reset the values in the existing row (if any)
if (existingRow && overwrite) {
existingRow.values = {} // reset the values since this is a mem cache -> DB dump
}

// if there is no input, then we are probably deleting the row
if (input?.length > 0) {
if (!existingRow) {
existingRow = await this.Context.create({
project: projectId,
scope: path,
values: {}
},
{
transaction: t
})
// if we are overwriting, then we need to reset the values in the existing row (if any)
if (existingRow && overwrite) {
existingRow.values = {} // reset the values since this is a mem cache -> DB dump
}
for (const i in input) {
const path = input[i].key
const value = input[i].value
util.setMessageProperty(existingRow.values, path, value)

// if there is no input, then we are probably deleting the row
if (input?.length > 0) {
if (!existingRow) {
existingRow = await this.Context.create({
project: instanceId,
scope: path,
values: {}
},
{
transaction: t
})
}
for (const i in input) {
const path = input[i].key
const value = input[i].value
util.setMessageProperty(existingRow.values, path, value)
}
}
}
if (existingRow) {
if (existingRow.values && Object.keys(existingRow.values).length === 0) {
await existingRow.destroy({ transaction: t })
} else {
existingRow.changed('values', true)
await existingRow.save({ transaction: t })
if (existingRow) {
if (existingRow.values && Object.keys(existingRow.values).length === 0) {
await existingRow.destroy({ transaction: t })
} else {
existingRow.changed('values', true)
await existingRow.save({ transaction: t })
}
}
}
})
})
} finally {
// Regardless of the result, release the lock
await unlock()
}
},
/**
* Get the context data for a given scope
Expand Down Expand Up @@ -295,17 +336,17 @@ module.exports = {
}
},
quota: async function (projectId) {
const scopesResults = await this.Context.findAll({
// Sum the lengths in the query
// - note for postgres, we have to cast the values column from JSON to text
const sizeResult = await this.Context.findOne({
where: {
project: projectId
}
})
let size = 0
scopesResults.forEach(scope => {
const strValues = JSON.stringify(scope.values)
size += strValues.length
},
attributes: [
[sequelize.fn('SUM', sequelize.fn('LENGTH', sequelize.cast(sequelize.col('values'), 'text'))), 'length']
]
})
return size
return sizeResult.getDataValue('length') || 0
}
}

Expand Down