Skip to content

Commit

Permalink
Fixing readmodel writing for DELETE & UPDATE Descriptor document
Browse files Browse the repository at this point in the history
  • Loading branch information
ecamellini committed Feb 14, 2024
1 parent 983b414 commit ed55b35
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions packages/catalog-readmodel-writer/src/consumerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ export async function handleMessage(
{ "data.id": msg.stream_id, "metadata.version": { $lt: msg.version } },
{
$set: {
"metadata.version": msg.version,
"data.descriptors.$[descriptor].docs.$[doc]": msg.data
.updatedDocument
? fromDocumentV1(msg.data.updatedDocument)
Expand All @@ -109,6 +108,8 @@ export async function handleMessage(
arrayFilters: [
{
"descriptor.id": msg.data.descriptorId,
},
{
"doc.id": msg.data.documentId,
},
],
Expand All @@ -117,29 +118,35 @@ export async function handleMessage(
await eservices.updateOne(
{
"data.id": msg.stream_id,
"metadata.version": { $lt: msg.version },
},
{
$set: {
"data.descriptors.$[descriptor].interface": msg.data.updatedDocument
? fromDocumentV1(msg.data.updatedDocument)
: undefined,
"data.descriptors.$[descriptor].serverUrls": msg.data.serverUrls,
"metadata.version": msg.version,
},
},
{
arrayFilters: [
{
"descriptor.id": msg.data.descriptorId,
$or: [
{ "descriptor.interface": { $exists: true } },
{ "descriptor.interface.id": msg.data.documentId },
],
"descriptor.interface.id": msg.data.documentId,
},
],
}
);
await eservices.updateOne(
{
"data.id": msg.stream_id,
"metadata.version": { $lt: msg.version },
},
{
$set: {
"metadata.version": msg.version,
},
}
);
})
.with(
{ type: "EServiceDeleted" },
Expand Down Expand Up @@ -208,9 +215,6 @@ export async function handleMessage(
id: msg.data.documentId,
},
},
$set: {
"metadata.version": msg.version,
},
},
{
arrayFilters: [
Expand All @@ -228,7 +232,6 @@ export async function handleMessage(
},
$set: {
"data.descriptors.$[descriptor].serverUrls": [],
"metadata.version": msg.version,
},
},
{
Expand All @@ -240,6 +243,14 @@ export async function handleMessage(
],
}
);
await eservices.updateOne(
{ "data.id": msg.stream_id, "metadata.version": { $lt: msg.version } },
{
$set: {
"metadata.version": msg.version,
},
}
);
})
.with(
{ type: "EServiceDescriptorAdded" },
Expand Down

0 comments on commit ed55b35

Please sign in to comment.