From 99c0161e9469085e39b516015a6a50ac1a9de928 Mon Sep 17 00:00:00 2001 From: Rogger Valverde Date: Tue, 13 Aug 2024 22:41:55 -0600 Subject: [PATCH] fix(flow): fail parent on failure by default (#2682) --- src/classes/job.ts | 8 +----- .../moveParentFromWaitingChildrenToFailed.lua | 16 +++++------ src/commands/moveToFinished-14.lua | 12 ++++----- tests/test_flow.ts | 27 ++++++++++--------- tests/test_getters.ts | 9 +++++-- tests/test_job.ts | 13 --------- 6 files changed, 36 insertions(+), 49 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index ce76f21b3f..d53c8f825c 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -39,7 +39,7 @@ const logger = debuglog('bull'); const optsDecodeMap = { de: 'debounce', - fpof: 'failParentOnFailure', + fpof: 'failParentOnFailure', // TODO: deprecate it in next breaking change idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', rdof: 'removeDependencyOnFailure', @@ -1210,12 +1210,6 @@ export class Job< throw new Error(`Delay and repeat options could not be used together`); } - if (this.opts.removeDependencyOnFailure && this.opts.failParentOnFailure) { - throw new Error( - `RemoveDependencyOnFailure and failParentOnFailure options can not be used together`, - ); - } - if (`${parseInt(this.id, 10)}` === this.id) { throw new Error('Custom Ids cannot be integers'); } diff --git a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua index da10722d7c..e96fd91752 100644 --- a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua +++ b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua @@ -20,7 +20,14 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, if jobAttributes[1] then local parentData = cjson.decode(jobAttributes[1]) - if parentData['fpof'] then + if parentData['rdof'] then + local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id'] + local grandParentDependenciesSet = grandParentKey .. ":dependencies" + if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then + moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet, + grandParentKey, parentData['id'], timestamp) + end + else moveParentFromWaitingChildrenToFailed( parentData['queueKey'], parentData['queueKey'] .. ':' .. parentData['id'], @@ -28,13 +35,6 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentKey, timestamp ) - elseif parentData['rdof'] then - local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id'] - local grandParentDependenciesSet = grandParentKey .. ":dependencies" - if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then - moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet, - grandParentKey, parentData['id'], timestamp) - end end end end diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-14.lua index dc396da706..f244825f42 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-14.lua @@ -40,7 +40,7 @@ opts - maxMetricsSize opts - fpof - fail parent on fail opts - idof - ignore dependency on fail - opts - rdof - remove dependency on fail + opts - rdof - remove dependency on fail TODO: remove it in next breaking change Output: 0 OK @@ -140,11 +140,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists ARGV[4], timestamp) end else - if opts['fpof'] then - moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey, - parentId, jobIdKey, - timestamp) - elseif opts['idof'] or opts['rdof'] then + if opts['idof'] or opts['rdof'] then local dependenciesSet = parentKey .. ":dependencies" if rcall("SREM", dependenciesSet, jobIdKey) == 1 then moveParentToWaitIfNeeded(parentQueueKey, dependenciesSet, @@ -154,6 +150,10 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists rcall("HSET", failedSet, jobIdKey, ARGV[4]) end end + else + moveParentFromWaitingChildrenToFailed(parentQueueKey, parentKey, + parentId, jobIdKey, + timestamp) end end end diff --git a/tests/test_flow.ts b/tests/test_flow.ts index c976889128..4d20fcaeb0 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -40,7 +40,7 @@ describe('flows', () => { }); describe('when removeOnFail is true in last pending child', () => { - it('moves parent to wait without getting stuck', async () => { + it('moves parent to failed without getting stuck', async () => { const worker = new Worker( queueName, async job => { @@ -51,9 +51,14 @@ describe('flows', () => { { connection, prefix }, ); await worker.waitUntilReady(); + const queueEvents = new QueueEvents(queueName, { + connection, + prefix, + }); + await queueEvents.waitUntilReady(); const flow = new FlowProducer({ connection, prefix }); - await flow.add({ + const tree = await flow.add({ name: 'parent', data: {}, queueName, @@ -74,21 +79,17 @@ describe('flows', () => { ], }); - const completed = new Promise((resolve, reject) => { - worker.on('completed', async (job: Job) => { - try { - if (job.name === 'parent') { - const { processed } = await job.getDependenciesCount(); - expect(processed).to.equal(1); - resolve(); - } - } catch (err) { - reject(err); + const failed = new Promise(resolve => { + queueEvents.on('failed', async ({ jobId, failedReason, prev }) => { + if (jobId === tree.job.id) { + const { processed } = await tree.job!.getDependenciesCount(); + expect(processed).to.equal(1); + resolve(); } }); }); - await completed; + await failed; await flow.close(); await worker.close(); }); diff --git a/tests/test_getters.ts b/tests/test_getters.ts index 4007f3dcf7..d1f2ff5584 100644 --- a/tests/test_getters.ts +++ b/tests/test_getters.ts @@ -809,20 +809,25 @@ describe('Jobs getters', function () { }); }); + await queue.add('test', {}); const flow = new FlowProducer({ connection, prefix }); await flow.add({ name: 'parent-job', queueName, data: {}, children: [ - { name: 'child-1', data: { idx: 0, foo: 'bar' }, queueName }, + { + name: 'child-1', + data: { idx: 0, foo: 'bar' }, + queueName, + opts: { delay: 6000 }, + }, { name: 'child-2', data: { idx: 1, foo: 'baz' }, queueName }, { name: 'child-3', data: { idx: 2, foo: 'bac' }, queueName }, { name: 'child-4', data: { idx: 3, foo: 'bad' }, queueName }, ], }); - await queue.add('test', { idx: 2 }, { delay: 5000 }); await queue.add('test', { idx: 3 }, { priority: 5 }); await completing; diff --git a/tests/test_job.ts b/tests/test_job.ts index 346b7eebcb..139a1949df 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -139,19 +139,6 @@ describe('Job', function () { }); }); - describe('when removeDependencyOnFailure and failParentOnFailure options are provided', () => { - it('throws an error', async () => { - const data = { foo: 'bar' }; - const opts = { - removeDependencyOnFailure: true, - failParentOnFailure: true, - }; - await expect(Job.create(queue, 'test', data, opts)).to.be.rejectedWith( - 'RemoveDependencyOnFailure and failParentOnFailure options can not be used together', - ); - }); - }); - describe('when priority option is provided as float', () => { it('throws an error', async () => { const data = { foo: 'bar' };