From 05b5f4b963cff4206a3c58a8aa89a1a06b885ef2 Mon Sep 17 00:00:00 2001 From: Kostiantyn Smyrnov Date: Mon, 19 Feb 2024 14:43:30 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20Added=20sheduled=20kind?= =?UTF-8?q?=20of=20jobs=20to=20the=20Queue?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- packages/queue/src/index.ts | 205 +++++++++++++++++++----------- packages/queue/test/queue.spec.ts | 72 +++++++++++ 2 files changed, 201 insertions(+), 76 deletions(-) diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index a277b8a3..82660319 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -11,6 +11,7 @@ const logger = createLogger('Queue'); */ export enum JobStatus { Pending, + Scheduled, Started, Done, Cancelled, @@ -101,6 +102,8 @@ export interface JobConfig { retries?: number; /** Retries delay */ retriesDelay?: number; + /** Scheduled time */ + scheduledTime?: number; /** The history of the job */ history?: JobHistoryInterface; } @@ -133,6 +136,8 @@ export class Job { retries: number; /** The period of time between retries */ retriesDelay: number; + /** Scheduled time */ + scheduledTime?: number; /** The history of the job */ history: JobHistory; @@ -152,6 +157,11 @@ export class Job { this.maxRetries = config.maxRetries ?? 0; this.retries = config.retries ?? 0; this.retriesDelay = config.retriesDelay ?? 0; + + if (this.isRecurrent && config.scheduledTime) { + throw new Error('Job cannot be recurrent and scheduled at the same time'); + } + this.scheduledTime = config.scheduledTime; this.history = new JobHistory(config.history ?? {}); } @@ -204,7 +214,7 @@ export class Job { get executable() { return ( !this.expired && - this.status === JobStatus.Pending && + [JobStatus.Pending, JobStatus.Scheduled].includes(this.status) && ((!this.isRecurrent && (this.maxRetries === 0 || (this.maxRetries > 0 && this.retries < this.maxRetries))) || @@ -401,7 +411,9 @@ export class Queue extends EventEmitter { // Only Pending jobs must be restored if ( jobConfig.history && - JobHistory.getStatus(jobConfig.history) === JobStatus.Pending + [JobStatus.Pending, JobStatus.Scheduled].includes( + JobHistory.getStatus(jobConfig.history), + ) ) { this.add(jobConfig); } @@ -512,6 +524,87 @@ export class Queue extends EventEmitter { void this.storageUpdate(job.id, job); } + /** + * Executes Job + * @param {Job} job Job to execute + * @returns {Promise} + */ + private async executeJob(job: Job): Promise { + try { + if (!job.executable) { + return; + } + + this.changeJobStatus(job, JobStatus.Started); + + const handler = this.handlers.getHandler(job.handlerName); + + const result = await job.execute(handler); + logger.trace(`Job #${job.id} execution result: ${String(result)}`); + + if (result && job.isRecurrent) { + // If the job is recurrent and the handler returned true, reschedule the job + if (!job.expired) { + logger.trace(`Job #${job.id} is done but new one is scheduled`); + this.changeJobStatus(job, JobStatus.Done); + setTimeout(() => { + this.add({ + handlerName: job.handlerName, + data: job.data, + expire: job.expire, + isRecurrent: job.isRecurrent, + recurrenceInterval: job.recurrenceInterval, + maxRecurrences: job.maxRecurrences, + maxRetries: job.maxRetries, + retries: job.retries + 1, + }); + }, job.recurrenceInterval); + } else { + logger.trace(`Job #${job.id} is expired`); + this.changeJobStatus(job, JobStatus.Expired); + } + } else { + logger.trace(`Job #${job.id} is done`); + this.changeJobStatus(job, JobStatus.Done); + } + } catch (error) { + logger.error(`Job #${job.id} is errored`, error); + job.history.errors.push(String(error)); + + if (job.maxRetries > 0 && job.retries < job.maxRetries) { + // If the job hasn't reached the maximum number of retries, retry it + job.retries++; + + if (job.retriesDelay > 0) { + logger.trace(`Job #${job.id} filed but scheduled for restart`); + this.changeJobStatus(job, JobStatus.Failed); + setTimeout( + () => { + this.add({ + handlerName: job.handlerName, + data: job.data, + expire: job.expire, + maxRetries: job.maxRetries, + retries: job.retries + 1, + }); + }, + backoffWithJitter( + job.retriesDelay, + job.retries, + job.retriesDelay * 10, + ), + ); + } else { + logger.trace(`Job #${job.id} failed and immediately restarted`); + this.changeJobStatus(job, JobStatus.Pending); + } + } else { + logger.trace(`Job #${job.id} filed`); + this.changeJobStatus(job, JobStatus.Failed); + } + } + } + /** * Starts processing jobs in the queue. * It finds executable jobs and runs them concurrently up to the concurrency limit. @@ -524,13 +617,37 @@ export class Queue extends EventEmitter { */ private async start() { try { + const now = Date.now(); + const activeJobs = this.jobs.filter( (job) => job.status === JobStatus.Started, ); - const pendingJobs = this.jobs.filter((job) => job.executable); logger.trace(`Active jobs: ${activeJobs.length}`); + + // Select all pending jobs except for scheduled + const pendingJobs = this.jobs.filter( + (job) => + job.executable && + (!job.scheduledTime || + (job.scheduledTime && job.scheduledTime <= now)), + ); logger.trace(`Pending jobs: ${pendingJobs.length}`); + // Select all scheduled jobs + const scheduledJobs = this.jobs.filter( + (job) => job.executable && job.scheduledTime, + ); + + if (scheduledJobs.length > 0) { + scheduledJobs.forEach((job) => { + if (job.scheduledTime && job.scheduledTime > now) { + const delay = job.scheduledTime - now; + this.changeJobStatus(job, JobStatus.Scheduled); + setTimeout(() => void this.executeJob(job), delay); + } + }); + } + const availableSlots = this.concurrencyLimit - activeJobs.length; logger.trace(`Available slots: ${availableSlots}`); @@ -546,79 +663,9 @@ export class Queue extends EventEmitter { ); // Start all the selected jobs concurrently - const promises = jobsToStart.map(async (job) => { - try { - this.changeJobStatus(job, JobStatus.Started); - - const handler = this.handlers.getHandler(job.handlerName); - - const result = await job.execute(handler); - logger.trace(`Job #${job.id} execution result: ${String(result)}`); - - if (result && job.isRecurrent) { - // If the job is recurrent and the handler returned true, reschedule the job - if (!job.expired) { - logger.trace(`Job #${job.id} is done but new one is scheduled`); - this.changeJobStatus(job, JobStatus.Done); - setTimeout(() => { - this.add({ - handlerName: job.handlerName, - data: job.data, - expire: job.expire, - isRecurrent: job.isRecurrent, - recurrenceInterval: job.recurrenceInterval, - maxRecurrences: job.maxRecurrences, - maxRetries: job.maxRetries, - retries: job.retries + 1, - }); - }, job.recurrenceInterval); - } else { - logger.trace(`Job #${job.id} is expired`); - this.changeJobStatus(job, JobStatus.Expired); - } - } else { - logger.trace(`Job #${job.id} is done`); - this.changeJobStatus(job, JobStatus.Done); - } - } catch (error) { - logger.error(`Job #${job.id} is errored`, error); - job.history.errors.push(String(error)); - - if (job.maxRetries > 0 && job.retries < job.maxRetries) { - // If the job hasn't reached the maximum number of retries, retry it - job.retries++; - - if (job.retriesDelay > 0) { - logger.trace(`Job #${job.id} filed but scheduled for restart`); - this.changeJobStatus(job, JobStatus.Failed); - setTimeout( - () => { - this.add({ - handlerName: job.handlerName, - data: job.data, - expire: job.expire, - maxRetries: job.maxRetries, - retries: job.retries + 1, - }); - }, - backoffWithJitter( - job.retriesDelay, - job.retries, - job.retriesDelay * 10, - ), - ); - } else { - logger.trace(`Job #${job.id} failed and immediately restarted`); - this.changeJobStatus(job, JobStatus.Pending); - } - } else { - logger.trace(`Job #${job.id} filed`); - this.changeJobStatus(job, JobStatus.Failed); - } - } - }); - - await Promise.allSettled(promises); + await Promise.allSettled( + jobsToStart.map(async (job) => this.executeJob(job)), + ); // After these jobs are done, check if there are any more jobs to process logger.trace('Trying to restart queue'); @@ -650,6 +697,12 @@ export class Queue extends EventEmitter { */ add(config: JobConfig): string { const job = new Job(config); + + // In case of restored Scheduled jobs we need to bring them to Pending again + if (job.status === JobStatus.Scheduled) { + job.status = JobStatus.Pending; + } + this.jobs.push(job); logger.trace('Job added:', job); void this.storageUpdate(job.id, job); diff --git a/packages/queue/test/queue.spec.ts b/packages/queue/test/queue.spec.ts index 95058298..4236e2bb 100644 --- a/packages/queue/test/queue.spec.ts +++ b/packages/queue/test/queue.spec.ts @@ -289,3 +289,75 @@ describe('Queue', function () { }); }); }); + +describe('Scheduled jobs', () => { + let queue: Queue; + let handler: JobHandler; + + beforeEach(() => { + queue = new Queue({ concurrencyLimit: 5 }); + handler = async () => Promise.resolve(true); + queue.registerHandler('scheduledHandler', handler); + }); + + it('should correctly schedule a job for future execution', async function () { + const futureTime = Date.now() + 100; + const jobId = queue.add({ + handlerName: 'scheduledHandler', + scheduledTime: futureTime, + }); + + expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Scheduled); + + await new Promise((resolve) => { + setTimeout(() => { + expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Done); + resolve(true); + }, 150); + }); + }); + + it('should immediately execute a job scheduled for the past', async function () { + const pastTime = Date.now() - 1000; + const jobId = queue.add({ + handlerName: 'scheduledHandler', + scheduledTime: pastTime, + }); + + await new Promise((resolve) => { + setTimeout(() => { + expect(queue.getLocal(jobId)?.status).to.equal(JobStatus.Done); + resolve(true); + }, 50); + }); + }); + + it('should handle the execution of multiple scheduled jobs', async function () { + const futureTimeShort = Date.now() + 50; + const futureTimeLong = Date.now() + 200; + + const jobIdShort = queue.add({ + handlerName: 'scheduledHandler', + scheduledTime: futureTimeShort, + }); + const jobIdLong = queue.add({ + handlerName: 'scheduledHandler', + scheduledTime: futureTimeLong, + }); + + await new Promise((resolve) => { + setTimeout(() => { + expect(queue.getLocal(jobIdShort)?.status).to.equal(JobStatus.Done); + expect(queue.getLocal(jobIdLong)?.status).to.equal(JobStatus.Scheduled); + resolve(true); + }, 100); + }); + + await new Promise((resolve) => { + setTimeout(() => { + expect(queue.getLocal(jobIdLong)?.status).to.equal(JobStatus.Done); + resolve(true); + }, 250); + }); + }); +});