diff --git a/.env b/.env index f5c0aba..930894e 100644 --- a/.env +++ b/.env @@ -1,6 +1,7 @@ -COMPOSE_FILE=docker-compose.yml;docker/dev.yml +COMPOSE_FILE_RUN=docker-compose.yml,docker-compose-run.yml +COMPOSE_FILE=${COMPOSE_FILE_RUN},docker/dev.yml COMPOSE_PROJECT_NAME=off-query -COMPOSE_PATH_SEPARATOR=; +COMPOSE_PATH_SEPARATOR=, RESTART_POLICY=no TAG=latest QUERY_PORT=127.0.0.1:5511 @@ -11,6 +12,7 @@ POSTGRES_USER=productopener POSTGRES_PASSWORD=productopener POSTGRES_SHM_SIZE=256m COMMON_NET_NAME=off_shared_network +# Note when running in a container the following settings are changed to use the internal docker network MONGO_URI=mongodb://localhost:27017 REDIS_URL=redis://localhost:6379 # Log levels are: debug, verbose, log (default), warn, error diff --git a/.github/workflows/container-deploy.yml b/.github/workflows/container-deploy.yml index e0b6bb5..27132cd 100644 --- a/.github/workflows/container-deploy.yml +++ b/.github/workflows/container-deploy.yml @@ -133,7 +133,7 @@ jobs: echo "DOCKER_CLIENT_TIMEOUT=120" >> .env echo "COMPOSE_HTTP_TIMEOUT=120" >> .env echo "COMPOSE_PROJECT_NAME=off-query" >> .env - echo "COMPOSE_PATH_SEPARATOR=;" >> .env + echo "COMPOSE_PATH_SEPARATOR=," >> .env echo "RESTART_POLICY=always" >> .env echo "COMPOSE_FILE=docker-compose.yml" >> .env echo "TAG=sha-${{ github.sha }}" >> .env diff --git a/Makefile b/Makefile index c08a4d3..1a50e83 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,10 @@ endif up: run_deps docker compose up --build --wait +# Called by other projects to start this project as a dependency +run: run_deps + COMPOSE_FILE=${COMPOSE_FILE_RUN} docker compose up -d + # This task starts a Postgres database in Docker and then prepares the local environment for development dev: run_deps docker compose up --wait query_postgres diff --git a/docker-compose-run.yml b/docker-compose-run.yml new file mode 100644 index 0000000..3c1ef64 --- /dev/null +++ b/docker-compose-run.yml @@ -0,0 +1,6 @@ +services: + query: + environment: + # Use shared-services MongoDB and REDIS + - MONGO_URI=mongodb://mongodb:27017 + - REDIS_URL=redis://redis:6379 diff --git a/docker-compose.yml b/docker-compose.yml index 7f40b1e..dc069b2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -8,6 +8,9 @@ services: - POSTGRES_PASSWORD - POSTGRES_DB shm_size: ${POSTGRES_SHM_SIZE} + # Always expose port for viewer access + ports: + - "${POSTGRES_PORT:-5512}:5432" volumes: - dbdata:/var/lib/postgresql/data networks: diff --git a/docker/dev.yml b/docker/dev.yml index 83cd403..ee4fcb5 100644 --- a/docker/dev.yml +++ b/docker/dev.yml @@ -1,9 +1,4 @@ services: - query_postgres: - # Expose port locally for testing purposes. - ports: - - "${POSTGRES_PORT:-5512}:5432" - query: build: . image: openfoodfacts-query:dev diff --git a/docs/decisions/event-aggregation.md b/docs/decisions/event-aggregation.md new file mode 100644 index 0000000..44cea2e --- /dev/null +++ b/docs/decisions/event-aggregation.md @@ -0,0 +1,56 @@ +# Product Event Aggregation + +## Context and Problem Statement + +We would like to be able to support a variety of queries on events that have been recorded on products over time. For example, for the producer's dashboard we want to be able to show the number of edits and distinct products updated over a month. + +## Decision Drivers + +* Queries should run quickly +* Database space consumed should not be excessive +* Data should be reasonably up to date, i.e. any ETL / ELT process should keep up with the rate at which events are being created + +## Considered Options + +* Query the raw event tables +* Create specific aggregate tables for each aggregate dimension +* Create a relational model of events against products + +## Decision Outcome + +Chosen option: "Create a relational model of events against products", because it offers the best compromise in terms of acceptable query performance with minimal storage space and does not require new tables to be created for every possible aggregate dimension. + +### Consequences + +In general we should try and map things to a relational model, but only at the most granular level of detail that makes sense, e.g. total count of actions in one day. + +It has been observed that PostgreSQL performs much better when dealing with small record sizes, so text fields should be normalised where possible so that an integer id can be stored instead. + +## Pros and Cons of the Options + +### Query the raw event tables + +In this option the raw events are simply loaded into a table and then views are created to query this table, joining to the product table to obtain the required dimension. + +* Good: Only the raw events are being stored +* Good: Import of data is as fast as possible +* Bad: Query performance is poor. Even with indexing typical queries were taking around 2 minutes + +### Create specific aggregate tables for each aggregate dimension + +With this option the raw events would be ingested and then a follow-up process would run to aggregate those events by the required dimension, e.g. for producer's dashboard this would be aggregating by day, action and owner with a total update count plus a count of distinct products updated. + +* Good: Queries run very quickly (sub 100ms) +* Bad: Additional tables, processes and storage need to be assigned for each new query dimension +* Bad: It is difficult to incrementally refresh tables where distinct counts are included (as cannot work out the new distinct count from the combination of new events plus existing distinct count) + +### Create a relational model of events against products + +With this option the raw events would be ingested and then a follow-up process would run to just aggregate those events by action, contributor and day against the product. Different views can then be provided to query this data, joining to the product to obtain the required dimension. + +With this option it was important to keep the size of the relational table as small as possible, so an enumeration was used for the action and the contributors were normalised into a separate table so that only the id needed to be stored in the event table. + +* Neutral: Queries performance is acceptable (sub 1s) +* Good: Queries to support different dimensions do not require addition storage or import processes +* Good: Aggregated counts are not distinct, so can be refreshed incrementally + diff --git a/docs/decisions/template.md b/docs/decisions/template.md new file mode 100644 index 0000000..77097eb --- /dev/null +++ b/docs/decisions/template.md @@ -0,0 +1,69 @@ +# {short title of solved problem and solution} + +## Context and Problem Statement + +{Describe the context and problem statement, e.g., in free form using two to three sentences or in the form of an illustrative story. + You may want to articulate the problem in form of a question and add links to collaboration boards or issue management systems.} + + +## Decision Drivers + +* {decision driver 1, e.g., a force, facing concern, …} +* {decision driver 2, e.g., a force, facing concern, …} +* … + +## Considered Options + +* {title of option 1} +* {title of option 2} +* {title of option 3} +* … + +## Decision Outcome + +Chosen option: "{title of option 1}", because +{justification. e.g., only option, which meets k.o. criterion decision driver | which resolves force {force} | … | comes out best (see below)}. + + +### Consequences + +{Provide detail on the implications of making this decision and how any forseen problems can be mitigated} + + +### Confirmation + +{Describe how the implementation of/compliance with the ADR is confirmed. E.g., by a review or an ArchUnit test. + Although we classify this element as optional, it is included in most ADRs.} + + +## Pros and Cons of the Options + +### {title of option 1} + + +{example | description | pointer to more information | …} + +* Good: {argument a} +* Good: {argument b} + +* Neutral: {argument c} +* Bad: {argument d} +* … + +### {title of other option} + +{example | description | pointer to more information | …} + +* Good: {argument a} +* Good: {argument b} +* Neutral: {argument c} +* Bad: {argument d} +* … + + +## More Information + +{You might want to provide additional evidence/confidence for the decision outcome here and/or + document the team agreement on the decision and/or + define when/how this decision the decision should be realized and if/when it should be re-visited. +Links to other decisions and resources might appear here as well.} \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index 0173bf2..28619fd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "@nestjs/platform-express": "^9.0.0", "@nestjs/schedule": "^4.0.0", "@nestjs/terminus": "^10.1.1", + "dotenv": "^16.4.5", "fast-deep-equal": "^3.1.3", "id128": "^1.6.6", "mongodb": "^5.8.0", @@ -1584,6 +1585,17 @@ } } }, + "node_modules/@mikro-orm/core/node_modules/dotenv": { + "version": "16.3.1", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.3.1.tgz", + "integrity": "sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ==", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/motdotla/dotenv?sponsor=1" + } + }, "node_modules/@mikro-orm/core/node_modules/fs-extra": { "version": "11.1.1", "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.1.1.tgz", @@ -4407,14 +4419,14 @@ } }, "node_modules/dotenv": { - "version": "16.3.1", - "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.3.1.tgz", - "integrity": "sha512-IPzF4w4/Rd94bA9imS68tZBaYyBWSCE47V1RGuMrB94iyTOIEwRmVL2x/4An+6mETpLrKJ5hQkB8W4kFAadeIQ==", + "version": "16.4.5", + "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.4.5.tgz", + "integrity": "sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg==", "engines": { "node": ">=12" }, "funding": { - "url": "https://github.com/motdotla/dotenv?sponsor=1" + "url": "https://dotenvx.com" } }, "node_modules/ee-first": { diff --git a/package.json b/package.json index 020b204..81a086e 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "@nestjs/platform-express": "^9.0.0", "@nestjs/schedule": "^4.0.0", "@nestjs/terminus": "^10.1.1", + "dotenv": "^16.4.5", "fast-deep-equal": "^3.1.3", "id128": "^1.6.6", "mongodb": "^5.8.0", diff --git a/src/app.controller.spec.ts b/src/app.controller.spec.ts new file mode 100644 index 0000000..9572d1b --- /dev/null +++ b/src/app.controller.spec.ts @@ -0,0 +1,36 @@ +import { createTestingModule, randomCode } from '../test/test.helper'; +import { AppController } from './app.controller'; +import { AppModule } from './app.module'; +import sql from './db'; +import { ImportService } from './domain/services/import.service'; + +describe('productupdate', () => { + it('should import message but not refresh products', async () => { + await createTestingModule([AppModule], async (app) => { + const importService = app.get(ImportService); + const importSpy = jest + .spyOn(importService, 'importWithFilter') + .mockImplementation(); + + const code1 = randomCode(); + const updates = [ + { + code: code1, + rev: 1, + }, + ]; + + const appController = app.get(AppController); + await appController.addProductUpdates(updates); + + // Then the import is not called + expect(importSpy).not.toHaveBeenCalled(); + + // Update events are created + const events = + await sql`SELECT * FROM product_update_event WHERE message->>'code' = ${code1}`; + + expect(events).toHaveLength(1); + }); + }); +}); diff --git a/src/app.controller.ts b/src/app.controller.ts index 6288cc2..a056052 100644 --- a/src/app.controller.ts +++ b/src/app.controller.ts @@ -1,12 +1,16 @@ import { Body, Controller, Get, Post, Query, All } from '@nestjs/common'; import { ImportService } from './domain/services/import.service'; import { QueryService } from './domain/services/query.service'; +import { RedisListener } from './domain/services/redis.listener'; +import { MessagesService } from './domain/services/messages.service'; @Controller() export class AppController { constructor( private readonly importService: ImportService, private readonly queryService: QueryService, + private readonly redisListener: RedisListener, + private readonly messagesService: MessagesService, ) {} @Get('importfrommongo') @@ -19,7 +23,10 @@ export class AppController { @Get('scheduledimportfrommongo') async scheduledImportFromMongo() { - await this.importService.scheduledImportFromMongo(); + // Pause redis while doing a scheduled import + await this.redisListener.pauseAndRun( + this.importService.scheduledImportFromMongo, + ); } parseBoolean(value) { @@ -40,4 +47,15 @@ export class AppController { async select(@Body() body: any, @Query('obsolete') obsolete) { return await this.queryService.select(body, this.parseBoolean(obsolete)); } + + // Temporary code for initial import + messageId = 0; + @Post('productupdates') + async addProductUpdates(@Body() updates: any[]) { + const messages = []; + for (const update of updates) { + messages.push({ id: `0-${this.messageId++}`, message: update }); + } + await this.messagesService.create(messages, true); + } } diff --git a/src/constants.ts b/src/constants.ts index 2a1500e..c5493c7 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -1 +1,3 @@ export const SCHEMA = 'query'; +export const VIEW_USER = 'viewer'; +export const VIEW_PASSWORD = 'off'; diff --git a/src/db.ts b/src/db.ts index c988f59..9406193 100644 --- a/src/db.ts +++ b/src/db.ts @@ -1,5 +1,4 @@ import postgres from 'postgres'; -import { SCHEMA } from './constants'; const sql = postgres({ host: process.env.POSTGRES_HOST, diff --git a/src/domain/domain.module.spec.ts b/src/domain/domain.module.spec.ts new file mode 100644 index 0000000..85af9ae --- /dev/null +++ b/src/domain/domain.module.spec.ts @@ -0,0 +1,24 @@ +import { createTestingModule } from '../../test/test.helper'; +import { DomainModule } from './domain.module'; +import { ImportService } from './services/import.service'; +import { RedisListener } from './services/redis.listener'; + +describe('refreshProducts', () => { + it('should pause Redis while doing a scheduled reload', async () => { + await createTestingModule([DomainModule], async (app) => { + const importService = app.get(ImportService); + const redisListener = app.get(RedisListener); + jest.spyOn(importService, 'importFromMongo').mockImplementation(); + const redisStopSpy = jest + .spyOn(redisListener, 'stopRedisConsumer') + .mockImplementation(); + const redisStartSpy = jest + .spyOn(redisListener, 'startRedisConsumer') + .mockImplementation(); + + await app.get(DomainModule).refreshProducts(); + expect(redisStopSpy).toHaveBeenCalledTimes(1); + expect(redisStartSpy).toHaveBeenCalledTimes(1); + }); + }); +}); diff --git a/src/domain/domain.module.ts b/src/domain/domain.module.ts index e603134..34d15c9 100644 --- a/src/domain/domain.module.ts +++ b/src/domain/domain.module.ts @@ -6,26 +6,43 @@ import { TagService } from './services/tag.service'; import { SettingsService } from './services/settings.service'; import { EntityManager, RequestContext } from '@mikro-orm/core'; import { Cron, ScheduleModule } from '@nestjs/schedule'; +import { MessagesService } from './services/messages.service'; +import { RedisListener } from './services/redis.listener'; @Module({ imports: [MikroOrmModule.forRoot(), ScheduleModule.forRoot()], - providers: [ImportService, QueryService, TagService, SettingsService], - exports: [ImportService, QueryService, TagService, SettingsService], + providers: [ + ImportService, + QueryService, + TagService, + SettingsService, + MessagesService, + RedisListener, + ], + exports: [ + ImportService, + QueryService, + TagService, + SettingsService, + RedisListener, + MessagesService, + ], }) export class DomainModule implements OnModuleInit, OnModuleDestroy { constructor( private readonly em: EntityManager, private readonly importService: ImportService, + private readonly redisListener: RedisListener, ) {} async onModuleInit() { RequestContext.create(this.em, () => { - this.importService.startRedisConsumer(); + this.redisListener.startRedisConsumer(); }); } async onModuleDestroy() { - await this.importService.stopRedisConsumer(); + await this.redisListener.stopRedisConsumer(); } // Refresh the PostgreSQL database from MongoDB at 2am every night @@ -35,7 +52,9 @@ export class DomainModule implements OnModuleInit, OnModuleDestroy { // The request context creates a separate entity manager instance // which avoids clashes with other requests await RequestContext.createAsync(this.em, async () => { - await this.importService.scheduledImportFromMongo(); + await this.redisListener.pauseAndRun(() => + this.importService.scheduledImportFromMongo(), + ); }); } } diff --git a/src/domain/entities/product.ts b/src/domain/entities/product.ts index e520477..7b8a298 100644 --- a/src/domain/entities/product.ts +++ b/src/domain/entities/product.ts @@ -23,10 +23,10 @@ export class Product { @Property({ columnType: 'timestamptz' }) lastModified?: Date; - @Property() + @Property({ index: true }) creator?: string; - @Property() + @Property({ index: true }) ownersTags?: string; @Property() @@ -47,6 +47,9 @@ export class Product { @Property() source?: ProductSource; + + @Property() + revision?: number; } export const MAPPED_FIELDS = [ @@ -58,4 +61,5 @@ export const MAPPED_FIELDS = [ 'ingredients_n', 'ingredients_without_ciqual_codes_n', 'ingredients', + 'rev', ]; diff --git a/src/domain/entities/settings.ts b/src/domain/entities/settings.ts deleted file mode 100644 index 40de5ca..0000000 --- a/src/domain/entities/settings.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { Entity, PrimaryKey, Property } from '@mikro-orm/core'; - -@Entity() -export class Settings { - @PrimaryKey() - id = 1; - - @Property({ columnType: 'timestamptz' }) - lastModified?: Date; - - @Property() - lastMessageId?: string; -} diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index a63ac84..0e380a1 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -9,9 +9,6 @@ import { LoadedTag } from '../entities/loaded-tag'; import { ProductTagMap } from '../entities/product-tag-map'; import { ProductSource } from '../enums/product-source'; import { SettingsService } from './settings.service'; -import { createClient } from 'redis'; -import { GenericContainer } from 'testcontainers'; -import { setTimeout } from 'timers/promises'; import { ProductIngredient } from '../entities/product-ingredient'; const lastModified = 1692032161; @@ -25,6 +22,7 @@ function testProducts() { code: productIdNew, last_modified_t: lastModified, ingredients_tags: ['test'], + rev: 1, }, { // This one will already exist @@ -71,7 +69,6 @@ function mockMongoDB(productList) { } // Import tests can sometimes take a little time in GitHub -// Plus Allow a little time for the testcontainer to start jest.setTimeout(300000); describe('importFromMongo', () => { @@ -130,6 +127,7 @@ describe('importFromMongo', () => { }); expect(ingredientsNew).toHaveLength(1); expect(ingredientsNew[0].value).toBe('test'); + expect(productNew.revision).toBe(1); const ingredientsExisting = await em.find(ProductIngredientsTag, { product: productExisting, @@ -335,8 +333,6 @@ describe('scheduledImportFromMongo', () => { it('should do a full import if loaded tags arent complete', async () => { await createTestingModule([DomainModule], async (app) => { const importService = app.get(ImportService); - const redisStart = jest.spyOn(importService, 'startRedisConsumer'); - const redisStop = jest.spyOn(importService, 'stopRedisConsumer'); jest .spyOn(app.get(TagService), 'getLoadedTags') .mockImplementation(async () => []); @@ -346,10 +342,6 @@ describe('scheduledImportFromMongo', () => { await importService.scheduledImportFromMongo(); expect(importSpy).toHaveBeenCalledTimes(1); expect(importSpy.mock.calls[0][0]).toBeUndefined(); - - // Should pause redis during import - expect(redisStop).toHaveBeenCalled(); - expect(redisStart).toHaveBeenCalled(); }); }); @@ -401,60 +393,3 @@ describe('importWithFilter', () => { }); }); }); - -describe('receiveMessages', () => { - it('should call importwithfilter when a message is received', async () => { - await createTestingModule([DomainModule], async (app) => { - // GIVEN: Redis is running - const redis = await new GenericContainer('redis') - .withExposedPorts(6379) - .start(); - const redisUrl = `redis://localhost:${redis.getMappedPort(6379)}`; - const settings = app.get(SettingsService); - jest.spyOn(settings, 'getRedisUrl').mockImplementation(() => redisUrl); - - // And lastmessageid is zero - await settings.setLastMessageId('0'); - const importService = app.get(ImportService); - const importSpy = jest - .spyOn(importService, 'importWithFilter') - .mockImplementation(); - await importService.startRedisConsumer(); - - const client = createClient({ url: redisUrl }); - await client.connect(); - try { - // When: A message is sent - const messageId = await client.xAdd('product_updates_off', '*', { - code: 'TEST1', - }); - - // Wait for message to be delivered - await setTimeout(10); - - // Then the import is called - expect(importSpy).toHaveBeenCalledTimes(1); - expect(await settings.getLastMessageId()).toBe(messageId); - - // If a new message is added - importSpy.mockClear(); - await client.xAdd('product_updates_off', '*', { - code: 'TEST2', - }); - - // Wait for message to be delivered - await setTimeout(10); - - // Then import is called again but only with the new code - expect(importSpy).toHaveBeenCalledTimes(1); - const codes = importSpy.mock.calls[0][0].code.$in; - expect(codes).toHaveLength(1); - expect(codes[0]).toBe('TEST2'); - } finally { - await client.quit(); - await importService.stopRedisConsumer(); - await redis.stop(); - } - }); - }); -}); diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 9e502c6..91351aa 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -5,7 +5,6 @@ import { MongoClient } from 'mongodb'; import { EntityManager } from '@mikro-orm/postgresql'; import { TagService } from './tag.service'; import { ProductTagMap } from '../entities/product-tag-map'; -import { createClient, commandOptions } from 'redis'; import { ProductSource } from '../enums/product-source'; import equal from 'fast-deep-equal'; import { SettingsService } from './settings.service'; @@ -16,7 +15,6 @@ import { SerializableParameter } from 'postgres'; @Injectable() export class ImportService { private logger = new Logger(ImportService.name); - private client: any; // Don't strongly type here is it is really verbose constructor( private readonly em: EntityManager, @@ -39,6 +37,8 @@ export class ImportService { return; } this.importRunning = true; + const source = + from != null ? ProductSource.INCREMENTAL_LOAD : ProductSource.FULL_LOAD; try { // If the from parameter is supplied but it is empty then obtain the most // recent modified time from the database and query MongoDB for products @@ -53,11 +53,7 @@ export class ImportService { this.logger.debug(`Starting import from ${from}`); } - const latestModified = await this.importWithFilter( - filter, - from ? ProductSource.INCREMENTAL_LOAD : ProductSource.FULL_LOAD, - skip, - ); + const latestModified = await this.importWithFilter(filter, source, skip); if (latestModified) { await this.settings.setLastModified(new Date(latestModified)); } @@ -120,6 +116,11 @@ export class ImportService { i++; if (skip && i < skip) continue; + + if (!(i % this.importLogInterval)) { + this.logger.debug(`Fetched ${i}`); + } + // Find the product if it exists let results = await connection`select id, last_modified from product where code = ${data.code}`; @@ -174,9 +175,6 @@ export class ImportService { await this.applyProductChange(connection, obsolete, source, updateId); await connection`begin`; } - if (!(i % this.importLogInterval)) { - this.logger.debug(`Updated ${i}`); - } } await this.applyProductChange(connection, obsolete, source, updateId); await cursor.close(); @@ -223,50 +221,20 @@ export class ImportService { last_modified = tp.last_modified, last_update_id = ${updateId}, last_updated = ${new Date()}, - source = ${source} + source = ${source}, + revision = (tp.data->>'rev')::int FROM product_temp tp WHERE product.id = tp.id`; this.logger.debug(`Updated ${productResults.count} products`); - // Fix ingredients - let logText = `Updated ingredients`; - const deleted = await connection`delete from product_ingredient - where product_id in (select id from product_temp)`; - logText += ` deleted ${deleted.count},`; - const results = await connection`insert into product_ingredient ( - product_id, - sequence, - id, - ciqual_food_code, - ingredient_text, - percent, - percent_min, - percent_max, - percent_estimate, - data, - obsolete - ) - select - product.id, - ordinality, - tag.value->>'id', - tag.value->>'ciqual_food_code', - tag.value->>'ingredient_text', - tag.value->>'percent', - (tag.value->>'percent_min')::numeric, - (tag.value->>'percent_max')::numeric, - (tag.value->>'percent_estimate')::numeric, - tag.value->'ingredients', - ${obsolete} - from product_temp product - cross join jsonb_array_elements(data->'ingredients') with ordinality tag`; - let affectedRows = results.count; - logText += ` inserted ${affectedRows}`; - while (affectedRows > 0) { + if (productResults.count) { + // Fix ingredients + let logText = `Updated ingredients`; + const deleted = await connection`delete from product_ingredient + where product_id in (select id from product_temp)`; + logText += ` deleted ${deleted.count},`; const results = await connection`insert into product_ingredient ( product_id, - parent_product_id, - parent_sequence, sequence, id, ciqual_food_code, @@ -279,10 +247,8 @@ export class ImportService { obsolete ) select - pi.product_id, - pi.product_id, - pi.sequence, - pi.sequence || '.' || ordinality, + product.id, + ordinality, tag.value->>'id', tag.value->>'ciqual_food_code', tag.value->>'ingredient_text', @@ -292,37 +258,73 @@ export class ImportService { (tag.value->>'percent_estimate')::numeric, tag.value->'ingredients', ${obsolete} - from product_ingredient pi - join product_temp product on product.id = pi.product_id - cross join json_array_elements(pi.data) with ordinality tag - WHERE pi.data IS NOT NULL - AND NOT EXISTS (SELECT * FROM product_ingredient pi2 WHERE pi2.parent_product_id = pi.product_id AND pi2.parent_sequence = pi.sequence)`; - affectedRows = results.count; - logText += ` > ${affectedRows}`; - } - this.logger.debug(logText + ' rows'); + from product_temp product + cross join jsonb_array_elements(data->'ingredients') with ordinality tag`; + let affectedRows = results.count; + logText += ` inserted ${affectedRows}`; + while (affectedRows > 0) { + const results = await connection`insert into product_ingredient ( + product_id, + parent_product_id, + parent_sequence, + sequence, + id, + ciqual_food_code, + ingredient_text, + percent, + percent_min, + percent_max, + percent_estimate, + data, + obsolete + ) + select + pi.product_id, + pi.product_id, + pi.sequence, + pi.sequence || '.' || ordinality, + tag.value->>'id', + tag.value->>'ciqual_food_code', + tag.value->>'ingredient_text', + tag.value->>'percent', + (tag.value->>'percent_min')::numeric, + (tag.value->>'percent_max')::numeric, + (tag.value->>'percent_estimate')::numeric, + tag.value->'ingredients', + ${obsolete} + from product_ingredient pi + join product_temp product on product.id = pi.product_id + cross join json_array_elements(pi.data) with ordinality tag + WHERE pi.data IS NOT NULL + AND NOT EXISTS (SELECT * FROM product_ingredient pi2 WHERE pi2.parent_product_id = pi.product_id AND pi2.parent_sequence = pi.sequence)`; + affectedRows = results.count; + logText += ` > ${affectedRows}`; + } + this.logger.debug(logText + ' rows'); - for (const [tag, entity] of Object.entries(ProductTagMap.MAPPED_TAGS)) { - let logText = `Updated ${tag}`; - // Get the underlying table name for the entity - const tableName = this.em.getMetadata(entity).tableName; + for (const [tag, entity] of Object.entries(ProductTagMap.MAPPED_TAGS)) { + let logText = `Updated ${tag}`; + // Get the underlying table name for the entity + const tableName = this.em.getMetadata(entity).tableName; - // Delete existing tags for products that were imported on this run - const deleted = await connection`delete from ${sql(tableName)} - where product_id in (select id from product_temp)`; - logText += ` deleted ${deleted.count},`; + // Delete existing tags for products that were imported on this run + const deleted = await connection`delete from ${sql(tableName)} + where product_id in (select id from product_temp)`; + logText += ` deleted ${deleted.count},`; - // Add tags back in with the updated information - const results = await connection`insert into ${sql( - tableName, - )} (product_id, value, obsolete) - select DISTINCT id, tag.value, ${obsolete} from product_temp - cross join jsonb_array_elements_text(data->'${sql.unsafe(tag)}') tag`; + // Add tags back in with the updated information + const results = await connection`insert into ${sql( + tableName, + )} (product_id, value, obsolete) + select DISTINCT id, tag.value, ${obsolete} from product_temp + cross join jsonb_array_elements_text(data->'${sql.unsafe(tag)}') tag`; - logText += ` inserted ${results.count} rows`; + logText += ` inserted ${results.count} rows`; - this.logger.debug(logText); + this.logger.debug(logText); + } } + await connection`truncate table product_temp`; await connection`commit`; } @@ -333,88 +335,18 @@ export class ImportService { this.logger.debug(`${deleted.count} Products deleted`); } + // Make sure to pause redis before calling this async scheduledImportFromMongo() { - // Pause redis while doing a scheduled import - await this.stopRedisConsumer(); - - try { - if ( - equal( - Object.keys(ProductTagMap.MAPPED_TAGS).sort(), - (await this.tagService.getLoadedTags()).sort(), - ) - ) { - // Do an incremental load if all tags are already loaded - await this.importFromMongo(''); - } else { - await this.importFromMongo(); - } - } finally { - // Resume redis after import - await this.startRedisConsumer(); - } - } - - async startRedisConsumer() { - const redisUrl = this.settings.getRedisUrl(); - if (!redisUrl) return; - this.client = createClient({ url: redisUrl }); - this.client.on('error', (err) => this.logger.error(err)); - await this.client.connect(); - this.receiveMessages(); - } - - async stopRedisConsumer() { - if (this.client && this.client.isOpen) await this.client.quit(); - } - - async receiveMessages() { - const lastMessageId = await this.settings.getLastMessageId(); - if (!this.client.isOpen) return; - this.client - .xRead( - commandOptions({ - isolated: true, - }), - [ - // XREAD can read from multiple streams, starting at a - // different ID for each... - { - key: 'product_updates_off', - id: lastMessageId, - }, - ], - { - // Read 1000 entry at a time, block for 5 seconds if there are none. - COUNT: 1000, - BLOCK: 5000, - }, + if ( + equal( + Object.keys(ProductTagMap.MAPPED_TAGS).sort(), + (await this.tagService.getLoadedTags()).sort(), ) - .then(async (keys) => { - if (keys?.length) { - const messages = keys[0].messages; - if (messages?.length) { - /** Message looks like this: - { - code: "0850026029062", - flavor: "off", - user_id: "stephane", - action: "updated", - comment: "Modification : Remove changes", - diffs: "{\"fields\":{\"change\":[\"categories\"],\"delete\":[\"product_name\",\"product_name_es\"]}}", - } - */ - const productCodes = messages.map((m) => m.message.code); - const filter = { code: { $in: productCodes } }; - await this.importWithFilter(filter, ProductSource.EVENT); - await this.settings.setLastMessageId( - messages[messages.length - 1].id, - ); - } - } - setTimeout(() => { - this.receiveMessages(); - }, 0); - }); + ) { + // Do an incremental load if all tags are already loaded + await this.importFromMongo(''); + } else { + await this.importFromMongo(); + } } } diff --git a/src/domain/services/messages.service.spec.ts b/src/domain/services/messages.service.spec.ts new file mode 100644 index 0000000..935e578 --- /dev/null +++ b/src/domain/services/messages.service.spec.ts @@ -0,0 +1,394 @@ +import { createTestingModule, randomCode } from '../../../test/test.helper'; +import sql from '../../db'; +import { DomainModule } from '../domain.module'; +import { ImportService } from './import.service'; +import { MessagesService } from './messages.service'; + +describe('messageTime', () => { + it('should return a date from a message id', () => { + const time = Date.now() - 1000; + const date = MessagesService.messageTime({ id: `${time}-0` }); + expect(date.getTime()).toBe(time); + }); + it('should return the current date for an invalid message id', () => { + const now = Date.now(); + const date = MessagesService.messageTime({ id: 'invalid' }); + expect(date.getTime()).toBeGreaterThanOrEqual(now); + }); + it('should cope with a null id', async () => { + const now = Date.now(); + const date = MessagesService.messageTime({ id: null }); + expect(date.getTime()).toBeGreaterThanOrEqual(now); + }); + it('should cope with no id', async () => { + const now = Date.now(); + const date = MessagesService.messageTime({}); + expect(date.getTime()).toBeGreaterThanOrEqual(now); + }); + it('should use timestamp if provided', async () => { + const time = Math.trunc((Date.now() - 1000) / 1000); + const date = MessagesService.messageTime({ + id: '100-0', + message: { timestamp: time }, + }); + expect(date.getTime()).toBe(time * 1000); + }); +}); + +let idCount = 0; + +describe('create', () => { + it('should load duplicate events', async () => { + await createTestingModule([DomainModule], async (app) => { + const messages = app.get(MessagesService); + const code1 = randomCode(); + const messageId = `${Date.now()}-${idCount++}`; + + await messages.create( + [ + { + id: messageId, + message: { + code: code1, + action: 'created', + }, + }, + { + id: messageId, + message: { + code: code1, + action: 'created', + }, + }, + ], + true, + ); + + const result = + await sql`SELECT * FROM product_update_event WHERE message->>'code' = ${code1}`; + expect(result).toHaveLength(2); + }); + }); + + it('should cope with null characters', async () => { + await createTestingModule([DomainModule], async (app) => { + const messages = app.get(MessagesService); + const code1 = randomCode(); + await messages.create( + [ + { + id: `${Date.now()}-${idCount++}`, + message: { + code: code1, + comment: 'test \u0000 test2 \u0000 end', + }, + }, + ], + true, + ); + + const result = + await sql`SELECT * FROM product_update_event WHERE message->>'code' = ${code1}`; + expect(result).toHaveLength(1); + expect(result[0].message.comment).toBe('test test2 end'); + }); + }); + + it('should create contributors', async () => { + await createTestingModule([DomainModule], async (app) => { + const messages = app.get(MessagesService); + const code1 = randomCode(); + const user1 = randomCode(); + const user2 = randomCode(); + + // Given and existing contributor record + sql`INSERT INTO contributor (code) VALUES(${user1})`; + + // When events are imported + await messages.create( + [ + { + id: `${Date.now()}-${idCount++}`, + message: { + code: code1, + user_id: user1, + action: 'created', + }, + }, + { + id: `${Date.now()}-${idCount++}`, + message: { + code: code1, + user_id: user2, + action: 'created', + }, + }, + ], + true, + ); + + const result = await sql`SELECT * FROM contributor WHERE code in ${sql([ + user1, + user2, + ])} order by id`; + expect(result).toHaveLength(2); + expect(result[1].id).toBe(result[0].id + 1); + }); + }); + + it('should aggregate events by count and distinct products', async () => { + await createTestingModule([DomainModule], async (app) => { + const messages = app.get(MessagesService); + // Create some products + const code1 = randomCode(); + const code2 = randomCode(); + const owner1 = randomCode(); + + await sql`INSERT INTO product ${sql([ + { + code: code1, + owners_tags: owner1, + }, + { + code: code2, + owners_tags: owner1, + }, + ])}`; + + // Create some messages + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + await messages.create( + [ + { + id: nextId(), + message: { + code: code1, + action: 'created', + user_id: 'test', + rev: 1, + }, + }, + { + id: nextId(), + message: { + code: code1, + action: 'created', + user_id: 'test', + rev: 2, + }, + }, + { + id: nextId(), + message: { + code: code1, + action: 'created', + user_id: 'test', + rev: 2, // Duplicate + }, + }, + { + id: nextId(), + message: { + code: code2, + action: 'created', + user_id: 'test', + rev: 1, + }, + }, + ], + true, + ); + + const results = + await sql`SELECT * from product_update join product on product.id = product_update.product_id`; + + const myResult1 = results.filter( + (r) => r.owners_tags === owner1 && r.code === code1, + ); + expect(myResult1).toHaveLength(2); + + const myResult2 = results.filter( + (r) => r.owners_tags === owner1 && r.code === code2, + ); + expect(myResult2).toHaveLength(1); + }); + }); + + it('should update existing aggregate counts', async () => { + await createTestingModule([DomainModule], async (app) => { + const messages = app.get(MessagesService); + // Create a product + const code1 = randomCode(); + await sql`INSERT INTO product ${sql([ + { + code: code1, + }, + ])}`; + const action1 = randomCode(); + + // Create an existing message + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + await messages.create( + [ + { + id: nextId(), + message: { + code: code1, + action: action1, + user_id: 'test', + rev: 1, + }, + }, + ], + true, + ); + + // Add another message + await messages.create( + [ + { + id: nextId(), + message: { + code: code1, + action: action1, + user_id: 'test', + rev: 2, + }, + }, + ], + true, + ); + + const results = + await sql`SELECT * from product_update join product on product.id = product_update.product_id`; + + const myResult1 = results.filter((r) => r.code === code1); + expect(myResult1).toHaveLength(2); + }); + }); + + it('should not call importwithfilter for initialImport', async () => { + await createTestingModule([DomainModule], async (app) => { + const importService = app.get(ImportService); + const importSpy = jest + .spyOn(importService, 'importWithFilter') + .mockImplementation(); + + const code1 = randomCode(); + const code2 = randomCode(); + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + const messages = [ + { + id: nextId(), + message: { + code: code1, + }, + }, + { + id: nextId(), + message: { + code: code2, + }, + }, + ]; + + const messagesService = app.get(MessagesService); + await messagesService.create(messages, true); + + // Then the import is not called + expect(importSpy).not.toHaveBeenCalled(); + + // Update events are created for all codes + const events = + await sql`SELECT * FROM product_update_event WHERE message->>'code' IN ${sql( + [code1, code2], + )}`; + + expect(events).toHaveLength(2); + }); + }); + + it('should call importwithfilter for normal imports', async () => { + await createTestingModule([DomainModule], async (app) => { + const importService = app.get(ImportService); + const importSpy = jest + .spyOn(importService, 'importWithFilter') + .mockImplementation(); + + const code1 = randomCode(); + const code2 = randomCode(); + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + const messages = [ + { + id: nextId(), + message: { + code: code1, + }, + }, + { + id: nextId(), + message: { + code: code2, + }, + }, + ]; + + const messagesService = app.get(MessagesService); + await messagesService.create(messages); + + // Then the import is called + expect(importSpy).toHaveBeenCalled(); + + // Update events are created for all codes + const events = + await sql`SELECT * FROM product_update_event WHERE message->>'code' IN ${sql( + [code1, code2], + )}`; + + expect(events).toHaveLength(2); + }); + }); + + // This is just needed for backward compatibility with PO versions that don't send rev in the event + it('should get revision from product if not in message', async () => { + await createTestingModule([DomainModule], async (app) => { + const messages = app.get(MessagesService); + // Create a product + const code1 = randomCode(); + + await sql`INSERT INTO product ${sql([ + { + code: code1, + revision: 123, + }, + ])}`; + + // Create a message with no rev + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + await messages.create( + [ + { + id: nextId(), + message: { + code: code1, + action: 'created', + user_id: 'test', + }, + }, + ], + true, + ); + + const results = + await sql`SELECT product_update.revision from product_update join product on product.id = product_update.product_id where code = ${code1}`; + + expect(results).toHaveLength(1); + expect(results[0].revision).toBe(123); + }); + }); +}); diff --git a/src/domain/services/messages.service.ts b/src/domain/services/messages.service.ts new file mode 100644 index 0000000..8f38921 --- /dev/null +++ b/src/domain/services/messages.service.ts @@ -0,0 +1,91 @@ +import { Injectable, Logger } from '@nestjs/common'; +import sql from '../../db'; +import { ImportService } from './import.service'; +import { ProductSource } from '../enums/product-source'; + +const nulRegex = /\\u0000/g; + +@Injectable() +export class MessagesService { + private logger = new Logger(MessagesService.name); + + constructor(private readonly importService: ImportService) {} + + static messageTime(message: any) { + // First preference is to use timestamp in the message + let time = new Date(parseInt(message.message?.timestamp) * 1000); + // Otherwise derive from message id + time = isNaN(time.getTime()) + ? new Date(parseInt(message.id?.split('-')[0])) + : time; + // Or use today's date/time if that doesn't work + return isNaN(time.getTime()) ? new Date() : time; + } + + async create(messages: any[], initialImport = false) { + // Strip out any \u0000 characters as PostgresSQL can't cope with them + const messageJson = JSON.stringify(messages); + if (messageJson.includes('\\u0000')) { + messages = JSON.parse(messageJson.replace(nulRegex, '')); + } + + const receivedAt = new Date(); + const insertResult = await sql`INSERT into product_update_event ${sql( + messages.map((m) => ({ + message_id: m.id, + received_at: receivedAt, + updated_at: MessagesService.messageTime(m), + message: m.message, + })), + )} RETURNING (id)`; + + const messageIds = insertResult.map((m) => m.id); + + await sql`insert into contributor (code) + select distinct message->>'user_id' + from product_update_event + where id in ${sql(messageIds)} + and not exists (select * from contributor where code = message->>'user_id') + on conflict (code) + do nothing`; + + await sql`insert into update_type (code) + select distinct message->>'action' + from product_update_event + where id in ${sql(messageIds)} + and not exists (select * from update_type where code = message->>'action') + on conflict (code) + do nothing`; + + if (!initialImport) { + const productCodes = [...new Set(messages.map((m) => m.message.code))]; + const filter = { code: { $in: productCodes } }; + await this.importService.importWithFilter(filter, ProductSource.EVENT); + } + + // Update counts on product_update after products have been imported + // Note coalesce on rev is only needed for transition if an older version of PO is deployed + await sql`INSERT INTO product_update ( + product_id, + revision, + updated_date, + update_type_id, + contributor_id, + event_id) + SELECT + p.id, + coalesce((pe.message->>'rev')::int, p.revision), + date(pe.updated_at at time zone 'UTC') updated_day, + update_type.id, + contributor.id, + pe.id + FROM product_update_event pe + JOIN product p on p.code = pe.message->>'code' + join contributor on contributor.code = pe.message->>'user_id' + join update_type on update_type.code = pe.message->>'action' + where pe.id in ${sql(messageIds)} + on conflict (product_id,revision) DO NOTHING`; + + this.logger.log(`Received ${messages.length} events`); + } +} diff --git a/src/domain/services/redis.listener.spec.ts b/src/domain/services/redis.listener.spec.ts new file mode 100644 index 0000000..eb256af --- /dev/null +++ b/src/domain/services/redis.listener.spec.ts @@ -0,0 +1,111 @@ +import { createClient } from 'redis'; +import { GenericContainer } from 'testcontainers'; +import { createTestingModule, randomCode } from '../../../test/test.helper'; +import sql from '../../db'; +import { DomainModule } from '../domain.module'; +import { ImportService } from './import.service'; +import { SettingsService } from './settings.service'; +import { RedisListener } from './redis.listener'; +import { setTimeout } from 'timers/promises'; +import { MessagesService } from './messages.service'; + +// Allow a little time for the testcontainer to start +jest.setTimeout(300000); + +describe('receiveMessages', () => { + it('should call importWithFilter when a message is received', async () => { + await createTestingModule([DomainModule], async (app) => { + // GIVEN: Redis is running + const redis = await new GenericContainer('redis') + .withExposedPorts(6379) + .start(); + const redisUrl = `redis://localhost:${redis.getMappedPort(6379)}`; + const settings = app.get(SettingsService); + jest.spyOn(settings, 'getRedisUrl').mockImplementation(() => redisUrl); + + // And lastmessageid is zero + await settings.setLastMessageId('0'); + const importService = app.get(ImportService); + const importSpy = jest + .spyOn(importService, 'importWithFilter') + .mockImplementation(); + + const redisListener = app.get(RedisListener); + await redisListener.startRedisConsumer(); + + const client = createClient({ url: redisUrl }); + await client.connect(); + try { + const code1 = randomCode(); + const code2 = randomCode(); + + // When: A message is sent + const messageId = await client.xAdd('product_updates_off', '*', { + code: code1, + rev: '1', + }); + + // Wait for message to be delivered + await setTimeout(100); + + // Then the import is called + expect(importSpy).toHaveBeenCalledTimes(1); + expect(await settings.getLastMessageId()).toBe(messageId); + + // If a new message is added + importSpy.mockClear(); + await client.xAdd('product_updates_off', '*', { + code: code2, + }); + + // Wait for message to be delivered + await setTimeout(100); + + // Then import is called again but only with the new code + expect(importSpy).toHaveBeenCalledTimes(1); + const codes = importSpy.mock.calls[0][0].code.$in; + expect(codes).toHaveLength(1); + expect(codes[0]).toBe(code2); + + // Update events are created + const events = + await sql`SELECT * FROM product_update_event WHERE message->>'code' = ${code1}`; + + expect(events).toHaveLength(1); + expect(events[0].message_id).toBe(messageId); + } finally { + await client.quit(); + await redisListener.stopRedisConsumer(); + await redis.stop(); + } + }); + }); +}); + +describe('processMessages', () => { + it('should convert json properties to objects', async () => { + await createTestingModule([DomainModule], async (app) => { + const messagesService = app.get(MessagesService); + const createSpy = jest + .spyOn(messagesService, 'create') + .mockImplementation(); + + const messages = [ + { + id: `0-0`, + message: { + code: 'test', + diffs: `{"action":"update"}`, + }, + }, + ]; + + const redisListener = app.get(RedisListener); + await redisListener.processMessages(messages); + + // Then create is called with a real object + const diffs = createSpy.mock.calls[0][0][0].message.diffs; + expect(diffs.action).toBe('update'); + }); + }); +}); diff --git a/src/domain/services/redis.listener.ts b/src/domain/services/redis.listener.ts new file mode 100644 index 0000000..8d40e97 --- /dev/null +++ b/src/domain/services/redis.listener.ts @@ -0,0 +1,102 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { createClient, commandOptions } from 'redis'; +import { MessagesService } from './messages.service'; +import { SettingsService } from './settings.service'; + +@Injectable() +export class RedisListener { + private logger = new Logger(RedisListener.name); + private client: any; // Don't strongly type here is it is really verbose + + constructor( + private readonly settings: SettingsService, + private readonly messages: MessagesService, + ) {} + + async startRedisConsumer() { + const redisUrl = this.settings.getRedisUrl(); + if (!redisUrl) return; + this.client = createClient({ url: redisUrl }); + this.client.on('error', (err: any) => this.logger.error(err)); + await this.client.connect(); + this.receiveMessages(); + } + + async stopRedisConsumer() { + if (this.client && this.client.isOpen) await this.client.quit(); + } + + async receiveMessages() { + const lastMessageId = await this.settings.getLastMessageId(); + if (!this.client.isOpen) return; + try { + const keys = await this.client.xRead( + commandOptions({ + isolated: true, + }), + [ + // XREAD can read from multiple streams, starting at a + // different ID for each... + { + key: 'product_updates_off', + id: lastMessageId, + }, + ], + { + // Read 1000 entry at a time, block for 5 seconds if there are none. + COUNT: 1000, + BLOCK: 5000, + }, + ); + if (keys?.length) { + const messages = keys[0].messages; + if (messages?.length) { + /** Message looks like this: + { + code: "0850026029062", + flavor: "off", + user_id: "stephane", + action: "updated", + comment: "Modification : Remove changes", + diffs: "{\"fields\":{\"change\":[\"categories\"],\"delete\":[\"product_name\",\"product_name_es\"]}}", + } + */ + await this.processMessages(messages); + await this.settings.setLastMessageId( + messages[messages.length - 1].id, + ); + } + } + setTimeout(() => { + this.receiveMessages(); + }, 0); + } catch (e) { + this.logger.error(e); + // Try again in 10 seconds + setTimeout(() => { + this.receiveMessages(); + }, 10000); + } + } + + async processMessages(messages: any[]) { + // Fix JSON properties on each message to be objects rather than strings + for (const event of messages) { + if (event.message.diffs) + event.message.diffs = JSON.parse(event.message.diffs); + } + await this.messages.create(messages); + } + + async pauseAndRun(action: () => Promise) { + // Pause redis while doing a scheduled import + await this.stopRedisConsumer(); + + try { + await action(); + } finally { + // Resume redis after import + await this.startRedisConsumer(); + } + } +} diff --git a/src/domain/services/settings.service.ts b/src/domain/services/settings.service.ts index 66f2277..6a24e83 100644 --- a/src/domain/services/settings.service.ts +++ b/src/domain/services/settings.service.ts @@ -1,36 +1,32 @@ -import { EntityManager } from '@mikro-orm/core'; import { Injectable } from '@nestjs/common'; -import { Settings } from '../entities/settings'; +import sql from '../../db'; @Injectable() export class SettingsService { - constructor(private readonly em: EntityManager) {} - - settings: Settings; - async find() { - this.settings = await this.em.findOne(Settings, 1); - if (!this.settings) { - this.settings = this.em.create(Settings, {}); + async updateSetting(settings: any) { + const result = await sql`UPDATE settings SET ${sql(settings)}`; + if (!result.count) { + await sql`INSERT INTO settings ${sql(settings)}`; } - return this.settings; } async getLastModified() { - return (await this.find()).lastModified; + return (await sql`SELECT last_modified FROM settings`)[0].last_modified; } async setLastModified(lastModified: Date) { - (await this.find()).lastModified = lastModified; - await this.em.flush(); + await this.updateSetting({ last_modified: lastModified }); } async getLastMessageId() { - return (await this.find()).lastMessageId || '$'; + return ( + (await sql`SELECT last_message_id FROM settings`)[0]?.last_message_id || + '$' + ); } async setLastMessageId(messageId: string) { - (await this.find()).lastMessageId = messageId; - await this.em.flush(); + await this.updateSetting({ last_message_id: messageId }); } getRedisUrl() { diff --git a/src/domain/services/views.spec.ts b/src/domain/services/views.spec.ts new file mode 100644 index 0000000..904bb6f --- /dev/null +++ b/src/domain/services/views.spec.ts @@ -0,0 +1,160 @@ +import postgres from 'postgres'; +import { createTestingModule, randomCode } from '../../../test/test.helper'; +import sql from '../../db'; +import { MessagesService } from './messages.service'; +import { VIEW_PASSWORD, VIEW_USER } from '../../constants'; +import { DomainModule } from '../domain.module'; + +async function withViewUser( + action: (viewer: postgres.Sql) => Promise, +) { + // Use viewer user + const viewer = postgres({ + host: process.env.POSTGRES_HOST, + database: process.env.POSTGRES_DB, + user: VIEW_USER, + password: VIEW_PASSWORD, + port: parseInt(process.env.POSTGRES_PORT.split(':').pop()), + }); + + try { + await action(viewer); + } finally { + await viewer.end(); + } +} + +describe('product_update', () => { + it('should aggregate events by count and distinct products', async () => { + await createTestingModule([DomainModule], async (app) => { + const messages = app.get(MessagesService); + // Create some products + const code1 = randomCode(); + const code2 = randomCode(); + const owner1 = randomCode(); + + await sql`INSERT INTO product ${sql([ + { + code: code1, + owners_tags: owner1, + }, + { + code: code2, + owners_tags: owner1, + }, + ])}`; + + // Create some messages + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + await messages.create( + [ + { + id: nextId(), + message: { + code: code1, + action: 'updated', + user_id: 'user1', + rev: 1, + }, + }, + { + id: nextId(), + message: { + code: code1, + action: 'updated', + user_id: 'user1', + rev: 2, + }, + }, + { + id: nextId(), + message: { + code: code1, + action: 'updated', + user_id: 'user1', + rev: 3, + }, + }, + { + id: nextId(), + message: { + code: code2, + action: 'updated', + user_id: 'user1', + rev: 1, + }, + }, + ], + true, + ); + + // Use viewer user + await withViewUser(async (viewer) => { + const results = await viewer`SELECT * from product_updates_by_owner`; + + const myResult = results.find((r) => r.owner_tag === owner1); + expect(myResult.update_count).toBe('4'); + expect(myResult.product_count).toBe('2'); + }); + }); + }); + + it('should update existing aggregate counts', async () => { + await createTestingModule([DomainModule], async (app) => { + const messages = app.get(MessagesService); + // Create some products + const code1 = randomCode(); + const action1 = randomCode(); + + await sql`INSERT INTO product ${sql([ + { + code: code1, + }, + ])}`; + + // Create an existing message + let idCount = 0; + const nextId = () => `${Date.now()}-${idCount++}`; + await messages.create( + [ + { + id: nextId(), + message: { + code: code1, + action: action1, + user_id: 'user1', + rev: 1, + }, + }, + ], + true, + ); + + // Add another message + await messages.create( + [ + { + id: nextId(), + message: { + code: code1, + action: action1, + user_id: 'user1', + rev: 2, + }, + }, + ], + true, + ); + + // Use viewer user + await withViewUser(async (viewer) => { + const results = await viewer`SELECT * from product_updates_by_owner`; + + const myResult = results.find((r) => r.update_type === action1); + expect(myResult.update_count).toBe('2'); + expect(myResult.product_count).toBe('1'); + }); + }); + }); +}); diff --git a/src/main.ts b/src/main.ts index 697349f..2e46cb4 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,10 +1,19 @@ +import { configDotenv } from 'dotenv'; +configDotenv(); + import { NestFactory } from '@nestjs/core'; import { AppModule } from './app.module'; import { MikroORM } from '@mikro-orm/core'; import { LogLevel } from '@nestjs/common'; +import { json, urlencoded } from 'express'; async function bootstrap() { const app = await NestFactory.create(AppModule); + + // Accept large payloads for event migration + app.use(json({ limit: '50mb' })); + app.use(urlencoded({ extended: true, limit: '50mb' })); + app.useLogger([(process.env['LOG_LEVEL'] as LogLevel) || 'log']); // Run migrations if needed. Note may need to change this if multiple containers are used for scaling diff --git a/src/migrations/.snapshot-query.json b/src/migrations/.snapshot-query.json index 7f15460..ca0ca0f 100644 --- a/src/migrations/.snapshot-query.json +++ b/src/migrations/.snapshot-query.json @@ -144,6 +144,15 @@ "primary": false, "nullable": true, "mappedType": "string" + }, + "revision": { + "name": "revision", + "type": "int", + "unsigned": false, + "autoincrement": false, + "primary": false, + "nullable": true, + "mappedType": "integer" } }, "name": "product", @@ -158,6 +167,24 @@ "primary": false, "unique": false }, + { + "columnNames": [ + "creator" + ], + "composite": false, + "keyName": "product_creator_index", + "primary": false, + "unique": false + }, + { + "columnNames": [ + "owners_tags" + ], + "composite": false, + "keyName": "product_owners_tags_index", + "primary": false, + "unique": false + }, { "columnNames": [ "last_update_id" @@ -4900,54 +4927,6 @@ "updateRule": "cascade" } } - }, - { - "columns": { - "id": { - "name": "id", - "type": "serial", - "unsigned": true, - "autoincrement": true, - "primary": true, - "nullable": false, - "default": "1", - "mappedType": "integer" - }, - "last_modified": { - "name": "last_modified", - "type": "timestamptz", - "unsigned": false, - "autoincrement": false, - "primary": false, - "nullable": true, - "length": 6, - "mappedType": "datetime" - }, - "last_message_id": { - "name": "last_message_id", - "type": "text", - "unsigned": false, - "autoincrement": false, - "primary": false, - "nullable": true, - "mappedType": "text" - } - }, - "name": "settings", - "schema": "query", - "indexes": [ - { - "keyName": "settings_pkey", - "columnNames": [ - "id" - ], - "composite": false, - "primary": true, - "unique": true - } - ], - "checks": [], - "foreignKeys": {} } ] } diff --git a/src/migrations/Migration20240719101700-redis.ts b/src/migrations/Migration20240719101700-redis.ts new file mode 100644 index 0000000..a29d7df --- /dev/null +++ b/src/migrations/Migration20240719101700-redis.ts @@ -0,0 +1,66 @@ +/* eslint-disable prettier/prettier */ +import { Migration } from '@mikro-orm/migrations'; +import { VIEW_PASSWORD, VIEW_USER } from '../constants'; + +export class Migration20240719101700Redis extends Migration { + async up(): Promise { + this.addSql(`CREATE TABLE IF NOT EXISTS product_update_event ( + id bigserial NOT NULL PRIMARY KEY, + message_id text NOT NULL, + received_at timestamptz NOT NULL, + updated_at timestamptz NOT NULL, + message jsonb NOT NULL)`); + + this.addSql(`CREATE TABLE IF NOT EXISTS contributor ( + id serial, + code text, + constraint contributor_pkey primary key (id), + constraint contributor_code unique (code))`); + + this.addSql(`CREATE TABLE IF NOT EXISTS update_type ( + id serial, + code text, + constraint action_pkey primary key (id), + constraint action_code unique (code))`); + + this.addSql(`INSERT INTO update_type (code) VALUES ('created'), ('updated'), ('archived'), ('unarchived'), ('deleted'), ('reprocessed'), ('unknown')`); + + this.addSql(`CREATE TABLE IF NOT EXISTS product_update ( + product_id int, + revision int, + updated_date date, + update_type_id int, + contributor_id int, + event_id bigint, + constraint product_update_pkey primary key (product_id, revision))`); + this.addSql('create index product_update_updated_date_index on product_update (updated_date);'); + + this.addSql('create schema if not exists views;'); + this.addSql(`CREATE USER ${VIEW_USER} PASSWORD '${VIEW_PASSWORD}'`); + this.addSql(`ALTER ROLE ${VIEW_USER} SET search_path=views,public`); + this.addSql(`CREATE OR REPLACE VIEW views.product_updates_by_owner AS + SELECT pu.updated_date, + p.owners_tags owner_tag, + ut.code update_type, + count(*) update_count, + count(DISTINCT pu.product_id) product_count + FROM product_update pu + JOIN product p ON p.id = pu.product_id + JOIN update_type ut ON ut.id = pu.update_type_id + GROUP BY pu.updated_date, + p.owners_tags, + ut.code`); + this.addSql(`GRANT USAGE ON SCHEMA views TO ${VIEW_USER}`); + this.addSql(`GRANT SELECT ON views.product_updates_by_owner TO ${VIEW_USER}`); + } + async down(): Promise { + this.addSql(`drop owned by ${VIEW_USER}`); + this.addSql(`DROP ROLE ${VIEW_USER}`); + this.addSql(`drop view views.product_updates_by_owner`); + this.addSql(`drop schema views`); + this.addSql(`drop table product_update CASCADE`); + this.addSql(`drop table contributor CASCADE`); + this.addSql(`drop table update_type CASCADE`); + this.addSql(`drop table product_update_event CASCADE`); + } +} diff --git a/src/migrations/Migration20240809162441.ts b/src/migrations/Migration20240809162441.ts new file mode 100644 index 0000000..9fbeb59 --- /dev/null +++ b/src/migrations/Migration20240809162441.ts @@ -0,0 +1,15 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20240809162441 extends Migration { + + async up(): Promise { + this.addSql('create index "product_creator_index" on "query"."product" ("creator");'); + this.addSql('create index "product_owners_tags_index" on "query"."product" ("owners_tags");'); + } + + async down(): Promise { + this.addSql('drop index "query"."product_creator_index";'); + this.addSql('drop index "query"."product_owners_tags_index";'); + } + +} diff --git a/src/migrations/Migration20240809164713.ts b/src/migrations/Migration20240809164713.ts new file mode 100644 index 0000000..9e141f7 --- /dev/null +++ b/src/migrations/Migration20240809164713.ts @@ -0,0 +1,13 @@ +import { Migration } from '@mikro-orm/migrations'; + +export class Migration20240809164713 extends Migration { + async up(): Promise { + this.addSql( + 'alter table "query"."product" add column "revision" int null;', + ); + } + + async down(): Promise { + this.addSql('alter table "query"."product" drop column "revision";'); + } +} diff --git a/test/global-setup.ts b/test/global-setup.ts index 91ee64b..309f7fa 100644 --- a/test/global-setup.ts +++ b/test/global-setup.ts @@ -16,6 +16,9 @@ export default async function () { // We don't use redis in the tests process.env.REDIS_URL = ''; + // Prevent tests from calling directly to MongoDB + process.env.MONGO_URI = ''; + // Tried running migrations with the API but doesn't work because // of the way Jest mocks things. Even importing MikroORM is enough to break things. // https://github.com/mikro-orm/mikro-orm/discussions/3795