diff --git a/forge/context-driver/sequelize.js b/forge/context-driver/sequelize.js index dd6868f..e4ed01f 100644 --- a/forge/context-driver/sequelize.js +++ b/forge/context-driver/sequelize.js @@ -5,6 +5,40 @@ const path = require('path') let sequelize, app +/** + */ +const activeLocks = new Map() +/** + * This is a simple instanceId-level locking mechanism that ensures we single-thread + * requests related to a single instance. + * + * This is not scalable, but solves an immediate issue around deadlocks caused by + * parallel requests to update different context scopes for the same instance. + * + * See https://github.com/FlowFuse/file-server/issues/122 + * @param {*} instanceId the id of the instance to lock + * @returns a promise that resolves once the lock is held. The promise resolves with a function that must be called to release the lock + */ +async function getInstanceLock (instanceId) { + let lockingPromise + if (!activeLocks.has(instanceId)) { + lockingPromise = Promise.resolve() + activeLocks.set(instanceId, lockingPromise) + } else { + lockingPromise = activeLocks.get(instanceId) + } + let unlockNextPromise + const nextPromise = new Promise(resolve => { + unlockNextPromise = () => { + resolve() + } + return unlockNextPromise + }) + const unlockPromise = lockingPromise.then(() => unlockNextPromise) + activeLocks.set(instanceId, lockingPromise.then(() => nextPromise)) + return unlockPromise +} + module.exports = { init: async function (_app) { app = _app @@ -44,7 +78,7 @@ module.exports = { sequelize = new Sequelize(dbOptions) - app.log.info(`FlowForge File Server Sequelize Context connected to ${dbOptions.dialect} on ${dbOptions.host || dbOptions.storage}`) + app.log.info(`FlowFuse File Server Sequelize Context connected to ${dbOptions.dialect} on ${dbOptions.host || dbOptions.storage}`) const Context = sequelize.define('Context', { project: { type: DataTypes.STRING, allowNull: false, unique: 'context-project-scope-unique' }, @@ -56,106 +90,113 @@ module.exports = { }, /** * Set the context data for a given scope - * @param {string} projectId - The project id + * @param {string} instanceId - The instance id * @param {string} scope - The context scope to write to * @param {[{key:string, value:any}]} input - The context data to write * @param {boolean} [overwrite=false] - If true, any context data will be overwritten (i.e. for a cache dump). If false, the context data will be merged with the existing data. * @param {number} quotaOverride - if set overrides the locally configured limit */ - set: async function (projectId, scope, input, overwrite = false, quotaOverride = 0) { - const { path } = parseScope(scope) - await sequelize.transaction({ - type: Sequelize.Transaction.TYPES.IMMEDIATE - }, - async (t) => { - // get the existing row of context data from the database (if any) - let existingRow = await this.Context.findOne({ - where: { - project: projectId, - scope: path - }, - lock: t.LOCK.UPDATE, - transaction: t - }) - const quotaLimit = quotaOverride || app.config?.context?.quota || 0 - // if quota is set, check if we are over quota or will be after this update - if (quotaLimit > 0) { - // Difficulties implementing this correctly - // - The final size of data can only be determined after the data is stored. - // This is due to the fact that some keys may be deleted and some may be added - // and the size of the data is not the same as the size of the keys. - // This implementation is not ideal, but it is a good approximation and will - // prevent the possibility of runaway storage usage. - let changeSize = 0 - let hasValues = false - // if we are overwriting, then we need to remove the existing size to get the final size - if (existingRow) { - if (overwrite) { - changeSize -= getItemSize(existingRow.values || '') - } else { - hasValues = existingRow?.values && Object.keys(existingRow.values).length > 0 + set: async function (instanceId, scope, input, overwrite = false, quotaOverride = 1000) { + // Obtain the lock for this instance + const unlock = await getInstanceLock(instanceId) + try { + const { path } = parseScope(scope) + await sequelize.transaction({ + type: Sequelize.Transaction.TYPES.IMMEDIATE + }, + async (t) => { + // get the existing row of context data from the database (if any) + let existingRow = await this.Context.findOne({ + where: { + project: instanceId, + scope: path + }, + lock: t.LOCK.UPDATE, + transaction: t + }) + const quotaLimit = quotaOverride || app.config?.context?.quota || 0 + // if quota is set, check if we are over quota or will be after this update + if (quotaLimit > 0) { + // Difficulties implementing this correctly + // - The final size of data can only be determined after the data is stored. + // This is due to the fact that some keys may be deleted and some may be added + // and the size of the data is not the same as the size of the keys. + // This implementation is not ideal, but it is a good approximation and will + // prevent the possibility of runaway storage usage. + let changeSize = 0 + let hasValues = false + // if we are overwriting, then we need to remove the existing size to get the final size + if (existingRow) { + if (overwrite) { + changeSize -= getItemSize(existingRow.values || '') + } else { + hasValues = existingRow?.values && Object.keys(existingRow.values).length > 0 + } } - } - // calculate the change in size - for (const element of input) { - const currentItem = hasValues ? getObjectProperty(existingRow.values, element.key) : undefined - if (currentItem === undefined && element.value !== undefined) { - // this is an addition - changeSize += getItemSize(element.value) - } else if (currentItem !== undefined && element.value === undefined) { - // this is an deletion - changeSize -= getItemSize(currentItem) - } else { - // this is an update - changeSize -= getItemSize(currentItem) - changeSize += getItemSize(element.value) + // calculate the change in size + for (const element of input) { + const currentItem = hasValues ? getObjectProperty(existingRow.values, element.key) : undefined + if (currentItem === undefined && element.value !== undefined) { + // this is an addition + changeSize += getItemSize(element.value) + } else if (currentItem !== undefined && element.value === undefined) { + // this is an deletion + changeSize -= getItemSize(currentItem) + } else { + // this is an update + changeSize -= getItemSize(currentItem) + changeSize += getItemSize(element.value) + } } - } - // only calculate the current size if we are going to need it - if (changeSize >= 0) { - const currentSize = await this.quota(projectId) - if (currentSize + changeSize > quotaLimit) { - const err = new Error('Over Quota') - err.code = 'over_quota' - err.error = err.message - err.limit = quotaLimit - throw err + // only calculate the current size if we are going to need it + if (changeSize >= 0) { + const currentSize = await this.quota(instanceId) + if (currentSize + changeSize > quotaLimit) { + const err = new Error('Over Quota') + err.code = 'over_quota' + err.error = err.message + err.limit = quotaLimit + throw err + } } } - } - - // if we are overwriting, then we need to reset the values in the existing row (if any) - if (existingRow && overwrite) { - existingRow.values = {} // reset the values since this is a mem cache -> DB dump - } - // if there is no input, then we are probably deleting the row - if (input?.length > 0) { - if (!existingRow) { - existingRow = await this.Context.create({ - project: projectId, - scope: path, - values: {} - }, - { - transaction: t - }) + // if we are overwriting, then we need to reset the values in the existing row (if any) + if (existingRow && overwrite) { + existingRow.values = {} // reset the values since this is a mem cache -> DB dump } - for (const i in input) { - const path = input[i].key - const value = input[i].value - util.setMessageProperty(existingRow.values, path, value) + + // if there is no input, then we are probably deleting the row + if (input?.length > 0) { + if (!existingRow) { + existingRow = await this.Context.create({ + project: instanceId, + scope: path, + values: {} + }, + { + transaction: t + }) + } + for (const i in input) { + const path = input[i].key + const value = input[i].value + util.setMessageProperty(existingRow.values, path, value) + } } - } - if (existingRow) { - if (existingRow.values && Object.keys(existingRow.values).length === 0) { - await existingRow.destroy({ transaction: t }) - } else { - existingRow.changed('values', true) - await existingRow.save({ transaction: t }) + if (existingRow) { + if (existingRow.values && Object.keys(existingRow.values).length === 0) { + await existingRow.destroy({ transaction: t }) + } else { + existingRow.changed('values', true) + await existingRow.save({ transaction: t }) + } } - } - }) + }) + } finally { + // Regardless of the result, release the lock + await unlock() + } }, /** * Get the context data for a given scope @@ -295,17 +336,17 @@ module.exports = { } }, quota: async function (projectId) { - const scopesResults = await this.Context.findAll({ + // Sum the lengths in the query + // - note for postgres, we have to cast the values column from JSON to text + const sizeResult = await this.Context.findOne({ where: { project: projectId - } - }) - let size = 0 - scopesResults.forEach(scope => { - const strValues = JSON.stringify(scope.values) - size += strValues.length + }, + attributes: [ + [sequelize.fn('SUM', sequelize.fn('LENGTH', sequelize.cast(sequelize.col('values'), 'text'))), 'length'] + ] }) - return size + return sizeResult.getDataValue('length') || 0 } }