Skip to content

Commit b1e081d

Browse files
committed
refactor: add handleDuplicatedJob
1 parent f5fb5aa commit b1e081d

6 files changed

+93
-42
lines changed

src/commands/addDelayedJob-6.lua

+4-16
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ local parentData
5757
-- Includes
5858
--- @include "includes/addDelayMarkerIfNeeded"
5959
--- @include "includes/getOrSetMaxEvents"
60+
--- @include "includes/handleDuplicatedJob"
6061
--- @include "includes/isQueuePaused"
6162
--- @include "includes/storeJob"
6263
--- @include "includes/updateExistingJobsParent"
@@ -77,25 +78,12 @@ if args[2] == "" then
7778
jobId = jobCounter
7879
jobIdKey = args[1] .. jobId
7980
else
80-
-- Refactor to: handleDuplicateJob.lua
8181
jobId = args[2]
8282
jobIdKey = args[1] .. jobId
8383
if rcall("EXISTS", jobIdKey) == 1 then
84-
local existedParentKey = rcall("HGET", jobIdKey, "parentKey")
85-
if( (type(existedParentKey) == "string") and existedParentKey ~= "" then
86-
if parentKey ~= nil and parentKey ~= existedParentKey
87-
and (rcall("EXISTS", existedParentKey) == 1)) then
88-
return -7
89-
else
90-
updateExistingJobsParent(parentKey, parent, parentData,
91-
parentDependenciesKey, completedKey, jobIdKey,
92-
jobId, timestamp)
93-
end
94-
end
95-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
96-
"duplicated", "jobId", jobId)
97-
98-
return jobId .. "" -- convert to string
84+
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
85+
parentData, parentDependenciesKey, completedKey, eventsKey,
86+
maxEvents, timestamp)
9987
end
10088
end
10189

src/commands/addParentJob-4.lua

+5-8
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ local parent = args[8]
4949
local parentData
5050

5151
-- Includes
52+
--- @include "includes/getOrSetMaxEvents"
53+
--- @include "includes/handleDuplicatedJob"
5254
--- @include "includes/storeJob"
5355
--- @include "includes/updateExistingJobsParent"
54-
--- @include "includes/getOrSetMaxEvents"
5556

5657
if parentKey ~= nil then
5758
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
@@ -72,13 +73,9 @@ else
7273
jobId = args[2]
7374
jobIdKey = args[1] .. jobId
7475
if rcall("EXISTS", jobIdKey) == 1 then
75-
updateExistingJobsParent(parentKey, parent, parentData,
76-
parentDependenciesKey, completedKey, jobIdKey,
77-
jobId, timestamp)
78-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
79-
"duplicated", "jobId", jobId)
80-
81-
return jobId .. "" -- convert to string
76+
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
77+
parentData, parentDependenciesKey, completedKey, eventsKey,
78+
maxEvents, timestamp)
8279
end
8380
end
8481

src/commands/addPrioritizedJob-7.lua

+6-10
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ local parent = args[8]
5454
local parentData
5555

5656
-- Includes
57+
--- @include "includes/addJobWithPriority"
5758
--- @include "includes/storeJob"
59+
--- @include "includes/getOrSetMaxEvents"
60+
--- @include "includes/handleDuplicatedJob"
5861
--- @include "includes/isQueuePaused"
59-
--- @include "includes/addJobWithPriority"
6062
--- @include "includes/updateExistingJobsParent"
61-
--- @include "includes/getOrSetMaxEvents"
6263

6364
if parentKey ~= nil then
6465
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
@@ -79,14 +80,9 @@ else
7980
jobId = args[2]
8081
jobIdKey = args[1] .. jobId
8182
if rcall("EXISTS", jobIdKey) == 1 then
82-
updateExistingJobsParent(parentKey, parent, parentData,
83-
parentDependenciesKey, completedKey, jobIdKey,
84-
jobId, timestamp)
85-
86-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
87-
"duplicated", "jobId", jobId)
88-
89-
return jobId .. "" -- convert to string
83+
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
84+
parentData, parentDependenciesKey, completedKey, eventsKey,
85+
maxEvents, timestamp)
9086
end
9187
end
9288

src/commands/addStandardJob-7.lua

+4-8
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ local parentData
6161
--- @include "includes/addJobInTargetList"
6262
--- @include "includes/getOrSetMaxEvents"
6363
--- @include "includes/getTargetQueueList"
64+
--- @include "includes/handleDuplicatedJob"
6465
--- @include "includes/storeJob"
6566
--- @include "includes/updateExistingJobsParent"
6667

@@ -84,14 +85,9 @@ else
8485
jobId = args[2]
8586
jobIdKey = args[1] .. jobId
8687
if rcall("EXISTS", jobIdKey) == 1 then
87-
updateExistingJobsParent(parentKey, parent, parentData,
88-
parentDependenciesKey, KEYS[5], jobIdKey,
89-
jobId, timestamp)
90-
91-
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
92-
"duplicated", "jobId", jobId)
93-
94-
return jobId .. "" -- convert to string
88+
return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent,
89+
parentData, parentDependenciesKey, KEYS[5], eventsKey,
90+
maxEvents, timestamp)
9591
end
9692
end
9793

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
--[[
2+
Function to handle the case when job is duplicated.
3+
]]
4+
5+
-- Includes
6+
--- @include "updateExistingJobsParent"
7+
8+
local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParent,
9+
parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
10+
local existedParentKey = rcall("HGET", jobKey, "parentKey")
11+
12+
if not existedParentKey then
13+
updateExistingJobsParent(currentParentKey, currentParent, parentData,
14+
parentDependenciesKey, completedKey, jobKey,
15+
jobId, timestamp)
16+
else
17+
if currentParentKey ~= nil and currentParentKey ~= existedParentKey
18+
and (rcall("EXISTS", existedParentKey) == 1) then
19+
return -7
20+
end
21+
end
22+
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event",
23+
"duplicated", "jobId", jobId)
24+
25+
return jobId .. "" -- convert to string
26+
end

tests/test_flow.ts

+48
Original file line numberDiff line numberDiff line change
@@ -1318,6 +1318,54 @@ describe('flows', () => {
13181318
await worker.close();
13191319
await flow.close();
13201320
});
1321+
1322+
describe('when job already have a parent', async () => {
1323+
it('throws an error', async () => {
1324+
const flow = new FlowProducer({ connection, prefix });
1325+
await flow.add({
1326+
queueName,
1327+
name: 'tue',
1328+
opts: {
1329+
jobId: 'tue',
1330+
},
1331+
children: [
1332+
{
1333+
name: 'mon',
1334+
queueName,
1335+
opts: {
1336+
jobId: 'mon',
1337+
},
1338+
},
1339+
],
1340+
});
1341+
1342+
await queue.add(
1343+
'wed',
1344+
{},
1345+
{
1346+
jobId: 'wed',
1347+
},
1348+
);
1349+
1350+
await expect(
1351+
queue.add(
1352+
'mon',
1353+
{},
1354+
{
1355+
jobId: 'mon',
1356+
parent: {
1357+
id: 'wed',
1358+
queue: `${prefix}:${queueName}`,
1359+
},
1360+
},
1361+
),
1362+
).to.be.rejectedWith(
1363+
`Parent job from job ${prefix}:${queueName}:wed cannot be replaced. addJob`,
1364+
);
1365+
1366+
await flow.close();
1367+
});
1368+
});
13211369
});
13221370

13231371
describe('when custom prefix is set in flow producer', async () => {

0 commit comments

Comments
 (0)