From 23562f8c34502a2717357568dfabeecf11198b76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Rodriguez?= Date: Thu, 30 Jan 2025 17:50:44 +0100 Subject: [PATCH] fix(device): separate internal logic without lock --- lib/modules/device/DeviceService.ts | 252 ++++++++++++++++------------ 1 file changed, 142 insertions(+), 110 deletions(-) diff --git a/lib/modules/device/DeviceService.ts b/lib/modules/device/DeviceService.ts index c57225f1..c98c7ac4 100644 --- a/lib/modules/device/DeviceService.ts +++ b/lib/modules/device/DeviceService.ts @@ -430,58 +430,74 @@ export class DeviceService extends DigitalTwinService { } /** - * Attach the device to an engine + * Internal logic to attach the device to an engine * * @param engineId Engine id to attach to * @param deviceId Device id to attach * @param options.refresh Wait for ES indexation * @param options.strict If true, throw if an operation isn't possible */ - async attachEngine( + private async _attachEngine( engineId: string, deviceId: string, request: KuzzleRequest, ): Promise> { - return lock(`device:${deviceId}`, async () => { - const device = await this.getInternalDevices(deviceId); + const device = await this.getInternalDevices(deviceId); - if (device._source.engineId) { - throw new BadRequestError( - `Device "${device._id}" is already attached to an engine.`, - ); - } + if (device._source.engineId) { + throw new BadRequestError( + `Device "${device._id}" is already attached to an engine.`, + ); + } - await this.checkEngineExists(engineId); + await this.checkEngineExists(engineId); - device._source.engineId = engineId; + device._source.engineId = engineId; - const [updatedDevice] = await Promise.all([ - this.updateDocument(request, device, { - collection: InternalCollection.DEVICES, - engineId: this.config.adminIndex, - }), + const [updatedDevice] = await Promise.all([ + this.updateDocument(request, device, { + collection: InternalCollection.DEVICES, + engineId: this.config.adminIndex, + }), - this.createDocument(request, device, { - collection: InternalCollection.DEVICES, - engineId, - }), + this.createDocument(request, device, { + collection: InternalCollection.DEVICES, + engineId, + }), + ]); + + if (request.getRefresh() === "wait_for") { + await Promise.all([ + this.sdk.collection.refresh( + this.config.adminIndex, + InternalCollection.DEVICES, + ), + this.sdk.collection.refresh( + device._source.engineId, + InternalCollection.DEVICES, + ), ]); + } - if (request.getRefresh() === "wait_for") { - await Promise.all([ - this.sdk.collection.refresh( - this.config.adminIndex, - InternalCollection.DEVICES, - ), - this.sdk.collection.refresh( - device._source.engineId, - InternalCollection.DEVICES, - ), - ]); - } + return updatedDevice; + } - return updatedDevice; - }); + /** + * Attach the device to an engine + * + * @param engineId Engine id to attach to + * @param deviceId Device id to attach + * @param options.refresh Wait for ES indexation + * @param options.strict If true, throw if an operation isn't possible + */ + async attachEngine( + engineId: string, + deviceId: string, + request: KuzzleRequest, + ): Promise> { + return lock(`device:${deviceId}`, async () => + this._attachEngine(engineId, deviceId, request), + ); } /** @@ -797,105 +813,121 @@ export class DeviceService extends DigitalTwinService { } /** - * Unlink a device of an asset + * Internal logic of unlink a device of an asset * * @param {string} deviceId Id of the device * @param {KuzzleRequest} request kuzzle request */ - async unlinkAsset( + private async _unlinkAsset( deviceId: string, request: KuzzleRequest, ): Promise<{ asset: KDocument; device: KDocument; }> { - return lock(`device:${deviceId}`, async () => { - const device = await this.getInternalDevices(deviceId); - const engineId = device._source.engineId; + const device = await this.getInternalDevices(deviceId); + const engineId = device._source.engineId; - this.checkAttachedToEngine(device); - - if (!device._source.assetId) { - throw new BadRequestError( - `Device "${device._id}" is not linked to an asset.`, - ); - } + this.checkAttachedToEngine(device); - const asset = await this.sdk.document.get( - engineId, - InternalCollection.ASSETS, - device._source.assetId, - ); - - const linkedDevices = asset._source.linkedDevices.filter( - (link) => link._id !== device._id, + if (!device._source.assetId) { + throw new BadRequestError( + `Device "${device._id}" is not linked to an asset.`, ); + } - const [updatedDevice, , updatedAsset] = await Promise.all([ - this.updateDocument( - request, - { _id: device._id, _source: { assetId: null } }, - { - collection: InternalCollection.DEVICES, - engineId: this.config.adminIndex, - }, - { source: true }, - ), + const asset = await this.sdk.document.get( + engineId, + InternalCollection.ASSETS, + device._source.assetId, + ); - this.updateDocument( - request, - { _id: device._id, _source: { assetId: null } }, - { - collection: InternalCollection.DEVICES, - engineId, - }, - ), + const linkedDevices = asset._source.linkedDevices.filter( + (link) => link._id !== device._id, + ); - this.updateDocument( - request, - { _id: asset._id, _source: { linkedDevices } }, - { - collection: InternalCollection.ASSETS, - engineId, - }, - { source: true }, - ), - ]); + const [updatedDevice, , updatedAsset] = await Promise.all([ + this.updateDocument( + request, + { _id: device._id, _source: { assetId: null } }, + { + collection: InternalCollection.DEVICES, + engineId: this.config.adminIndex, + }, + { source: true }, + ), - const event: AssetHistoryEventUnlink = { - name: "unlink", - unlink: { - deviceId, + this.updateDocument( + request, + { _id: device._id, _source: { assetId: null } }, + { + collection: InternalCollection.DEVICES, + engineId, }, - }; - await ask>( - "ask:device-manager:asset:history:add", + ), + + this.updateDocument( + request, + { _id: asset._id, _source: { linkedDevices } }, { + collection: InternalCollection.ASSETS, engineId, - histories: [ - { - asset: updatedAsset._source, - event, - id: updatedAsset._id, - timestamp: Date.now(), - }, - ], }, - ); + { source: true }, + ), + ]); - if (request.getRefresh() === "wait_for") { - await Promise.all([ - this.sdk.collection.refresh( - this.config.adminIndex, - InternalCollection.DEVICES, - ), - this.sdk.collection.refresh(engineId, InternalCollection.DEVICES), - this.sdk.collection.refresh(engineId, InternalCollection.ASSETS), - ]); - } + const event: AssetHistoryEventUnlink = { + name: "unlink", + unlink: { + deviceId, + }, + }; + await ask>( + "ask:device-manager:asset:history:add", + { + engineId, + histories: [ + { + asset: updatedAsset._source, + event, + id: updatedAsset._id, + timestamp: Date.now(), + }, + ], + }, + ); - return { asset: updatedAsset, device: updatedDevice }; - }); + if (request.getRefresh() === "wait_for") { + await Promise.all([ + this.sdk.collection.refresh( + this.config.adminIndex, + InternalCollection.DEVICES, + ), + this.sdk.collection.refresh(engineId, InternalCollection.DEVICES), + this.sdk.collection.refresh(engineId, InternalCollection.ASSETS), + ]); + } + + return { asset: updatedAsset, device: updatedDevice }; + } + + /** + * Unlink a device of an asset with lock + * + * @param {string} deviceId Id of the device + * @param {KuzzleRequest} request kuzzle request + */ + async unlinkAsset( + deviceId: string, + request: KuzzleRequest, + ): Promise<{ + asset: KDocument; + device: KDocument; + }> { + return lock(`device:${deviceId}`, async () => + this._unlinkAsset(deviceId, request), + ); } private async checkEngineExists(engineId: string) {