Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: the resumeOnRestart function to handle correctl the recurring jobs + do not run finished non-recurrent jobs on restart + add unit tests #62

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/job/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class Job<T extends JobAttributesData = JobAttributesData> {
attrs: JobAttributes<T>;

constructor(options: Modify<JobAttributes<T>, { _id?: mongodb.ObjectId }>) {
const { pulse, type, nextRunAt, ...args } = options ?? {};
const { pulse, type, nextRunAt, repeatAt, repeatInterval, lastFinishedAt, ...args } = options ?? {};

// Save Pulse instance
this.pulse = pulse;
Expand All @@ -213,7 +213,10 @@ class Job<T extends JobAttributesData = JobAttributesData> {
name: attrs.name || '',
priority: attrs.priority,
type: type || 'once',
nextRunAt: nextRunAt || new Date(),
// if a job that's non-recurring has a lastFinishedAt (finished the job), do not default nextRunAt to now
// only if it will be defaulted either by explicitly setting it or by computing it computeNextRunAt
nextRunAt:
repeatAt || repeatInterval ? nextRunAt || new Date() : !lastFinishedAt ? nextRunAt || new Date() : nextRunAt,
};
}

Expand Down
55 changes: 52 additions & 3 deletions src/pulse/resume-on-restart.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import createDebugger from 'debug';
import { Pulse } from '.';
import { Job } from '../job';

const debug = createDebugger('pulse:resumeOnRestart');

Expand All @@ -18,18 +19,23 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res

if (this._collection && this._resumeOnRestart) {
const now = new Date();

// Non-recurring jobs
this._collection
.updateMany(
{
$or: [
{
lockedAt: { $exists: true },
nextRunAt: { $ne: null },
$or: [{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, { lastFinishedAt: { $exists: false } }],
$or: [
{ $expr: { $eq: ['$runCount', '$finishedCount'] } },
{ $or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }] },
],
},
{
lockedAt: { $exists: false },
lastFinishedAt: { $exists: false },
$or: [{ lastFinishedAt: { $exists: false } }, { lastFinishedAt: null }],
nextRunAt: { $lte: now, $ne: null },
},
],
Expand All @@ -41,7 +47,50 @@ export const resumeOnRestart: ResumeOnRestartMethod = function (this: Pulse, res
)
.then((result) => {
if (result.modifiedCount > 0) {
debug('resuming unfinished %d jobs(%s)', result.modifiedCount, now.toISOString());
debug('Resumed %d unfinished standard jobs (%s)', result.modifiedCount, now.toISOString());
}
});

// Recurring jobs
this._collection
.find({
$and: [
{ $or: [{ repeatInterval: { $exists: true } }, { repeatAt: { $exists: true } }] },
{ $or: [{ nextRunAt: { $lte: now } }, { nextRunAt: { $exists: false } }, { nextRunAt: null }] },
],
})
.toArray()
.then((jobs) => {
const updates = jobs.map((jobData) => {
const job = new Job({
pulse: this,
name: jobData.name || '',
data: jobData.data || {},
type: jobData.type || 'normal',
priority: jobData.priority || 'normal',
shouldSaveResult: jobData.shouldSaveResult || false,
attempts: jobData.attempts || 0,
backoff: jobData.backoff,
...jobData,
});

job.computeNextRunAt();

return this._collection.updateOne(
{ _id: job.attrs._id },
{
$set: { nextRunAt: job.attrs.nextRunAt },
$unset: { lockedAt: undefined, lastModifiedBy: undefined, lastRunAt: undefined },
}
);
});

return Promise.all(updates);
})
.then((results) => {
const modifiedCount = results.filter((res) => res.modifiedCount > 0).length;
if (modifiedCount > 0) {
debug('Resumed %d recurring jobs (%s)', modifiedCount, now.toISOString());
}
});
}
Expand Down
126 changes: 126 additions & 0 deletions test/unit/pulse.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,132 @@ describe('Test Pulse', () => {
test('returns itself', () => {
expect(globalPulseInstance.resumeOnRestart(false)).toEqual(globalPulseInstance);
});

test('should not reschedule successfully finished non-recurring jobs', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.lastFinishedAt = new Date();
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).toBeNull();
});

test('should resume non-recurring jobs on restart', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.nextRunAt = new Date(Date.now() - 1000);
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(Date.now() - 100);
});

test('should resume recurring jobs on restart - interval', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.repeatInterval = '5 minutes';
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should resume recurring jobs on restart - cron', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.repeatInterval = '*/5 * * * *';
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should resume recurring jobs on restart - repeatAt', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.repeatAt = '1:00 am';
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should not modify jobs with existing nextRunAt', async () => {
const futureDate = new Date(Date.now() + 60 * 60 * 1000);
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.nextRunAt = futureDate;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt?.getTime()).toEqual(futureDate.getTime());
});

test('should handle jobs that started but have not finished (non-recurring)', async () => {
const job = globalPulseInstance.create('processData', { data: 'sample' });
job.attrs.nextRunAt = null;
job.attrs.lockedAt = new Date();
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];

const now = Date.now();
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
expect(updatedJob.attrs.nextRunAt?.getTime()).toBeGreaterThan(now - 100);
});

test('should handle recurring jobs that started but have not finished', async () => {
const job = globalPulseInstance.create('processData', { data: 'sample' });
job.attrs.repeatInterval = '10 minutes';
job.attrs.lockedAt = new Date();
job.attrs.nextRunAt = new Date(Date.now() + 10000);
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
expect(updatedJob.attrs.lockedAt).not.toBeNull();
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
});

test('should handle interrupted recurring jobs after server recovery', async () => {
const job = globalPulseInstance.create('processData', { data: 'sample' });
job.attrs.repeatInterval = '5 minutes';
job.attrs.lastModifiedBy = 'server_crash';
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'processData' }))[0];
expect(updatedJob.attrs.nextRunAt).not.toBeNull();
expect(updatedJob.attrs.lastModifiedBy).not.toEqual('server_crash');
});

test('should not modify non-recurring jobs with lastFinishedAt in the past', async () => {
const job = globalPulseInstance.create('sendEmail', { to: '[email protected]' });
job.attrs.lastFinishedAt = new Date(Date.now() - 10000);
job.attrs.nextRunAt = null;
await job.save();

await globalPulseInstance.resumeOnRestart();

const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0];
expect(updatedJob.attrs.nextRunAt).toBeNull();
});
});
});

Expand Down
Loading