Skip to content

Commit c1c2ccc

Browse files
authored
fix(rate-limit): move job to wait even if ttl is 0 (#2403)
1 parent 168d312 commit c1c2ccc

File tree

3 files changed

+97
-18
lines changed

3 files changed

+97
-18
lines changed

src/classes/scripts.ts

+13-4
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,7 @@ export class Scripts {
222222
return <string>result;
223223
}
224224

225-
async pause(pause: boolean): Promise<void> {
226-
const client = await this.queue.client;
227-
225+
protected pauseArgs(pause: boolean): (string | number)[] {
228226
let src = 'wait',
229227
dst = 'paused';
230228
if (!pause) {
@@ -242,7 +240,17 @@ export class Scripts {
242240
this.queue.keys.marker,
243241
);
244242

245-
return (<any>client).pause(keys.concat([pause ? 'paused' : 'resumed']));
243+
const args = [pause ? 'paused' : 'resumed'];
244+
245+
return keys.concat(args);
246+
}
247+
248+
async pause(pause: boolean): Promise<void> {
249+
const client = await this.queue.client;
250+
251+
const args = this.pauseArgs(pause);
252+
253+
return (<any>client).pause(args);
246254
}
247255

248256
private removeRepeatableArgs(
@@ -1034,6 +1042,7 @@ export class Scripts {
10341042
this.queue.keys.meta,
10351043
this.queue.keys.limiter,
10361044
this.queue.keys.prioritized,
1045+
this.queue.keys.marker,
10371046
this.queue.keys.events,
10381047
];
10391048

src/commands/moveJobFromActiveToWait-9.lua src/commands/moveJobFromActiveToWait-10.lua

+21-14
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
--[[
22
Function to move job from active state to wait.
33
Input:
4-
KEYS[1] active key
5-
KEYS[2] wait key
4+
KEYS[1] active key
5+
KEYS[2] wait key
66
7-
KEYS[3] stalled key
8-
KEYS[4] job lock key
9-
KEYS[5] paused key
10-
KEYS[6] meta key
11-
KEYS[7] limiter key
12-
KEYS[8] prioritized key
13-
KEYS[9] event key
7+
KEYS[3] stalled key
8+
KEYS[4] job lock key
9+
KEYS[5] paused key
10+
KEYS[6] meta key
11+
KEYS[7] limiter key
12+
KEYS[8] prioritized key
13+
KEYS[9] marker key
14+
KEYS[10] event key
1415
1516
ARGV[1] job id
1617
ARGV[2] lock token
@@ -19,7 +20,9 @@
1920
local rcall = redis.call
2021

2122
-- Includes
23+
--- @include "includes/addJobInTargetList"
2224
--- @include "includes/pushBackJobWithPriority"
25+
--- @include "includes/getOrSetMaxEvents"
2326
--- @include "includes/getTargetQueueList"
2427

2528
local jobId = ARGV[1]
@@ -28,10 +31,11 @@ local lockKey = KEYS[4]
2831

2932
local lockToken = rcall("GET", lockKey)
3033
local pttl = rcall("PTTL", KEYS[7])
31-
if lockToken == token and pttl > 0 then
34+
if lockToken == token then
35+
local metaKey = KEYS[6]
3236
local removed = rcall("LREM", KEYS[1], 1, jobId)
33-
if (removed > 0) then
34-
local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[5])
37+
if removed > 0 then
38+
local target, isPaused = getTargetQueueList(metaKey, KEYS[2], KEYS[5])
3539

3640
rcall("SREM", KEYS[3], jobId)
3741

@@ -40,13 +44,16 @@ if lockToken == token and pttl > 0 then
4044
if priority > 0 then
4145
pushBackJobWithPriority(KEYS[8], priority, jobId)
4246
else
43-
rcall("RPUSH", target, jobId)
47+
addJobInTargetList(target, KEYS[9], "RPUSH", isPaused, jobId)
4448
end
4549

4650
rcall("DEL", lockKey)
4751

52+
local maxEvents = getOrSetMaxEvents(metaKey)
53+
4854
-- Emit waiting event
49-
rcall("XADD", KEYS[9], "*", "event", "waiting", "jobId", jobId)
55+
rcall("XADD", KEYS[10], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
56+
"jobId", jobId)
5057
end
5158
end
5259

tests/test_rate_limiter.ts

+63
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,69 @@ describe('Rate Limiter', function () {
423423
await worker.close();
424424
});
425425

426+
describe('when rate limit is too low', () => {
427+
it('should move job to wait anyway', async function () {
428+
this.timeout(4000);
429+
430+
const numJobs = 10;
431+
const dynamicLimit = 1;
432+
const duration = 100;
433+
434+
const ttl = await queue.getRateLimitTtl();
435+
expect(ttl).to.be.equal(-2);
436+
437+
const worker = new Worker(
438+
queueName,
439+
async job => {
440+
if (job.attemptsStarted === 1) {
441+
delay(50);
442+
await worker.rateLimit(dynamicLimit);
443+
const currentTtl = await queue.getRateLimitTtl();
444+
expect(currentTtl).to.be.lessThanOrEqual(dynamicLimit);
445+
throw Worker.RateLimitError();
446+
}
447+
},
448+
{
449+
connection,
450+
prefix,
451+
maxStalledCount: 0,
452+
limiter: {
453+
max: 1,
454+
duration,
455+
},
456+
},
457+
);
458+
459+
const result = new Promise<void>((resolve, reject) => {
460+
queueEvents.on(
461+
'completed',
462+
// after every job has been completed
463+
after(numJobs, async () => {
464+
try {
465+
resolve();
466+
} catch (err) {
467+
reject(err);
468+
}
469+
}),
470+
);
471+
472+
queueEvents.on('failed', async err => {
473+
await worker.close();
474+
reject(err);
475+
});
476+
});
477+
478+
const jobs = Array.from(Array(numJobs).keys()).map(() => ({
479+
name: 'rate test',
480+
data: {},
481+
}));
482+
await queue.addBulk(jobs);
483+
484+
await result;
485+
await worker.close();
486+
});
487+
});
488+
426489
describe('when reaching max attempts and we want to move the job to failed', () => {
427490
it('should throw Unrecoverable error', async function () {
428491
const dynamicLimit = 550;

0 commit comments

Comments
 (0)