From a66fdcb8d21a3cf1780f23cadd40883dde394eab Mon Sep 17 00:00:00 2001 From: Het Delwadiya Date: Wed, 22 Jan 2025 17:56:06 +0530 Subject: [PATCH 1/2] Remove logic of skipping recurring jobs and let them run based on nextRunAt date if resumeOnRestart is disabled --- src/job/run.ts | 13 ------------- test/unit/pulse.spec.ts | 22 ++++++++++++++++++++++ 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/job/run.ts b/src/job/run.ts index fc7e8f8..b78d290 100644 --- a/src/job/run.ts +++ b/src/job/run.ts @@ -85,19 +85,6 @@ export const run: RunMethod = async function (this: Job) { throw new JobError('Undefined job'); } - // on restart, skip the job if it's not time to run - if ( - !this.pulse._resumeOnRestart && - previousRunAt && - this.pulse._readyAt >= previousRunAt && - this.attrs.nextRunAt - ) { - debug('[%s:%s] job resumeOnRestart skipped', this.attrs.name, this.attrs._id); - resumeOnRestartSkipped = true; - await jobCallback(undefined, 'skipped'); - return; - } - this.attrs.runCount = (this.attrs.runCount || 0) + 1; if (definition.fn.length === 2) { diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index 8d1de9b..aa09870 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -367,6 +367,28 @@ describe('Test Pulse', () => { // const updatedJob = (await globalPulseInstance.jobs({ name: 'sendEmail' }))[0]; // expect(updatedJob.attrs.nextRunAt).toBeNull(); // }); + + test('should not skip queued recurring jobs while starting a new pulse instance in case of resumeOnRestart is disabled', async () => { + globalPulseInstance.stop(); + + // Create a recurring job that is in queue and wasn't locked by any pulse instance + const job = globalPulseInstance.create('processData', { data: 'sample' }); + job.attrs.repeatInterval = '10 minutes'; + job.attrs.lockedAt = null; + job.attrs.nextRunAt = new Date(Date.now() - 10000); + await job.save(); + + // Starting a new pulse instance + const newPulseInstance = new Pulse({ mongo: mongoDb, resumeOnRestart: false }); + newPulseInstance.define('processData', jobProcessor); + await newPulseInstance.start(); + + await delay(1000); + const updatedJob = (await newPulseInstance.jobs({ name: 'processData' }))[0]; + expect(updatedJob.attrs.lastFinishedAt).toBeDefined(); + + await newPulseInstance.stop(); + }); }); }); From 07d9918c323ee0378283e7b255ac73227c25d991 Mon Sep 17 00:00:00 2001 From: Het Delwadiya Date: Wed, 22 Jan 2025 18:07:31 +0530 Subject: [PATCH 2/2] added missing await in test --- test/unit/pulse.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/unit/pulse.spec.ts b/test/unit/pulse.spec.ts index aa09870..32f3b5c 100644 --- a/test/unit/pulse.spec.ts +++ b/test/unit/pulse.spec.ts @@ -369,7 +369,7 @@ describe('Test Pulse', () => { // }); test('should not skip queued recurring jobs while starting a new pulse instance in case of resumeOnRestart is disabled', async () => { - globalPulseInstance.stop(); + await globalPulseInstance.stop(); // Create a recurring job that is in queue and wasn't locked by any pulse instance const job = globalPulseInstance.create('processData', { data: 'sample' });