diff --git a/lib/handler-scanner.service.ts b/lib/handler-scanner.service.ts index a2589aa..2491ccc 100644 --- a/lib/handler-scanner.service.ts +++ b/lib/handler-scanner.service.ts @@ -1,3 +1,4 @@ +import { Injectable, Logger } from "@nestjs/common"; import { Reflector, ModulesContainer } from "@nestjs/core"; import { PgBossService } from "./pgboss.service"; import { @@ -7,9 +8,8 @@ import { CRON_OPTIONS, } from "./decorators/job.decorator"; import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper"; -import { BatchWorkOptions } from "pg-boss"; +import PgBoss, { WorkWithMetadataHandler } from "pg-boss"; import { LOGGER } from "./utils/consts"; -import { Injectable, Logger } from "@nestjs/common"; @Injectable() export class HandlerScannerService { @@ -26,7 +26,7 @@ export class HandlerScannerService { const providers = [...module.providers.values()]; for (const provider of providers) { - this.scanProvider(provider); + await this.scanProvider(provider); } } } @@ -44,7 +44,7 @@ export class HandlerScannerService { for (const methodName of methodNames) { const methodRef = instance[methodName]; const jobName = this.reflector.get(JOB_NAME, methodRef); - const jobOptions = this.reflector.get( + const jobOptions = this.reflector.get( JOB_OPTIONS, methodRef, ); @@ -55,8 +55,12 @@ export class HandlerScannerService { const cronOptions = this.reflector.get(CRON_OPTIONS, methodRef); if (jobName) { - const boundHandler = methodRef.bind(instance); - + const boundHandler: WorkWithMetadataHandler = async (job) => { + const jobData = { + ...job, + }; + await methodRef.call(instance, jobData); + }; try { if (cronExpression) { await this.pgBossService.registerCronJob( @@ -71,12 +75,12 @@ export class HandlerScannerService { await this.pgBossService.registerJob( jobName, boundHandler, - jobOptions, + jobOptions as PgBoss.BatchWorkOptions, ); this.logger.log(`Registered job: ${jobName}`); } } catch (error) { - this.logger.error(`Error registering job ${jobName}:`); + this.logger.error(`Error registering job ${jobName}:`, error); } } } diff --git a/lib/pgboss.service.ts b/lib/pgboss.service.ts index 7927491..ea2ba75 100644 --- a/lib/pgboss.service.ts +++ b/lib/pgboss.service.ts @@ -1,5 +1,5 @@ import { Injectable } from "@nestjs/common"; -import PgBoss, { BatchWorkOptions, WorkHandler } from "pg-boss"; +import PgBoss, { WorkWithMetadataHandler } from "pg-boss"; import { Inject } from "@nestjs/common"; import { PGBOSS_TOKEN } from "./utils/consts"; @@ -27,19 +27,27 @@ export class PgBossService { async registerCronJob( name: string, cron: string, - handler: WorkHandler, + handler: WorkWithMetadataHandler, data?: TData, options?: PgBoss.ScheduleOptions, ) { await this.boss.schedule(name, cron, data ?? {}, options ?? {}); - await this.boss.work(name, handler); + await this.boss.work( + name, + { ...options, includeMetadata: true }, + handler, + ); } async registerJob( name: string, - handler: WorkHandler, - options?: BatchWorkOptions, + handler: WorkWithMetadataHandler, + options?: PgBoss.BatchWorkOptions, ) { - await this.boss.work(name, options, handler); + await this.boss.work( + name, + { ...options, includeMetadata: true }, + handler, + ); } } diff --git a/test/pgboss.service.spec.ts b/test/pgboss.service.spec.ts index b1cf125..b4c38ae 100644 --- a/test/pgboss.service.spec.ts +++ b/test/pgboss.service.spec.ts @@ -1,3 +1,84 @@ +// import { Test, TestingModule } from "@nestjs/testing"; +// import PgBoss from "pg-boss"; +// import { PgBossService } from "../lib/pgboss.service"; +// import { PGBOSS_TOKEN } from "../lib/utils/consts"; + +// describe("PgBossService", () => { +// let service: PgBossService; +// let mockPgBoss: jest.Mocked; + +// beforeEach(async () => { +// mockPgBoss = { +// start: jest.fn(), +// work: jest.fn(), +// send: jest.fn(), +// schedule: jest.fn(), +// } as any; + +// const module: TestingModule = await Test.createTestingModule({ +// providers: [ +// PgBossService, +// { provide: PGBOSS_TOKEN, useValue: mockPgBoss }, +// ], +// }).compile(); + +// service = module.get(PgBossService); +// }); + +// it("should be defined", () => { +// expect(service).toBeDefined(); +// }); + +// describe("registerJob", () => { +// it("should call PgBoss work with correct parameters", async () => { +// const handler = jest.fn(); +// const options = { batchSize: 5 }; + +// await service.registerJob("test-job", handler, options); + +// expect(mockPgBoss.work).toHaveBeenCalledWith( +// "test-job", +// { batchSize: 5, includeMetadata: true }, +// handler, +// ); +// }); +// }); + +// describe("scheduleJob", () => { +// it("should call PgBoss send with correct parameters", async () => { +// const data = { test: "data" }; + +// await service.scheduleJob("test-job", data, {}); + +// expect(mockPgBoss.send).toHaveBeenCalledWith("test-job", data, {}); +// }); +// }); + +// describe("registerCronJob", () => { +// it("should call PgBoss schedule and work with correct parameters", async () => { +// const handler = jest.fn(); +// const cron = "* * * * *"; +// const data = { test: "data" }; +// const options = { tz: "UTC" }; + +// await service.registerCronJob( +// "test-cron-job", +// cron, +// handler, +// data, +// options, +// ); + +// expect(mockPgBoss.schedule).toHaveBeenCalledWith( +// "test-cron-job", +// cron, +// data, +// { tz: "UTC" }, +// ); +// expect(mockPgBoss.work).toHaveBeenCalledWith("test-cron-job", handler); +// }); +// }); +// }); import { Test, TestingModule } from "@nestjs/testing"; import PgBoss from "pg-boss"; import { PgBossService } from "../lib/pgboss.service"; @@ -38,7 +119,7 @@ describe("PgBossService", () => { expect(mockPgBoss.work).toHaveBeenCalledWith( "test-job", - { batchSize: 5 }, + { batchSize: 5, includeMetadata: true }, handler, ); }); @@ -75,7 +156,11 @@ describe("PgBossService", () => { data, { tz: "UTC" }, ); - expect(mockPgBoss.work).toHaveBeenCalledWith("test-cron-job", handler); + expect(mockPgBoss.work).toHaveBeenCalledWith( + "test-cron-job", + { includeMetadata: true, tz: "UTC" }, + handler, + ); }); }); });