Skip to content

Commit

Permalink
fix(device): separate internal logic without lock
Browse files Browse the repository at this point in the history
  • Loading branch information
sebtiz13 committed Jan 30, 2025
1 parent afa342a commit 23562f8
Showing 1 changed file with 142 additions and 110 deletions.
252 changes: 142 additions & 110 deletions lib/modules/device/DeviceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -430,58 +430,74 @@ export class DeviceService extends DigitalTwinService {
}

/**
* Attach the device to an engine
* Internal logic to attach the device to an engine
*
* @param engineId Engine id to attach to
* @param deviceId Device id to attach
* @param options.refresh Wait for ES indexation
* @param options.strict If true, throw if an operation isn't possible
*/
async attachEngine(
private async _attachEngine(
engineId: string,
deviceId: string,
request: KuzzleRequest,
): Promise<KDocument<DeviceContent>> {
return lock(`device:${deviceId}`, async () => {
const device = await this.getInternalDevices(deviceId);
const device = await this.getInternalDevices(deviceId);

if (device._source.engineId) {
throw new BadRequestError(
`Device "${device._id}" is already attached to an engine.`,
);
}
if (device._source.engineId) {
throw new BadRequestError(
`Device "${device._id}" is already attached to an engine.`,
);
}

await this.checkEngineExists(engineId);
await this.checkEngineExists(engineId);

device._source.engineId = engineId;
device._source.engineId = engineId;

const [updatedDevice] = await Promise.all([
this.updateDocument<DeviceContent>(request, device, {
collection: InternalCollection.DEVICES,
engineId: this.config.adminIndex,
}),
const [updatedDevice] = await Promise.all([
this.updateDocument<DeviceContent>(request, device, {
collection: InternalCollection.DEVICES,
engineId: this.config.adminIndex,
}),

this.createDocument<DeviceContent>(request, device, {
collection: InternalCollection.DEVICES,
engineId,
}),
this.createDocument<DeviceContent>(request, device, {
collection: InternalCollection.DEVICES,
engineId,
}),
]);

if (request.getRefresh() === "wait_for") {
await Promise.all([
this.sdk.collection.refresh(
this.config.adminIndex,
InternalCollection.DEVICES,
),
this.sdk.collection.refresh(
device._source.engineId,
InternalCollection.DEVICES,
),
]);
}

if (request.getRefresh() === "wait_for") {
await Promise.all([
this.sdk.collection.refresh(
this.config.adminIndex,
InternalCollection.DEVICES,
),
this.sdk.collection.refresh(
device._source.engineId,
InternalCollection.DEVICES,
),
]);
}
return updatedDevice;
}

return updatedDevice;
});
/**
* Attach the device to an engine
*
* @param engineId Engine id to attach to
* @param deviceId Device id to attach
* @param options.refresh Wait for ES indexation
* @param options.strict If true, throw if an operation isn't possible
*/
async attachEngine(
engineId: string,
deviceId: string,
request: KuzzleRequest,
): Promise<KDocument<DeviceContent>> {
return lock(`device:${deviceId}`, async () =>
this._attachEngine(engineId, deviceId, request),
);
}

/**
Expand Down Expand Up @@ -797,105 +813,121 @@ export class DeviceService extends DigitalTwinService {
}

/**
* Unlink a device of an asset
* Internal logic of unlink a device of an asset
*
* @param {string} deviceId Id of the device
* @param {KuzzleRequest} request kuzzle request
*/
async unlinkAsset(
private async _unlinkAsset(
deviceId: string,
request: KuzzleRequest,
): Promise<{
asset: KDocument<AssetContent>;
device: KDocument<DeviceContent>;
}> {
return lock(`device:${deviceId}`, async () => {
const device = await this.getInternalDevices(deviceId);
const engineId = device._source.engineId;
const device = await this.getInternalDevices(deviceId);
const engineId = device._source.engineId;

this.checkAttachedToEngine(device);

if (!device._source.assetId) {
throw new BadRequestError(
`Device "${device._id}" is not linked to an asset.`,
);
}
this.checkAttachedToEngine(device);

const asset = await this.sdk.document.get<AssetContent>(
engineId,
InternalCollection.ASSETS,
device._source.assetId,
);

const linkedDevices = asset._source.linkedDevices.filter(
(link) => link._id !== device._id,
if (!device._source.assetId) {
throw new BadRequestError(
`Device "${device._id}" is not linked to an asset.`,
);
}

const [updatedDevice, , updatedAsset] = await Promise.all([
this.updateDocument<DeviceContent>(
request,
{ _id: device._id, _source: { assetId: null } },
{
collection: InternalCollection.DEVICES,
engineId: this.config.adminIndex,
},
{ source: true },
),
const asset = await this.sdk.document.get<AssetContent>(
engineId,
InternalCollection.ASSETS,
device._source.assetId,
);

this.updateDocument<DeviceContent>(
request,
{ _id: device._id, _source: { assetId: null } },
{
collection: InternalCollection.DEVICES,
engineId,
},
),
const linkedDevices = asset._source.linkedDevices.filter(
(link) => link._id !== device._id,
);

this.updateDocument<AssetContent>(
request,
{ _id: asset._id, _source: { linkedDevices } },
{
collection: InternalCollection.ASSETS,
engineId,
},
{ source: true },
),
]);
const [updatedDevice, , updatedAsset] = await Promise.all([
this.updateDocument<DeviceContent>(
request,
{ _id: device._id, _source: { assetId: null } },
{
collection: InternalCollection.DEVICES,
engineId: this.config.adminIndex,
},
{ source: true },
),

const event: AssetHistoryEventUnlink = {
name: "unlink",
unlink: {
deviceId,
this.updateDocument<DeviceContent>(
request,
{ _id: device._id, _source: { assetId: null } },
{
collection: InternalCollection.DEVICES,
engineId,
},
};
await ask<AskAssetHistoryAdd<AssetHistoryEventUnlink>>(
"ask:device-manager:asset:history:add",
),

this.updateDocument<AssetContent>(
request,
{ _id: asset._id, _source: { linkedDevices } },
{
collection: InternalCollection.ASSETS,
engineId,
histories: [
{
asset: updatedAsset._source,
event,
id: updatedAsset._id,
timestamp: Date.now(),
},
],
},
);
{ source: true },
),
]);

if (request.getRefresh() === "wait_for") {
await Promise.all([
this.sdk.collection.refresh(
this.config.adminIndex,
InternalCollection.DEVICES,
),
this.sdk.collection.refresh(engineId, InternalCollection.DEVICES),
this.sdk.collection.refresh(engineId, InternalCollection.ASSETS),
]);
}
const event: AssetHistoryEventUnlink = {
name: "unlink",
unlink: {
deviceId,
},
};
await ask<AskAssetHistoryAdd<AssetHistoryEventUnlink>>(
"ask:device-manager:asset:history:add",
{
engineId,
histories: [
{
asset: updatedAsset._source,
event,
id: updatedAsset._id,
timestamp: Date.now(),
},
],
},
);

return { asset: updatedAsset, device: updatedDevice };
});
if (request.getRefresh() === "wait_for") {
await Promise.all([
this.sdk.collection.refresh(
this.config.adminIndex,
InternalCollection.DEVICES,
),
this.sdk.collection.refresh(engineId, InternalCollection.DEVICES),
this.sdk.collection.refresh(engineId, InternalCollection.ASSETS),
]);
}

return { asset: updatedAsset, device: updatedDevice };
}

/**
* Unlink a device of an asset with lock
*
* @param {string} deviceId Id of the device
* @param {KuzzleRequest} request kuzzle request
*/
async unlinkAsset(
deviceId: string,
request: KuzzleRequest,
): Promise<{
asset: KDocument<AssetContent>;
device: KDocument<DeviceContent>;
}> {
return lock(`device:${deviceId}`, async () =>
this._unlinkAsset(deviceId, request),
);
}

private async checkEngineExists(engineId: string) {
Expand Down

0 comments on commit 23562f8

Please sign in to comment.