Skip to content

Commit

Permalink
fix(digitalTwin): correct race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
sebtiz13 committed Jan 29, 2025
1 parent b7e1351 commit 596b8d5
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 132 deletions.
224 changes: 118 additions & 106 deletions lib/modules/asset/AssetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,63 +143,65 @@ export class AssetService extends DigitalTwinService {
metadata: Metadata,
request: KuzzleRequest,
): Promise<KDocument<AssetContent>> {
const asset = await this.get(engineId, assetId, request);
const unknownMetadata = {};
for (const key in metadata) {
if (key in asset._source.metadata) {
asset._source.metadata[key] = metadata[key];
} else {
unknownMetadata[key] = metadata[key];
return lock(`asset:${engineId}:${assetId}`, async () => {
const asset = await this.get(engineId, assetId, request);
const unknownMetadata = {};
for (const key in metadata) {
if (key in asset._source.metadata) {
asset._source.metadata[key] = metadata[key];
} else {
unknownMetadata[key] = metadata[key];
}
}
}
// ? If metadata key is unknown on the asset we check that it exists in the assetModel mappings
if (Object.keys(unknownMetadata).length > 0) {
const assetModel = await ask<AskModelAssetGet>(
"ask:device-manager:model:asset:get",
{ engineGroup: engineId.split("-")[1], model: asset._source.model },
);
for (const key in unknownMetadata) {
if (key in assetModel.asset.metadataMappings) {
asset._source.metadata[key] = unknownMetadata[key];
// ? If metadata key is unknown on the asset we check that it exists in the assetModel mappings
if (Object.keys(unknownMetadata).length > 0) {
const assetModel = await ask<AskModelAssetGet>(
"ask:device-manager:model:asset:get",
{ engineGroup: engineId.split("-")[1], model: asset._source.model },
);
for (const key in unknownMetadata) {
if (key in assetModel.asset.metadataMappings) {
asset._source.metadata[key] = unknownMetadata[key];
}
}
}
}
const updatedPayload = await this.app.trigger<EventAssetUpdateBefore>(
"device-manager:asset:update:before",
{ asset, metadata },
);
const updatedPayload = await this.app.trigger<EventAssetUpdateBefore>(
"device-manager:asset:update:before",
{ asset, metadata },
);

const updatedAsset = await this.sdk.document.replace<AssetContent>(
engineId,
InternalCollection.ASSETS,
assetId,
updatedPayload.asset._source,
{ triggerEvents: true },
);
const updatedAsset = await this.sdk.document.replace<AssetContent>(
engineId,
InternalCollection.ASSETS,
assetId,
updatedPayload.asset._source,
{ triggerEvents: true },
);

await this.assetHistoryService.add<AssetHistoryEventMetadata>(engineId, [
{
asset: updatedAsset._source,
event: {
metadata: {
names: Object.keys(flattenObject(updatedPayload.metadata)),
await this.assetHistoryService.add<AssetHistoryEventMetadata>(engineId, [
{
asset: updatedAsset._source,
event: {
metadata: {
names: Object.keys(flattenObject(updatedPayload.metadata)),
},
name: "metadata",
},
name: "metadata",
id: updatedAsset._id,
timestamp: Date.now(),
},
id: updatedAsset._id,
timestamp: Date.now(),
},
]);
]);

await this.app.trigger<EventAssetUpdateAfter>(
"device-manager:asset:update:after",
{
asset: updatedAsset,
metadata: updatedPayload.metadata,
},
);
await this.app.trigger<EventAssetUpdateAfter>(
"device-manager:asset:update:after",
{
asset: updatedAsset,
metadata: updatedPayload.metadata,
},
);

return updatedAsset;
return updatedAsset;
});
}

/**
Expand All @@ -219,7 +221,7 @@ export class AssetService extends DigitalTwinService {
);

if (!asset) {
return this.create(engineId, model, reference, metadata, request);
return this._create(engineId, model, reference, metadata, request);
}

const updatedPayload = await this.app.trigger<EventAssetUpdateBefore>(
Expand Down Expand Up @@ -266,10 +268,7 @@ export class AssetService extends DigitalTwinService {
});
}

/**
* Create an asset metadata
*/
public async create(
private async _create(
engineId: string,
model: string,
reference: string,
Expand All @@ -278,68 +277,81 @@ export class AssetService extends DigitalTwinService {
): Promise<KDocument<AssetContent>> {
const assetId = AssetSerializer.id(model, reference);

return lock(`asset:${engineId}:${assetId}`, async () => {
const engine = await this.getEngine(engineId);
const assetModel = await ask<AskModelAssetGet>(
"ask:device-manager:model:asset:get",
{ engineGroup: engine.group, model },
);
const engine = await this.getEngine(engineId);
const assetModel = await ask<AskModelAssetGet>(
"ask:device-manager:model:asset:get",
{ engineGroup: engine.group, model },
);

const assetMetadata = {};
for (const metadataName of Object.keys(
assetModel.asset.metadataMappings,
)) {
assetMetadata[metadataName] = null;
}
for (const [metadataName, metadataValue] of Object.entries(
assetModel.asset.defaultMetadata,
)) {
_.set(assetMetadata, metadataName, metadataValue);
}
const assetMetadata = {};
for (const metadataName of Object.keys(assetModel.asset.metadataMappings)) {
assetMetadata[metadataName] = null;
}
for (const [metadataName, metadataValue] of Object.entries(
assetModel.asset.defaultMetadata,
)) {
_.set(assetMetadata, metadataName, metadataValue);
}

const measures: Record<string, EmbeddedMeasure> = {};
const measures: Record<string, EmbeddedMeasure> = {};

for (const { name } of assetModel.asset.measures) {
measures[name] = null;
}
for (const { name } of assetModel.asset.measures) {
measures[name] = null;
}

const asset = await this.createDocument<AssetContent>(
request,
{
_id: assetId,
_source: {
groups: [],
lastMeasuredAt: null,
linkedDevices: [],
measures,
metadata: { ...assetMetadata, ...metadata },
model,
reference,
softTenant: [],
},
},
{
collection: InternalCollection.ASSETS,
engineId,
const asset = await this.createDocument<AssetContent>(
request,
{
_id: assetId,
_source: {
groups: [],
lastMeasuredAt: null,
linkedDevices: [],
measures,
metadata: { ...assetMetadata, ...metadata },
model,
reference,
softTenant: [],
},
);
},
{
collection: InternalCollection.ASSETS,
engineId,
},
);

await this.assetHistoryService.add<AssetHistoryEventMetadata>(engineId, [
{
asset: asset._source,
event: {
metadata: {
names: Object.keys(flattenObject(asset._source.metadata)),
},
name: "metadata",
await this.assetHistoryService.add<AssetHistoryEventMetadata>(engineId, [
{
asset: asset._source,
event: {
metadata: {
names: Object.keys(flattenObject(asset._source.metadata)),
},
id: asset._id,
timestamp: Date.now(),
name: "metadata",
},
]);
id: asset._id,
timestamp: Date.now(),
},
]);

return asset;
});
return asset;
}

/**
* Create an asset metadata
*/
public async create(
engineId: string,
model: string,
reference: string,
metadata: JSONObject,
request: KuzzleRequest,
): Promise<KDocument<AssetContent>> {
const assetId = AssetSerializer.id(model, reference);

return lock(`asset:${engineId}:${assetId}`, async () =>
this._create(engineId, model, reference, metadata, request),
);
}

/**
Expand Down
54 changes: 28 additions & 26 deletions lib/modules/device/DeviceService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,35 +192,37 @@ export class DeviceService extends DigitalTwinService {
metadata: Metadata,
request: KuzzleRequest,
): Promise<KDocument<DeviceContent>> {
const device = await this.get(engineId, deviceId, request);
return lock(`device:${engineId}:${deviceId}`, async () => {
const device = await this.get(engineId, deviceId, request);

for (const key in metadata) {
if (key in device._source.metadata) {
device._source.metadata[key] = metadata[key];
for (const key in metadata) {
if (key in device._source.metadata) {
device._source.metadata[key] = metadata[key];
}
}
}

const updatedPayload = await this.app.trigger<EventDeviceUpdateBefore>(
"device-manager:device:update:before",
{ device: device, metadata },
);
const updatedPayload = await this.app.trigger<EventDeviceUpdateBefore>(
"device-manager:device:update:before",
{ device: device, metadata },
);

const updatedDevice = await this.sdk.document.replace<DeviceContent>(
engineId,
InternalCollection.DEVICES,
deviceId,
updatedPayload.device._source,
);
const updatedDevice = await this.sdk.document.replace<DeviceContent>(
engineId,
InternalCollection.DEVICES,
deviceId,
updatedPayload.device._source,
);

await this.app.trigger<EventDeviceUpdateAfter>(
"device-manager:device:update:after",
{
device: updatedDevice,
metadata: updatedPayload.metadata,
},
);
await this.app.trigger<EventDeviceUpdateAfter>(
"device-manager:device:update:after",
{
device: updatedDevice,
metadata: updatedPayload.metadata,
},
);

return updatedDevice;
return updatedDevice;
});
}

/**
Expand Down Expand Up @@ -299,7 +301,7 @@ export class DeviceService extends DigitalTwinService {
request: KuzzleRequest,
): Promise<KDocument<DeviceContent>> {
return lock<KDocument<DeviceContent>>(
`device:update:${deviceId}`,
`device:${engineId}:${deviceId}`,
async () => {
const device = await this.get(engineId, deviceId, request);

Expand Down Expand Up @@ -336,7 +338,7 @@ export class DeviceService extends DigitalTwinService {
deviceId: string,
request: KuzzleRequest,
) {
return lock<void>(`device:delete:${deviceId}`, async () => {
return lock<void>(`device:${engineId}:${deviceId}`, async () => {
const device = await this.get(engineId, deviceId, request);

const promises = [];
Expand Down Expand Up @@ -542,7 +544,7 @@ export class DeviceService extends DigitalTwinService {
asset: KDocument<AssetContent>;
device: KDocument<DeviceContent>;
}> {
return lock(`device:linkAsset:${deviceId}`, async () => {
return lock(`device:${engineId}:${deviceId}`, async () => {
const device = await this.getInternalDevices(deviceId);
const engine = await this.getEngine(engineId);

Expand Down

0 comments on commit 596b8d5

Please sign in to comment.