diff --git a/lib/modules/asset/AssetService.ts b/lib/modules/asset/AssetService.ts index a4da4e6d..54ae3fe2 100644 --- a/lib/modules/asset/AssetService.ts +++ b/lib/modules/asset/AssetService.ts @@ -143,63 +143,65 @@ export class AssetService extends DigitalTwinService { metadata: Metadata, request: KuzzleRequest, ): Promise> { - const asset = await this.get(engineId, assetId, request); - const unknownMetadata = {}; - for (const key in metadata) { - if (key in asset._source.metadata) { - asset._source.metadata[key] = metadata[key]; - } else { - unknownMetadata[key] = metadata[key]; + return lock(`asset:${engineId}:${assetId}`, async () => { + const asset = await this.get(engineId, assetId, request); + const unknownMetadata = {}; + for (const key in metadata) { + if (key in asset._source.metadata) { + asset._source.metadata[key] = metadata[key]; + } else { + unknownMetadata[key] = metadata[key]; + } } - } - // ? If metadata key is unknown on the asset we check that it exists in the assetModel mappings - if (Object.keys(unknownMetadata).length > 0) { - const assetModel = await ask( - "ask:device-manager:model:asset:get", - { engineGroup: engineId.split("-")[1], model: asset._source.model }, - ); - for (const key in unknownMetadata) { - if (key in assetModel.asset.metadataMappings) { - asset._source.metadata[key] = unknownMetadata[key]; + // ? If metadata key is unknown on the asset we check that it exists in the assetModel mappings + if (Object.keys(unknownMetadata).length > 0) { + const assetModel = await ask( + "ask:device-manager:model:asset:get", + { engineGroup: engineId.split("-")[1], model: asset._source.model }, + ); + for (const key in unknownMetadata) { + if (key in assetModel.asset.metadataMappings) { + asset._source.metadata[key] = unknownMetadata[key]; + } } } - } - const updatedPayload = await this.app.trigger( - "device-manager:asset:update:before", - { asset, metadata }, - ); + const updatedPayload = await this.app.trigger( + "device-manager:asset:update:before", + { asset, metadata }, + ); - const updatedAsset = await this.sdk.document.replace( - engineId, - InternalCollection.ASSETS, - assetId, - updatedPayload.asset._source, - { triggerEvents: true }, - ); + const updatedAsset = await this.sdk.document.replace( + engineId, + InternalCollection.ASSETS, + assetId, + updatedPayload.asset._source, + { triggerEvents: true }, + ); - await this.assetHistoryService.add(engineId, [ - { - asset: updatedAsset._source, - event: { - metadata: { - names: Object.keys(flattenObject(updatedPayload.metadata)), + await this.assetHistoryService.add(engineId, [ + { + asset: updatedAsset._source, + event: { + metadata: { + names: Object.keys(flattenObject(updatedPayload.metadata)), + }, + name: "metadata", }, - name: "metadata", + id: updatedAsset._id, + timestamp: Date.now(), }, - id: updatedAsset._id, - timestamp: Date.now(), - }, - ]); + ]); - await this.app.trigger( - "device-manager:asset:update:after", - { - asset: updatedAsset, - metadata: updatedPayload.metadata, - }, - ); + await this.app.trigger( + "device-manager:asset:update:after", + { + asset: updatedAsset, + metadata: updatedPayload.metadata, + }, + ); - return updatedAsset; + return updatedAsset; + }); } /** @@ -212,14 +214,22 @@ export class AssetService extends DigitalTwinService { metadata: Metadata, request: KuzzleRequest, ): Promise> { - const assetId = `${model}-${reference}`; + const assetId = AssetSerializer.id(model, reference); + return lock(`asset:${engineId}:${assetId}`, async () => { const asset = await this.get(engineId, assetId, request).catch( () => null, ); if (!asset) { - return this.create(engineId, model, reference, metadata, request); + return this._create( + assetId, + engineId, + model, + reference, + metadata, + request, + ); } const updatedPayload = await this.app.trigger( @@ -266,80 +276,89 @@ export class AssetService extends DigitalTwinService { }); } - /** - * Create an asset metadata - */ - public async create( + private async _create( + assetId: string, engineId: string, model: string, reference: string, metadata: JSONObject, request: KuzzleRequest, ): Promise> { - const assetId = AssetSerializer.id(model, reference); - - return lock(`asset:${engineId}:${assetId}`, async () => { - const engine = await this.getEngine(engineId); - const assetModel = await ask( - "ask:device-manager:model:asset:get", - { engineGroup: engine.group, model }, - ); + const engine = await this.getEngine(engineId); + const assetModel = await ask( + "ask:device-manager:model:asset:get", + { engineGroup: engine.group, model }, + ); - const assetMetadata = {}; - for (const metadataName of Object.keys( - assetModel.asset.metadataMappings, - )) { - assetMetadata[metadataName] = null; - } - for (const [metadataName, metadataValue] of Object.entries( - assetModel.asset.defaultMetadata, - )) { - _.set(assetMetadata, metadataName, metadataValue); - } + const assetMetadata = {}; + for (const metadataName of Object.keys(assetModel.asset.metadataMappings)) { + assetMetadata[metadataName] = null; + } + for (const [metadataName, metadataValue] of Object.entries( + assetModel.asset.defaultMetadata, + )) { + _.set(assetMetadata, metadataName, metadataValue); + } - const measures: Record = {}; + const measures: Record = {}; - for (const { name } of assetModel.asset.measures) { - measures[name] = null; - } + for (const { name } of assetModel.asset.measures) { + measures[name] = null; + } - const asset = await this.createDocument( - request, - { - _id: assetId, - _source: { - groups: [], - lastMeasuredAt: null, - linkedDevices: [], - measures, - metadata: { ...assetMetadata, ...metadata }, - model, - reference, - softTenant: [], - }, - }, - { - collection: InternalCollection.ASSETS, - engineId, + const asset = await this.createDocument( + request, + { + _id: assetId, + _source: { + groups: [], + lastMeasuredAt: null, + linkedDevices: [], + measures, + metadata: { ...assetMetadata, ...metadata }, + model, + reference, + softTenant: [], }, - ); + }, + { + collection: InternalCollection.ASSETS, + engineId, + }, + ); - await this.assetHistoryService.add(engineId, [ - { - asset: asset._source, - event: { - metadata: { - names: Object.keys(flattenObject(asset._source.metadata)), - }, - name: "metadata", + await this.assetHistoryService.add(engineId, [ + { + asset: asset._source, + event: { + metadata: { + names: Object.keys(flattenObject(asset._source.metadata)), }, - id: asset._id, - timestamp: Date.now(), + name: "metadata", }, - ]); + id: asset._id, + timestamp: Date.now(), + }, + ]); - return asset; - }); + return asset; + } + + /** + * Create an asset metadata + */ + public async create( + engineId: string, + model: string, + reference: string, + metadata: JSONObject, + request: KuzzleRequest, + ): Promise> { + const assetId = AssetSerializer.id(model, reference); + + return lock(`asset:${engineId}:${assetId}`, async () => + this._create(assetId, engineId, model, reference, metadata, request), + ); } /** diff --git a/lib/modules/device/DeviceService.ts b/lib/modules/device/DeviceService.ts index 3a71d968..c98c7ac4 100644 --- a/lib/modules/device/DeviceService.ts +++ b/lib/modules/device/DeviceService.ts @@ -82,21 +82,15 @@ export class DeviceService extends DigitalTwinService { ); } - /** - * Create a new device. - * - * @todo creating a device in the "device-manager" index should be a separate - * method since in this case "engineId" is not really an engine ID but still - * required as an argument (in part to check the tenant rights) - */ - async create( + private async _create( + deviceId: string, model: string, reference: string, metadata: JSONObject, request: KuzzleRequest, ): Promise> { let device: KDocument = { - _id: DeviceSerializer.id(model, reference), + _id: deviceId, _source: { assetId: null, engineId: null, @@ -108,61 +102,79 @@ export class DeviceService extends DigitalTwinService { }, }; - return lock(`device:create:${device._id}`, async () => { - const deviceModel = await this.getDeviceModel(model); - const engineId = request.getString("engineId"); + const deviceModel = await this.getDeviceModel(model); + const engineId = request.getString("engineId"); - for (const metadataName of Object.keys( - deviceModel.device.metadataMappings, - )) { - device._source.metadata[metadataName] ||= null; - } - for (const [metadataName, metadataValue] of Object.entries( - deviceModel.device.defaultMetadata, - )) { - _.set(device._source.metadata, metadataName, metadataValue); - } + for (const metadataName of Object.keys( + deviceModel.device.metadataMappings, + )) { + device._source.metadata[metadataName] ||= null; + } + for (const [metadataName, metadataValue] of Object.entries( + deviceModel.device.defaultMetadata, + )) { + _.set(device._source.metadata, metadataName, metadataValue); + } - const refreshableCollections: Array<{ - index: string; - collection: string; - }> = []; + const refreshableCollections: Array<{ + index: string; + collection: string; + }> = []; - const { _source } = await this.createDocument( - request, - device, - { - collection: InternalCollection.DEVICES, - engineId: this.config.adminIndex, - }, - ); + const { _source } = await this.createDocument( + request, + device, + { + collection: InternalCollection.DEVICES, + engineId: this.config.adminIndex, + }, + ); + + device._source = _source; - device._source = _source; + refreshableCollections.push({ + collection: InternalCollection.DEVICES, + index: this.config.adminIndex, + }); + + if (engineId && engineId !== this.config.adminIndex) { + device = await this.attachEngine(engineId, device._id, request); refreshableCollections.push({ collection: InternalCollection.DEVICES, - index: this.config.adminIndex, + index: engineId, }); + } - if (engineId && engineId !== this.config.adminIndex) { - device = await this.attachEngine(engineId, device._id, request); + if (request.getRefresh() === "wait_for") { + await Promise.all( + refreshableCollections.map(({ index, collection }) => + this.sdk.collection.refresh(index, collection), + ), + ); + } - refreshableCollections.push({ - collection: InternalCollection.DEVICES, - index: engineId, - }); - } + return device; + } - if (request.getRefresh() === "wait_for") { - await Promise.all( - refreshableCollections.map(({ index, collection }) => - this.sdk.collection.refresh(index, collection), - ), - ); - } + /** + * Create a new device. + * + * @todo creating a device in the "device-manager" index should be a separate + * method since in this case "engineId" is not really an engine ID but still + * required as an argument (in part to check the tenant rights) + */ + async create( + model: string, + reference: string, + metadata: JSONObject, + request: KuzzleRequest, + ): Promise> { + const deviceId = DeviceSerializer.id(model, reference); - return device; - }); + return lock(`device:${deviceId}`, async () => + this._create(deviceId, model, reference, metadata, request), + ); } public async get( @@ -192,35 +204,37 @@ export class DeviceService extends DigitalTwinService { metadata: Metadata, request: KuzzleRequest, ): Promise> { - const device = await this.get(engineId, deviceId, request); + return lock(`device:${deviceId}`, async () => { + const device = await this.get(engineId, deviceId, request); - for (const key in metadata) { - if (key in device._source.metadata) { - device._source.metadata[key] = metadata[key]; + for (const key in metadata) { + if (key in device._source.metadata) { + device._source.metadata[key] = metadata[key]; + } } - } - const updatedPayload = await this.app.trigger( - "device-manager:device:update:before", - { device: device, metadata }, - ); + const updatedPayload = await this.app.trigger( + "device-manager:device:update:before", + { device: device, metadata }, + ); - const updatedDevice = await this.sdk.document.replace( - engineId, - InternalCollection.DEVICES, - deviceId, - updatedPayload.device._source, - ); + const updatedDevice = await this.sdk.document.replace( + engineId, + InternalCollection.DEVICES, + deviceId, + updatedPayload.device._source, + ); - await this.app.trigger( - "device-manager:device:update:after", - { - device: updatedDevice, - metadata: updatedPayload.metadata, - }, - ); + await this.app.trigger( + "device-manager:device:update:after", + { + device: updatedDevice, + metadata: updatedPayload.metadata, + }, + ); - return updatedDevice; + return updatedDevice; + }); } /** @@ -233,8 +247,9 @@ export class DeviceService extends DigitalTwinService { metadata: Metadata, request: KuzzleRequest, ): Promise> { - const deviceId = `${model}-${reference}`; - return lock(`device:${engineId}:${deviceId}`, async () => { + const deviceId = DeviceSerializer.id(model, reference); + + return lock(`device:${deviceId}`, async () => { const adminIndexDevice = await this.get( this.config.adminIndex, deviceId, @@ -242,7 +257,7 @@ export class DeviceService extends DigitalTwinService { ).catch(() => null); if (!adminIndexDevice) { - return this.create(model, reference, metadata, request); + return this._create(deviceId, model, reference, metadata, request); } if ( @@ -298,37 +313,34 @@ export class DeviceService extends DigitalTwinService { metadata: Metadata, request: KuzzleRequest, ): Promise> { - return lock>( - `device:update:${deviceId}`, - async () => { - const device = await this.get(engineId, deviceId, request); - - const updatedPayload = await this.app.trigger( - "device-manager:device:update:before", - { device, metadata }, - ); + return lock(`device:${deviceId}`, async () => { + const device = await this.get(engineId, deviceId, request); - const updatedDevice = await this.updateDocument( - request, - { - _id: deviceId, - _source: { metadata: updatedPayload.metadata }, - }, - { collection: InternalCollection.DEVICES, engineId }, - { source: true }, - ); + const updatedPayload = await this.app.trigger( + "device-manager:device:update:before", + { device, metadata }, + ); - await this.app.trigger( - "device-manager:device:update:after", - { - device: updatedDevice, - metadata: updatedPayload.metadata, - }, - ); + const updatedDevice = await this.updateDocument( + request, + { + _id: deviceId, + _source: { metadata: updatedPayload.metadata }, + }, + { collection: InternalCollection.DEVICES, engineId }, + { source: true }, + ); - return updatedDevice; - }, - ); + await this.app.trigger( + "device-manager:device:update:after", + { + device: updatedDevice, + metadata: updatedPayload.metadata, + }, + ); + + return updatedDevice; + }); } public async delete( @@ -336,7 +348,7 @@ export class DeviceService extends DigitalTwinService { deviceId: string, request: KuzzleRequest, ) { - return lock(`device:delete:${deviceId}`, async () => { + return lock(`device:${deviceId}`, async () => { const device = await this.get(engineId, deviceId, request); const promises = []; @@ -418,58 +430,74 @@ export class DeviceService extends DigitalTwinService { } /** - * Attach the device to an engine + * Internal logic to attach the device to an engine * * @param engineId Engine id to attach to * @param deviceId Device id to attach * @param options.refresh Wait for ES indexation * @param options.strict If true, throw if an operation isn't possible */ - async attachEngine( + private async _attachEngine( engineId: string, deviceId: string, request: KuzzleRequest, ): Promise> { - return lock(`device:attachEngine:${deviceId}`, async () => { - const device = await this.getInternalDevices(deviceId); + const device = await this.getInternalDevices(deviceId); - if (device._source.engineId) { - throw new BadRequestError( - `Device "${device._id}" is already attached to an engine.`, - ); - } + if (device._source.engineId) { + throw new BadRequestError( + `Device "${device._id}" is already attached to an engine.`, + ); + } - await this.checkEngineExists(engineId); + await this.checkEngineExists(engineId); - device._source.engineId = engineId; + device._source.engineId = engineId; - const [updatedDevice] = await Promise.all([ - this.updateDocument(request, device, { - collection: InternalCollection.DEVICES, - engineId: this.config.adminIndex, - }), + const [updatedDevice] = await Promise.all([ + this.updateDocument(request, device, { + collection: InternalCollection.DEVICES, + engineId: this.config.adminIndex, + }), - this.createDocument(request, device, { - collection: InternalCollection.DEVICES, - engineId, - }), + this.createDocument(request, device, { + collection: InternalCollection.DEVICES, + engineId, + }), + ]); + + if (request.getRefresh() === "wait_for") { + await Promise.all([ + this.sdk.collection.refresh( + this.config.adminIndex, + InternalCollection.DEVICES, + ), + this.sdk.collection.refresh( + device._source.engineId, + InternalCollection.DEVICES, + ), ]); + } - if (request.getRefresh() === "wait_for") { - await Promise.all([ - this.sdk.collection.refresh( - this.config.adminIndex, - InternalCollection.DEVICES, - ), - this.sdk.collection.refresh( - device._source.engineId, - InternalCollection.DEVICES, - ), - ]); - } + return updatedDevice; + } - return updatedDevice; - }); + /** + * Attach the device to an engine + * + * @param engineId Engine id to attach to + * @param deviceId Device id to attach + * @param options.refresh Wait for ES indexation + * @param options.strict If true, throw if an operation isn't possible + */ + async attachEngine( + engineId: string, + deviceId: string, + request: KuzzleRequest, + ): Promise> { + return lock(`device:${deviceId}`, async () => + this._attachEngine(engineId, deviceId, request), + ); } /** @@ -482,7 +510,7 @@ export class DeviceService extends DigitalTwinService { deviceId: string, request: KuzzleRequest, ): Promise> { - return lock(`device:detachEngine:${deviceId}`, async () => { + return lock(`device:${deviceId}`, async () => { const device = await this.getInternalDevices(deviceId); this.checkAttachedToEngine(device); @@ -542,7 +570,7 @@ export class DeviceService extends DigitalTwinService { asset: KDocument; device: KDocument; }> { - return lock(`device:linkAsset:${deviceId}`, async () => { + return lock(`device:${deviceId}`, async () => { const device = await this.getInternalDevices(deviceId); const engine = await this.getEngine(engineId); @@ -785,105 +813,121 @@ export class DeviceService extends DigitalTwinService { } /** - * Unlink a device of an asset + * Internal logic of unlink a device of an asset * * @param {string} deviceId Id of the device * @param {KuzzleRequest} request kuzzle request */ - async unlinkAsset( + private async _unlinkAsset( deviceId: string, request: KuzzleRequest, ): Promise<{ asset: KDocument; device: KDocument; }> { - return lock(`device:unlinkAsset:${deviceId}`, async () => { - const device = await this.getInternalDevices(deviceId); - const engineId = device._source.engineId; + const device = await this.getInternalDevices(deviceId); + const engineId = device._source.engineId; - this.checkAttachedToEngine(device); - - if (!device._source.assetId) { - throw new BadRequestError( - `Device "${device._id}" is not linked to an asset.`, - ); - } - - const asset = await this.sdk.document.get( - engineId, - InternalCollection.ASSETS, - device._source.assetId, - ); + this.checkAttachedToEngine(device); - const linkedDevices = asset._source.linkedDevices.filter( - (link) => link._id !== device._id, + if (!device._source.assetId) { + throw new BadRequestError( + `Device "${device._id}" is not linked to an asset.`, ); + } - const [updatedDevice, , updatedAsset] = await Promise.all([ - this.updateDocument( - request, - { _id: device._id, _source: { assetId: null } }, - { - collection: InternalCollection.DEVICES, - engineId: this.config.adminIndex, - }, - { source: true }, - ), + const asset = await this.sdk.document.get( + engineId, + InternalCollection.ASSETS, + device._source.assetId, + ); - this.updateDocument( - request, - { _id: device._id, _source: { assetId: null } }, - { - collection: InternalCollection.DEVICES, - engineId, - }, - ), + const linkedDevices = asset._source.linkedDevices.filter( + (link) => link._id !== device._id, + ); - this.updateDocument( - request, - { _id: asset._id, _source: { linkedDevices } }, - { - collection: InternalCollection.ASSETS, - engineId, - }, - { source: true }, - ), - ]); + const [updatedDevice, , updatedAsset] = await Promise.all([ + this.updateDocument( + request, + { _id: device._id, _source: { assetId: null } }, + { + collection: InternalCollection.DEVICES, + engineId: this.config.adminIndex, + }, + { source: true }, + ), - const event: AssetHistoryEventUnlink = { - name: "unlink", - unlink: { - deviceId, + this.updateDocument( + request, + { _id: device._id, _source: { assetId: null } }, + { + collection: InternalCollection.DEVICES, + engineId, }, - }; - await ask>( - "ask:device-manager:asset:history:add", + ), + + this.updateDocument( + request, + { _id: asset._id, _source: { linkedDevices } }, { + collection: InternalCollection.ASSETS, engineId, - histories: [ - { - asset: updatedAsset._source, - event, - id: updatedAsset._id, - timestamp: Date.now(), - }, - ], }, - ); + { source: true }, + ), + ]); - if (request.getRefresh() === "wait_for") { - await Promise.all([ - this.sdk.collection.refresh( - this.config.adminIndex, - InternalCollection.DEVICES, - ), - this.sdk.collection.refresh(engineId, InternalCollection.DEVICES), - this.sdk.collection.refresh(engineId, InternalCollection.ASSETS), - ]); - } + const event: AssetHistoryEventUnlink = { + name: "unlink", + unlink: { + deviceId, + }, + }; + await ask>( + "ask:device-manager:asset:history:add", + { + engineId, + histories: [ + { + asset: updatedAsset._source, + event, + id: updatedAsset._id, + timestamp: Date.now(), + }, + ], + }, + ); - return { asset: updatedAsset, device: updatedDevice }; - }); + if (request.getRefresh() === "wait_for") { + await Promise.all([ + this.sdk.collection.refresh( + this.config.adminIndex, + InternalCollection.DEVICES, + ), + this.sdk.collection.refresh(engineId, InternalCollection.DEVICES), + this.sdk.collection.refresh(engineId, InternalCollection.ASSETS), + ]); + } + + return { asset: updatedAsset, device: updatedDevice }; + } + + /** + * Unlink a device of an asset with lock + * + * @param {string} deviceId Id of the device + * @param {KuzzleRequest} request kuzzle request + */ + async unlinkAsset( + deviceId: string, + request: KuzzleRequest, + ): Promise<{ + asset: KDocument; + device: KDocument; + }> { + return lock(`device:${deviceId}`, async () => + this._unlinkAsset(deviceId, request), + ); } private async checkEngineExists(engineId: string) { diff --git a/tests/scenario/modules/assets/asset-migrate-tenant.test.ts b/tests/scenario/modules/assets/asset-migrate-tenant.test.ts index df068711..330b953b 100644 --- a/tests/scenario/modules/assets/asset-migrate-tenant.test.ts +++ b/tests/scenario/modules/assets/asset-migrate-tenant.test.ts @@ -3,7 +3,7 @@ import { beforeEachTruncateCollections } from "../../../hooks/collections"; import { beforeAllCreateEngines } from "../../../hooks/engines"; import { beforeEachLoadFixtures } from "../../../hooks/fixtures"; -jest.setTimeout(10000); +jest.setTimeout(20000); describe("AssetsController:migrateTenant", () => { const sdk = useSdk(); @@ -12,6 +12,11 @@ describe("AssetsController:migrateTenant", () => { await sdk.connect(); await beforeAllCreateEngines(sdk); + + await sdk.auth.login("local", { + username: "test-admin", + password: "password", + }); }); beforeEach(async () => { @@ -24,14 +29,6 @@ describe("AssetsController:migrateTenant", () => { }); it("should fail if both engine does not belong to same group", async () => { - await sdk.auth.login("local", { - username: "test-admin", - password: "password", - }); - // We connect only here to avoid failing the first test - // If we do it in the beforeAll hook, the first test will fail - // And if we run it each time we might encounter "Too many login attempts per second" - await expect( sdk.query({ controller: "device-manager/assets",