Skip to content

Commit

Permalink
fix(job-scheduler): consider removing current job from wait, paused o…
Browse files Browse the repository at this point in the history
…r prioritized (#3066)
  • Loading branch information
roggervalf authored Feb 21, 2025
1 parent 1522931 commit 97cd2b1
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 7 deletions.
4 changes: 4 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ export class Scripts {
const keys: (string | number | Buffer)[] = [
queueKeys.repeat,
queueKeys.delayed,
queueKeys.wait,
queueKeys.paused,
queueKeys.meta,
queueKeys.prioritized,
];

const args = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
Input:
KEYS[1] 'repeat' key
KEYS[2] 'delayed' key
KEYS[3] 'wait' key
KEYS[4] 'paused' key
KEYS[5] 'meta' key
KEYS[6] 'prioritized' key
ARGV[1] next milliseconds
ARGV[2] msgpacked options
Expand All @@ -22,13 +26,15 @@
]] local rcall = redis.call
local repeatKey = KEYS[1]
local delayedKey = KEYS[2]
local prioritizedKey = KEYS[6]

local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[3]
local templateOpts = cmsgpack.unpack(ARGV[5])
local prefixKey = ARGV[6]

-- Includes
--- @include "includes/isQueuePaused"
--- @include "includes/removeJob"
--- @include "includes/storeJobScheduler"

Expand All @@ -37,14 +43,28 @@ local prefixKey = ARGV[6]
local schedulerKey = repeatKey .. ":" .. jobSchedulerId
local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
if prevMillis ~= false then
local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
local currentJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false and
(rcall("EXISTS", nextDelayedJobKey) ~= 1 or delayedJobId == nextDelayedJobId) then
removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]] )
rcall("ZREM", delayedKey, delayedJobId)
if rcall("EXISTS", nextDelayedJobKey) ~= 1 or currentJobId == nextDelayedJobId then
if rcall("ZSCORE", delayedKey, currentJobId) ~= false then
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
rcall("ZREM", delayedKey, currentJobId)
elseif rcall("ZSCORE", prioritizedKey, currentJobId) ~= false then
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
rcall("ZREM", prioritizedKey, currentJobId)
else
if isQueuePaused(KEYS[5]) then
if rcall("LREM", KEYS[4], 1, currentJobId) > 0 then
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
end
else
if rcall("LREM", KEYS[3], 1, currentJobId) > 0 then
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
end
end
end
end
end

Expand Down
96 changes: 94 additions & 2 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,99 @@ describe('Job Scheduler', function () {

await worker.close();
});

describe('when generated job is in waiting state', function () {
it('should upsert scheduler by removing waiting job', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

const jobSchedulerId = 'test';

await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '10 * * * * *',
});
const delayedJobs = await queue.getDelayed();
await delayedJobs[0].promote();

const waitingCount = await queue.getWaitingCount();
expect(waitingCount).to.be.eql(1);

await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '2 10 * * * *',
});

const waitingCountAfter = await queue.getWaitingCount();
expect(waitingCountAfter).to.be.eql(0);

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.eql(1);
});
});

describe('when generated job is in paused state', function () {
it('should upsert scheduler by removing paused job', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

const jobSchedulerId = 'test';

await queue.pause();
await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '10 * * * * *',
});
const delayedJobs = await queue.getDelayed();
await delayedJobs[0].promote();

const waitingCount = await queue.getWaitingCount();
expect(waitingCount).to.be.eql(1);

await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '2 10 * * * *',
});

const waitingCountAfter = await queue.getWaitingCount();
expect(waitingCountAfter).to.be.eql(0);

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.eql(1);
});
});

describe('when generated job is in prioritized state', function () {
it('should upsert scheduler by removing prioritized job', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

const jobSchedulerId = 'test';

await queue.upsertJobScheduler(
jobSchedulerId,
{
pattern: '10 * * * * *',
},
{
opts: {
priority: 1,
},
},
);
const delayedJobs = await queue.getDelayed();
await delayedJobs[0].promote();

const prioritizedCount = await queue.getPrioritizedCount();
expect(prioritizedCount).to.be.eql(1);

await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '2 10 * * * *',
});

const prioritizedCountAfter = await queue.getPrioritizedCount();
expect(prioritizedCountAfter).to.be.eql(0);

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.eql(1);
});
});
});

describe('when clocks are slightly out of sync', function () {
Expand Down Expand Up @@ -2011,8 +2104,7 @@ describe('Job Scheduler', function () {
expect(jobs).to.have.length(1);

waitingJobs = await queue.getWaiting();
// TODO: need to fix the case when jobs are added and previous one is in waiting state
expect(waitingJobs).to.have.length(2);
expect(waitingJobs).to.have.length(1);
});
});

Expand Down

0 comments on commit 97cd2b1

Please sign in to comment.