Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(job-scheduler): consider removing current job from wait, paused or prioritized #3066

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ export class Scripts {
const keys: (string | number | Buffer)[] = [
queueKeys.repeat,
queueKeys.delayed,
queueKeys.wait,
queueKeys.paused,
queueKeys.meta,
queueKeys.prioritized,
];

const args = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
Input:
KEYS[1] 'repeat' key
KEYS[2] 'delayed' key
KEYS[3] 'wait' key
KEYS[4] 'paused' key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have pause key anymore, so why is this key needed?

KEYS[5] 'meta' key
KEYS[6] 'prioritized' key

ARGV[1] next milliseconds
ARGV[2] msgpacked options
Expand All @@ -22,13 +26,15 @@
]] local rcall = redis.call
local repeatKey = KEYS[1]
local delayedKey = KEYS[2]
local prioritizedKey = KEYS[6]

local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[3]
local templateOpts = cmsgpack.unpack(ARGV[5])
local prefixKey = ARGV[6]

-- Includes
--- @include "includes/isQueuePaused"
--- @include "includes/removeJob"
--- @include "includes/storeJobScheduler"

Expand All @@ -37,14 +43,28 @@ local prefixKey = ARGV[6]
local schedulerKey = repeatKey .. ":" .. jobSchedulerId
local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
if prevMillis ~= false then
local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
local currentJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false and
(rcall("EXISTS", nextDelayedJobKey) ~= 1 or delayedJobId == nextDelayedJobId) then
removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]] )
rcall("ZREM", delayedKey, delayedJobId)
if rcall("EXISTS", nextDelayedJobKey) ~= 1 or currentJobId == nextDelayedJobId then
if rcall("ZSCORE", delayedKey, currentJobId) ~= false then
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
rcall("ZREM", delayedKey, currentJobId)
elseif rcall("ZSCORE", prioritizedKey, currentJobId) ~= false then
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
rcall("ZREM", prioritizedKey, currentJobId)
else
if isQueuePaused(KEYS[5]) then
if rcall("LREM", KEYS[4], 1, currentJobId) > 0 then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be needed as there is no pause key.

removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
end
else
if rcall("LREM", KEYS[3], 1, currentJobId) > 0 then
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
end
end
end
end
end

Expand Down
96 changes: 94 additions & 2 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,99 @@ describe('Job Scheduler', function () {

await worker.close();
});

describe('when generated job is in waiting state', function () {
it('should upsert scheduler by removing waiting job', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

const jobSchedulerId = 'test';

await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '10 * * * * *',
});
const delayedJobs = await queue.getDelayed();
await delayedJobs[0].promote();

const waitingCount = await queue.getWaitingCount();
expect(waitingCount).to.be.eql(1);

await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '2 10 * * * *',
});

const waitingCountAfter = await queue.getWaitingCount();
expect(waitingCountAfter).to.be.eql(0);

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.eql(1);
});
});

describe('when generated job is in paused state', function () {
it('should upsert scheduler by removing paused job', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

const jobSchedulerId = 'test';

await queue.pause();
await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '10 * * * * *',
});
const delayedJobs = await queue.getDelayed();
await delayedJobs[0].promote();

const waitingCount = await queue.getWaitingCount();
expect(waitingCount).to.be.eql(1);

await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '2 10 * * * *',
});

const waitingCountAfter = await queue.getWaitingCount();
expect(waitingCountAfter).to.be.eql(0);

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.eql(1);
});
});

describe('when generated job is in prioritized state', function () {
it('should upsert scheduler by removing prioritized job', async function () {
const date = new Date('2017-02-07 9:24:00');
this.clock.setSystemTime(date);

const jobSchedulerId = 'test';

await queue.upsertJobScheduler(
jobSchedulerId,
{
pattern: '10 * * * * *',
},
{
opts: {
priority: 1,
},
},
);
const delayedJobs = await queue.getDelayed();
await delayedJobs[0].promote();

const prioritizedCount = await queue.getPrioritizedCount();
expect(prioritizedCount).to.be.eql(1);

await queue.upsertJobScheduler(jobSchedulerId, {
pattern: '2 10 * * * *',
});

const prioritizedCountAfter = await queue.getPrioritizedCount();
expect(prioritizedCountAfter).to.be.eql(0);

const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.eql(1);
});
});
});

describe('when clocks are slightly out of sync', function () {
Expand Down Expand Up @@ -2011,8 +2104,7 @@ describe('Job Scheduler', function () {
expect(jobs).to.have.length(1);

waitingJobs = await queue.getWaiting();
// TODO: need to fix the case when jobs are added and previous one is in waiting state
expect(waitingJobs).to.have.length(2);
expect(waitingJobs).to.have.length(1);
});
});

Expand Down