From a1154735da749ce2f393b1ecaca29cfc08343f68 Mon Sep 17 00:00:00 2001 From: samaratungajs Date: Mon, 9 Sep 2024 20:35:31 +0530 Subject: [PATCH 1/5] feat: migrate to pgboss 10 --- lib/handler-scanner.service.ts | 2 +- lib/pgboss.service.ts | 17 +++++- lib/utils/conversion-helper.ts | 23 ++++---- package-lock.json | 97 +++------------------------------- package.json | 2 +- test/pgboss.service.spec.ts | 4 +- 6 files changed, 41 insertions(+), 104 deletions(-) diff --git a/lib/handler-scanner.service.ts b/lib/handler-scanner.service.ts index 2491ccc..0000a1b 100644 --- a/lib/handler-scanner.service.ts +++ b/lib/handler-scanner.service.ts @@ -75,7 +75,7 @@ export class HandlerScannerService { await this.pgBossService.registerJob( jobName, boundHandler, - jobOptions as PgBoss.BatchWorkOptions, + jobOptions, ); this.logger.log(`Registered job: ${jobName}`); } diff --git a/lib/pgboss.service.ts b/lib/pgboss.service.ts index ac18721..f77e8bd 100644 --- a/lib/pgboss.service.ts +++ b/lib/pgboss.service.ts @@ -42,7 +42,7 @@ export class PgBossService { await this.pgBoss.schedule(name, cron, data ?? {}, options ?? {}); await this.pgBoss.work( name, - { ...options, includeMetadata: true }, + { ...this.transformOptions(options), includeMetadata: true }, handler, ); } @@ -50,7 +50,7 @@ export class PgBossService { async registerJob( name: string, handler: WorkWithMetadataHandler, - options?: PgBoss.BatchWorkOptions, + options?: PgBoss.WorkOptions, ) { await this.pgBoss.work( name, @@ -58,4 +58,17 @@ export class PgBossService { handler, ); } + private transformOptions( + options?: PgBoss.WorkOptions | PgBoss.ScheduleOptions, + ) { + if (!options) return {}; + + const transformedOptions: any = { ...options }; + + if (typeof options.priority === "number") { + transformedOptions.priority = options.priority > 0; + } + + return transformedOptions; + } } diff --git a/lib/utils/conversion-helper.ts b/lib/utils/conversion-helper.ts index 906eb49..1f623bb 100644 --- a/lib/utils/conversion-helper.ts +++ b/lib/utils/conversion-helper.ts @@ -1,10 +1,15 @@ -import { JobOptions, BatchWorkOptions } from "pg-boss"; - -export function convertToBatchWorkOptions( - jobOptions: JobOptions = {}, -): BatchWorkOptions { - return { - ...jobOptions, - batchSize: 1, - }; +import PgBoss from "pg-boss"; + +export function transformOptions( + options?: PgBoss.WorkOptions | PgBoss.ScheduleOptions, +) { + if (!options) return {}; + + const transformedOptions: any = { ...options }; + + if (typeof options.priority === "number") { + transformedOptions.priority = options.priority > 0; + } + + return transformedOptions; } diff --git a/package-lock.json b/package-lock.json index 91af0f7..12d89f6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@wavezync/nestjs-pgboss", - "version": "2.0.0", + "version": "2.2.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@wavezync/nestjs-pgboss", - "version": "2.0.0", + "version": "2.2.0", "license": "MIT", "devDependencies": { "@nestjs/cli": "^10.0.0", @@ -34,7 +34,7 @@ "peerDependencies": { "@nestjs/common": "^9 || ^10", "@nestjs/core": "^9 || ^10", - "pg-boss": "^9", + "pg-boss": "^10", "reflect-metadata": "^0.1.13 || ^0.2.0", "rxjs": "^7.2.0" } @@ -2502,19 +2502,6 @@ "node": ">=0.4.0" } }, - "node_modules/aggregate-error": { - "version": "3.1.0", - "resolved": "https://registry.npmjs.org/aggregate-error/-/aggregate-error-3.1.0.tgz", - "integrity": "sha512-4I7Td01quW/RpocfNayFdFVk1qSuoh0E7JrbRJ16nH01HhKFQ88INq9Sd+nd72zqRySlr9BmDA8xlEJ6vJMrYA==", - "peer": true, - "dependencies": { - "clean-stack": "^2.0.0", - "indent-string": "^4.0.0" - }, - "engines": { - "node": ">=8" - } - }, "node_modules/ajv": { "version": "8.12.0", "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.12.0.tgz", @@ -3180,15 +3167,6 @@ "integrity": "sha512-a3KdPAANPbNE4ZUv9h6LckSl9zLsYOP4MBmhIPkRaeyybt+r4UghLvq+xw/YwUcC1gqylCkL4rdVs3Lwupjm4Q==", "dev": true }, - "node_modules/clean-stack": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/clean-stack/-/clean-stack-2.2.0.tgz", - "integrity": "sha512-4diC9HaTE+KRAMWhDhrGOECgWZxoevMc5TlkObMqNSsVU62PYzXZ/SMTjzyGAFF1YusgxGcSWTEXBhp0CPwQ1A==", - "peer": true, - "engines": { - "node": ">=6" - } - }, "node_modules/cli-cursor": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/cli-cursor/-/cli-cursor-3.1.0.tgz", @@ -3609,18 +3587,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/delay": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/delay/-/delay-5.0.0.tgz", - "integrity": "sha512-ReEBKkIfe4ya47wlPYf/gu5ib6yUG0/Aez0JQZQz94kiWtRQvZIQbTiehsnwHvLSWJnQdhVeqYue7Id1dKr0qw==", - "peer": true, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/delayed-stream": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-1.0.0.tgz", @@ -4974,15 +4940,6 @@ "node": ">=0.8.19" } }, - "node_modules/indent-string": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/indent-string/-/indent-string-4.0.0.tgz", - "integrity": "sha512-EdDDZu4A2OyIK7Lr/2zG+w5jmbuk1DVBnEwREQvBzspBJkCEbRa8GxU1lghYcaGJCnRWibjDXlq779X1/y5xwg==", - "peer": true, - "engines": { - "node": ">=8" - } - }, "node_modules/inflight": { "version": "1.0.6", "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", @@ -6143,12 +6100,6 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", "dev": true }, - "node_modules/lodash.debounce": { - "version": "4.0.8", - "resolved": "https://registry.npmjs.org/lodash.debounce/-/lodash.debounce-4.0.8.tgz", - "integrity": "sha512-FT1yDzDYEoYWhnSGnpE/4Kj1fLZkDFyqRb7fNt6FdYOSxlUWAtp42Eh6Wb0rGIv/m9Bgo7x4GhQbm5Ys4SG5ow==", - "peer": true - }, "node_modules/lodash.memoize": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/lodash.memoize/-/lodash.memoize-4.1.2.tgz", @@ -6663,21 +6614,6 @@ "url": "https://github.com/sponsors/sindresorhus" } }, - "node_modules/p-map": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/p-map/-/p-map-4.0.0.tgz", - "integrity": "sha512-/bjOqmgETBYB5BoEeGVea8dmvHb2m9GLy1E9W43yeyfP6QQCZGFNa+XRceJEuDB6zqr+gKpIAmlLebMpykw/MQ==", - "peer": true, - "dependencies": { - "aggregate-error": "^3.0.0" - }, - "engines": { - "node": ">=10" - }, - "funding": { - "url": "https://github.com/sponsors/sindresorhus" - } - }, "node_modules/p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", @@ -6831,21 +6767,17 @@ } }, "node_modules/pg-boss": { - "version": "9.0.3", - "resolved": "https://registry.npmjs.org/pg-boss/-/pg-boss-9.0.3.tgz", - "integrity": "sha512-cUWUiv3sr563yNy0nCZ25Tv5U0m59Y9MhX/flm0vTR012yeVCrqpfboaZP4xFOQPdWipMJpuu4g94HR0SncTgw==", + "version": "10.1.1", + "resolved": "https://registry.npmjs.org/pg-boss/-/pg-boss-10.1.1.tgz", + "integrity": "sha512-2t7gz5nEUYFabj8czWWFRUSyPDQ5t+K/EF5l9Q5lHn2iwyPPKgIfwK+8LKgRfyHRUePTDQhogsGcwOlNczfZ5Q==", "peer": true, "dependencies": { "cron-parser": "^4.0.0", - "delay": "^5.0.0", - "lodash.debounce": "^4.0.8", - "p-map": "^4.0.0", "pg": "^8.5.1", - "serialize-error": "^8.1.0", - "uuid": "^9.0.0" + "serialize-error": "^8.1.0" }, "engines": { - "node": ">=16" + "node": ">=20" } }, "node_modules/pg-cloudflare": { @@ -8617,19 +8549,6 @@ "node": ">= 0.4.0" } }, - "node_modules/uuid": { - "version": "9.0.1", - "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", - "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", - "funding": [ - "https://github.com/sponsors/broofa", - "https://github.com/sponsors/ctavan" - ], - "peer": true, - "bin": { - "uuid": "dist/bin/uuid" - } - }, "node_modules/v8-compile-cache-lib": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.1.tgz", diff --git a/package.json b/package.json index 3491a6f..37fbe8f 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ "peerDependencies": { "@nestjs/common": "^9 || ^10", "@nestjs/core": "^9 || ^10", - "pg-boss": "^9", + "pg-boss": "^10", "reflect-metadata": "^0.1.13 || ^0.2.0", "rxjs": "^7.2.0" }, diff --git a/test/pgboss.service.spec.ts b/test/pgboss.service.spec.ts index 1287dfd..6c3ffa8 100644 --- a/test/pgboss.service.spec.ts +++ b/test/pgboss.service.spec.ts @@ -32,13 +32,13 @@ describe("PgBossService", () => { describe("registerJob", () => { it("should call PgBoss work with correct parameters", async () => { const handler = jest.fn(); - const options = { batchSize: 5 }; + const options = {}; await service.registerJob("test-job", handler, options); expect(mockPgBoss.work).toHaveBeenCalledWith( "test-job", - { batchSize: 5, includeMetadata: true }, + { includeMetadata: true }, handler, ); }); From cfb929d2f17ec347b8e251e54c0c8a5a4bd1aa4c Mon Sep 17 00:00:00 2001 From: samaratungajs Date: Tue, 10 Sep 2024 11:05:49 +0530 Subject: [PATCH 2/5] fix: ensure queue exists added --- lib/handler-scanner.service.ts | 18 +++++++++++++----- lib/pgboss.service.ts | 11 +++++++++++ test/pgboss.service.spec.ts | 2 ++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/lib/handler-scanner.service.ts b/lib/handler-scanner.service.ts index 0000a1b..5b5af99 100644 --- a/lib/handler-scanner.service.ts +++ b/lib/handler-scanner.service.ts @@ -52,14 +52,15 @@ export class HandlerScannerService { CRON_EXPRESSION, methodRef, ); - const cronOptions = this.reflector.get(CRON_OPTIONS, methodRef); + const cronOptions = this.reflector.get( + CRON_OPTIONS, + methodRef, + ); if (jobName) { const boundHandler: WorkWithMetadataHandler = async (job) => { - const jobData = { - ...job, - }; - await methodRef.call(instance, jobData); + const extractedJob = this.normalizeJob(job); + await methodRef.call(instance, extractedJob); }; try { if (cronExpression) { @@ -85,4 +86,11 @@ export class HandlerScannerService { } } } + + private normalizeJob(job: any) { + if (typeof job === "object" && "0" in job) { + return job[0]; + } + return job; + } } diff --git a/lib/pgboss.service.ts b/lib/pgboss.service.ts index f77e8bd..405c20f 100644 --- a/lib/pgboss.service.ts +++ b/lib/pgboss.service.ts @@ -20,6 +20,7 @@ export class PgBossService { data: TData, options?: PgBoss.SendOptions, ) { + await this.ensureQueueExists(name); await this.pgBoss.send(name, data, options); } @@ -29,6 +30,7 @@ export class PgBossService { data?: TData, options?: PgBoss.ScheduleOptions, ) { + await this.ensureQueueExists(name); await this.pgBoss.schedule(name, cron, data ?? {}, options ?? {}); } @@ -39,6 +41,7 @@ export class PgBossService { data?: TData, options?: PgBoss.ScheduleOptions, ) { + await this.ensureQueueExists(name); await this.pgBoss.schedule(name, cron, data ?? {}, options ?? {}); await this.pgBoss.work( name, @@ -52,6 +55,7 @@ export class PgBossService { handler: WorkWithMetadataHandler, options?: PgBoss.WorkOptions, ) { + await this.ensureQueueExists(name); await this.pgBoss.work( name, { ...options, includeMetadata: true }, @@ -71,4 +75,11 @@ export class PgBossService { return transformedOptions; } + + async ensureQueueExists(queueName: string) { + const currentQueue = await this.pgBoss.getQueue(queueName); + if (!currentQueue) { + await this.pgBoss.createQueue(queueName); + } + } } diff --git a/test/pgboss.service.spec.ts b/test/pgboss.service.spec.ts index 6c3ffa8..ace83ce 100644 --- a/test/pgboss.service.spec.ts +++ b/test/pgboss.service.spec.ts @@ -13,6 +13,8 @@ describe("PgBossService", () => { work: jest.fn(), send: jest.fn(), schedule: jest.fn(), + createQueue: jest.fn(), + getQueue: jest.fn(), } as any; const module: TestingModule = await Test.createTestingModule({ From 45b9b2b096bad6a3abae8487b8ee90b05885dbb1 Mon Sep 17 00:00:00 2001 From: samaratungajs Date: Tue, 10 Sep 2024 11:31:17 +0530 Subject: [PATCH 3/5] chore: bump version to v3.0.0 --- package-lock.json | 4 ++-- package.json | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/package-lock.json b/package-lock.json index 12d89f6..fa85ce9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@wavezync/nestjs-pgboss", - "version": "2.2.0", + "version": "3.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@wavezync/nestjs-pgboss", - "version": "2.2.0", + "version": "3.0.0", "license": "MIT", "devDependencies": { "@nestjs/cli": "^10.0.0", diff --git a/package.json b/package.json index 37fbe8f..225e200 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@wavezync/nestjs-pgboss", - "version": "2.2.0", + "version": "3.0.0", "description": "A NestJS module that integrates pg-boss for job scheduling and handling.", "license": "MIT", "author": "samaratungajs@wavezync.com", From 393cd735659d904e6271c31de829a4977f4eda07 Mon Sep 17 00:00:00 2001 From: samaratungajs Date: Tue, 10 Sep 2024 11:56:29 +0530 Subject: [PATCH 4/5] fix: update helpers --- lib/handler-scanner.service.ts | 10 ++-------- lib/pgboss.service.ts | 16 ++-------------- lib/utils/{conversion-helper.ts => helpers.ts} | 7 +++++++ 3 files changed, 11 insertions(+), 22 deletions(-) rename lib/utils/{conversion-helper.ts => helpers.ts} (72%) diff --git a/lib/handler-scanner.service.ts b/lib/handler-scanner.service.ts index 5b5af99..e2c3dfa 100644 --- a/lib/handler-scanner.service.ts +++ b/lib/handler-scanner.service.ts @@ -10,6 +10,7 @@ import { import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper"; import PgBoss, { WorkWithMetadataHandler } from "pg-boss"; import { LOGGER } from "./utils/consts"; +import { normalizeJob } from "./utils/helpers"; @Injectable() export class HandlerScannerService { @@ -59,7 +60,7 @@ export class HandlerScannerService { if (jobName) { const boundHandler: WorkWithMetadataHandler = async (job) => { - const extractedJob = this.normalizeJob(job); + const extractedJob = normalizeJob(job); await methodRef.call(instance, extractedJob); }; try { @@ -86,11 +87,4 @@ export class HandlerScannerService { } } } - - private normalizeJob(job: any) { - if (typeof job === "object" && "0" in job) { - return job[0]; - } - return job; - } } diff --git a/lib/pgboss.service.ts b/lib/pgboss.service.ts index 405c20f..a382e54 100644 --- a/lib/pgboss.service.ts +++ b/lib/pgboss.service.ts @@ -2,6 +2,7 @@ import { Injectable } from "@nestjs/common"; import PgBoss, { WorkWithMetadataHandler } from "pg-boss"; import { Inject } from "@nestjs/common"; import { PGBOSS_TOKEN } from "./utils/consts"; +import { transformOptions } from "./utils/helpers"; @Injectable() export class PgBossService { @@ -45,7 +46,7 @@ export class PgBossService { await this.pgBoss.schedule(name, cron, data ?? {}, options ?? {}); await this.pgBoss.work( name, - { ...this.transformOptions(options), includeMetadata: true }, + { ...transformOptions(options), includeMetadata: true }, handler, ); } @@ -62,19 +63,6 @@ export class PgBossService { handler, ); } - private transformOptions( - options?: PgBoss.WorkOptions | PgBoss.ScheduleOptions, - ) { - if (!options) return {}; - - const transformedOptions: any = { ...options }; - - if (typeof options.priority === "number") { - transformedOptions.priority = options.priority > 0; - } - - return transformedOptions; - } async ensureQueueExists(queueName: string) { const currentQueue = await this.pgBoss.getQueue(queueName); diff --git a/lib/utils/conversion-helper.ts b/lib/utils/helpers.ts similarity index 72% rename from lib/utils/conversion-helper.ts rename to lib/utils/helpers.ts index 1f623bb..14e1748 100644 --- a/lib/utils/conversion-helper.ts +++ b/lib/utils/helpers.ts @@ -13,3 +13,10 @@ export function transformOptions( return transformedOptions; } + +export function normalizeJob(job: any) { + if (typeof job === "object" && "0" in job) { + return job[0]; + } + return job; +} From 5939862e394572541f155e3c82f20f47b54c77d4 Mon Sep 17 00:00:00 2001 From: samaratungajs Date: Tue, 10 Sep 2024 12:34:30 +0530 Subject: [PATCH 5/5] fix: remove ensureQueueExists --- lib/pgboss.service.ts | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/lib/pgboss.service.ts b/lib/pgboss.service.ts index a382e54..0ab3e15 100644 --- a/lib/pgboss.service.ts +++ b/lib/pgboss.service.ts @@ -21,7 +21,7 @@ export class PgBossService { data: TData, options?: PgBoss.SendOptions, ) { - await this.ensureQueueExists(name); + await this.pgBoss.createQueue(name); await this.pgBoss.send(name, data, options); } @@ -31,7 +31,7 @@ export class PgBossService { data?: TData, options?: PgBoss.ScheduleOptions, ) { - await this.ensureQueueExists(name); + await this.pgBoss.createQueue(name); await this.pgBoss.schedule(name, cron, data ?? {}, options ?? {}); } @@ -42,7 +42,7 @@ export class PgBossService { data?: TData, options?: PgBoss.ScheduleOptions, ) { - await this.ensureQueueExists(name); + await this.pgBoss.createQueue(name); await this.pgBoss.schedule(name, cron, data ?? {}, options ?? {}); await this.pgBoss.work( name, @@ -56,18 +56,11 @@ export class PgBossService { handler: WorkWithMetadataHandler, options?: PgBoss.WorkOptions, ) { - await this.ensureQueueExists(name); + await this.pgBoss.createQueue(name); await this.pgBoss.work( name, { ...options, includeMetadata: true }, handler, ); } - - async ensureQueueExists(queueName: string) { - const currentQueue = await this.pgBoss.getQueue(queueName); - if (!currentQueue) { - await this.pgBoss.createQueue(queueName); - } - } }