Skip to content

Commit 4e8a7ef

Browse files
committed
feat: add remove support for flows
1 parent dcb5f5b commit 4e8a7ef

10 files changed

+324
-53
lines changed

src/classes/flow.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@ import { RedisConnection } from './redis-connection';
88
import { KeysMap, QueueKeys } from './queue-keys';
99
import { FlowJob } from '../interfaces/flow-job';
1010
import { Job } from './job';
11+
import { ParentOpts } from './scripts';
1112

12-
interface JobNode {
13+
export interface JobNode {
1314
job: Job;
1415
children?: JobNode[];
1516
}
@@ -123,6 +124,8 @@ export class Flow extends EventEmitter {
123124
node.opts?.jobId || parentId,
124125
);
125126

127+
const parentKey = getParentKey(parent?.parentOpts);
128+
126129
if (node.children) {
127130
// Create parent job, will be a job in status "waiting-children".
128131
const queueKeysParent = new QueueKeys(node.prefix);
@@ -134,6 +137,7 @@ export class Flow extends EventEmitter {
134137
job.addJob(<Redis>(multi as unknown), {
135138
parentDependenciesKey: parent?.parentDependenciesKey,
136139
waitChildrenKey,
140+
parentKey,
137141
});
138142

139143
const parentDependenciesKey = `${queueKeysParent.toKey(
@@ -153,6 +157,7 @@ export class Flow extends EventEmitter {
153157
} else {
154158
job.addJob(<Redis>(multi as unknown), {
155159
parentDependenciesKey: parent?.parentDependenciesKey,
160+
parentKey,
156161
});
157162

158163
return { job };
@@ -208,3 +213,9 @@ export class Flow extends EventEmitter {
208213
return this.connection.disconnect();
209214
}
210215
}
216+
217+
function getParentKey(opts: { id: string; queue: string }) {
218+
if (opts) {
219+
return `${opts.queue}:${opts.id}`;
220+
}
221+
}

src/classes/queue.ts

+12
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,18 @@ export class Queue<
142142
return (await this.repeat).removeRepeatableByKey(key);
143143
}
144144

145+
/**
146+
* Removes the given job from the queue as well as all its
147+
* dependencies.
148+
*
149+
* @param jobId The if of the job to remove
150+
* @returns 1 if it managed to remove the job or -1 if the job or
151+
* any of its dependencies was locked.
152+
*/
153+
async remove(jobId: string) {
154+
return Scripts.remove(this, jobId);
155+
}
156+
145157
/**
146158
* Drains the queue, i.e., removes all jobs that are waiting
147159
* or delayed, but not active, completed or failed.

src/classes/scripts.ts

+5-12
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export type MinimalQueue = Pick<
3333
export type ParentOpts = {
3434
waitChildrenKey?: string;
3535
parentDependenciesKey?: string;
36+
parentKey?: string;
3637
};
3738

3839
export class Scripts {
@@ -60,6 +61,7 @@ export class Scripts {
6061
parentOpts: ParentOpts = {
6162
waitChildrenKey: null,
6263
parentDependenciesKey: null,
64+
parentKey: null,
6365
},
6466
) {
6567
const queueKeys = queue.keys;
@@ -87,6 +89,7 @@ export class Scripts {
8789
opts.delay ? job.timestamp + opts.delay : 0,
8890
opts.priority || 0,
8991
opts.lifo ? 'RPUSH' : 'LPUSH',
92+
parentOpts.parentKey,
9093
];
9194

9295
keys = keys.concat(<string[]>args);
@@ -114,18 +117,8 @@ export class Scripts {
114117
static async remove(queue: MinimalQueue, jobId: string) {
115118
const client = await queue.client;
116119

117-
const keys = [
118-
'active',
119-
'wait',
120-
'delayed',
121-
'paused',
122-
'completed',
123-
'failed',
124-
'priority',
125-
jobId,
126-
`${jobId}:logs`,
127-
].map(name => queue.toKey(name));
128-
return (<any>client).removeJob(keys.concat([queue.keys.events, jobId]));
120+
const keys = [jobId].map(name => queue.toKey(name));
121+
return (<any>client).removeJob(keys.concat([jobId]));
129122
}
130123

131124
static async extendLock(

src/commands/addJob-10.lua

+2-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
ARGV[8] delayedTimestamp
3737
ARGV[9] priority
3838
ARGV[10] LIFO
39+
ARGV[11] parentKey?
3940
]]
4041
local jobId
4142
local jobIdKey
@@ -56,7 +57,7 @@ end
5657

5758
-- Store the job.
5859
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", ARGV[5],
59-
"timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
60+
"timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9], "parentKey", ARGV[11])
6061

6162
-- Check if job is delayed
6263
local delayedTimestamp = tonumber(ARGV[8])

src/commands/extendLock-2.lua

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
]]
1515
local rcall = redis.call
1616
if rcall("GET", KEYS[1]) == ARGV[1] then
17+
-- if rcall("SET", KEYS[1], ARGV[1], "PX", ARGV[2], "XX") then
1718
if rcall("SET", KEYS[1], ARGV[1], "PX", ARGV[2]) then
1819
rcall("SREM", KEYS[2], ARGV[3])
1920
return 1

src/commands/removeJob-1.lua

+119
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
--[[
2+
Remove a job from all the queues it may be in as well as all its data.
3+
In order to be able to remove a job, it cannot be active.
4+
5+
Input:
6+
KEYS[1] jobId
7+
ARGV[1] jobId
8+
9+
Events:
10+
'removed'
11+
]]
12+
13+
local rcall = redis.call
14+
15+
local getJobIdFromKey = function (jobKey)
16+
return string.match(jobKey, ".*:(.*)")
17+
end
18+
19+
local getJobKeyPrefix = function (jobKey, jobId)
20+
return string.sub(jobKey, 0, #jobKey - #jobId)
21+
end
22+
23+
-- recursively check if there are no locks on the
24+
-- jobs to be removed.
25+
local function isLocked( prefix, jobId)
26+
local jobKey = prefix .. jobId;
27+
28+
-- Check if this job is locked
29+
local lockKey = jobKey .. ':lock'
30+
local lock = rcall("GET", lockKey)
31+
if not lock then
32+
local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
33+
if (#dependencies > 0) then
34+
for i, childJobKey in ipairs(dependencies) do
35+
-- We need to get the jobId for this job.
36+
local childJobId = getJobIdFromKey(childJobKey)
37+
local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
38+
local result = isLocked( childJobPrefix, childJobId )
39+
if result then
40+
return true
41+
end
42+
end
43+
end
44+
return false
45+
end
46+
return true
47+
end
48+
49+
local function removeJob( prefix, jobId)
50+
local jobKey = prefix .. jobId;
51+
52+
-- Check if this job has a parent. If so we will just remove it from
53+
-- the parent child list, but if it is the last child we should move the parent to "wait/paused"
54+
-- which requires code from "moveToFinished"
55+
local parentKey = rcall("HGET", jobKey, "parentKey")
56+
if( (type(parentKey) == "string") and (rcall("EXISTS", parentKey) == 1)) then
57+
local parentDependenciesKey = parentKey .. ":dependencies"
58+
local result = rcall("SREM", parentDependenciesKey, jobKey)
59+
if rcall("SCARD", parentDependenciesKey) == 0 then
60+
local parentId = getJobIdFromKey(parentKey)
61+
local parentPrefix = getJobKeyPrefix(parentKey, parentId)
62+
63+
rcall("ZREM", parentPrefix .. "waiting-children", parentId)
64+
65+
if rcall("HEXISTS", parentPrefix .. "meta", "paused") ~= 1 then
66+
rcall("RPUSH", parentPrefix .. "wait", parentId)
67+
else
68+
rcall("RPUSH", parentPrefix .. "parentPrefixpaused", parentId)
69+
end
70+
71+
local parentEventStream = parentPrefix .. "events"
72+
rcall("XADD", parentEventStream, "*", "event", "active", "jobId", parentId, "prev", "waiting-children")
73+
end
74+
end
75+
76+
rcall("LREM", prefix .. "active", 0, jobId)
77+
rcall("LREM", prefix .. "wait", 0, jobId)
78+
rcall("ZREM", prefix .. "delayed", jobId)
79+
rcall("LREM", prefix .. "paused", 0, jobId)
80+
rcall("ZREM", prefix .. "completed", jobId)
81+
rcall("ZREM", prefix .. "failed", jobId)
82+
rcall("ZREM", prefix .. "priority", jobId)
83+
rcall("ZREM", prefix .. "waiting-children", jobId)
84+
rcall("DEL", jobKey)
85+
rcall("DEL", jobKey .. ":logs")
86+
87+
-- Check if this job has children
88+
-- If so, we are going to try to remove the children recursively in deep first way because
89+
-- if some job is locked we must exit with and error.
90+
local dependencies = rcall("SMEMBERS", jobKey .. ":dependencies")
91+
if (#dependencies > 0) then
92+
for i, childJobKey in ipairs(dependencies) do
93+
-- We need to get the jobId for this job.
94+
local childJobId = getJobIdFromKey(childJobKey)
95+
local childJobPrefix = getJobKeyPrefix(childJobKey, childJobId)
96+
removeJob( childJobPrefix, childJobId )
97+
end
98+
end
99+
100+
rcall("DEL", jobKey .. ":dependencies")
101+
102+
-- -- delete keys related to rate limiter
103+
-- local limiterIndexTable = KEYS[10] .. ":index"
104+
-- local limitedSetKey = rcall("HGET", limiterIndexTable, jobId)
105+
-- if limitedSetKey then
106+
-- rcall("SREM", limitedSetKey, jobId)
107+
-- rcall("HDEL", limiterIndexTable, jobId)
108+
-- end
109+
110+
rcall("XADD", prefix .. "events", "*", "event", "removed", "jobId", jobId, "prev", "unknown");
111+
end
112+
113+
local prefix = getJobKeyPrefix(KEYS[1], ARGV[1])
114+
115+
if not isLocked(prefix, ARGV[1]) then
116+
removeJob(prefix, ARGV[1])
117+
return 1
118+
end
119+
return 0

src/commands/removeJob-10.lua

-37
This file was deleted.

src/commands/updateDelaySet-7.lua

+2-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
1818
Events:
1919
'waiting'
20-
]] local rcall = redis.call
20+
]]
21+
local rcall = redis.call
2122

2223
-- Try to get as much as 1000 jobs at once
2324
local jobs = rcall("ZRANGEBYSCORE", KEYS[1], 0, tonumber(ARGV[2]) * 0x1000,

0 commit comments

Comments
 (0)