Skip to content

Commit

Permalink
Merge pull request #88 from openfoodfacts/87-does-not-deal-with-produ…
Browse files Browse the repository at this point in the history
…ct-deletions

fix: Does not recognise deletions when loading data from events
  • Loading branch information
john-gom authored Sep 13, 2024
2 parents fdbeae6 + a9fe059 commit 1a8dd3a
Show file tree
Hide file tree
Showing 7 changed files with 594 additions and 87 deletions.
2 changes: 1 addition & 1 deletion src/domain/entities/base-product-tag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ export abstract class BaseProductTag {
product: Product;

@Property()
obsolete = false;
obsolete? = false;
}
2 changes: 1 addition & 1 deletion src/domain/entities/product-ingredient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ export class ProductIngredient {
ingredients = new Collection<ProductIngredient>(this);

@Property()
obsolete = false;
obsolete? = false;
}
4 changes: 2 additions & 2 deletions src/domain/entities/product.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ export class Product {
@Property()
ingredientsCount?: number;

// The followign fields are populated by the query service
// The following fields are populated by the query service
@Property()
obsolete = false;
obsolete? = false;

@Property({ type: 'uuid', index: true })
lastUpdateId?: string;
Expand Down
81 changes: 67 additions & 14 deletions src/domain/services/import.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,25 @@ jest.mock('mongodb', () => {
MongoClient: jest.fn(() => ({
connect: jest.fn(),
db: () => {
let index = 0;
return {
collection: () => ({
find: (...args: any) => {
findCalls.push(args);
return {
next: async () => {
return index++ <= mockedProducts.length
? mockedProducts[index - 1]
: null;
},
close: jest.fn(),
};
},
}),
collection: (collectionName) => {
let index = 0;
const productList =
collectionName === 'products' ? mockedProducts : [];
return {
find: (...args: any) => {
findCalls.push(args);
return {
next: async () => {
return index++ <= productList.length
? productList[index - 1]
: null;
},
close: jest.fn(),
};
},
};
},
};
},
close: jest.fn(),
Expand Down Expand Up @@ -392,4 +396,53 @@ describe('importWithFilter', () => {
await Promise.all(imports);
});
});

it('should flag products not in mongodb as deleted', async () => {
await createTestingModule([DomainModule], async (app) => {
const importService = app.get(ImportService);

// GIVEN: An existing product that doesn't exist in MongoDB
const em = app.get(EntityManager);
const productIdToDelete = randomCode();
const productToDelete = em.create(Product, {
code: productIdToDelete,
source: ProductSource.FULL_LOAD,
lastUpdated: new Date(2023, 1, 1),
lastModified: new Date(lastModified * 1000),
});
em.create(ProductIngredientsTag, {
product: productToDelete,
value: 'old_ingredient',
});
await em.flush();

const beforeImport = Date.now();
// WHEN: Doing an incremental import from MongoDB where the id is mentioned
const { products, productIdExisting, productIdNew } = testProducts();
mockMongoDB(products);
await importService.importWithFilter(
{ code: { $in: [productIdExisting, productIdNew, productIdToDelete] } },
ProductSource.EVENT,
);

// THEN: Obsolete flag should get set to null
const deletedProduct = await em.findOne(Product, {
code: productIdToDelete,
});
const updatedProduct = await em.findOne(Product, {
code: productIdExisting,
});
expect(deletedProduct.lastUpdateId).toBe(updatedProduct.lastUpdateId);
expect(deletedProduct.lastUpdated.getTime()).toBeGreaterThanOrEqual(
beforeImport,
);
expect(deletedProduct.source).toBe(ProductSource.EVENT);
expect(updatedProduct.obsolete).toBe(false);

const deletedTag = await em.findOne(ProductIngredientsTag, {
product: deletedProduct,
});
expect(deletedTag.obsolete).toBeNull();
});
});
});
55 changes: 52 additions & 3 deletions src/domain/services/import.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ export class ImportService {
// Flush mikro-orm before switching to native SQL
await this.em.flush();

const inputCodes = filter.code?.$in;
const foundCodes = [];

// Now using postgres to help with transactions
const connection = await sql.reserve();
await connection`CREATE TEMP TABLE product_temp (id int PRIMARY KEY, last_modified timestamptz, data jsonb)`;
Expand All @@ -121,6 +124,8 @@ export class ImportService {
this.logger.debug(`Fetched ${i}`);
}

if (source === ProductSource.EVENT) foundCodes.push(data.code);

// Find the product if it exists
let results =
await connection`select id, last_modified from product where code = ${data.code}`;
Expand Down Expand Up @@ -182,6 +187,27 @@ export class ImportService {
}
await client.close();

// If doing an event import flag all products that weren't found in MongoDB as deleted (obsolete = null)
if (source === ProductSource.EVENT) {
const missingProducts = inputCodes.filter(
(code) => !foundCodes.includes(code),
);
if (missingProducts.length) {
const deletedProducts = await connection`UPDATE product SET
obsolete = NULL,
last_update_id = ${updateId},
last_updated = ${new Date()},
source = ${source}
WHERE code IN ${sql(missingProducts)}
RETURNING id`;

await this.deleteProductTags(
connection,
deletedProducts.map((p) => p.id),
);
}
}

// If doing a full import delete all products that weren't updated and flag all tags as imported
if (source === ProductSource.FULL_LOAD) {
await this.tagService.addLoadedTags(
Expand Down Expand Up @@ -330,9 +356,32 @@ export class ImportService {
}

async deleteOtherProducts(connection: ReservedSql, updateId: string) {
const deleted =
await connection`delete from product where last_update_id != ${updateId} OR last_update_id IS NULL`;
this.logger.debug(`${deleted.count} Products deleted`);
const deletedProducts = await connection`UPDATE product SET
obsolete = NULL,
last_update_id = ${updateId},
last_updated = ${new Date()},
source = ${ProductSource.FULL_LOAD}
WHERE last_update_id != ${updateId} OR last_update_id IS NULL
RETURNING id`;
this.logger.debug(`${deletedProducts.count} Products deleted`);

await this.deleteProductTags(
connection,
deletedProducts.map((p) => p.id),
);
}

private async deleteProductTags(
connection: ReservedSql,
deletedProductIds: any[],
) {
for (const entity of Object.values(ProductTagMap.MAPPED_TAGS)) {
const tableName = this.em.getMetadata(entity).tableName;
await connection`UPDATE ${sql(tableName)} SET obsolete = NULL
where product_id in ${sql(deletedProductIds)}`;
}
await connection`UPDATE product_ingredient SET obsolete = NULL
where product_id in ${sql(deletedProductIds)}`;
}

// Make sure to pause redis before calling this
Expand Down
Loading

0 comments on commit 1a8dd3a

Please sign in to comment.