Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Does not recognise deletions when loading data from events #88

Merged
merged 5 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/domain/entities/base-product-tag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ export abstract class BaseProductTag {
product: Product;

@Property()
obsolete = false;
obsolete? = false;
}
2 changes: 1 addition & 1 deletion src/domain/entities/product-ingredient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ export class ProductIngredient {
ingredients = new Collection<ProductIngredient>(this);

@Property()
obsolete = false;
obsolete? = false;
}
4 changes: 2 additions & 2 deletions src/domain/entities/product.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ export class Product {
@Property()
ingredientsCount?: number;

// The followign fields are populated by the query service
// The following fields are populated by the query service
@Property()
obsolete = false;
obsolete? = false;

@Property({ type: 'uuid', index: true })
lastUpdateId?: string;
Expand Down
81 changes: 67 additions & 14 deletions src/domain/services/import.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,25 @@ jest.mock('mongodb', () => {
MongoClient: jest.fn(() => ({
connect: jest.fn(),
db: () => {
let index = 0;
return {
collection: () => ({
find: (...args: any) => {
findCalls.push(args);
return {
next: async () => {
return index++ <= mockedProducts.length
? mockedProducts[index - 1]
: null;
},
close: jest.fn(),
};
},
}),
collection: (collectionName) => {
let index = 0;
const productList =
collectionName === 'products' ? mockedProducts : [];
return {
find: (...args: any) => {
findCalls.push(args);
return {
next: async () => {
return index++ <= productList.length
? productList[index - 1]
: null;
},
close: jest.fn(),
};
},
};
},
};
},
close: jest.fn(),
Expand Down Expand Up @@ -392,4 +396,53 @@ describe('importWithFilter', () => {
await Promise.all(imports);
});
});

it('should flag products not in mongodb as deleted', async () => {
await createTestingModule([DomainModule], async (app) => {
const importService = app.get(ImportService);

// GIVEN: An existing product that doesn't exist in MongoDB
const em = app.get(EntityManager);
const productIdToDelete = randomCode();
const productToDelete = em.create(Product, {
code: productIdToDelete,
source: ProductSource.FULL_LOAD,
lastUpdated: new Date(2023, 1, 1),
lastModified: new Date(lastModified * 1000),
});
em.create(ProductIngredientsTag, {
product: productToDelete,
value: 'old_ingredient',
});
await em.flush();

const beforeImport = Date.now();
// WHEN: Doing an incremental import from MongoDB where the id is mentioned
const { products, productIdExisting, productIdNew } = testProducts();
mockMongoDB(products);
await importService.importWithFilter(
{ code: { $in: [productIdExisting, productIdNew, productIdToDelete] } },
ProductSource.EVENT,
);

// THEN: Obsolete flag should get set to null
const deletedProduct = await em.findOne(Product, {
code: productIdToDelete,
});
const updatedProduct = await em.findOne(Product, {
code: productIdExisting,
});
expect(deletedProduct.lastUpdateId).toBe(updatedProduct.lastUpdateId);
expect(deletedProduct.lastUpdated.getTime()).toBeGreaterThanOrEqual(
beforeImport,
);
expect(deletedProduct.source).toBe(ProductSource.EVENT);
expect(updatedProduct.obsolete).toBe(false);

const deletedTag = await em.findOne(ProductIngredientsTag, {
product: deletedProduct,
});
expect(deletedTag.obsolete).toBeNull();
});
});
});
55 changes: 52 additions & 3 deletions src/domain/services/import.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ export class ImportService {
// Flush mikro-orm before switching to native SQL
await this.em.flush();

const inputCodes = filter.code?.$in;
const foundCodes = [];

// Now using postgres to help with transactions
const connection = await sql.reserve();
await connection`CREATE TEMP TABLE product_temp (id int PRIMARY KEY, last_modified timestamptz, data jsonb)`;
Expand All @@ -121,6 +124,8 @@ export class ImportService {
this.logger.debug(`Fetched ${i}`);
}

if (source === ProductSource.EVENT) foundCodes.push(data.code);

// Find the product if it exists
let results =
await connection`select id, last_modified from product where code = ${data.code}`;
Expand Down Expand Up @@ -182,6 +187,27 @@ export class ImportService {
}
await client.close();

// If doing an event import flag all products that weren't found in MongoDB as deleted (obsolete = null)
if (source === ProductSource.EVENT) {
const missingProducts = inputCodes.filter(
(code) => !foundCodes.includes(code),
);
if (missingProducts.length) {
const deletedProducts = await connection`UPDATE product SET
obsolete = NULL,
last_update_id = ${updateId},
last_updated = ${new Date()},
source = ${source}
WHERE code IN ${sql(missingProducts)}
RETURNING id`;

await this.deleteProductTags(
connection,
deletedProducts.map((p) => p.id),
);
}
}

// If doing a full import delete all products that weren't updated and flag all tags as imported
if (source === ProductSource.FULL_LOAD) {
await this.tagService.addLoadedTags(
Expand Down Expand Up @@ -330,9 +356,32 @@ export class ImportService {
}

async deleteOtherProducts(connection: ReservedSql, updateId: string) {
const deleted =
await connection`delete from product where last_update_id != ${updateId} OR last_update_id IS NULL`;
this.logger.debug(`${deleted.count} Products deleted`);
const deletedProducts = await connection`UPDATE product SET
obsolete = NULL,
last_update_id = ${updateId},
last_updated = ${new Date()},
source = ${ProductSource.FULL_LOAD}
WHERE last_update_id != ${updateId} OR last_update_id IS NULL
RETURNING id`;
this.logger.debug(`${deletedProducts.count} Products deleted`);

await this.deleteProductTags(
connection,
deletedProducts.map((p) => p.id),
);
}

private async deleteProductTags(
connection: ReservedSql,
deletedProductIds: any[],
) {
for (const entity of Object.values(ProductTagMap.MAPPED_TAGS)) {
const tableName = this.em.getMetadata(entity).tableName;
await connection`UPDATE ${sql(tableName)} SET obsolete = NULL
where product_id in ${sql(deletedProductIds)}`;
}
await connection`UPDATE product_ingredient SET obsolete = NULL
where product_id in ${sql(deletedProductIds)}`;
}

// Make sure to pause redis before calling this
Expand Down
Loading
Loading