Skip to content

Commit 4b619ca

Browse files
authored
fix(scheduler): remove multi when updating a job scheduler (#3108)
1 parent 94008ac commit 4b619ca

File tree

5 files changed

+143
-108
lines changed

5 files changed

+143
-108
lines changed

src/classes/job-scheduler.ts

Lines changed: 31 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -137,20 +137,19 @@ export class JobScheduler extends QueueBase {
137137
}
138138
}
139139

140-
const multi = (await this.client).multi();
140+
const mergedOpts = this.getNextJobOpts(
141+
nextMillis,
142+
jobSchedulerId,
143+
{
144+
...opts,
145+
repeat: filteredRepeatOpts,
146+
telemetry,
147+
},
148+
iterationCount,
149+
newOffset,
150+
);
141151

142152
if (override) {
143-
const mergedOpts = this.getNextJobOpts(
144-
nextMillis,
145-
jobSchedulerId,
146-
{
147-
...opts,
148-
repeat: filteredRepeatOpts,
149-
telemetry,
150-
},
151-
iterationCount,
152-
newOffset,
153-
);
154153
const jobId = await this.scripts.addJobScheduler(
155154
jobSchedulerId,
156155
nextMillis,
@@ -185,49 +184,33 @@ export class JobScheduler extends QueueBase {
185184

186185
return job;
187186
} else {
188-
this.scripts.updateJobSchedulerNextMillis(
189-
(<unknown>multi) as RedisClient,
187+
const jobId = await this.scripts.updateJobSchedulerNextMillis(
190188
jobSchedulerId,
191189
nextMillis,
190+
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
191+
Job.optsAsJSON(mergedOpts),
192+
producerId,
192193
);
193-
}
194-
195-
const job = this.createNextJob<T, R, N>(
196-
(<unknown>multi) as RedisClient,
197-
jobName,
198-
nextMillis,
199-
newOffset,
200-
jobSchedulerId,
201-
{
202-
...opts,
203-
repeat: { ...filteredRepeatOpts, offset: newOffset },
204-
telemetry,
205-
},
206-
jobData,
207-
iterationCount,
208-
producerId,
209-
);
210-
211-
const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]
212194

213-
// Check if there are any errors
214-
const erroredResult = results.find(result => result[0]);
215-
if (erroredResult) {
216-
throw new Error(
217-
`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`,
218-
);
219-
}
195+
if (jobId) {
196+
const job = new this.Job<T, R, N>(
197+
this,
198+
jobName,
199+
jobData,
200+
mergedOpts,
201+
jobId,
202+
);
220203

221-
// Get last result with the job id
222-
const lastResult = results.pop();
223-
job.id = lastResult[1] as string;
204+
job.id = jobId;
224205

225-
span?.setAttributes({
226-
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
227-
[TelemetryAttributes.JobId]: job.id,
228-
});
206+
span?.setAttributes({
207+
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
208+
[TelemetryAttributes.JobId]: job.id,
209+
});
229210

230-
return job;
211+
return job;
212+
}
213+
}
231214
},
232215
);
233216
}

src/classes/scripts.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -371,11 +371,42 @@ export class Scripts {
371371
}
372372

373373
async updateJobSchedulerNextMillis(
374-
client: RedisClient,
375374
jobSchedulerId: string,
376375
nextMillis: number,
377-
): Promise<number> {
378-
return client.zadd(this.queue.keys.repeat, nextMillis, jobSchedulerId);
376+
templateData: string,
377+
delayedJobOpts: JobsOptions,
378+
// The job id of the job that produced this next iteration
379+
producerId?: string,
380+
): Promise<string | null> {
381+
const client = await this.queue.client;
382+
383+
const queueKeys = this.queue.keys;
384+
385+
const keys: (string | number | Buffer)[] = [
386+
queueKeys.repeat,
387+
queueKeys.delayed,
388+
queueKeys.wait,
389+
queueKeys.paused,
390+
queueKeys.meta,
391+
queueKeys.prioritized,
392+
queueKeys.marker,
393+
queueKeys.id,
394+
queueKeys.events,
395+
queueKeys.pc,
396+
producerId ? this.queue.toKey(producerId) : '',
397+
];
398+
399+
const args = [
400+
nextMillis,
401+
jobSchedulerId,
402+
templateData,
403+
pack(delayedJobOpts),
404+
Date.now(),
405+
queueKeys[''],
406+
producerId,
407+
];
408+
409+
return this.execCommand(client, 'updateJobScheduler', keys.concat(args));
379410
}
380411

381412
private removeRepeatableArgs(

src/commands/addJobScheduler-10.lua

Lines changed: 9 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
local rcall = redis.call
3535
local repeatKey = KEYS[1]
3636
local delayedKey = KEYS[2]
37+
local waitKey = KEYS[3]
38+
local pausedKey = KEYS[4]
39+
local metaKey = KEYS[5]
3740
local prioritizedKey = KEYS[6]
3841

3942
local nextMillis = ARGV[1]
@@ -42,12 +45,10 @@ local templateOpts = cmsgpack.unpack(ARGV[5])
4245
local prefixKey = ARGV[8]
4346

4447
-- Includes
45-
--- @include "includes/addDelayedJob"
46-
--- @include "includes/addJobWithPriority"
48+
--- @include "includes/addJobFromScheduler"
4749
--- @include "includes/getOrSetMaxEvents"
4850
--- @include "includes/isQueuePaused"
4951
--- @include "includes/removeJob"
50-
--- @include "includes/storeJob"
5152
--- @include "includes/storeJobScheduler"
5253

5354
-- If we are overriding a repeatable job we must delete the delayed job for
@@ -68,12 +69,12 @@ if prevMillis ~= false then
6869
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
6970
rcall("ZREM", prioritizedKey, currentJobId)
7071
else
71-
if isQueuePaused(KEYS[5]) then
72-
if rcall("LREM", KEYS[4], 1, currentJobId) > 0 then
72+
if isQueuePaused(metaKey) then
73+
if rcall("LREM", pausedKey, 1, currentJobId) > 0 then
7374
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
7475
end
7576
else
76-
if rcall("LREM", KEYS[3], 1, currentJobId) > 0 then
77+
if rcall("LREM", waitKey, 1, currentJobId) > 0 then
7778
removeJob(currentJobId, true, prefixKey, true --[[remove debounce key]] )
7879
end
7980
end
@@ -86,40 +87,12 @@ storeJobScheduler(jobSchedulerId, schedulerKey, repeatKey, nextMillis, scheduler
8687

8788
if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
8889
local eventsKey = KEYS[9]
89-
local metaKey = KEYS[5]
9090
local maxEvents = getOrSetMaxEvents(metaKey)
9191

9292
rcall("INCR", KEYS[8])
9393

94-
local delayedOpts = cmsgpack.unpack(ARGV[6])
95-
96-
local delay, priority = storeJob(eventsKey, nextDelayedJobKey, nextDelayedJobId, schedulerOpts['name'], ARGV[4],
97-
delayedOpts, ARGV[7], nil, nil, jobSchedulerId)
98-
99-
if delay ~= 0 then
100-
addDelayedJob(nextDelayedJobId, delayedKey, eventsKey,
101-
ARGV[7], maxEvents, KEYS[7], delay)
102-
else
103-
local isPaused = isQueuePaused(KEYS[5])
104-
105-
-- Standard or priority add
106-
if priority == 0 then
107-
if isPaused then
108-
-- LIFO or FIFO
109-
local pushCmd = delayedOpts['lifo'] and 'RPUSH' or 'LPUSH'
110-
rcall(pushCmd, KEYS[4], nextDelayedJobId)
111-
else
112-
-- LIFO or FIFO
113-
local pushCmd = delayedOpts['lifo'] and 'RPUSH' or 'LPUSH'
114-
rcall(pushCmd, KEYS[3], nextDelayedJobId)
115-
end
116-
else
117-
-- Priority add
118-
addJobWithPriority(KEYS[7], KEYS[6], priority, nextDelayedJobId, KEYS[10], isPaused)
119-
end
120-
-- Emit waiting event
121-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", "jobId", nextDelayedJobId)
122-
end
94+
addJobFromScheduler(nextDelayedJobKey, nextDelayedJobId, ARGV[6], waitKey, pausedKey, metaKey, prioritizedKey,
95+
KEYS[10], delayedKey, KEYS[7], eventsKey, schedulerOpts['name'], maxEvents, ARGV[7], ARGV[4], jobSchedulerId)
12396

12497
if ARGV[9] ~= "" then
12598
rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
--[[
2+
Add delay marker if needed.
3+
]]
4+
5+
-- Includes
6+
--- @include "addDelayedJob"
7+
--- @include "addJobWithPriority"
8+
--- @include "isQueuePaused"
9+
--- @include "storeJob"
10+
11+
local function addJobFromScheduler(jobKey, jobId, rawOpts, waitKey, pausedKey, metaKey, prioritizedKey,
12+
priorityCounter, delayedKey, markerKey, eventsKey, name, maxEvents, timestamp, data, jobSchedulerId)
13+
local opts = cmsgpack.unpack(rawOpts)
14+
15+
local delay, priority = storeJob(eventsKey, jobKey, jobId, name, data,
16+
opts, timestamp, nil, nil, jobSchedulerId)
17+
18+
if delay ~= 0 then
19+
addDelayedJob(jobId, delayedKey, eventsKey, timestamp, maxEvents, markerKey, delay)
20+
else
21+
local isPaused = isQueuePaused(metaKey)
22+
23+
-- Standard or priority add
24+
if priority == 0 then
25+
if isPaused then
26+
-- LIFO or FIFO
27+
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
28+
rcall(pushCmd, pausedKey, jobId)
29+
else
30+
-- LIFO or FIFO
31+
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
32+
rcall(pushCmd, waitKey, jobId)
33+
end
34+
else
35+
-- Priority add
36+
addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounter, isPaused)
37+
end
38+
-- Emit waiting event
39+
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", "jobId", jobId)
40+
end
41+
end

src/commands/updateJobScheduler-7.lua renamed to src/commands/updateJobScheduler-11.lua

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,17 @@
22
Updates a job scheduler and adds next delayed job
33
44
Input:
5-
KEYS[1] 'marker',
6-
KEYS[2] 'meta'
7-
KEYS[3] 'id'
8-
KEYS[4] 'delayed'
9-
KEYS[5] events stream key
10-
KEYS[6] 'repeat' key
11-
KEYS[7] producer key
5+
KEYS[1] 'repeat' key
6+
KEYS[2] 'delayed'
7+
KEYS[3] 'wait' key
8+
KEYS[4] 'paused' key
9+
KEYS[5] 'meta'
10+
KEYS[6] 'prioritized' key
11+
KEYS[7] 'marker',
12+
KEYS[8] 'id'
13+
KEYS[9] events stream key
14+
KEYS[10] 'pc' priority counter
15+
KEYS[11] producer key
1216
1317
ARGV[1] next milliseconds
1418
ARGV[2] jobs scheduler id
@@ -22,16 +26,20 @@
2226
next delayed job id - OK
2327
]]
2428
local rcall = redis.call
25-
local repeatKey = KEYS[6]
26-
local delayedKey = KEYS[4]
29+
local repeatKey = KEYS[1]
30+
local delayedKey = KEYS[2]
31+
local waitKey = KEYS[3]
32+
local pausedKey = KEYS[4]
33+
local metaKey = KEYS[5]
34+
local prioritizedKey = KEYS[6]
2735
local nextMillis = ARGV[1]
2836
local jobSchedulerId = ARGV[2]
2937
local timestamp = ARGV[5]
3038
local prefixKey = ARGV[6]
3139
local producerId = ARGV[7]
3240

3341
-- Includes
34-
--- @include "includes/addDelayedJob"
42+
--- @include "includes/addJobFromScheduler"
3543
--- @include "includes/getOrSetMaxEvents"
3644

3745
local schedulerKey = repeatKey .. ":" .. jobSchedulerId
@@ -43,19 +51,16 @@ local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
4351
if prevMillis ~= false then
4452
local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
4553

46-
if producerId == currentDelayedJobId then
54+
if producerId == currentDelayedJobId and rcall("EXISTS", nextDelayedJobKey) ~= 1 then
4755
local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data")
4856

4957
rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
5058
rcall("HINCRBY", schedulerKey, "ic", 1)
5159

52-
local eventsKey = KEYS[5]
53-
local metaKey = KEYS[2]
60+
local eventsKey = KEYS[9]
5461
local maxEvents = getOrSetMaxEvents(metaKey)
5562

56-
rcall("INCR", KEYS[3])
57-
58-
local delayedOpts = cmsgpack.unpack(ARGV[4])
63+
rcall("INCR", KEYS[8])
5964

6065
-- TODO: remove this workaround in next breaking change,
6166
-- all job-schedulers must save job data
@@ -65,11 +70,13 @@ if prevMillis ~= false then
6570
rcall("HSET", schedulerKey, "data", templateData)
6671
end
6772

68-
addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1],
69-
templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)
70-
71-
if KEYS[7] ~= "" then
72-
rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId)
73+
addJobFromScheduler(nextDelayedJobKey, nextDelayedJobId, ARGV[4], waitKey, pausedKey, metaKey, prioritizedKey,
74+
KEYS[10], delayedKey, KEYS[7], eventsKey, schedulerAttributes[1], maxEvents, ARGV[5],
75+
templateData or '{}', jobSchedulerId)
76+
77+
-- TODO: remove this workaround in next breaking change
78+
if KEYS[11] ~= "" then
79+
rcall("HSET", KEYS[11], "nrjid", nextDelayedJobId)
7380
end
7481

7582
return nextDelayedJobId .. "" -- convert to string

0 commit comments

Comments
 (0)