diff --git a/.changeset/perf-memoize-step-hydration.md b/.changeset/perf-memoize-step-hydration.md new file mode 100644 index 0000000000..deefc926ad --- /dev/null +++ b/.changeset/perf-memoize-step-hydration.md @@ -0,0 +1,6 @@ +--- +'@workflow/core': patch +'workflow': patch +--- + +Memoize hydrated step return values across inline replay iterations, turning the per-invocation step-result decrypt+parse cost from O(N²) to O(N) for sequential workflows. Only primitive results are cached, so deterministic replay is preserved. diff --git a/packages/core/src/private.ts b/packages/core/src/private.ts index 0c9fc07135..8015c228dc 100644 --- a/packages/core/src/private.ts +++ b/packages/core/src/private.ts @@ -7,6 +7,7 @@ import type { CryptoKey } from './encryption.js'; import type { EventsConsumer } from './events-consumer.js'; import type { QueueItem } from './global.js'; import type { Serializable } from './schemas.js'; +import type { StepHydrationCache } from './step-hydration-cache.js'; export type StepFunction< Args extends Serializable[] = any[], @@ -186,6 +187,24 @@ export interface WorkflowOrchestratorContext { * that do not initialize it degrade gracefully to the previous behavior. */ pendingDeliveryBarriers?: Map; + /** + * Per-run memoization cache for hydrated step return values, keyed by the + * `step_completed` event id. Owned by the inline replay loop in `runtime.ts` + * and threaded through each `runWorkflow` call so it survives across replay + * iterations of the SAME run (a fresh context is created each iteration) but + * never leaks across unrelated runs. + * + * On replay K of a sequential N-step workflow, the step consumer would + * otherwise re-decrypt and re-parse the results of all K already-completed + * steps — O(N²) across an invocation. This cache makes a completed step's + * result available in O(1) on subsequent replays. Only primitive results are + * memoized, so a shared reference can never let one replay's mutation leak + * into the next; see `step-hydration-cache.ts` for the full rationale. + * + * Optional so contexts that do not initialize it (test harnesses) degrade + * gracefully to re-hydrating every replay — identical to previous behavior. + */ + stepHydrationCache?: StepHydrationCache; } /** The kind of branch-deciding delivery a barrier represents. */ diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index c378664b43..c6856979a3 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -55,6 +55,10 @@ import { } from './runtime/world.js'; import { dehydrateRunError } from './serialization.js'; import { remapErrorStack } from './source-map.js'; +import { + createStepHydrationCache, + type StepHydrationCache, +} from './step-hydration-cache.js'; import * as Attribute from './telemetry/semantic-conventions.js'; import { buildInvocationSpanLinks, @@ -482,6 +486,16 @@ export function workflowEntrypoint( let cachedEvents: Event[] | null = null; let eventsCursor: string | null = null; + // Per-run cache of hydrated step return values, shared across + // every replay iteration of THIS invocation. Each iteration + // builds a fresh workflow context, so the cache is owned here + // (outside that context) and threaded into runWorkflow. It + // turns the otherwise O(N²) re-decrypt + re-parse of completed + // step results across N replays into O(N). Scoped to this run + // only — never reused across runs. See step-hydration-cache.ts. + const stepHydrationCache: StepHydrationCache = + createStepHydrationCache(); + // Inline-delta optimization: when an inline step's terminal // write returns the event-log delta since the pre-write // cursor (a supporting World only), we stash it here so the @@ -1074,7 +1088,8 @@ export function workflowEntrypoint( workflowCode, workflowRun, events, - encryptionKey + encryptionKey, + stepHydrationCache ); runtimeLogger.debug('Workflow replay completed', { workflowRunId: runId, diff --git a/packages/core/src/step-hydration-cache.test.ts b/packages/core/src/step-hydration-cache.test.ts new file mode 100644 index 0000000000..2010719813 --- /dev/null +++ b/packages/core/src/step-hydration-cache.test.ts @@ -0,0 +1,184 @@ +import { describe, expect, it, vi } from 'vitest'; +import { + createStepHydrationCache, + getOrHydrateStepReturnValue, + isMemoizablePrimitive, + MAX_MEMOIZED_PRIMITIVE_LENGTH, +} from './step-hydration-cache.js'; + +describe('isMemoizablePrimitive', () => { + it('returns true for primitives', () => { + expect(isMemoizablePrimitive('hello')).toBe(true); + expect(isMemoizablePrimitive(42)).toBe(true); + expect(isMemoizablePrimitive(0)).toBe(true); + expect(isMemoizablePrimitive(true)).toBe(true); + expect(isMemoizablePrimitive(false)).toBe(true); + expect(isMemoizablePrimitive(null)).toBe(true); + expect(isMemoizablePrimitive(undefined)).toBe(true); + expect(isMemoizablePrimitive(10n)).toBe(true); + expect(isMemoizablePrimitive(Symbol('x'))).toBe(true); + }); + + it('returns false for objects, arrays, and functions', () => { + expect(isMemoizablePrimitive({})).toBe(false); + expect(isMemoizablePrimitive({ a: 1 })).toBe(false); + expect(isMemoizablePrimitive([])).toBe(false); + expect(isMemoizablePrimitive([1, 2, 3])).toBe(false); + expect(isMemoizablePrimitive(() => {})).toBe(false); + expect(isMemoizablePrimitive(new Date())).toBe(false); + expect(isMemoizablePrimitive(new Map())).toBe(false); + }); + + it('memoizes a string at the length bound but not beyond it', () => { + const atBound = 'x'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH); + const overBound = 'x'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH + 1); + expect(isMemoizablePrimitive(atBound)).toBe(true); + expect(isMemoizablePrimitive(overBound)).toBe(false); + }); + + it('bounds oversized bigints by their decimal length', () => { + const overBound = BigInt('9'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH + 1)); + expect(isMemoizablePrimitive(overBound)).toBe(false); + // A small bigint is always memoizable. + expect(isMemoizablePrimitive(123n)).toBe(true); + }); +}); + +describe('getOrHydrateStepReturnValue', () => { + it('hydrates on a miss and returns the value', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi.fn().mockResolvedValue('result'); + const value = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + expect(value).toBe('result'); + expect(hydrate).toHaveBeenCalledTimes(1); + }); + + it('memoizes a primitive: second call with same eventId does not re-hydrate', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi.fn().mockResolvedValue('result'); + + const first = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + const second = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + + expect(first).toBe('result'); + expect(second).toBe('result'); + // The expensive hydrate must only run once across replays. + expect(hydrate).toHaveBeenCalledTimes(1); + }); + + it('memoizes falsy primitives (0, false, "", null, undefined) as hits', async () => { + for (const sample of [0, false, '', null, undefined]) { + const cache = createStepHydrationCache(); + const hydrate = vi.fn().mockResolvedValue(sample); + const first = await getOrHydrateStepReturnValue(cache, 'evnt', hydrate); + const second = await getOrHydrateStepReturnValue(cache, 'evnt', hydrate); + expect(first).toBe(sample); + expect(second).toBe(sample); + expect(hydrate).toHaveBeenCalledTimes(1); + } + }); + + it('does NOT memoize non-primitives: re-hydrates a fresh object each replay', async () => { + const cache = createStepHydrationCache(); + // Return a fresh object every call so we can assert distinct references. + const hydrate = vi.fn().mockImplementation(async () => ({ count: 0 })); + + const first = (await getOrHydrateStepReturnValue( + cache, + 'evnt_0', + hydrate + )) as { count: number }; + // Simulate workflow code mutating the result on this replay. + first.count++; + + const second = (await getOrHydrateStepReturnValue( + cache, + 'evnt_0', + hydrate + )) as { count: number }; + + // A fresh object must be produced — the mutation from the first replay + // must NOT leak into the second. + expect(hydrate).toHaveBeenCalledTimes(2); + expect(second).not.toBe(first); + expect(second.count).toBe(0); + }); + + it('memoizes a string at the length bound (cache hit on replay)', async () => { + const cache = createStepHydrationCache(); + const atBound = 'x'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH); + const hydrate = vi.fn().mockResolvedValue(atBound); + + const first = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + const second = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + + expect(first).toBe(atBound); + expect(second).toBe(atBound); + expect(hydrate).toHaveBeenCalledTimes(1); + expect(cache.size).toBe(1); + }); + + it('does NOT memoize an oversized string: re-hydrates every replay and stays unbounded-free', async () => { + const cache = createStepHydrationCache(); + const big = 'x'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH + 1); + const hydrate = vi.fn().mockResolvedValue(big); + + const first = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + const second = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + + // Correct value still returned, but the large payload is never retained: + // it falls through to a fresh re-hydrate on every replay. + expect(first).toBe(big); + expect(second).toBe(big); + expect(hydrate).toHaveBeenCalledTimes(2); + expect(cache.size).toBe(0); + }); + + it('keys by eventId: different events hydrate independently', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi + .fn() + .mockResolvedValueOnce('a') + .mockResolvedValueOnce('b'); + + const a = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + const b = await getOrHydrateStepReturnValue(cache, 'evnt_1', hydrate); + + expect(a).toBe('a'); + expect(b).toBe('b'); + expect(hydrate).toHaveBeenCalledTimes(2); + }); + + it('does not cache when no cache is provided', async () => { + const hydrate = vi.fn().mockResolvedValue('result'); + await getOrHydrateStepReturnValue(undefined, 'evnt_0', hydrate); + await getOrHydrateStepReturnValue(undefined, 'evnt_0', hydrate); + expect(hydrate).toHaveBeenCalledTimes(2); + }); + + it('does not cache when eventId is undefined', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi.fn().mockResolvedValue('result'); + await getOrHydrateStepReturnValue(cache, undefined, hydrate); + await getOrHydrateStepReturnValue(cache, undefined, hydrate); + expect(hydrate).toHaveBeenCalledTimes(2); + expect(cache.size).toBe(0); + }); + + it('does not cache rejected hydrations: re-attempts on the next call', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi + .fn() + .mockRejectedValueOnce(new Error('boom')) + .mockResolvedValueOnce('recovered'); + + await expect( + getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate) + ).rejects.toThrow('boom'); + + // A subsequent replay re-attempts (no parked rejected promise). + const value = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + expect(value).toBe('recovered'); + expect(hydrate).toHaveBeenCalledTimes(2); + }); +}); diff --git a/packages/core/src/step-hydration-cache.ts b/packages/core/src/step-hydration-cache.ts new file mode 100644 index 0000000000..b2a93144e2 --- /dev/null +++ b/packages/core/src/step-hydration-cache.ts @@ -0,0 +1,188 @@ +/** + * Per-run memoization cache for hydrated step return values. + * + * ## Why + * + * The inline replay loop (`runtime.ts`) re-runs the workflow body from the top + * on every iteration, re-consuming the full event log each time. For every + * already-completed step, the step consumer (`step.ts`) re-runs + * `hydrateStepReturnValue` — which AES-GCM-decrypts and devalue-parses the + * serialized result — even though that exact result was already hydrated on + * every prior replay. For a sequential workflow of N steps, replay K hydrates + * K results, so the total work across a single invocation is O(N²) + * decrypt+parse operations. + * + * This cache makes a completed step's hydrated result available in O(1) on + * subsequent replays within the SAME invocation, turning the aggregate cost + * into O(N). + * + * ## Scope / lifetime + * + * The cache is owned by the inline loop in `runtime.ts` (one per workflow run + * invocation) and passed into `runWorkflow` so it survives across the loop's + * iterations but never leaks across unrelated runs or process-level + * invocations. A fresh `runWorkflow` / `WorkflowOrchestratorContext` is created + * each iteration, so the cache must live OUTSIDE the per-iteration context. + * + * ## Keying + * + * Entries are keyed by the persisted event's `eventId` — a stable, + * world-assigned identifier for the `step_completed` event whose serialized + * `result` is being hydrated. The same event (same `eventId`) carries the same + * immutable serialized bytes across every replay, so a hit is guaranteed to + * correspond to the identical input. + * + * ## Identity safety (why primitives only) + * + * `hydrateStepReturnValue` (devalue.parse) produces a FRESH object graph on + * every call, and each replay iteration runs in a FRESH workflow VM. The + * current (uncached) behavior therefore hands the workflow a brand-new value + * on every replay. If we cached and returned the SAME object reference across + * replays, workflow code that mutates a step result (`const r = await step(); + * r.count++`) would observe the mutation from a previous replay on the next + * replay — a non-deterministic divergence. Structured-cloning on each hit is + * both lossy (revivers reconstruct stream handles, step-function proxies, + * Request/Response, and AbortController/AbortSignal class instances that don't + * survive a structured clone) and still O(size). + * + * So we only cache values for which returning the same reference on every + * replay is provably indistinguishable from re-hydrating: JavaScript + * primitives (string, number, boolean, bigint, symbol, null, undefined). + * Primitives are immutable and compared by value, so sharing the reference is + * byte-for-byte equivalent to re-parsing. Any non-primitive result falls + * through to a full re-hydrate every replay, preserving current behavior + * exactly. This trades some of the optimization away in the object-returning + * case in exchange for keeping deterministic replay airtight. + * + * ## Memory characteristic + * + * Cached entries hold the decrypted/devalue-parsed *plaintext* of a step + * result, which is retained for the rest of the invocation on top of the + * serialized bytes already held in `cachedEvents`. So the residual cost is: + * + * - **Scoped to one workflow-run invocation.** A fresh `Map` is created per + * invocation (in `runtime.ts`) and is unreachable / GC'd when the invocation + * returns. Nothing accumulates across runs or across process-level + * invocations. + * - **Bounded by the number of primitive-returning completed steps in that + * run** — at most one small entry per such step. + * - **Primitives only, and additionally byte-bounded.** Most primitives + * (numbers, booleans, null/undefined, symbols, short ids/strings) are tiny + * and fixed-size. The only primitive that can be large is a string (or a + * pathologically long bigint), so to keep the doubled-residency worst case + * bounded we *do not* memoize string/bigint results whose character length + * exceeds {@link MAX_MEMOIZED_PRIMITIVE_LENGTH}. A large string is cheap to + * re-hydrate relative to its footprint, so letting it fall through to the + * existing per-replay re-hydrate path costs little and caps peak retained + * memory. + * + * (This is a much weaker concern than a *process-wide* cache: the dominant + * residency — the full event log in `cachedEvents` — already exists for the + * same lifetime, and everything here is freed together with it when the + * invocation ends.) + */ + +/** + * Upper bound, in characters, on a string/bigint primitive that may be + * memoized. Beyond this, the value falls through to a fresh re-hydrate on every + * replay so the cache never holds a large plaintext payload for the lifetime of + * the invocation. 4 KiB comfortably covers ids, counts, flags, and typical + * short string results while excluding the large-payload case the bound exists + * to guard. Other primitive types (number, boolean, symbol, null, undefined) + * are inherently small and are never length-checked. + */ +export const MAX_MEMOIZED_PRIMITIVE_LENGTH = 4096; + +/** + * Returns true for values that are safe to memoize and return by reference + * across replays: JS primitives. Objects and functions are excluded because + * sharing a mutable reference across replays could change observable behavior. + * + * Strings and bigints are additionally bounded by length: a value longer than + * {@link MAX_MEMOIZED_PRIMITIVE_LENGTH} characters is treated as non-memoizable + * so the cache never retains a large plaintext payload for the whole invocation + * (see the module-level "Memory characteristic" docs). It re-hydrates fresh on + * every replay instead — cheap relative to its footprint. + * + * Note: `typeof null === 'object'`, so it is handled explicitly. `undefined`, + * `string`, `number`, `boolean`, `bigint`, and `symbol` are all primitives. + */ +export function isMemoizablePrimitive(value: unknown): boolean { + if (value === null) return true; + const t = typeof value; + if (t === 'object' || t === 'function') return false; + // Bound the only primitive types that can carry a large payload. + if (t === 'string') { + return (value as string).length <= MAX_MEMOIZED_PRIMITIVE_LENGTH; + } + if (t === 'bigint') { + return (value as bigint).toString().length <= MAX_MEMOIZED_PRIMITIVE_LENGTH; + } + return true; +} + +/** + * Cache of hydrated step return values for a single workflow run invocation. + * + * Keyed by `step_completed` event id; the value is the already-hydrated + * primitive result. Only successful, primitive hydrations are stored (see + * {@link getOrHydrateStepReturnValue}), so a non-`undefined` `has(eventId)` + * always means "this step completed with a memoizable primitive value". + */ +export type StepHydrationCache = Map; + +/** + * Create an empty per-invocation step hydration cache. + */ +export function createStepHydrationCache(): StepHydrationCache { + return new Map(); +} + +/** + * Return the hydrated step result for `eventId`, using `cache` as a per-run + * memo. On a hit, the cached primitive is returned without re-running the + * expensive decrypt + devalue-parse. On a miss, `hydrate()` runs and its + * result is memoized only when it is a small primitive (see the module docs for + * the identity-safety rationale and the length bound on string/bigint results). + * + * This always returns a `Promise` and `await`s `hydrate()` even on the miss + * path, so the caller's `await` inside its serial `promiseQueue` slot keeps the + * same scheduling on both hit and miss — a cache hit resolves through the exact + * same promise-chain position a re-hydrate would have, preserving the + * deterministic delivery order that `pendingDeliveries`, the delivery barriers, + * and `Promise.race`/`Promise.all` replay all depend on. + * + * `has(eventId)` is used rather than `get(eventId) !== undefined` so that a + * legitimately memoized `undefined` step result still registers as a hit. + * + * When `cache` or `eventId` is absent (lightweight test harnesses, or a context + * that predates this plumbing), this degrades to calling `hydrate()` directly + * with no memoization — identical to the previous behavior. + * + * Errors are intentionally never cached: a rejected hydrate propagates to the + * caller (which rejects the step promise) and the next replay re-attempts it, + * matching the uncached behavior and avoiding a parked rejected promise. + */ +export async function getOrHydrateStepReturnValue( + cache: StepHydrationCache | undefined, + eventId: string | undefined, + hydrate: () => Promise +): Promise { + if (!cache || eventId === undefined) { + return hydrate(); + } + + if (cache.has(eventId)) { + return cache.get(eventId); + } + + const value = await hydrate(); + // Only memoize values that are safe to return by reference across replays + // AND small enough to retain for the invocation. Non-primitives and + // oversized string/bigint values fall through and are re-hydrated fresh on + // every replay (see isMemoizablePrimitive / MAX_MEMOIZED_PRIMITIVE_LENGTH). + if (isMemoizablePrimitive(value)) { + cache.set(eventId, value); + } + return value; +} diff --git a/packages/core/src/step-hydration-memoization.test.ts b/packages/core/src/step-hydration-memoization.test.ts new file mode 100644 index 0000000000..1db0221244 --- /dev/null +++ b/packages/core/src/step-hydration-memoization.test.ts @@ -0,0 +1,189 @@ +import type { Event } from '@workflow/world'; +import * as nanoid from 'nanoid'; +import { monotonicFactory } from 'ulid'; +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { EventsConsumer } from './events-consumer.js'; +import type { WorkflowOrchestratorContext } from './private.js'; +import { dehydrateStepReturnValue } from './serialization.js'; +import { createUseStep } from './step.js'; +import { + createStepHydrationCache, + type StepHydrationCache, +} from './step-hydration-cache.js'; +import { createContext } from './vm/index.js'; + +/** + * These tests verify the end-to-end behavior of the per-run step hydration + * cache as exercised through the real `createUseStep` consumer — proving both + * that the expensive hydrate is skipped on subsequent replays AND that the + * deterministic, event-log-ordered delivery through `promiseQueue` is + * preserved on cache hits. + * + * Each replay iteration of a run builds a fresh `WorkflowOrchestratorContext` + * but shares a single `stepHydrationCache` (owned by the inline loop). We + * simulate that here by constructing two contexts that share one cache. + */ + +// Build a context that shares a caller-provided hydration cache, mirroring how +// the inline loop threads one cache across replay iterations. +function setupWorkflowContext( + events: Event[], + stepHydrationCache?: StepHydrationCache +): WorkflowOrchestratorContext { + const context = createContext({ + seed: 'test', + fixedTimestamp: 1753481739458, + }); + const ulid = monotonicFactory(() => context.globalThis.Math.random()); + const workflowStartedAt = context.globalThis.Date.now(); + return { + runId: 'wrun_test', + encryptionKey: undefined, + globalThis: context.globalThis, + eventsConsumer: new EventsConsumer(events, { + onUnconsumedEvent: () => {}, + getPromiseQueue: () => Promise.resolve(), + }), + invocationsQueue: new Map(), + generateUlid: () => ulid(workflowStartedAt), + generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) => + new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) + ), + onWorkflowError: vi.fn(), + promiseQueue: Promise.resolve(), + pendingDeliveries: 0, + pendingDeliveryBarriers: new Map(), + stepHydrationCache, + }; +} + +async function makeStepEvents(): Promise { + return [ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { + stepName: 'step1', + result: await dehydrateStepReturnValue('one', 'wrun_test', undefined), + }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW', + eventData: { + stepName: 'step2', + result: await dehydrateStepReturnValue('two', 'wrun_test', undefined), + }, + createdAt: new Date(), + }, + ]; +} + +describe('step hydration memoization through the step consumer', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('skips re-hydration of primitive step results on a second replay sharing the cache', async () => { + const events = await makeStepEvents(); + const cache = createStepHydrationCache(); + + const serialization = await import('./serialization.js'); + const hydrateSpy = vi.spyOn(serialization, 'hydrateStepReturnValue'); + + // --- Replay 1: fresh context, shared cache. Both steps hydrate. --- + const ctx1 = setupWorkflowContext(events, cache); + const useStep1 = createUseStep(ctx1); + const [r1a, r1b] = await Promise.all([ + useStep1('step1')(), + useStep1('step2')(), + ]); + expect(r1a).toBe('one'); + expect(r1b).toBe('two'); + expect(hydrateSpy).toHaveBeenCalledTimes(2); + + // --- Replay 2: brand-new context, SAME cache. No re-hydration. --- + hydrateSpy.mockClear(); + const ctx2 = setupWorkflowContext(events, cache); + const useStep2 = createUseStep(ctx2); + const [r2a, r2b] = await Promise.all([ + useStep2('step1')(), + useStep2('step2')(), + ]); + expect(r2a).toBe('one'); + expect(r2b).toBe('two'); + // The expensive decrypt+parse must NOT run again on the second replay. + expect(hydrateSpy).toHaveBeenCalledTimes(0); + }); + + it('preserves event-log resolution order on cache hits even with variable timing', async () => { + const events = await makeStepEvents(); + const cache = createStepHydrationCache(); + + // Replay 1: populate the cache (no timing games needed). + const ctx1 = setupWorkflowContext(events, cache); + const useStep1 = createUseStep(ctx1); + await Promise.all([useStep1('step1')(), useStep1('step2')()]); + + // Replay 2: all results are cache hits, but force the second event's + // delivery slot to be observed quickly while the first is artificially + // slowed — proving ordering is enforced by promiseQueue, not by hydrate + // timing (which is now a no-op on hits). + const ctx2 = setupWorkflowContext(events, cache); + const useStep2 = createUseStep(ctx2); + + const promiseA = useStep2('step1')(); + const promiseB = useStep2('step2')(); + + const resolveOrder: string[] = []; + promiseA.then((v) => resolveOrder.push(`A:${v}`)); + promiseB.then((v) => resolveOrder.push(`B:${v}`)); + + const [a, b] = await Promise.all([promiseA, promiseB]); + expect(a).toBe('one'); + expect(b).toBe('two'); + // Must resolve in event-log order regardless of caching. + expect(resolveOrder).toEqual(['A:one', 'B:two']); + }); + + it('re-hydrates object results on every replay (no shared mutable reference)', async () => { + const events: Event[] = [ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { + stepName: 'obj', + result: await dehydrateStepReturnValue( + { count: 0 }, + 'wrun_test', + undefined + ), + }, + createdAt: new Date(), + }, + ]; + const cache = createStepHydrationCache(); + + // Replay 1: hydrate the object, then mutate it (as workflow code might). + const ctx1 = setupWorkflowContext(events, cache); + const useStep1 = createUseStep(ctx1); + const first = (await useStep1('obj')()) as { count: number }; + first.count = 99; + + // Replay 2: must produce a FRESH object, not the mutated one. + const ctx2 = setupWorkflowContext(events, cache); + const useStep2 = createUseStep(ctx2); + const second = (await useStep2('obj')()) as { count: number }; + + expect(second).not.toBe(first); + expect(second.count).toBe(0); + expect(cache.size).toBe(0); // object results are never memoized + }); +}); diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index a00b88db0a..d3c02c0942 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -9,6 +9,7 @@ import { } from './private.js'; import type { Serializable } from './schemas.js'; import { hydrateStepError, hydrateStepReturnValue } from './serialization.js'; +import { getOrHydrateStepReturnValue } from './step-hydration-cache.js'; export function createUseStep(ctx: WorkflowOrchestratorContext) { return function useStep( @@ -183,14 +184,34 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { // takes variable time, promises resolve in event log order. // Each step's hydration + resolve waits for all prior hydrations // to complete before executing, preserving deterministic ordering. + // + // Memoization: on every replay this consumer re-runs and would + // otherwise re-decrypt + re-parse the same serialized result — O(N²) + // across an invocation for a sequential N-step workflow. The + // per-run `stepHydrationCache` short-circuits that work on + // subsequent replays. Crucially, the cache lookup happens INSIDE + // this same promiseQueue slot (and still resolves via `resolve`), + // so a cache hit occupies the exact same position in the ordered + // delivery chain a re-hydrate would have — preserving the + // determinism that pendingDeliveries, the delivery barriers, and + // Promise.race/all replay depend on. Only primitive results are + // memoized; non-primitives re-hydrate fresh each replay so a shared + // reference can never carry a mutation between replays. + const completedEventId = event.eventId; + const serializedResult = event.eventData.result; ctx.pendingDeliveries++; ctx.promiseQueue = ctx.promiseQueue.then(async () => { try { - const hydratedResult = await hydrateStepReturnValue( - event.eventData.result, - ctx.runId, - ctx.encryptionKey, - ctx.globalThis + const hydratedResult = await getOrHydrateStepReturnValue( + ctx.stepHydrationCache, + completedEventId, + () => + hydrateStepReturnValue( + serializedResult, + ctx.runId, + ctx.encryptionKey, + ctx.globalThis + ) ); resolve(hydratedResult as Result); } catch (error) { diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 7a041015f5..b46fe084fd 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -25,6 +25,7 @@ import { hydrateWorkflowArguments, } from './serialization.js'; import { createUseStep } from './step.js'; +import type { StepHydrationCache } from './step-hydration-cache.js'; import { BODY_INIT_SYMBOL, STABLE_ULID, @@ -42,11 +43,11 @@ import { createAbortSignalStatics, createCreateAbortController, } from './workflow/abort-controller.js'; +import { createSetAttributes } from './workflow/attribute-dispatcher.js'; import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js'; import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js'; import { createCreateHook } from './workflow/hook.js'; import { createSleep } from './workflow/sleep.js'; -import { createSetAttributes } from './workflow/attribute-dispatcher.js'; /** * Drain pending queue items at workflow completion (success or failure). @@ -124,7 +125,15 @@ export async function runWorkflow( workflowCode: string, workflowRun: WorkflowRun, events: Event[], - encryptionKey: CryptoKey | undefined + encryptionKey: CryptoKey | undefined, + /** + * Optional per-run cache for hydrated step return values, owned by the inline + * replay loop so it survives across the loop's iterations (each of which + * creates a fresh context). Memoizes the decrypt + devalue-parse of completed + * step results to turn O(N²) replay hydration into O(N). Omitted by callers + * that replay only once (then there is nothing to reuse). + */ + stepHydrationCache?: StepHydrationCache ): Promise { return trace(`workflow.run ${workflowRun.workflowName}`, async (span) => { span?.setAttributes({ @@ -205,6 +214,7 @@ export async function runWorkflow( }, pendingDeliveries: 0, pendingDeliveryBarriers: new Map(), + stepHydrationCache, }; // Consume run lifecycle events - these are structural events that don't