Skip to content

Commit

Permalink
fix(flow): consider delayed state when moving a parent to failed (#3112)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Feb 26, 2025
1 parent 8569e29 commit 6a28b86
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 120 deletions.
12 changes: 6 additions & 6 deletions src/commands/includes/getOrSetMaxEvents.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
Function to get max events value or set by default 10000.
]]
local function getOrSetMaxEvents(metaKey)
local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
if not maxEvents then
maxEvents = 10000
rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
end
return maxEvents
local maxEvents = rcall("HGET", metaKey, "opts.maxLenEvents")
if not maxEvents then
maxEvents = 10000
rcall("HSET", metaKey, "opts.maxLenEvents", maxEvents)
end
return maxEvents
end
49 changes: 47 additions & 2 deletions src/commands/includes/moveParentFromWaitingChildrenToFailed.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
--- @include "removeDeduplicationKeyIfNeeded"
--- @include "removeJobsOnFail"

local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentId, jobIdKey, timestamp)
if rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) == 1 then
local function moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey, parentId, jobIdKey, timestamp)
local parentWaitingChildrenKey = parentQueueKey .. ":waiting-children"
local parentDelayedKey = parentQueueKey .. ":delayed"
if rcall("ZSCORE", parentWaitingChildrenKey, parentId) ~= false then
rcall("ZREM", parentWaitingChildrenKey, parentId)
local parentQueuePrefix = parentQueueKey .. ":"
local parentFailedKey = parentQueueKey .. ":failed"
rcall("ZADD", parentFailedKey, timestamp, parentId)
Expand Down Expand Up @@ -48,6 +51,48 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey,
local parentRawOpts = jobAttributes[3]
local parentOpts = cjson.decode(parentRawOpts)

removeJobsOnFail(parentQueuePrefix, parentFailedKey, parentId, parentOpts, timestamp)
elseif rcall("ZSCORE", parentDelayedKey, parentId) ~= false then
rcall("ZREM", parentDelayedKey, parentId)
local parentQueuePrefix = parentQueueKey .. ":"
local parentFailedKey = parentQueueKey .. ":failed"
rcall("ZADD", parentFailedKey, timestamp, parentId)
local failedReason = "child " .. jobIdKey .. " failed"
rcall("HMSET", parentKey, "failedReason", failedReason, "finishedOn", timestamp)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "failed", "jobId", parentId, "failedReason",
failedReason, "prev", "delayed")

local jobAttributes = rcall("HMGET", parentKey, "parent", "deid", "opts")

removeDeduplicationKeyIfNeeded(parentQueueKey .. ":", jobAttributes[2])

if jobAttributes[1] then
local parentData = cjson.decode(jobAttributes[1])
if parentData['fpof'] then
moveParentFromWaitingChildrenToFailed(
parentData['queueKey'],
parentData['queueKey'] .. ':' .. parentData['id'],
parentData['id'],
parentKey,
timestamp
)
elseif parentData['idof'] or parentData['rdof'] then
local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']
local grandParentDependenciesSet = grandParentKey .. ":dependencies"
if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then
moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,
grandParentKey, parentData['id'], timestamp)
if parentData['idof'] then
local grandParentFailedSet = grandParentKey .. ":failed"
rcall("HSET", grandParentFailedSet, parentKey, failedReason)
end
end
end
end

local parentRawOpts = jobAttributes[3]
local parentOpts = cjson.decode(parentRawOpts)

removeJobsOnFail(parentQueuePrefix, parentFailedKey, parentId, parentOpts, timestamp)
end
end
2 changes: 1 addition & 1 deletion src/commands/moveStalledJobsToWait-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ if (#stalling > 0) then
"failed", "jobId", jobId, 'prev', 'active',
'failedReason', failedReason)

if rawParentData ~= false then
if rawParentData then
if opts['fpof'] then
local parentData = cjson.decode(rawParentData)
moveParentFromWaitingChildrenToFailed(
Expand Down
2 changes: 1 addition & 1 deletion src/commands/moveToFinished-14.lua
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
local parentKey = jobAttributes[1] or ""
local parentId = ""
local parentQueueKey = ""
if jobAttributes[2] ~= false then
if jobAttributes[2] then
local jsonDecodedParent = cjson.decode(jobAttributes[2])
parentId = jsonDecodedParent['id']
parentQueueKey = jsonDecodedParent['queueKey']
Expand Down
Loading

0 comments on commit 6a28b86

Please sign in to comment.