Skip to content

Commit c2ed3e7

Browse files
pranaygpclaude
andcommitted
feat: enforce max queue deliveries in handlers with graceful failure
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0a9c6f7 commit c2ed3e7

File tree

8 files changed

+245
-98
lines changed

8 files changed

+245
-98
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@workflow/errors": patch
3+
"@workflow/core": patch
4+
"@workflow/world-local": patch
5+
"@workflow/builders": patch
6+
---
7+
8+
Remove VQS maxDeliveries cap and enforce max delivery limit in workflow/step handlers with graceful failure

packages/builders/src/constants.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ export const STEP_QUEUE_TRIGGER = {
66
type: 'queue/v2beta' as const,
77
topic: '__wkf_step_*',
88
consumer: 'default',
9-
maxDeliveries: 64, // Maximum number of delivery attempts (default: 3)
109
retryAfterSeconds: 5, // Delay between retries (default: 60)
1110
initialDelaySeconds: 0, // Initial delay before first delivery (default: 0)
1211
};
@@ -19,7 +18,6 @@ export const WORKFLOW_QUEUE_TRIGGER = {
1918
type: 'queue/v2beta' as const,
2019
topic: '__wkf_workflow_*',
2120
consumer: 'default',
22-
maxDeliveries: 64, // Maximum number of delivery attempts (default: 3)
2321
retryAfterSeconds: 5, // Delay between retries (default: 60)
2422
initialDelaySeconds: 0, // Initial delay before first delivery (default: 0)
2523
};

packages/core/src/runtime.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import {
22
EntityConflictError,
3+
RUN_ERROR_CODES,
34
RunExpiredError,
45
WorkflowRuntimeError,
56
} from '@workflow/errors';
67
import { classifyRunError } from './classify-error.js';
8+
import { MAX_QUEUE_DELIVERIES } from './runtime/constants.js';
79
import { parseWorkflowName } from '@workflow/utils/parse-name';
810
import {
911
type Event,
@@ -106,6 +108,47 @@ export function workflowEntrypoint(
106108
const { requestId } = metadata;
107109
// Extract the workflow name from the topic name
108110
const workflowName = metadata.queueName.slice('__wkf_workflow_'.length);
111+
112+
// --- Max delivery check ---
113+
// Enforce max delivery limit before any infrastructure calls.
114+
// This prevents runaway workflows from consuming infinite queue deliveries.
115+
if (metadata.attempt > MAX_QUEUE_DELIVERIES) {
116+
runtimeLogger.error(
117+
`Workflow handler exceeded max deliveries (${metadata.attempt}/${MAX_QUEUE_DELIVERIES})`,
118+
{ workflowRunId: runId, workflowName, attempt: metadata.attempt }
119+
);
120+
try {
121+
const world = getWorld();
122+
await world.events.create(runId, {
123+
eventType: 'run_failed',
124+
specVersion: SPEC_VERSION_CURRENT,
125+
eventData: {
126+
error: {
127+
message: `Workflow exceeded maximum queue deliveries (${metadata.attempt}/${MAX_QUEUE_DELIVERIES})`,
128+
},
129+
errorCode: RUN_ERROR_CODES.MAX_DELIVERIES_EXCEEDED,
130+
},
131+
});
132+
} catch (err) {
133+
if (
134+
EntityConflictError.is(err) ||
135+
RunExpiredError.is(err)
136+
) {
137+
// Run already finished, consume the message
138+
return;
139+
}
140+
runtimeLogger.error(
141+
'Failed to post run_failed for max deliveries exceeded, consuming message anyway',
142+
{
143+
workflowRunId: runId,
144+
error: err instanceof Error ? err.message : String(err),
145+
attempt: metadata.attempt,
146+
}
147+
);
148+
}
149+
return;
150+
}
151+
109152
const spanLinks = await linkToCurrentContext();
110153

111154
// Invoke user workflow within the propagated trace context and baggage
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export const MAX_QUEUE_DELIVERIES = 64;

packages/core/src/runtime/step-handler.test.ts

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,3 +497,77 @@ describe('step-handler 409 handling', () => {
497497
});
498498
});
499499
});
500+
501+
describe('step-handler max deliveries', () => {
502+
beforeEach(() => {
503+
vi.clearAllMocks();
504+
vi.mocked(getStepFunction).mockReturnValue(mockStepFn);
505+
mockStepFn.mockReset().mockResolvedValue('step-result');
506+
mockStepFn.maxRetries = 3;
507+
mockQueueMessage.mockResolvedValue(undefined);
508+
vi.mocked(getWorld).mockReturnValue({
509+
events: { create: mockEventsCreate },
510+
queue: mockQueue,
511+
getEncryptionKeyForRun: vi.fn().mockResolvedValue(undefined),
512+
} as any);
513+
mockEventsCreate.mockReset().mockResolvedValue({
514+
step: {
515+
stepId: 'step_abc',
516+
status: 'running',
517+
attempt: 1,
518+
startedAt: new Date(),
519+
input: [],
520+
},
521+
event: {},
522+
});
523+
});
524+
525+
afterEach(() => {
526+
vi.restoreAllMocks();
527+
});
528+
529+
it('should post step_failed and re-queue workflow when delivery count exceeds max', async () => {
530+
const result = await capturedHandler(
531+
createMessage(),
532+
{ ...createMetadata('myStep'), attempt: 65 }
533+
);
534+
535+
expect(result).toBeUndefined();
536+
expect(mockEventsCreate).toHaveBeenCalledWith(
537+
'wrun_test123',
538+
expect.objectContaining({
539+
eventType: 'step_failed',
540+
correlationId: 'step_abc',
541+
})
542+
);
543+
expect(mockQueueMessage).toHaveBeenCalled();
544+
expect(mockRuntimeLogger.error).toHaveBeenCalledWith(
545+
expect.stringContaining('exceeded max deliveries'),
546+
expect.objectContaining({ workflowRunId: 'wrun_test123' })
547+
);
548+
});
549+
550+
it('should consume message silently when step_failed fails with EntityConflictError', async () => {
551+
mockEventsCreate.mockRejectedValue(
552+
new EntityConflictError('Step already completed')
553+
);
554+
555+
const result = await capturedHandler(
556+
createMessage(),
557+
{ ...createMetadata('myStep'), attempt: 65 }
558+
);
559+
560+
expect(result).toBeUndefined();
561+
expect(mockStepFn).not.toHaveBeenCalled();
562+
});
563+
564+
it('should not trigger max deliveries check when under limit', async () => {
565+
const result = await capturedHandler(
566+
createMessage(),
567+
{ ...createMetadata('myStep'), attempt: 64 }
568+
);
569+
570+
// Should proceed normally (step function executes)
571+
expect(mockStepFn).toHaveBeenCalled();
572+
});
573+
});

0 commit comments

Comments
 (0)