Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/transport-error-redrive.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@workflow/world-vercel': patch
'@workflow/core': patch
---

Treat transient world-vercel transport failures as retryable, surfacing them as a `TRANSPORT` type `WorkflowWorldError`, to be retried by the queue instead of failing the run.
110 changes: 107 additions & 3 deletions packages/core/src/classify-error.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import {
ReplayDivergenceError,
RUN_ERROR_CODES,
RuntimeDecryptionError,
ThrottleError,
TooEarlyError,
WorkflowNotRegisteredError,
WorkflowRuntimeError,
WorkflowWorldError,
} from '@workflow/errors';
import { describe, expect, it } from 'vitest';
import { classifyRunError } from './classify-error.js';
import { classifyRunError, isRetryableWorldError } from './classify-error.js';

describe('classifyRunError', () => {
it('classifies CorruptedEventLogError as CORRUPTED_EVENT_LOG', () => {
Expand Down Expand Up @@ -52,12 +54,16 @@ describe('classifyRunError', () => {
);
});

it('classifies WorkflowWorldError as USER_ERROR (from user code fetch)', () => {
it('classifies a 5xx WorkflowWorldError as WORLD_CONTRACT_ERROR (backend fault, retryable)', () => {
// A WorkflowWorldError only originates from the world adapter talking to
// workflow-server, so a 5xx is the backend's fault, not the user's. These
// are normally redelivered (isRetryableWorldError); if one reaches terminal
// classification it must be a world error, not USER_ERROR.
expect(
classifyRunError(
new WorkflowWorldError('Internal Server Error', { status: 500 })
)
).toBe(RUN_ERROR_CODES.USER_ERROR);
).toBe(RUN_ERROR_CODES.WORLD_CONTRACT_ERROR);
});

it('classifies world schema validation failures as WORLD_CONTRACT_ERROR', () => {
Expand Down Expand Up @@ -116,4 +122,102 @@ describe('classifyRunError', () => {
native.name = 'OperationError';
expect(classifyRunError(native)).toBe(RUN_ERROR_CODES.USER_ERROR);
});

it('classifies a TRANSPORT error as WORLD_CONTRACT_ERROR (backend fault, not USER_ERROR)', () => {
// Transport blips are normally redelivered via the queue (see
// isRetryableWorldError); if one ever reaches terminal classification it is
// the backend/firewall's fault, not the user's — track it as a world error.
expect(
classifyRunError(
new WorkflowWorldError(
'GET /events transport failure (UND_ERR_REQ_RETRY)',
{
code: 'TRANSPORT',
}
)
)
).toBe(RUN_ERROR_CODES.WORLD_CONTRACT_ERROR);
});

it('classifies a ThrottleError (firewall challenge / 429) as WORLD_CONTRACT_ERROR', () => {
expect(classifyRunError(new ThrottleError('rate limited'))).toBe(
RUN_ERROR_CODES.WORLD_CONTRACT_ERROR
);
});
});

describe('isRetryableWorldError', () => {
it('treats ThrottleError (429) as retryable', () => {
expect(isRetryableWorldError(new ThrottleError('rate limited'))).toBe(true);
});

it('treats 5xx WorkflowWorldError as retryable', () => {
expect(
isRetryableWorldError(
new WorkflowWorldError('Bad Gateway', { status: 502 })
)
).toBe(true);
expect(
isRetryableWorldError(
new WorkflowWorldError('Service Unavailable', { status: 503 })
)
).toBe(true);
});

it('treats TRANSPORT and TIMEOUT codes as retryable', () => {
expect(
isRetryableWorldError(
new WorkflowWorldError('transport failure (UND_ERR_REQ_RETRY)', {
code: 'TRANSPORT',
})
)
).toBe(true);
expect(
isRetryableWorldError(
new WorkflowWorldError('timed out after 60000ms', { code: 'TIMEOUT' })
)
).toBe(true);
});

it('does NOT treat 4xx (other than 429) as retryable', () => {
expect(
isRetryableWorldError(
new WorkflowWorldError('Bad Request', { status: 400 })
)
).toBe(false);
});

it('does NOT treat contract errors (parse/schema) as retryable', () => {
expect(
isRetryableWorldError(
new WorkflowWorldError(
'Failed to parse response body for GET /events',
{
code: 'PARSE_ERROR',
}
)
)
).toBe(false);
expect(
isRetryableWorldError(
new WorkflowWorldError('Schema validation failed for POST /events', {
code: 'SCHEMA_VALIDATION',
})
)
).toBe(false);
});

it('does NOT treat TooEarlyError (425 step pacing) as retryable here', () => {
expect(isRetryableWorldError(new TooEarlyError('too early'))).toBe(false);
});

it('does NOT treat plain errors or non-errors as retryable', () => {
expect(isRetryableWorldError(new Error('boom'))).toBe(false);
expect(
isRetryableWorldError(new WorkflowWorldError('no status or code'))
).toBe(false);
expect(isRetryableWorldError('string')).toBe(false);
expect(isRetryableWorldError(null)).toBe(false);
expect(isRetryableWorldError(undefined)).toBe(false);
});
});
49 changes: 48 additions & 1 deletion packages/core/src/classify-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
type RunErrorCode,
RuntimeDecryptionError,
StepNotRegisteredError,
ThrottleError,
WorkflowNotRegisteredError,
WorkflowRuntimeError,
WorkflowWorldError,
Expand All @@ -16,6 +17,18 @@ const WORLD_CONTRACT_ERROR_CODES = new Set([
RUN_ERROR_CODES.WORLD_CONTRACT_ERROR,
]);

/**
* `WorkflowWorldError.code` values that mark a transient transport failure
* (set by world-vercel's HTTP client): `TRANSPORT` covers an exhausted
* RetryAgent (`UND_ERR_REQ_RETRY` — e.g. the firewall in front of
* workflow-server shedding load with 429/503), a dropped socket, or a
* connect/DNS failure; `TIMEOUT` covers a request that exceeded the client
* timeout. Both are infrastructure failures a fresh invocation can recover
* from. Kept distinct from `WORLD_CONTRACT_ERROR_CODES` so a transport blip is
* never misclassified as the server returning a malformed response.
*/
const RETRYABLE_WORLD_ERROR_CODES = new Set(['TRANSPORT', 'TIMEOUT']);

/**
* Set of error names that should classify as generic `RUNTIME_ERROR`. Each
* `*.is()` static does a name-based duck check, so subclassing alone is
Expand Down Expand Up @@ -67,6 +80,33 @@ export function isWorldContractError(err: unknown): err is WorkflowWorldError {
);
}

/**
* True when an error from a world (workflow-server) call is a transient
* infrastructure failure that should be retried by redelivering the queue
* message, rather than failing the run. A fresh invocation will likely
* succeed once the backend (or the firewall in front of it) recovers.
*
* - `ThrottleError` (429): rate limited / load shed
* - `WorkflowWorldError` with `status >= 500`: server-side error
* - `WorkflowWorldError` with a retryable transport `code` (`TRANSPORT` /
* `TIMEOUT`): the request never produced a usable response
*
* Uses `.is()` name-based duck checks (not `instanceof`) for the same
* cross-`vm`-realm reason described on `classifyRunError`.
*/
export function isRetryableWorldError(err: unknown): boolean {
if (ThrottleError.is(err)) {
return true;
}
if (!WorkflowWorldError.is(err)) {
return false;
}
if (err.status !== undefined && err.status >= 500) {
return true;
}
return err.code !== undefined && RETRYABLE_WORLD_ERROR_CODES.has(err.code);
}

export function classifyRunError(err: unknown): RunErrorCode {
if (ReplayDivergenceError.is(err)) {
return RUN_ERROR_CODES.REPLAY_DIVERGENCE;
Expand All @@ -76,7 +116,14 @@ export function classifyRunError(err: unknown): RunErrorCode {
return RUN_ERROR_CODES.CORRUPTED_EVENT_LOG;
}

if (isWorldContractError(err)) {
// World-layer faults — both a malformed response (contract violation) and a
// transient infrastructure failure (throttle / 5xx / transport / timeout,
// e.g. a firewall challenge) — are the backend's fault, not the user's.
// Bucket them under WORLD_CONTRACT_ERROR rather than USER_ERROR so dashboards
// attribute an outage correctly. Note the retryable variants are normally
// redelivered via the queue (see `isRetryableWorldError`) and only reach this
// terminal classification if the run ultimately gives up.
if (isWorldContractError(err) || isRetryableWorldError(err)) {
return RUN_ERROR_CODES.WORLD_CONTRACT_ERROR;
}

Expand Down
98 changes: 98 additions & 0 deletions packages/core/src/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,104 @@ describe('workflowEntrypoint replay guards', () => {
);
});

it('redelivers (does NOT fail the run) when event listing hits a transient TRANSPORT error', async () => {
const createdEvents: unknown[] = [];
const workflowRun: WorkflowRun = {
runId: 'wrun_events_transport',
workflowName: 'workflow',
status: 'running',
input: await dehydrateWorkflowArguments(
[],
'wrun_events_transport',
undefined,
[]
),
createdAt: new Date('2024-01-01T00:00:00.000Z'),
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
startedAt: new Date('2024-01-01T00:00:00.000Z'),
deploymentId: 'test-deployment',
};
// The firewall in front of workflow-server returned 429→503 during an
// attack; the RetryAgent exhausted its retries and surfaced
// UND_ERR_REQ_RETRY, which world-vercel maps to a TRANSPORT error.
const transportError = new WorkflowWorldError(
'GET /v3/runs/wrun_events_transport/events transport failure after 1234ms (UND_ERR_REQ_RETRY)',
{ code: 'TRANSPORT' }
);

const eventsCreate = vi.fn(async (_runId: string, data: any) => {
if (data.eventType !== 'run_started') {
createdEvents.push(data);
}
return data.eventType === 'run_started'
? { run: workflowRun }
: {
event: {
eventId: `event-${createdEvents.length}`,
runId: workflowRun.runId,
createdAt: new Date(),
...data,
},
};
});

setWorld({
specVersion: SPEC_VERSION_CURRENT,
createQueueHandler: vi.fn(
(
_prefix: string,
handler: (message: unknown, metadata: unknown) => Promise<unknown>
) => {
return async () => {
// The real createQueueHandler catches and applies a retry
// directive; here we let the throw escape so the test can assert
// the delivery rejects (which triggers redelivery in production).
await handler(
{
runId: workflowRun.runId,
requestedAt: new Date('2024-01-01T00:00:00.000Z'),
},
{
requestId: 'req_test',
attempt: 1,
queueName: '__wkf_workflow_workflow',
messageId: 'msg_test',
}
);
return new Response(null, { status: 204 });
};
}
),
events: {
create: eventsCreate,
list: vi.fn(async () => {
throw transportError;
}),
},
runs: {
get: vi.fn(async () => workflowRun),
},
queue: vi.fn(),
getEncryptionKeyForRun: vi.fn(async () => undefined),
} as any);

const handler = workflowEntrypoint(
`async function workflow() {
return 'done';
}${getWorkflowTransformCode('workflow')}`
);

// The transient transport failure must bubble out of the handler so the
// queue redelivers — not be swallowed into a run_failed event.
await expect(handler(new Request('https://example.test'))).rejects.toBe(
transportError
);

expect(createdEvents).not.toContainEqual(
expect.objectContaining({ eventType: 'run_failed' })
);
});

it('records run_failed when run_started response parsing fails', async () => {
const createdEvents: unknown[] = [];
const parseError = new WorkflowWorldError(
Expand Down
34 changes: 33 additions & 1 deletion packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ import {
type WorkflowRun,
type World,
} from '@workflow/world';
import { classifyRunError, isWorldContractError } from './classify-error.js';
import {
classifyRunError,
isRetryableWorldError,
isWorldContractError,
} from './classify-error.js';
import { describeError } from './describe-error.js';
import { WorkflowSuspension } from './global.js';
import { runtimeLogger } from './logger.js';
Expand Down Expand Up @@ -1354,6 +1358,34 @@ export function workflowEntrypoint(
return;
}
} else {
// Transient infrastructure failures talking to the
// world (workflow-server) — an exhausted RetryAgent
// (UND_ERR_REQ_RETRY from a sustained 429/503 storm),
// a dropped socket, a connect/DNS failure, or a client
// timeout — must NOT fail the run. Rethrow so the queue
// redelivers and a fresh invocation retries the replay
// once the backend recovers. The @vercel/queue handler
// applies a fast (1s→60s) backoff by delivery count,
// avoiding the ~5min default visibility-timeout redrive
// (and never killing the process via run_failed).
if (isRetryableWorldError(err)) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness here rests on server-side idempotency of writes — worth stating explicitly.

Redelivering re-runs the replay, which re-issues whatever write was in flight. For a POST that actually succeeded server-side but whose response was lost (e.g. UND_ERR_SOCKET / body timeout after the server committed the event), the redrive re-attempts that write. This is only safe because event creation is idempotent (see events.ts "is idempotent and safe for the caller to retry").

The whole fix depends on that holding for every write reachable from the replay path. A sentence in the PR description confirming idempotency coverage (and any non-idempotent endpoints, if any) would make the safety argument explicit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, and confirmed: every write reachable from the replay path is idempotent — event creation dedupes server-side by correlation/idempotency key (the events.ts "idempotent and safe for the caller to retry" note), which is exactly what makes redrive safe for a write whose response was lost (e.g. UND_ERR_SOCKET / body timeout after the server committed). I've added a sentence to the PR description making this explicit.

runLogger.warn(
'Transient world error during replay; redelivering via queue instead of failing the run',

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AI Review: Note

FYI (not blocking): when a transient failure persists all the way to the delivery cap, the run is failed with MAX_DELIVERIES_EXCEEDED, so the terminal run_failed event won't carry the underlying transport cause (TRANSPORT/TIMEOUT, the mitigation header, the original code) — that detail lives only in this warn line. So answering "why did this run exhaust its deliveries?" means correlating the failed run with these logs rather than reading it off the run record. If it's cheap, carrying the last transport error code/message through to the cap's run_failed would make these runs self-diagnosing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — deferring this one as a follow-up. The MAX_QUEUE_DELIVERIES check runs at the very top of a fresh invocation, before any world call, so it has no handle on the prior delivery's error. Carrying the last transport cause (code / mitigation header / vercel-id) into the cap's run_failed means persisting it across deliveries (queue payload or an event) — worth doing, but it's cross-delivery state rather than a local change, so I'd rather not expand this PR with it. For now the per-delivery warn line (which includes x-vercel-mitigated / x-vercel-id) is the correlation path.

{
errorName:
err instanceof Error
? err.name
: 'UnknownError',
errorMessage:
err instanceof Error
? err.message
: String(err),
deliveryAttempt: metadata.attempt,
}
);
throw err;
}

let terminalError = err;
if (ReplayDivergenceError.is(err)) {
const divergenceCount =
Expand Down
17 changes: 10 additions & 7 deletions packages/core/src/runtime/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import { runtimeLogger } from '../logger.js';
// max visibility window (24 hours) so that our handler-side failure path
// reliably executes before VQS expires the message.
//
// VQS retry schedule (with retryAfterSeconds: 5):
// Attempts 1–32: linear backoff at 5s each → 32 × 5s = 160s (~2.7 min)
// Attempts 33+: exponential backoff: 60s × 2^(attempt-32),
// capped at 7,200s (2h), floored at retryAfterSeconds
//
// At 48 attempts the total elapsed time is approximately 20 hours, which is
// safely under the 24-hour message visibility limit.
// The effective wall-clock survival depends on the per-redelivery backoff: the
// `retry-after` the handler returns (see world-vercel
// `getHandlerErrorRetryAfterSeconds`) fed through VQS `calculateBackoffDelay`.
// VQS uses our value for the first 32 attempts (clamped to [5s, 900s]) then
// applies its own exponential growth — every hop hard-capped at the SQS limit
// of 900s. With the backoff ramping toward that 900s ceiling, 48 attempts span
// the better part of the 24-hour window, leaving headroom for the failure path
// to run before the message expires. (A flatter, low-capped backoff exhausts
// the budget in only a few hours, failing otherwise-healthy runs during a
// transient backend outage.)
export const MAX_QUEUE_DELIVERIES = 48;

/**
Expand Down
Loading
Loading