Skip to content

Commit

Permalink
fix: ensure queue exists added
Browse files Browse the repository at this point in the history
  • Loading branch information
samaratungajs committed Sep 10, 2024
1 parent a115473 commit cfb929d
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
18 changes: 13 additions & 5 deletions lib/handler-scanner.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ export class HandlerScannerService {
CRON_EXPRESSION,
methodRef,
);
const cronOptions = this.reflector.get<any>(CRON_OPTIONS, methodRef);
const cronOptions = this.reflector.get<PgBoss.ScheduleOptions>(
CRON_OPTIONS,
methodRef,
);

if (jobName) {
const boundHandler: WorkWithMetadataHandler<any> = async (job) => {
const jobData = {
...job,
};
await methodRef.call(instance, jobData);
const extractedJob = this.normalizeJob(job);
await methodRef.call(instance, extractedJob);
};
try {
if (cronExpression) {
Expand All @@ -85,4 +86,11 @@ export class HandlerScannerService {
}
}
}

private normalizeJob(job: any) {
if (typeof job === "object" && "0" in job) {
return job[0];
}
return job;
}
}
11 changes: 11 additions & 0 deletions lib/pgboss.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class PgBossService {
data: TData,
options?: PgBoss.SendOptions,
) {
await this.ensureQueueExists(name);
await this.pgBoss.send(name, data, options);
}

Expand All @@ -29,6 +30,7 @@ export class PgBossService {
data?: TData,
options?: PgBoss.ScheduleOptions,
) {
await this.ensureQueueExists(name);
await this.pgBoss.schedule(name, cron, data ?? {}, options ?? {});
}

Expand All @@ -39,6 +41,7 @@ export class PgBossService {
data?: TData,
options?: PgBoss.ScheduleOptions,
) {
await this.ensureQueueExists(name);
await this.pgBoss.schedule(name, cron, data ?? {}, options ?? {});
await this.pgBoss.work<TData>(
name,
Expand All @@ -52,6 +55,7 @@ export class PgBossService {
handler: WorkWithMetadataHandler<TData>,
options?: PgBoss.WorkOptions,
) {
await this.ensureQueueExists(name);
await this.pgBoss.work<TData>(
name,
{ ...options, includeMetadata: true },
Expand All @@ -71,4 +75,11 @@ export class PgBossService {

return transformedOptions;
}

async ensureQueueExists(queueName: string) {
const currentQueue = await this.pgBoss.getQueue(queueName);
if (!currentQueue) {
await this.pgBoss.createQueue(queueName);
}
}
}
2 changes: 2 additions & 0 deletions test/pgboss.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ describe("PgBossService", () => {
work: jest.fn(),
send: jest.fn(),
schedule: jest.fn(),
createQueue: jest.fn(),
getQueue: jest.fn(),
} as any;

const module: TestingModule = await Test.createTestingModule({
Expand Down

0 comments on commit cfb929d

Please sign in to comment.