Skip to content

Commit c2dc669

Browse files
committed
fix: add jobId support to repeatable jobs fixes #396
1 parent e5a29bf commit c2dc669

File tree

3 files changed

+56
-30
lines changed

3 files changed

+56
-30
lines changed

src/classes/repeat.ts

+27-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export class Repeat extends QueueBase {
1212
skipCheckExists?: boolean,
1313
) {
1414
const repeatOpts = { ...opts.repeat };
15-
const prevMillis = repeatOpts.prevMillis || 0;
15+
const prevMillis = opts.prevMillis || 0;
1616
const currentCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
1717

1818
if (
@@ -28,6 +28,11 @@ export class Repeat extends QueueBase {
2828
const nextMillis = getNextMillis(now, repeatOpts);
2929

3030
if (nextMillis) {
31+
// We store the undecorated opts.jobId into the repeat options
32+
if (!prevMillis && opts.jobId) {
33+
repeatOpts.jobId = opts.jobId;
34+
}
35+
3136
const repeatJobKey = getRepeatKey(name, repeatOpts);
3237

3338
let repeatableExists = true;
@@ -69,7 +74,12 @@ export class Repeat extends QueueBase {
6974
//
7075
// Generate unique job id for this iteration.
7176
//
72-
const jobId = getRepeatJobId(name, nextMillis, md5(repeatJobKey));
77+
const jobId = getRepeatJobId(
78+
name,
79+
nextMillis,
80+
md5(repeatJobKey),
81+
opts.repeat.jobId,
82+
);
7383
const now = Date.now();
7484
const delay = nextMillis - now;
7585

@@ -91,8 +101,14 @@ export class Repeat extends QueueBase {
91101
async removeRepeatable(name: string, repeat: RepeatOptions, jobId?: string) {
92102
const client = await this.client;
93103

94-
const repeatJobKey = getRepeatKey(name, repeat);
95-
const repeatJobId = getRepeatJobId(name, '', md5(repeatJobKey));
104+
const repeatJobKey = getRepeatKey(name, { ...repeat, jobId });
105+
const repeatJobId = getRepeatJobId(
106+
name,
107+
'',
108+
md5(repeatJobKey),
109+
jobId || repeat.jobId,
110+
);
111+
96112
const queueKey = this.keys[''];
97113

98114
return (<any>client).removeRepeatable(
@@ -166,16 +182,21 @@ function getRepeatJobId(
166182
name: string,
167183
nextMillis: number | string,
168184
namespace: string,
185+
jobId?: string,
169186
) {
170-
return `repeat:${name}:${namespace}:${nextMillis}`;
187+
const checksum = md5(`${name}${jobId || ''}${namespace}`);
188+
return `repeat:${checksum}:${nextMillis}`;
189+
// return `repeat:${jobId || ''}:${name}:${namespace}:${nextMillis}`;
190+
//return `repeat:${name}:${namespace}:${nextMillis}`;
171191
}
172192

173193
function getRepeatKey(name: string, repeat: RepeatOptions) {
174194
const endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : '';
175195
const tz = repeat.tz || '';
176196
const suffix = (repeat.cron ? repeat.cron : String(repeat.every)) || '';
197+
const jobId = repeat.jobId ? repeat.jobId : '';
177198

178-
return `${name}::${endDate}:${tz}:${suffix}`;
199+
return `${name}:${jobId}:${endDate}:${tz}:${suffix}`;
179200
}
180201

181202
function getNextMillis(millis: number, opts: RepeatOptions) {

src/interfaces/jobs-options.ts

+5
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,9 @@ export interface JobsOptions {
8484
* Limits the amount of stack trace lines that will be recorded in the stacktrace.
8585
*/
8686
stackTraceLimit?: number;
87+
88+
/**
89+
* Internal property used by repeatable jobs.
90+
*/
91+
prevMillis?: number;
8792
}

src/test/test_repeat.ts

+24-24
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import { expect } from 'chai';
77
import * as IORedis from 'ioredis';
88
import { beforeEach, describe, it } from 'mocha';
99
import { v4 } from 'uuid';
10-
import { defaults } from 'lodash';
1110
import { removeAllQueueData } from '@src/utils';
1211

1312
const sinon = require('sinon');
@@ -158,7 +157,7 @@ describe('repeat', function() {
158157
let prev: any;
159158
var counter = 0;
160159

161-
const completting = new Promise(resolve => {
160+
const completting = new Promise<void>(resolve => {
162161
worker.on('completed', async job => {
163162
this.clock.tick(nextTick);
164163
if (prev) {
@@ -206,7 +205,7 @@ describe('repeat', function() {
206205
let prev: Job;
207206
let counter = 0;
208207

209-
const completting = new Promise((resolve, reject) => {
208+
const completting = new Promise<void>((resolve, reject) => {
210209
worker.on('completed', async job => {
211210
this.clock.tick(nextTick);
212211
if (prev) {
@@ -255,7 +254,7 @@ describe('repeat', function() {
255254
let prev: Job;
256255
let counter = 0;
257256

258-
const completting = new Promise((resolve, reject) => {
257+
const completting = new Promise<void>((resolve, reject) => {
259258
worker.on('completed', async job => {
260259
this.clock.tick(nextTick);
261260
if (prev) {
@@ -300,7 +299,7 @@ describe('repeat', function() {
300299

301300
let prev: Job;
302301
let counter = 0;
303-
const completting = new Promise((resolve, reject) => {
302+
const completting = new Promise<void>((resolve, reject) => {
304303
queue.on('completed', async job => {
305304
this.clock.tick(nextTick);
306305
if (prev) {
@@ -400,7 +399,7 @@ describe('repeat', function() {
400399
const repeat = { cron: '*/1 * * * * *' };
401400
let processor;
402401

403-
const processing = new Promise((resolve, reject) => {
402+
const processing = new Promise<void>((resolve, reject) => {
404403
processor = async (job: Job) => {
405404
counter++;
406405
if (counter == numJobs) {
@@ -410,7 +409,7 @@ describe('repeat', function() {
410409
expect(delayed).to.be.empty;
411410
resolve();
412411
} else if (counter > numJobs) {
413-
reject(Error('should not repeat more than 7 times'));
412+
reject(Error(`should not repeat more than ${numJobs} times`));
414413
}
415414
};
416415
});
@@ -449,6 +448,7 @@ describe('repeat', function() {
449448
const queueScheduler = new QueueScheduler(queueName);
450449
await queueScheduler.waitUntilReady();
451450

451+
const numJobs = 4;
452452
const date = new Date('2017-02-07 9:24:00');
453453
let prev: Job;
454454
let counter = 0;
@@ -464,10 +464,10 @@ describe('repeat', function() {
464464

465465
this.clock.tick(nextTick);
466466

467-
const processing = new Promise((resolve, reject) => {
467+
const processing = new Promise<void>((resolve, reject) => {
468468
processor = async (job: Job) => {
469469
counter++;
470-
if (counter == 4) {
470+
if (counter == numJobs) {
471471
try {
472472
await queue.removeRepeatable('test', repeat, jobId);
473473
this.clock.tick(nextTick);
@@ -477,8 +477,8 @@ describe('repeat', function() {
477477
} catch (err) {
478478
reject(err);
479479
}
480-
} else if (counter > 4) {
481-
reject(Error('should not repeat more than 4 times'));
480+
} else if (counter > numJobs) {
481+
reject(Error(`should not repeat more than ${numJobs} times`));
482482
}
483483
};
484484
});
@@ -509,21 +509,21 @@ describe('repeat', function() {
509509
const jobId = 'xxxx';
510510
const date = new Date('2017-02-07 9:24:00');
511511
const nextTick = 2 * ONE_SECOND + 100;
512-
const nextRepeatableJob = repeat.addNextRepeatableJob;
513-
this.clock.tick(date.getTime());
512+
const addNextRepeatableJob = repeat.addNextRepeatableJob;
513+
this.clock.setSystemTime(date);
514514

515515
const repeatOpts = { cron: '*/2 * * * * *' };
516516

517-
const afterRemoved = new Promise(async resolve => {
517+
const afterRemoved = new Promise<void>(async resolve => {
518518
worker = new Worker(queueName, async job => {
519519
const repeatWorker = await worker.repeat;
520520
repeatWorker.addNextRepeatableJob = async (...args) => {
521521
// In order to simulate race condition
522522
// Make removeRepeatables happen any time after a moveToX is called
523-
await queue.removeRepeatable('test', defaults({ jobId }, repeatOpts));
523+
await queue.removeRepeatable('test', repeatOpts, jobId);
524524

525-
// nextRepeatableJob will now re-add the removed repeatable
526-
const result = await nextRepeatableJob.apply(repeat, args);
525+
// addNextRepeatableJob will now re-add the removed repeatable
526+
const result = await addNextRepeatableJob.apply(repeat, args);
527527
resolve();
528528
return result;
529529
};
@@ -604,7 +604,7 @@ describe('repeat', function() {
604604

605605
var counter = 0;
606606

607-
const completting = new Promise((resolve, reject) => {
607+
const completting = new Promise<void>((resolve, reject) => {
608608
worker.on('completed', () => {
609609
this.clock.tick(nextTick);
610610
counter++;
@@ -630,7 +630,7 @@ describe('repeat', function() {
630630

631631
let processor;
632632

633-
const processing = new Promise((resolve, reject) => {
633+
const processing = new Promise<void>((resolve, reject) => {
634634
processor = async (job: Job) => {
635635
try {
636636
expect(job.id).to.be.ok;
@@ -685,7 +685,7 @@ describe('repeat', function() {
685685
let prevType: string;
686686
let counter = 0;
687687

688-
const completting = new Promise(resolve => {
688+
const completting = new Promise<void>(resolve => {
689689
worker.on('completed', job => {
690690
this.clock.tick(nextTick);
691691
if (prevType) {
@@ -729,12 +729,12 @@ describe('repeat', function() {
729729

730730
const worker = new Worker(queueName, async job => {});
731731

732-
const waiting = new Promise((resolve, reject) => {
732+
const waiting = new Promise<void>((resolve, reject) => {
733733
queueEvents.on('waiting', function({ jobId }) {
734734
try {
735735
expect(jobId).to.be.equal(
736-
'repeat:test:16db7a9b166154f5c636abf3c8fe3364:' +
737-
(date.getTime() + 1 * ONE_SECOND),
736+
`repeat:c602b9b36e4beddd9e7db39a3ef2ea4c:${date.getTime() +
737+
1 * ONE_SECOND}`,
738738
);
739739
resolve();
740740
} catch (err) {
@@ -763,7 +763,7 @@ describe('repeat', function() {
763763
this.clock.tick(ONE_SECOND + 100);
764764

765765
let processor;
766-
const processing = new Promise((resolve, reject) => {
766+
const processing = new Promise<void>((resolve, reject) => {
767767
processor = async (job: Job) => {
768768
if (job.opts.repeat.count === 1) {
769769
resolve();

0 commit comments

Comments
 (0)