diff --git a/.changeset/transport-error-redrive.md b/.changeset/transport-error-redrive.md new file mode 100644 index 0000000000..852c06ab0f --- /dev/null +++ b/.changeset/transport-error-redrive.md @@ -0,0 +1,6 @@ +--- +'@workflow/world-vercel': patch +'@workflow/core': patch +--- + +Treat transient world-vercel transport failures as retryable, surfacing them as a `TRANSPORT` type `WorkflowWorldError`, to be retried by the queue instead of failing the run. diff --git a/packages/core/src/classify-error.test.ts b/packages/core/src/classify-error.test.ts index 5fc3789a45..a2f202c845 100644 --- a/packages/core/src/classify-error.test.ts +++ b/packages/core/src/classify-error.test.ts @@ -4,12 +4,14 @@ import { ReplayDivergenceError, RUN_ERROR_CODES, RuntimeDecryptionError, + ThrottleError, + TooEarlyError, WorkflowNotRegisteredError, WorkflowRuntimeError, WorkflowWorldError, } from '@workflow/errors'; import { describe, expect, it } from 'vitest'; -import { classifyRunError } from './classify-error.js'; +import { classifyRunError, isRetryableWorldError } from './classify-error.js'; describe('classifyRunError', () => { it('classifies CorruptedEventLogError as CORRUPTED_EVENT_LOG', () => { @@ -52,12 +54,16 @@ describe('classifyRunError', () => { ); }); - it('classifies WorkflowWorldError as USER_ERROR (from user code fetch)', () => { + it('classifies a 5xx WorkflowWorldError as WORLD_CONTRACT_ERROR (backend fault, retryable)', () => { + // A WorkflowWorldError only originates from the world adapter talking to + // workflow-server, so a 5xx is the backend's fault, not the user's. These + // are normally redelivered (isRetryableWorldError); if one reaches terminal + // classification it must be a world error, not USER_ERROR. expect( classifyRunError( new WorkflowWorldError('Internal Server Error', { status: 500 }) ) - ).toBe(RUN_ERROR_CODES.USER_ERROR); + ).toBe(RUN_ERROR_CODES.WORLD_CONTRACT_ERROR); }); it('classifies world schema validation failures as WORLD_CONTRACT_ERROR', () => { @@ -116,4 +122,102 @@ describe('classifyRunError', () => { native.name = 'OperationError'; expect(classifyRunError(native)).toBe(RUN_ERROR_CODES.USER_ERROR); }); + + it('classifies a TRANSPORT error as WORLD_CONTRACT_ERROR (backend fault, not USER_ERROR)', () => { + // Transport blips are normally redelivered via the queue (see + // isRetryableWorldError); if one ever reaches terminal classification it is + // the backend/firewall's fault, not the user's — track it as a world error. + expect( + classifyRunError( + new WorkflowWorldError( + 'GET /events transport failure (UND_ERR_REQ_RETRY)', + { + code: 'TRANSPORT', + } + ) + ) + ).toBe(RUN_ERROR_CODES.WORLD_CONTRACT_ERROR); + }); + + it('classifies a ThrottleError (firewall challenge / 429) as WORLD_CONTRACT_ERROR', () => { + expect(classifyRunError(new ThrottleError('rate limited'))).toBe( + RUN_ERROR_CODES.WORLD_CONTRACT_ERROR + ); + }); +}); + +describe('isRetryableWorldError', () => { + it('treats ThrottleError (429) as retryable', () => { + expect(isRetryableWorldError(new ThrottleError('rate limited'))).toBe(true); + }); + + it('treats 5xx WorkflowWorldError as retryable', () => { + expect( + isRetryableWorldError( + new WorkflowWorldError('Bad Gateway', { status: 502 }) + ) + ).toBe(true); + expect( + isRetryableWorldError( + new WorkflowWorldError('Service Unavailable', { status: 503 }) + ) + ).toBe(true); + }); + + it('treats TRANSPORT and TIMEOUT codes as retryable', () => { + expect( + isRetryableWorldError( + new WorkflowWorldError('transport failure (UND_ERR_REQ_RETRY)', { + code: 'TRANSPORT', + }) + ) + ).toBe(true); + expect( + isRetryableWorldError( + new WorkflowWorldError('timed out after 60000ms', { code: 'TIMEOUT' }) + ) + ).toBe(true); + }); + + it('does NOT treat 4xx (other than 429) as retryable', () => { + expect( + isRetryableWorldError( + new WorkflowWorldError('Bad Request', { status: 400 }) + ) + ).toBe(false); + }); + + it('does NOT treat contract errors (parse/schema) as retryable', () => { + expect( + isRetryableWorldError( + new WorkflowWorldError( + 'Failed to parse response body for GET /events', + { + code: 'PARSE_ERROR', + } + ) + ) + ).toBe(false); + expect( + isRetryableWorldError( + new WorkflowWorldError('Schema validation failed for POST /events', { + code: 'SCHEMA_VALIDATION', + }) + ) + ).toBe(false); + }); + + it('does NOT treat TooEarlyError (425 step pacing) as retryable here', () => { + expect(isRetryableWorldError(new TooEarlyError('too early'))).toBe(false); + }); + + it('does NOT treat plain errors or non-errors as retryable', () => { + expect(isRetryableWorldError(new Error('boom'))).toBe(false); + expect( + isRetryableWorldError(new WorkflowWorldError('no status or code')) + ).toBe(false); + expect(isRetryableWorldError('string')).toBe(false); + expect(isRetryableWorldError(null)).toBe(false); + expect(isRetryableWorldError(undefined)).toBe(false); + }); }); diff --git a/packages/core/src/classify-error.ts b/packages/core/src/classify-error.ts index 0b3f9667d9..cc21416d02 100644 --- a/packages/core/src/classify-error.ts +++ b/packages/core/src/classify-error.ts @@ -5,6 +5,7 @@ import { type RunErrorCode, RuntimeDecryptionError, StepNotRegisteredError, + ThrottleError, WorkflowNotRegisteredError, WorkflowRuntimeError, WorkflowWorldError, @@ -16,6 +17,18 @@ const WORLD_CONTRACT_ERROR_CODES = new Set([ RUN_ERROR_CODES.WORLD_CONTRACT_ERROR, ]); +/** + * `WorkflowWorldError.code` values that mark a transient transport failure + * (set by world-vercel's HTTP client): `TRANSPORT` covers an exhausted + * RetryAgent (`UND_ERR_REQ_RETRY` — e.g. the firewall in front of + * workflow-server shedding load with 429/503), a dropped socket, or a + * connect/DNS failure; `TIMEOUT` covers a request that exceeded the client + * timeout. Both are infrastructure failures a fresh invocation can recover + * from. Kept distinct from `WORLD_CONTRACT_ERROR_CODES` so a transport blip is + * never misclassified as the server returning a malformed response. + */ +const RETRYABLE_WORLD_ERROR_CODES = new Set(['TRANSPORT', 'TIMEOUT']); + /** * Set of error names that should classify as generic `RUNTIME_ERROR`. Each * `*.is()` static does a name-based duck check, so subclassing alone is @@ -67,6 +80,33 @@ export function isWorldContractError(err: unknown): err is WorkflowWorldError { ); } +/** + * True when an error from a world (workflow-server) call is a transient + * infrastructure failure that should be retried by redelivering the queue + * message, rather than failing the run. A fresh invocation will likely + * succeed once the backend (or the firewall in front of it) recovers. + * + * - `ThrottleError` (429): rate limited / load shed + * - `WorkflowWorldError` with `status >= 500`: server-side error + * - `WorkflowWorldError` with a retryable transport `code` (`TRANSPORT` / + * `TIMEOUT`): the request never produced a usable response + * + * Uses `.is()` name-based duck checks (not `instanceof`) for the same + * cross-`vm`-realm reason described on `classifyRunError`. + */ +export function isRetryableWorldError(err: unknown): boolean { + if (ThrottleError.is(err)) { + return true; + } + if (!WorkflowWorldError.is(err)) { + return false; + } + if (err.status !== undefined && err.status >= 500) { + return true; + } + return err.code !== undefined && RETRYABLE_WORLD_ERROR_CODES.has(err.code); +} + export function classifyRunError(err: unknown): RunErrorCode { if (ReplayDivergenceError.is(err)) { return RUN_ERROR_CODES.REPLAY_DIVERGENCE; @@ -76,7 +116,14 @@ export function classifyRunError(err: unknown): RunErrorCode { return RUN_ERROR_CODES.CORRUPTED_EVENT_LOG; } - if (isWorldContractError(err)) { + // World-layer faults — both a malformed response (contract violation) and a + // transient infrastructure failure (throttle / 5xx / transport / timeout, + // e.g. a firewall challenge) — are the backend's fault, not the user's. + // Bucket them under WORLD_CONTRACT_ERROR rather than USER_ERROR so dashboards + // attribute an outage correctly. Note the retryable variants are normally + // redelivered via the queue (see `isRetryableWorldError`) and only reach this + // terminal classification if the run ultimately gives up. + if (isWorldContractError(err) || isRetryableWorldError(err)) { return RUN_ERROR_CODES.WORLD_CONTRACT_ERROR; } diff --git a/packages/core/src/runtime.test.ts b/packages/core/src/runtime.test.ts index bc2bb156f7..ad63fd8367 100644 --- a/packages/core/src/runtime.test.ts +++ b/packages/core/src/runtime.test.ts @@ -313,6 +313,104 @@ describe('workflowEntrypoint replay guards', () => { ); }); + it('redelivers (does NOT fail the run) when event listing hits a transient TRANSPORT error', async () => { + const createdEvents: unknown[] = []; + const workflowRun: WorkflowRun = { + runId: 'wrun_events_transport', + workflowName: 'workflow', + status: 'running', + input: await dehydrateWorkflowArguments( + [], + 'wrun_events_transport', + undefined, + [] + ), + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + startedAt: new Date('2024-01-01T00:00:00.000Z'), + deploymentId: 'test-deployment', + }; + // The firewall in front of workflow-server returned 429→503 during an + // attack; the RetryAgent exhausted its retries and surfaced + // UND_ERR_REQ_RETRY, which world-vercel maps to a TRANSPORT error. + const transportError = new WorkflowWorldError( + 'GET /v3/runs/wrun_events_transport/events transport failure after 1234ms (UND_ERR_REQ_RETRY)', + { code: 'TRANSPORT' } + ); + + const eventsCreate = vi.fn(async (_runId: string, data: any) => { + if (data.eventType !== 'run_started') { + createdEvents.push(data); + } + return data.eventType === 'run_started' + ? { run: workflowRun } + : { + event: { + eventId: `event-${createdEvents.length}`, + runId: workflowRun.runId, + createdAt: new Date(), + ...data, + }, + }; + }); + + setWorld({ + specVersion: SPEC_VERSION_CURRENT, + createQueueHandler: vi.fn( + ( + _prefix: string, + handler: (message: unknown, metadata: unknown) => Promise + ) => { + return async () => { + // The real createQueueHandler catches and applies a retry + // directive; here we let the throw escape so the test can assert + // the delivery rejects (which triggers redelivery in production). + await handler( + { + runId: workflowRun.runId, + requestedAt: new Date('2024-01-01T00:00:00.000Z'), + }, + { + requestId: 'req_test', + attempt: 1, + queueName: '__wkf_workflow_workflow', + messageId: 'msg_test', + } + ); + return new Response(null, { status: 204 }); + }; + } + ), + events: { + create: eventsCreate, + list: vi.fn(async () => { + throw transportError; + }), + }, + runs: { + get: vi.fn(async () => workflowRun), + }, + queue: vi.fn(), + getEncryptionKeyForRun: vi.fn(async () => undefined), + } as any); + + const handler = workflowEntrypoint( + `async function workflow() { + return 'done'; + }${getWorkflowTransformCode('workflow')}` + ); + + // The transient transport failure must bubble out of the handler so the + // queue redelivers — not be swallowed into a run_failed event. + await expect(handler(new Request('https://example.test'))).rejects.toBe( + transportError + ); + + expect(createdEvents).not.toContainEqual( + expect.objectContaining({ eventType: 'run_failed' }) + ); + }); + it('records run_failed when run_started response parsing fails', async () => { const createdEvents: unknown[] = []; const parseError = new WorkflowWorldError( diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 86ddf02bac..878a364ff7 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -23,7 +23,11 @@ import { type WorkflowRun, type World, } from '@workflow/world'; -import { classifyRunError, isWorldContractError } from './classify-error.js'; +import { + classifyRunError, + isRetryableWorldError, + isWorldContractError, +} from './classify-error.js'; import { describeError } from './describe-error.js'; import { WorkflowSuspension } from './global.js'; import { runtimeLogger } from './logger.js'; @@ -1354,6 +1358,34 @@ export function workflowEntrypoint( return; } } else { + // Transient infrastructure failures talking to the + // world (workflow-server) — an exhausted RetryAgent + // (UND_ERR_REQ_RETRY from a sustained 429/503 storm), + // a dropped socket, a connect/DNS failure, or a client + // timeout — must NOT fail the run. Rethrow so the queue + // redelivers and a fresh invocation retries the replay + // once the backend recovers. The @vercel/queue handler + // applies a fast (1s→60s) backoff by delivery count, + // avoiding the ~5min default visibility-timeout redrive + // (and never killing the process via run_failed). + if (isRetryableWorldError(err)) { + runLogger.warn( + 'Transient world error during replay; redelivering via queue instead of failing the run', + { + errorName: + err instanceof Error + ? err.name + : 'UnknownError', + errorMessage: + err instanceof Error + ? err.message + : String(err), + deliveryAttempt: metadata.attempt, + } + ); + throw err; + } + let terminalError = err; if (ReplayDivergenceError.is(err)) { const divergenceCount = diff --git a/packages/core/src/runtime/constants.ts b/packages/core/src/runtime/constants.ts index b29df7bd15..99f8d7fd8b 100644 --- a/packages/core/src/runtime/constants.ts +++ b/packages/core/src/runtime/constants.ts @@ -5,13 +5,16 @@ import { runtimeLogger } from '../logger.js'; // max visibility window (24 hours) so that our handler-side failure path // reliably executes before VQS expires the message. // -// VQS retry schedule (with retryAfterSeconds: 5): -// Attempts 1–32: linear backoff at 5s each → 32 × 5s = 160s (~2.7 min) -// Attempts 33+: exponential backoff: 60s × 2^(attempt-32), -// capped at 7,200s (2h), floored at retryAfterSeconds -// -// At 48 attempts the total elapsed time is approximately 20 hours, which is -// safely under the 24-hour message visibility limit. +// The effective wall-clock survival depends on the per-redelivery backoff: the +// `retry-after` the handler returns (see world-vercel +// `getHandlerErrorRetryAfterSeconds`) fed through VQS `calculateBackoffDelay`. +// VQS uses our value for the first 32 attempts (clamped to [5s, 900s]) then +// applies its own exponential growth — every hop hard-capped at the SQS limit +// of 900s. With the backoff ramping toward that 900s ceiling, 48 attempts span +// the better part of the 24-hour window, leaving headroom for the failure path +// to run before the message expires. (A flatter, low-capped backoff exhausts +// the budget in only a few hours, failing otherwise-healthy runs during a +// transient backend outage.) export const MAX_QUEUE_DELIVERIES = 48; /** diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 88c8838074..7f18a846ce 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -1,9 +1,4 @@ -import { - EntityConflictError, - ThrottleError, - WorkflowRuntimeError, - WorkflowWorldError, -} from '@workflow/errors'; +import { EntityConflictError, WorkflowRuntimeError } from '@workflow/errors'; import { workflowDisplayName } from '@workflow/utils/parse-name'; import type { WorkflowInvokePayload, World } from '@workflow/world'; import { @@ -16,6 +11,7 @@ import { import { monotonicFactory } from 'ulid'; import { normalizeAttributeChanges } from '../attribute-changes.js'; import { getRunCapabilities } from '../capabilities.js'; +import { isRetryableWorldError } from '../classify-error.js'; import { importKey } from '../encryption.js'; import { runtimeLogger } from '../logger.js'; import type { Serializable } from '../schemas.js'; @@ -424,10 +420,11 @@ export async function start( // the run creation call gets a cold start or other slowdown, and the queue // + run_started call completes faster. We expect this to be <=1% of cases. // In this case, we can safely return. - } else if (isRetryableStartError(err)) { - // 429 (ThrottleError) and 5xx (WorkflowWorldError with status >= 500) - // are retryable — the run was accepted via the queue and creation - // will be re-tried by the runtime when it calls run_started. + } else if (isRetryableWorldError(err)) { + // 429 (ThrottleError), 5xx, and transient transport failures + // (TRANSPORT/TIMEOUT) are retryable — the run was accepted via the + // queue and creation will be re-tried by the runtime when it calls + // run_started. resilientStart = true; runtimeLogger.warn( 'Run creation event failed, but the run was accepted via the queue. ' + @@ -481,16 +478,3 @@ export async function start( }); }); } - -/** - * Checks if an error from events.create (run_created) is retryable, - * meaning the queue can re-try creation later via the run_started path. - * - ThrottleError (429): rate limited, will succeed later - * - WorkflowWorldError with status >= 500: server error, will succeed later - */ -function isRetryableStartError(err: unknown): boolean { - if (ThrottleError.is(err)) return true; - if (WorkflowWorldError.is(err) && err.status && err.status >= 500) - return true; - return false; -} diff --git a/packages/core/src/runtime/step-executor.ts b/packages/core/src/runtime/step-executor.ts index 3cc2494652..d2a9d3c5e8 100644 --- a/packages/core/src/runtime/step-executor.ts +++ b/packages/core/src/runtime/step-executor.ts @@ -210,6 +210,15 @@ export async function executeStep( }); return { type: 'retry', timeoutSeconds }; } + // Any other failure (including a transient transport / 5xx world error) + // propagates. It surfaces in the replay loop, where retryable world + // errors are rethrown to the queue handler — earning the delivery-count + // backoff AND the MAX_QUEUE_DELIVERIES cap. We deliberately do NOT + // self-enqueue a fresh message here: that resets the delivery count, so a + // persistently-failing step_started write would loop forever at a flat + // interval and never hit the cap. Throwing on step_started is safe — + // the step body has not run, and a write that did land server-side + // dedupes to `skipped` on replay. throw err; } diff --git a/packages/world-vercel/src/http-client.ts b/packages/world-vercel/src/http-client.ts index bfcd386eac..42113a4c86 100644 --- a/packages/world-vercel/src/http-client.ts +++ b/packages/world-vercel/src/http-client.ts @@ -15,8 +15,9 @@ export function getDispatcher(config?: APIConfig): unknown { * Returns a shared undici RetryAgent wrapping an Agent. * * - Connection pooling (up to 8 connections per origin) - * - Retry: Automatic retry on 429/5xx or network errors with exponential backoff - * - Observes Retry-After header if received and lower than 30s + * - Retry: Automatic retry on 5xx or network errors with exponential backoff + * (idempotent methods only — undici's default never retries POST), observing + * the `Retry-After` header when present. */ function getDefaultDispatcher(): RetryAgent { if (!_dispatcher) { @@ -35,10 +36,17 @@ function getDefaultDispatcher(): RetryAgent { { // Observe Retry-After header if received retryAfter: true, - // By default, we observe re-try headers, and also separately - // re-try on these status codes: 429 / 500 / 502 / 503 / 504. - // TODO: We might want to let 429s pass through, so that we can do - // runtime retry-after handling through the queue. + // Retry 5xx in-process (genuine transient blips recover fast), but + // NOT 429. The Vercel firewall issues a challenge as a 429: our + // server-to-server client cannot solve a challenge, so in-process + // retries just re-trigger it ~5× per request and amplify load against + // an already-overloaded firewall during an incident. Letting 429 pass + // through surfaces it immediately to makeRequest — which maps it to a + // ThrottleError carrying the `x-vercel-mitigated` / `x-vercel-id` + // headers — and the queue does the (backed-off) retry instead. This is + // the long-standing "let 429s pass through" intent. + // (undici default is [500, 502, 503, 504, 429].) + statusCodes: [500, 502, 503, 504], } ); } diff --git a/packages/world-vercel/src/queue.test.ts b/packages/world-vercel/src/queue.test.ts index b8b2490af8..3f6eb80377 100644 --- a/packages/world-vercel/src/queue.test.ts +++ b/packages/world-vercel/src/queue.test.ts @@ -17,6 +17,9 @@ const { const mockSend = vi.fn(); const mockHandleCallback = vi.fn(); + // Must be a `function` (not an arrow): queue.ts calls `new QueueClient(...)`, + // and an arrow function cannot be used as a constructor. + // biome-ignore lint/complexity/useArrowFunction: needs to be newable const MockQueueClient = vi.fn().mockImplementation(function () { return { send: mockSend, @@ -390,7 +393,21 @@ describe('createQueue', () => { messageId: 'msg-123', deliveryCount: 8, }) - ).toEqual({ afterSeconds: 60 }); + ).toEqual({ afterSeconds: 128 }); + // Ramps toward the 900s ceiling (VQS clamps each redelivery to its + // 900s SQS limit) so a sustained outage spans most of the 24h window. + expect( + options.retry(new Error('workflow server unavailable'), { + messageId: 'msg-123', + deliveryCount: 11, + }) + ).toEqual({ afterSeconds: 900 }); + expect( + options.retry(new Error('workflow server unavailable'), { + messageId: 'msg-123', + deliveryCount: 20, + }) + ).toEqual({ afterSeconds: 900 }); randomSpy.mockReturnValue(0.999); expect( @@ -404,7 +421,7 @@ describe('createQueue', () => { messageId: 'msg-123', deliveryCount: 8, }) - ).toEqual({ afterSeconds: 45 }); + ).toEqual({ afterSeconds: 96 }); } finally { randomSpy.mockRestore(); } diff --git a/packages/world-vercel/src/queue.ts b/packages/world-vercel/src/queue.ts index 0de8ee9d38..902d9e6a3b 100644 --- a/packages/world-vercel/src/queue.ts +++ b/packages/world-vercel/src/queue.ts @@ -135,7 +135,16 @@ const MAX_DELAY_SECONDS = Number( ); const HANDLER_ERROR_RETRY_AFTER_SECONDS = 1; -const HANDLER_ERROR_MAX_RETRY_AFTER_SECONDS = 60; +// Ceiling for the per-redelivery backoff. This value is the `retry-after` we +// hand to VQS, which clamps it into [5s, MAX_SQS_DELAY_SECONDS=900s] for the +// first 32 deliveries and then applies its own exponential growth (also capped +// at 900s) — see vqs-server `calculateBackoffDelay`. Capping our base at 60s +// (the old value) wasted that headroom: a run stuck behind a sustained backend +// outage exhausted its delivery budget in ~3.7h. Ramping to the 900s ceiling +// instead stretches survival across the bulk of the ~24h message-visibility +// window, so transient outages don't fail otherwise-healthy runs. Going above +// 900s is pointless — VQS clamps it. +const HANDLER_ERROR_MAX_RETRY_AFTER_SECONDS = 900; const HANDLER_ERROR_RETRY_JITTER_RATIO = 0.25; function getHandlerErrorRetryAfterSeconds(deliveryCount: number): number { diff --git a/packages/world-vercel/src/utils.test.ts b/packages/world-vercel/src/utils.test.ts index f1851c8417..8bf0011d75 100644 --- a/packages/world-vercel/src/utils.test.ts +++ b/packages/world-vercel/src/utils.test.ts @@ -280,4 +280,127 @@ describe('makeRequest body-parse retry', () => { 'upstream timed out (x-vercel-id=iad1::req-abc; x-vercel-error=FUNCTION_INVOCATION_TIMEOUT)' ); }); + + it('surfaces the firewall x-vercel-mitigated header in HTTP response errors', async () => { + // A firewall `deny` arrives as a 403 (not retried by the RetryAgent), so + // its mitigation + trace headers reach our response handling. + const fetchMock = vi.fn().mockResolvedValue( + new Response(JSON.stringify({ message: 'Forbidden' }), { + status: 403, + headers: { + 'content-type': 'application/json', + 'x-vercel-id': 'sfo1::req-deny', + 'x-vercel-mitigated': 'deny', + }, + }) + ); + vi.stubGlobal('fetch', fetchMock); + + await expect( + makeRequest({ + endpoint: '/v3/runs/wrun_test/events', + options: { method: 'GET' }, + schema, + }) + ).rejects.toThrow('x-vercel-id=sfo1::req-deny; x-vercel-mitigated=deny'); + }); +}); + +describe('makeRequest transport errors', () => { + const schema = z.object({ value: z.string() }); + const originalEnv = process.env; + + beforeEach(() => { + process.env = { ...originalEnv }; + delete process.env.VERCEL_WORKFLOW_SERVER_URL; + delete process.env.VERCEL_OIDC_TOKEN; + }); + + afterEach(() => { + process.env = originalEnv; + vi.unstubAllGlobals(); + }); + + it('maps an exhausted RetryAgent (UND_ERR_REQ_RETRY in cause) to a TRANSPORT error', async () => { + // fetch() wraps the underlying undici error in a `TypeError: fetch failed` + // whose `cause` carries the real `.code` — the firewall returning 429/503 + // that the RetryAgent retried and then gave up on surfaces this way. + const cause = Object.assign(new Error('Request failed'), { + code: 'UND_ERR_REQ_RETRY', + }); + const fetchErr = Object.assign(new TypeError('fetch failed'), { cause }); + const fetchMock = vi.fn().mockRejectedValue(fetchErr); + vi.stubGlobal('fetch', fetchMock); + + await expect( + makeRequest({ + endpoint: '/v3/runs/wrun_test/events', + options: { method: 'GET' }, + schema, + }) + ).rejects.toMatchObject({ name: 'WorkflowWorldError', code: 'TRANSPORT' }); + + // Transport failures are not body-parse retried inside makeRequest — the + // queue redrive is the retry layer, so exactly one attempt is made. + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it('maps a direct socket error code (ECONNRESET) to TRANSPORT', async () => { + const fetchErr = Object.assign(new Error('socket hang up'), { + code: 'ECONNRESET', + }); + vi.stubGlobal('fetch', vi.fn().mockRejectedValue(fetchErr)); + + await expect( + makeRequest({ + endpoint: '/v3/runs/wrun_test/events', + options: { method: 'GET' }, + schema, + }) + ).rejects.toMatchObject({ name: 'WorkflowWorldError', code: 'TRANSPORT' }); + }); + + it('preserves the original error as the cause', async () => { + const cause = Object.assign(new Error('Request failed'), { + code: 'UND_ERR_REQ_RETRY', + }); + const fetchErr = Object.assign(new TypeError('fetch failed'), { cause }); + vi.stubGlobal('fetch', vi.fn().mockRejectedValue(fetchErr)); + + const rejection = await makeRequest({ + endpoint: '/v3/runs/wrun_test/events', + options: { method: 'GET' }, + schema, + }).catch((e) => e); + + expect(rejection.cause).toBe(fetchErr); + }); + + it('rethrows a non-transient fetch error unchanged', async () => { + const fetchErr = new Error('some unexpected non-network error'); + vi.stubGlobal('fetch', vi.fn().mockRejectedValue(fetchErr)); + + await expect( + makeRequest({ + endpoint: '/v3/runs/wrun_test/events', + options: { method: 'GET' }, + schema, + }) + ).rejects.toBe(fetchErr); + }); + + it('maps an AbortSignal timeout to a TIMEOUT error', async () => { + const timeoutErr = Object.assign(new Error('The operation timed out'), { + name: 'TimeoutError', + }); + vi.stubGlobal('fetch', vi.fn().mockRejectedValue(timeoutErr)); + + await expect( + makeRequest({ + endpoint: '/v3/runs/wrun_test/events', + options: { method: 'GET' }, + schema, + }) + ).rejects.toMatchObject({ name: 'WorkflowWorldError', code: 'TIMEOUT' }); + }); }); diff --git a/packages/world-vercel/src/utils.ts b/packages/world-vercel/src/utils.ts index 0f1f6d789d..abd0d43f97 100644 --- a/packages/world-vercel/src/utils.ts +++ b/packages/world-vercel/src/utils.ts @@ -60,10 +60,20 @@ function httpLog( } function getResponseDiagnosticHeaders(response: Response): string[] { - return ['x-vercel-id', 'x-vercel-error'].flatMap((header) => { - const value = response.headers.get(header); - return value ? [`${header}=${value}`] : []; - }); + // `x-vercel-mitigated` (`challenge` | `deny`) is set by the Vercel firewall + // when it intercepts a request in front of workflow-server — surfacing it + // (alongside the `x-vercel-id` trace identifier) makes a firewall block + // diagnosable from the error message and DEBUG logs. Both a 429 `challenge` + // and a 403 `deny` reach this response handling because the RetryAgent no + // longer retries 429 in-process (see http-client.ts), so the mitigation type + // and trace id are preserved for the SDK / queue instead of being lost inside + // undici's retry loop. + return ['x-vercel-id', 'x-vercel-error', 'x-vercel-mitigated'].flatMap( + (header) => { + const value = response.headers.get(header); + return value ? [`${header}=${value}`] : []; + } + ); } function formatResponseDiagnostics(response: Response): string { @@ -109,6 +119,57 @@ export const MAX_BODY_PARSE_RETRIES = 2; /** Base delay for the exponential backoff between body-parse retries. */ const BODY_PARSE_RETRY_BASE_MS = 100; +/** + * Transient transport failure codes. When a request to workflow-server cannot + * complete, `fetch()` throws rather than returning a response: the shared + * `RetryAgent` exhausted its retries (`UND_ERR_REQ_RETRY` — e.g. the firewall + * in front of workflow-server shedding load with sustained 429/503, which the + * RetryAgent retries internally and never surfaces to us), the socket dropped, + * or connect/DNS failed. These are retryable infrastructure failures, not + * contract or user errors, so we map them to a typed `WorkflowWorldError` + * (`code: 'TRANSPORT'`) that the runtime recognizes as retryable and bubbles + * to the queue for a fast redrive — instead of crashing the invocation or + * failing the run. + */ +const TRANSIENT_TRANSPORT_ERROR_CODES = new Set([ + 'UND_ERR_REQ_RETRY', + 'UND_ERR_SOCKET', + 'UND_ERR_CONNECT', + 'UND_ERR_CONNECT_TIMEOUT', + 'UND_ERR_HEADERS_TIMEOUT', + 'UND_ERR_BODY_TIMEOUT', + 'UND_ERR_CLOSED', + 'ECONNRESET', + 'ECONNREFUSED', + 'ENOTFOUND', + 'EAI_AGAIN', + 'EPIPE', +]); + +/** + * Walks the `cause` chain of a thrown value looking for a transient transport + * error code. `fetch()` wraps the underlying undici error in a + * `TypeError: fetch failed` whose `cause` carries the real `.code`, so the + * code we care about is usually one level down (sometimes two). Bounded depth + * guards against pathological or cyclic `cause` chains. + */ +function getTransientTransportCode(error: unknown): string | undefined { + let current: unknown = error; + for (let depth = 0; current != null && depth < 5; depth++) { + if (typeof current === 'object' && 'code' in current) { + const code = (current as { code?: unknown }).code; + if ( + typeof code === 'string' && + TRANSIENT_TRANSPORT_ERROR_CODES.has(code) + ) { + return code; + } + } + current = (current as { cause?: unknown })?.cause; + } + return undefined; +} + /** * Effective workflow-server URL override. The inline constant wins when * set; otherwise falls back to the `VERCEL_WORKFLOW_SERVER_URL` env var. @@ -393,12 +454,26 @@ export async function makeRequest({ ) { const timeoutError = new WorkflowWorldError( `${method} ${endpoint} timed out after ${elapsed}ms`, - { url, cause: error } + { url, code: 'TIMEOUT', cause: error } ); span?.setAttributes({ ...ErrorType('TIMEOUT') }); span?.recordException?.(timeoutError); throw timeoutError; } + // Transient transport failure (RetryAgent retries exhausted, socket + // reset, connect/DNS failure). Surface as a retryable + // WorkflowWorldError so the runtime redrives via the queue instead + // of failing the run. See TRANSIENT_TRANSPORT_ERROR_CODES. + const transportCode = getTransientTransportCode(error); + if (transportCode) { + const transportError = new WorkflowWorldError( + `${method} ${endpoint} transport failure after ${elapsed}ms (${transportCode})`, + { url, code: 'TRANSPORT', cause: error } + ); + span?.setAttributes({ ...ErrorType('TRANSPORT') }); + span?.recordException?.(transportError); + throw transportError; + } throw error; } const fetchMs = Date.now() - fetchStart;