Skip to content

Commit 2696ef8

Browse files
authored
fix(flow): parent job cannot be replaced (python) (#2417)
1 parent f2e11b4 commit 2696ef8

10 files changed

+101
-34
lines changed

python/bullmq/error_code.py

+1
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ class ErrorCode(Enum):
88
JobPendingDependencies = -4
99
ParentJobNotExist = -5
1010
JobLockMismatch = -6
11+
ParentJobCannotBeReplaced = -7

python/bullmq/scripts.py

+2
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,8 @@ def finishedErrors(self, code: int, jobId: str, command: str, state: str) -> Typ
573573
return TypeError(f"Missing key for parent job {jobId}.{command}")
574574
elif code == ErrorCode.JobLockMismatch.value:
575575
return TypeError(f"Lock mismatch for job {jobId}. Cmd {command} from {state}")
576+
elif code == ErrorCode.ParentJobCannotBeReplaced.value:
577+
return TypeError(f"The parent job of job {jobId} cannot be replaced. {command}")
576578
else:
577579
return TypeError(f"Unknown code {str(code)} error for {jobId}.{command}")
578580

src/classes/scripts.ts

+4
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,10 @@ export class Scripts {
444444
return new Error(
445445
`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`,
446446
);
447+
case ErrorCode.ParentJobCannotBeReplaced:
448+
return new Error(
449+
`The parent job of job ${jobId} cannot be replaced. ${command}`,
450+
);
447451
default:
448452
return new Error(`Unknown code ${code} error for ${jobId}. ${command}`);
449453
}

src/commands/addDelayedJob-6.lua

+4-8
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ local parentData
5757
-- Includes
5858
--- @include "includes/addDelayMarkerIfNeeded"
5959
--- @include "includes/getOrSetMaxEvents"
60+
--- @include "includes/handleDuplicatedJob"
6061
--- @include "includes/isQueuePaused"
6162
--- @include "includes/storeJob"
6263
--- @include "includes/updateExistingJobsParent"
@@ -77,17 +78,12 @@ if args[2] == "" then
7778
jobId = jobCounter
7879
jobIdKey = args[1] .. jobId
7980
else
80-
-- Refactor to: handleDuplicateJob.lua
8181
jobId = args[2]
8282
jobIdKey = args[1] .. jobId
8383
if rcall("EXISTS", jobIdKey) == 1 then
84-
updateExistingJobsParent(parentKey, parent, parentData,
85-
parentDependenciesKey, completedKey, jobIdKey,
86-
jobId, timestamp)
87-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
88-
"duplicated", "jobId", jobId)
89-
90-
return jobId .. "" -- convert to string
84+
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
85+
parentData, parentDependenciesKey, completedKey, eventsKey,
86+
maxEvents, timestamp)
9187
end
9288
end
9389

src/commands/addParentJob-4.lua

+5-8
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ local parent = args[8]
4949
local parentData
5050

5151
-- Includes
52+
--- @include "includes/getOrSetMaxEvents"
53+
--- @include "includes/handleDuplicatedJob"
5254
--- @include "includes/storeJob"
5355
--- @include "includes/updateExistingJobsParent"
54-
--- @include "includes/getOrSetMaxEvents"
5556

5657
if parentKey ~= nil then
5758
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
@@ -72,13 +73,9 @@ else
7273
jobId = args[2]
7374
jobIdKey = args[1] .. jobId
7475
if rcall("EXISTS", jobIdKey) == 1 then
75-
updateExistingJobsParent(parentKey, parent, parentData,
76-
parentDependenciesKey, completedKey, jobIdKey,
77-
jobId, timestamp)
78-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
79-
"duplicated", "jobId", jobId)
80-
81-
return jobId .. "" -- convert to string
76+
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
77+
parentData, parentDependenciesKey, completedKey, eventsKey,
78+
maxEvents, timestamp)
8279
end
8380
end
8481

src/commands/addPrioritizedJob-7.lua

+6-10
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ local parent = args[8]
5454
local parentData
5555

5656
-- Includes
57+
--- @include "includes/addJobWithPriority"
5758
--- @include "includes/storeJob"
59+
--- @include "includes/getOrSetMaxEvents"
60+
--- @include "includes/handleDuplicatedJob"
5861
--- @include "includes/isQueuePaused"
59-
--- @include "includes/addJobWithPriority"
6062
--- @include "includes/updateExistingJobsParent"
61-
--- @include "includes/getOrSetMaxEvents"
6263

6364
if parentKey ~= nil then
6465
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
@@ -79,14 +80,9 @@ else
7980
jobId = args[2]
8081
jobIdKey = args[1] .. jobId
8182
if rcall("EXISTS", jobIdKey) == 1 then
82-
updateExistingJobsParent(parentKey, parent, parentData,
83-
parentDependenciesKey, completedKey, jobIdKey,
84-
jobId, timestamp)
85-
86-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
87-
"duplicated", "jobId", jobId)
88-
89-
return jobId .. "" -- convert to string
83+
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
84+
parentData, parentDependenciesKey, completedKey, eventsKey,
85+
maxEvents, timestamp)
9086
end
9187
end
9288

src/commands/addStandardJob-7.lua

+4-8
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ local parentData
6161
--- @include "includes/addJobInTargetList"
6262
--- @include "includes/getOrSetMaxEvents"
6363
--- @include "includes/getTargetQueueList"
64+
--- @include "includes/handleDuplicatedJob"
6465
--- @include "includes/storeJob"
6566
--- @include "includes/updateExistingJobsParent"
6667

@@ -84,14 +85,9 @@ else
8485
jobId = args[2]
8586
jobIdKey = args[1] .. jobId
8687
if rcall("EXISTS", jobIdKey) == 1 then
87-
updateExistingJobsParent(parentKey, parent, parentData,
88-
parentDependenciesKey, KEYS[5], jobIdKey,
89-
jobId, timestamp)
90-
91-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
92-
"duplicated", "jobId", jobId)
93-
94-
return jobId .. "" -- convert to string
88+
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
89+
parentData, parentDependenciesKey, KEYS[5], eventsKey,
90+
maxEvents, timestamp)
9591
end
9692
end
9793

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
--[[
2+
Function to handle the case when job is duplicated.
3+
]]
4+
5+
-- Includes
6+
--- @include "updateExistingJobsParent"
7+
8+
local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
9+
parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
10+
local existedParentKey = rcall("HGET", jobKey, "parentKey")
11+
12+
if not existedParentKey then
13+
updateExistingJobsParent(currentParentKey, currentParent, parentData,
14+
parentDependenciesKey, completedKey, jobKey,
15+
jobId, timestamp)
16+
else
17+
if currentParentKey ~= nil and currentParentKey ~= existedParentKey
18+
and (rcall("EXISTS", existedParentKey) == 1) then
19+
return -7
20+
end
21+
end
22+
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
23+
"duplicated", "jobId", jobId)
24+
25+
return jobId .. "" -- convert to string
26+
end

src/enums/error-code.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ export enum ErrorCode {
55
JobPendingDependencies = -4,
66
ParentJobNotExist = -5,
77
JobLockMismatch = -6,
8+
ParentJobCannotBeReplaced = -7,
89
}

tests/test_flow.ts

+48
Original file line numberDiff line numberDiff line change
@@ -1318,6 +1318,54 @@ describe('flows', () => {
13181318
await worker.close();
13191319
await flow.close();
13201320
});
1321+
1322+
describe('when job already have a parent', async () => {
1323+
it('throws an error', async () => {
1324+
const flow = new FlowProducer({ connection, prefix });
1325+
await flow.add({
1326+
queueName,
1327+
name: 'tue',
1328+
opts: {
1329+
jobId: 'tue',
1330+
},
1331+
children: [
1332+
{
1333+
name: 'mon',
1334+
queueName,
1335+
opts: {
1336+
jobId: 'mon',
1337+
},
1338+
},
1339+
],
1340+
});
1341+
1342+
await queue.add(
1343+
'wed',
1344+
{},
1345+
{
1346+
jobId: 'wed',
1347+
},
1348+
);
1349+
1350+
await expect(
1351+
queue.add(
1352+
'mon',
1353+
{},
1354+
{
1355+
jobId: 'mon',
1356+
parent: {
1357+
id: 'wed',
1358+
queue: `${prefix}:${queueName}`,
1359+
},
1360+
},
1361+
),
1362+
).to.be.rejectedWith(
1363+
`The parent job of job ${prefix}:${queueName}:wed cannot be replaced. addJob`,
1364+
);
1365+
1366+
await flow.close();
1367+
});
1368+
});
13211369
});
13221370

13231371
describe('when custom prefix is set in flow producer', async () => {

0 commit comments

Comments
 (0)