Skip to content

Commit

Permalink
Merge pull request #9 from wavezync/fix/job-data
Browse files Browse the repository at this point in the history
  • Loading branch information
kasvith authored Aug 30, 2024
2 parents fcadeb9 + 1526515 commit db25a3e
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 16 deletions.
20 changes: 12 additions & 8 deletions lib/handler-scanner.service.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Injectable, Logger } from "@nestjs/common";
import { Reflector, ModulesContainer } from "@nestjs/core";
import { PgBossService } from "./pgboss.service";
import {
Expand All @@ -7,9 +8,8 @@ import {
CRON_OPTIONS,
} from "./decorators/job.decorator";
import { InstanceWrapper } from "@nestjs/core/injector/instance-wrapper";
import { BatchWorkOptions } from "pg-boss";
import PgBoss, { WorkWithMetadataHandler } from "pg-boss";
import { LOGGER } from "./utils/consts";
import { Injectable, Logger } from "@nestjs/common";

@Injectable()
export class HandlerScannerService {
Expand All @@ -26,7 +26,7 @@ export class HandlerScannerService {
const providers = [...module.providers.values()];

for (const provider of providers) {
this.scanProvider(provider);
await this.scanProvider(provider);
}
}
}
Expand All @@ -44,7 +44,7 @@ export class HandlerScannerService {
for (const methodName of methodNames) {
const methodRef = instance[methodName];
const jobName = this.reflector.get<string>(JOB_NAME, methodRef);
const jobOptions = this.reflector.get<BatchWorkOptions>(
const jobOptions = this.reflector.get<PgBoss.WorkOptions>(
JOB_OPTIONS,
methodRef,
);
Expand All @@ -55,8 +55,12 @@ export class HandlerScannerService {
const cronOptions = this.reflector.get<any>(CRON_OPTIONS, methodRef);

if (jobName) {
const boundHandler = methodRef.bind(instance);

const boundHandler: WorkWithMetadataHandler<any> = async (job) => {
const jobData = {
...job,
};
await methodRef.call(instance, jobData);
};
try {
if (cronExpression) {
await this.pgBossService.registerCronJob(
Expand All @@ -71,12 +75,12 @@ export class HandlerScannerService {
await this.pgBossService.registerJob(
jobName,
boundHandler,
jobOptions,
jobOptions as PgBoss.BatchWorkOptions,
);
this.logger.log(`Registered job: ${jobName}`);
}
} catch (error) {
this.logger.error(`Error registering job ${jobName}:`);
this.logger.error(`Error registering job ${jobName}:`, error);
}
}
}
Expand Down
20 changes: 14 additions & 6 deletions lib/pgboss.service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Injectable } from "@nestjs/common";
import PgBoss, { BatchWorkOptions, WorkHandler } from "pg-boss";
import PgBoss, { WorkWithMetadataHandler } from "pg-boss";
import { Inject } from "@nestjs/common";
import { PGBOSS_TOKEN } from "./utils/consts";

Expand Down Expand Up @@ -27,19 +27,27 @@ export class PgBossService {
async registerCronJob<TData extends object>(
name: string,
cron: string,
handler: WorkHandler<TData>,
handler: WorkWithMetadataHandler<TData>,
data?: TData,
options?: PgBoss.ScheduleOptions,
) {
await this.boss.schedule(name, cron, data ?? {}, options ?? {});
await this.boss.work(name, handler);
await this.boss.work<TData>(
name,
{ ...options, includeMetadata: true },
handler,
);
}

async registerJob<TData extends object>(
name: string,
handler: WorkHandler<TData>,
options?: BatchWorkOptions,
handler: WorkWithMetadataHandler<TData>,
options?: PgBoss.BatchWorkOptions,
) {
await this.boss.work(name, options, handler);
await this.boss.work<TData>(
name,
{ ...options, includeMetadata: true },
handler,
);
}
}
89 changes: 87 additions & 2 deletions test/pgboss.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,84 @@
// import { Test, TestingModule } from "@nestjs/testing";
// import PgBoss from "pg-boss";
// import { PgBossService } from "../lib/pgboss.service";
// import { PGBOSS_TOKEN } from "../lib/utils/consts";

// describe("PgBossService", () => {
// let service: PgBossService;
// let mockPgBoss: jest.Mocked<PgBoss>;

// beforeEach(async () => {
// mockPgBoss = {
// start: jest.fn(),
// work: jest.fn(),
// send: jest.fn(),
// schedule: jest.fn(),
// } as any;

// const module: TestingModule = await Test.createTestingModule({
// providers: [
// PgBossService,
// { provide: PGBOSS_TOKEN, useValue: mockPgBoss },
// ],
// }).compile();

// service = module.get<PgBossService>(PgBossService);
// });

// it("should be defined", () => {
// expect(service).toBeDefined();
// });

// describe("registerJob", () => {
// it("should call PgBoss work with correct parameters", async () => {
// const handler = jest.fn();
// const options = { batchSize: 5 };

// await service.registerJob("test-job", handler, options);

// expect(mockPgBoss.work).toHaveBeenCalledWith(
// "test-job",
// { batchSize: 5, includeMetadata: true },
// handler,
// );
// });
// });

// describe("scheduleJob", () => {
// it("should call PgBoss send with correct parameters", async () => {
// const data = { test: "data" };

// await service.scheduleJob("test-job", data, {});

// expect(mockPgBoss.send).toHaveBeenCalledWith("test-job", data, {});
// });
// });

// describe("registerCronJob", () => {
// it("should call PgBoss schedule and work with correct parameters", async () => {
// const handler = jest.fn();
// const cron = "* * * * *";
// const data = { test: "data" };
// const options = { tz: "UTC" };

// await service.registerCronJob(
// "test-cron-job",
// cron,
// handler,
// data,
// options,
// );

// expect(mockPgBoss.schedule).toHaveBeenCalledWith(
// "test-cron-job",
// cron,
// data,
// { tz: "UTC" },
// );
// expect(mockPgBoss.work).toHaveBeenCalledWith("test-cron-job", handler);
// });
// });
// });
import { Test, TestingModule } from "@nestjs/testing";
import PgBoss from "pg-boss";
import { PgBossService } from "../lib/pgboss.service";
Expand Down Expand Up @@ -38,7 +119,7 @@ describe("PgBossService", () => {

expect(mockPgBoss.work).toHaveBeenCalledWith(
"test-job",
{ batchSize: 5 },
{ batchSize: 5, includeMetadata: true },
handler,
);
});
Expand Down Expand Up @@ -75,7 +156,11 @@ describe("PgBossService", () => {
data,
{ tz: "UTC" },
);
expect(mockPgBoss.work).toHaveBeenCalledWith("test-cron-job", handler);
expect(mockPgBoss.work).toHaveBeenCalledWith(
"test-cron-job",
{ includeMetadata: true, tz: "UTC" },
handler,
);
});
});
});

0 comments on commit db25a3e

Please sign in to comment.