From 97cd2b147d541e0984d1c2e107110e1a9d56d9b5 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Fri, 21 Feb 2025 07:55:09 -0600 Subject: [PATCH] fix(job-scheduler): consider removing current job from wait, paused or prioritized (#3066) --- src/classes/scripts.ts | 4 + ...bScheduler-2.lua => addJobScheduler-6.lua} | 30 +++++- tests/test_job_scheduler.ts | 96 ++++++++++++++++++- 3 files changed, 123 insertions(+), 7 deletions(-) rename src/commands/{addJobScheduler-2.lua => addJobScheduler-6.lua} (53%) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 3db1a64c8c..270f522247 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -326,6 +326,10 @@ export class Scripts { const keys: (string | number | Buffer)[] = [ queueKeys.repeat, queueKeys.delayed, + queueKeys.wait, + queueKeys.paused, + queueKeys.meta, + queueKeys.prioritized, ]; const args = [ diff --git a/src/commands/addJobScheduler-2.lua b/src/commands/addJobScheduler-6.lua similarity index 53% rename from src/commands/addJobScheduler-2.lua rename to src/commands/addJobScheduler-6.lua index 2b6c92e914..9e235a0f10 100644 --- a/src/commands/addJobScheduler-2.lua +++ b/src/commands/addJobScheduler-6.lua @@ -4,6 +4,10 @@ Input: KEYS[1] 'repeat' key KEYS[2] 'delayed' key + KEYS[3] 'wait' key + KEYS[4] 'paused' key + KEYS[5] 'meta' key + KEYS[6] 'prioritized' key ARGV[1] next milliseconds ARGV[2] msgpacked options @@ -22,6 +26,7 @@ ]] local rcall = redis.call local repeatKey = KEYS[1] local delayedKey = KEYS[2] +local prioritizedKey = KEYS[6] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[3] @@ -29,6 +34,7 @@ local templateOpts = cmsgpack.unpack(ARGV[5]) local prefixKey = ARGV[6] -- Includes +--- @include "includes/isQueuePaused" --- @include "includes/removeJob" --- @include "includes/storeJobScheduler" @@ -37,14 +43,28 @@ local prefixKey = ARGV[6] local schedulerKey = repeatKey .. ":" .. jobSchedulerId local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) if prevMillis ~= false then - local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis + local currentJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis - if rcall("ZSCORE", delayedKey, delayedJobId) ~= false and - (rcall("EXISTS", nextDelayedJobKey) ~= 1 or delayedJobId == nextDelayedJobId) then - removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]] ) - rcall("ZREM", delayedKey, delayedJobId) + if rcall("EXISTS", nextDelayedJobKey) ~= 1 or currentJobId == nextDelayedJobId then + if rcall("ZSCORE", delayedKey, currentJobId) ~= false then + removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] ) + rcall("ZREM", delayedKey, currentJobId) + elseif rcall("ZSCORE", prioritizedKey, currentJobId) ~= false then + removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] ) + rcall("ZREM", prioritizedKey, currentJobId) + else + if isQueuePaused(KEYS[5]) then + if rcall("LREM", KEYS[4], 1, currentJobId) > 0 then + removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] ) + end + else + if rcall("LREM", KEYS[3], 1, currentJobId) > 0 then + removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] ) + end + end + end end end diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index e83efcc64d..9402f66d2b 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -297,6 +297,99 @@ describe('Job Scheduler', function () { await worker.close(); }); + + describe('when generated job is in waiting state', function () { + it('should upsert scheduler by removing waiting job', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + const jobSchedulerId = 'test'; + + await queue.upsertJobScheduler(jobSchedulerId, { + pattern: '10 * * * * *', + }); + const delayedJobs = await queue.getDelayed(); + await delayedJobs[0].promote(); + + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.be.eql(1); + + await queue.upsertJobScheduler(jobSchedulerId, { + pattern: '2 10 * * * *', + }); + + const waitingCountAfter = await queue.getWaitingCount(); + expect(waitingCountAfter).to.be.eql(0); + + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.eql(1); + }); + }); + + describe('when generated job is in paused state', function () { + it('should upsert scheduler by removing paused job', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + const jobSchedulerId = 'test'; + + await queue.pause(); + await queue.upsertJobScheduler(jobSchedulerId, { + pattern: '10 * * * * *', + }); + const delayedJobs = await queue.getDelayed(); + await delayedJobs[0].promote(); + + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.be.eql(1); + + await queue.upsertJobScheduler(jobSchedulerId, { + pattern: '2 10 * * * *', + }); + + const waitingCountAfter = await queue.getWaitingCount(); + expect(waitingCountAfter).to.be.eql(0); + + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.eql(1); + }); + }); + + describe('when generated job is in prioritized state', function () { + it('should upsert scheduler by removing prioritized job', async function () { + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + + const jobSchedulerId = 'test'; + + await queue.upsertJobScheduler( + jobSchedulerId, + { + pattern: '10 * * * * *', + }, + { + opts: { + priority: 1, + }, + }, + ); + const delayedJobs = await queue.getDelayed(); + await delayedJobs[0].promote(); + + const prioritizedCount = await queue.getPrioritizedCount(); + expect(prioritizedCount).to.be.eql(1); + + await queue.upsertJobScheduler(jobSchedulerId, { + pattern: '2 10 * * * *', + }); + + const prioritizedCountAfter = await queue.getPrioritizedCount(); + expect(prioritizedCountAfter).to.be.eql(0); + + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.be.eql(1); + }); + }); }); describe('when clocks are slightly out of sync', function () { @@ -2011,8 +2104,7 @@ describe('Job Scheduler', function () { expect(jobs).to.have.length(1); waitingJobs = await queue.getWaiting(); - // TODO: need to fix the case when jobs are added and previous one is in waiting state - expect(waitingJobs).to.have.length(2); + expect(waitingJobs).to.have.length(1); }); });