From 5d959dac43fa79aac7a6610375cef542405ecf43 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 22 Jun 2026 20:35:33 +0000 Subject: [PATCH] perf(core): memoize step return value hydration across inline replays (#2472) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * perf(core): memoize step return value hydration across replays The inline replay loop re-executes the workflow body and re-consumes the full event log on every iteration. For each already-completed step, the step consumer re-decrypted and re-devalue-parsed the serialized result on every replay — O(N^2) decrypt+parse operations across a single invocation of a sequential N-step workflow. Add a per-run memoization cache, owned by the inline loop in runtime.ts (alongside cachedEvents) so it survives across replay iterations of the same run but never leaks across runs. It is threaded into runWorkflow and stored on the orchestrator context, and consulted in the step_completed path keyed by the persisted event id. This makes a completed step's hydrated result O(1) on subsequent replays, turning the aggregate cost into O(N). Determinism is preserved: the cache lookup happens inside the existing ctx.promiseQueue slot and still resolves via the same resolve(), so a cache hit occupies the identical position in the ordered delivery chain a re-hydrate would have — pendingDeliveries accounting, delivery barriers, and Promise.race/all replay are untouched. Identity safety: hydrateStepReturnValue returns a fresh object graph each call and each replay runs in a fresh VM, so sharing an object reference across replays could let one replay's mutation leak into the next. Only primitive results are memoized (immutable, reference-share == re-parse); non-primitives re-hydrate fresh every replay, exactly as before. Hook, wait, and abort hydration paths are intentionally left uncached. Co-Authored-By: Claude Opus 4.8 * perf(core): bound memoized step-hydration cache by primitive size Address the review note that the per-run step hydration cache was never size-bounded: cached entries hold the decrypted/parsed plaintext of a primitive step result for the whole invocation, on top of the serialized bytes already retained in cachedEvents, so a long run returning large strings could roughly double peak retained memory for those results. Document the cache's memory characteristic (per-invocation, freed when the invocation ends, bounded by primitive-returning step count) and cap the only primitive types that can carry a large payload: string/bigint results longer than MAX_MEMOIZED_PRIMITIVE_LENGTH (4 KiB) fall through to the existing per-replay re-hydrate path instead of being memoized. Large payloads are cheap to re-hydrate relative to their footprint, so this caps the worst case at negligible cost. Other primitives are inherently small and always memoized. The cap only ever reduces what is cached, so deterministic replay is unaffected: oversized values take the already-correct re-hydrate path. Co-Authored-By: Claude Opus 4.8 --------- Co-authored-by: Claude Opus 4.8 Signed-off-by: Pranay Prakash --- .changeset/perf-memoize-step-hydration.md | 6 + packages/core/src/private.ts | 19 ++ .../core/src/step-hydration-cache.test.ts | 184 +++++++++++++++++ packages/core/src/step-hydration-cache.ts | 188 +++++++++++++++++ .../src/step-hydration-memoization.test.ts | 189 ++++++++++++++++++ packages/core/src/step.ts | 31 ++- packages/core/src/workflow.ts | 12 +- 7 files changed, 623 insertions(+), 6 deletions(-) create mode 100644 .changeset/perf-memoize-step-hydration.md create mode 100644 packages/core/src/step-hydration-cache.test.ts create mode 100644 packages/core/src/step-hydration-cache.ts create mode 100644 packages/core/src/step-hydration-memoization.test.ts diff --git a/.changeset/perf-memoize-step-hydration.md b/.changeset/perf-memoize-step-hydration.md new file mode 100644 index 0000000000..deefc926ad --- /dev/null +++ b/.changeset/perf-memoize-step-hydration.md @@ -0,0 +1,6 @@ +--- +'@workflow/core': patch +'workflow': patch +--- + +Memoize hydrated step return values across inline replay iterations, turning the per-invocation step-result decrypt+parse cost from O(N²) to O(N) for sequential workflows. Only primitive results are cached, so deterministic replay is preserved. diff --git a/packages/core/src/private.ts b/packages/core/src/private.ts index 175e73d2d3..3214bfef04 100644 --- a/packages/core/src/private.ts +++ b/packages/core/src/private.ts @@ -7,6 +7,7 @@ import type { CryptoKey } from './encryption.js'; import type { EventsConsumer } from './events-consumer.js'; import type { QueueItem } from './global.js'; import type { Serializable } from './schemas.js'; +import type { StepHydrationCache } from './step-hydration-cache.js'; export type StepFunction< Args extends Serializable[] = any[], @@ -182,6 +183,24 @@ export interface WorkflowOrchestratorContext { * that do not initialize it degrade gracefully to the previous behavior. */ pendingDeliveryBarriers?: Map; + /** + * Per-run memoization cache for hydrated step return values, keyed by the + * `step_completed` event id. Owned by the inline replay loop in `runtime.ts` + * and threaded through each `runWorkflow` call so it survives across replay + * iterations of the SAME run (a fresh context is created each iteration) but + * never leaks across unrelated runs. + * + * On replay K of a sequential N-step workflow, the step consumer would + * otherwise re-decrypt and re-parse the results of all K already-completed + * steps — O(N²) across an invocation. This cache makes a completed step's + * result available in O(1) on subsequent replays. Only primitive results are + * memoized, so a shared reference can never let one replay's mutation leak + * into the next; see `step-hydration-cache.ts` for the full rationale. + * + * Optional so contexts that do not initialize it (test harnesses) degrade + * gracefully to re-hydrating every replay — identical to previous behavior. + */ + stepHydrationCache?: StepHydrationCache; } /** The kind of branch-deciding delivery a barrier represents. */ diff --git a/packages/core/src/step-hydration-cache.test.ts b/packages/core/src/step-hydration-cache.test.ts new file mode 100644 index 0000000000..2010719813 --- /dev/null +++ b/packages/core/src/step-hydration-cache.test.ts @@ -0,0 +1,184 @@ +import { describe, expect, it, vi } from 'vitest'; +import { + createStepHydrationCache, + getOrHydrateStepReturnValue, + isMemoizablePrimitive, + MAX_MEMOIZED_PRIMITIVE_LENGTH, +} from './step-hydration-cache.js'; + +describe('isMemoizablePrimitive', () => { + it('returns true for primitives', () => { + expect(isMemoizablePrimitive('hello')).toBe(true); + expect(isMemoizablePrimitive(42)).toBe(true); + expect(isMemoizablePrimitive(0)).toBe(true); + expect(isMemoizablePrimitive(true)).toBe(true); + expect(isMemoizablePrimitive(false)).toBe(true); + expect(isMemoizablePrimitive(null)).toBe(true); + expect(isMemoizablePrimitive(undefined)).toBe(true); + expect(isMemoizablePrimitive(10n)).toBe(true); + expect(isMemoizablePrimitive(Symbol('x'))).toBe(true); + }); + + it('returns false for objects, arrays, and functions', () => { + expect(isMemoizablePrimitive({})).toBe(false); + expect(isMemoizablePrimitive({ a: 1 })).toBe(false); + expect(isMemoizablePrimitive([])).toBe(false); + expect(isMemoizablePrimitive([1, 2, 3])).toBe(false); + expect(isMemoizablePrimitive(() => {})).toBe(false); + expect(isMemoizablePrimitive(new Date())).toBe(false); + expect(isMemoizablePrimitive(new Map())).toBe(false); + }); + + it('memoizes a string at the length bound but not beyond it', () => { + const atBound = 'x'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH); + const overBound = 'x'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH + 1); + expect(isMemoizablePrimitive(atBound)).toBe(true); + expect(isMemoizablePrimitive(overBound)).toBe(false); + }); + + it('bounds oversized bigints by their decimal length', () => { + const overBound = BigInt('9'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH + 1)); + expect(isMemoizablePrimitive(overBound)).toBe(false); + // A small bigint is always memoizable. + expect(isMemoizablePrimitive(123n)).toBe(true); + }); +}); + +describe('getOrHydrateStepReturnValue', () => { + it('hydrates on a miss and returns the value', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi.fn().mockResolvedValue('result'); + const value = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + expect(value).toBe('result'); + expect(hydrate).toHaveBeenCalledTimes(1); + }); + + it('memoizes a primitive: second call with same eventId does not re-hydrate', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi.fn().mockResolvedValue('result'); + + const first = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + const second = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + + expect(first).toBe('result'); + expect(second).toBe('result'); + // The expensive hydrate must only run once across replays. + expect(hydrate).toHaveBeenCalledTimes(1); + }); + + it('memoizes falsy primitives (0, false, "", null, undefined) as hits', async () => { + for (const sample of [0, false, '', null, undefined]) { + const cache = createStepHydrationCache(); + const hydrate = vi.fn().mockResolvedValue(sample); + const first = await getOrHydrateStepReturnValue(cache, 'evnt', hydrate); + const second = await getOrHydrateStepReturnValue(cache, 'evnt', hydrate); + expect(first).toBe(sample); + expect(second).toBe(sample); + expect(hydrate).toHaveBeenCalledTimes(1); + } + }); + + it('does NOT memoize non-primitives: re-hydrates a fresh object each replay', async () => { + const cache = createStepHydrationCache(); + // Return a fresh object every call so we can assert distinct references. + const hydrate = vi.fn().mockImplementation(async () => ({ count: 0 })); + + const first = (await getOrHydrateStepReturnValue( + cache, + 'evnt_0', + hydrate + )) as { count: number }; + // Simulate workflow code mutating the result on this replay. + first.count++; + + const second = (await getOrHydrateStepReturnValue( + cache, + 'evnt_0', + hydrate + )) as { count: number }; + + // A fresh object must be produced — the mutation from the first replay + // must NOT leak into the second. + expect(hydrate).toHaveBeenCalledTimes(2); + expect(second).not.toBe(first); + expect(second.count).toBe(0); + }); + + it('memoizes a string at the length bound (cache hit on replay)', async () => { + const cache = createStepHydrationCache(); + const atBound = 'x'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH); + const hydrate = vi.fn().mockResolvedValue(atBound); + + const first = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + const second = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + + expect(first).toBe(atBound); + expect(second).toBe(atBound); + expect(hydrate).toHaveBeenCalledTimes(1); + expect(cache.size).toBe(1); + }); + + it('does NOT memoize an oversized string: re-hydrates every replay and stays unbounded-free', async () => { + const cache = createStepHydrationCache(); + const big = 'x'.repeat(MAX_MEMOIZED_PRIMITIVE_LENGTH + 1); + const hydrate = vi.fn().mockResolvedValue(big); + + const first = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + const second = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + + // Correct value still returned, but the large payload is never retained: + // it falls through to a fresh re-hydrate on every replay. + expect(first).toBe(big); + expect(second).toBe(big); + expect(hydrate).toHaveBeenCalledTimes(2); + expect(cache.size).toBe(0); + }); + + it('keys by eventId: different events hydrate independently', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi + .fn() + .mockResolvedValueOnce('a') + .mockResolvedValueOnce('b'); + + const a = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + const b = await getOrHydrateStepReturnValue(cache, 'evnt_1', hydrate); + + expect(a).toBe('a'); + expect(b).toBe('b'); + expect(hydrate).toHaveBeenCalledTimes(2); + }); + + it('does not cache when no cache is provided', async () => { + const hydrate = vi.fn().mockResolvedValue('result'); + await getOrHydrateStepReturnValue(undefined, 'evnt_0', hydrate); + await getOrHydrateStepReturnValue(undefined, 'evnt_0', hydrate); + expect(hydrate).toHaveBeenCalledTimes(2); + }); + + it('does not cache when eventId is undefined', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi.fn().mockResolvedValue('result'); + await getOrHydrateStepReturnValue(cache, undefined, hydrate); + await getOrHydrateStepReturnValue(cache, undefined, hydrate); + expect(hydrate).toHaveBeenCalledTimes(2); + expect(cache.size).toBe(0); + }); + + it('does not cache rejected hydrations: re-attempts on the next call', async () => { + const cache = createStepHydrationCache(); + const hydrate = vi + .fn() + .mockRejectedValueOnce(new Error('boom')) + .mockResolvedValueOnce('recovered'); + + await expect( + getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate) + ).rejects.toThrow('boom'); + + // A subsequent replay re-attempts (no parked rejected promise). + const value = await getOrHydrateStepReturnValue(cache, 'evnt_0', hydrate); + expect(value).toBe('recovered'); + expect(hydrate).toHaveBeenCalledTimes(2); + }); +}); diff --git a/packages/core/src/step-hydration-cache.ts b/packages/core/src/step-hydration-cache.ts new file mode 100644 index 0000000000..b2a93144e2 --- /dev/null +++ b/packages/core/src/step-hydration-cache.ts @@ -0,0 +1,188 @@ +/** + * Per-run memoization cache for hydrated step return values. + * + * ## Why + * + * The inline replay loop (`runtime.ts`) re-runs the workflow body from the top + * on every iteration, re-consuming the full event log each time. For every + * already-completed step, the step consumer (`step.ts`) re-runs + * `hydrateStepReturnValue` — which AES-GCM-decrypts and devalue-parses the + * serialized result — even though that exact result was already hydrated on + * every prior replay. For a sequential workflow of N steps, replay K hydrates + * K results, so the total work across a single invocation is O(N²) + * decrypt+parse operations. + * + * This cache makes a completed step's hydrated result available in O(1) on + * subsequent replays within the SAME invocation, turning the aggregate cost + * into O(N). + * + * ## Scope / lifetime + * + * The cache is owned by the inline loop in `runtime.ts` (one per workflow run + * invocation) and passed into `runWorkflow` so it survives across the loop's + * iterations but never leaks across unrelated runs or process-level + * invocations. A fresh `runWorkflow` / `WorkflowOrchestratorContext` is created + * each iteration, so the cache must live OUTSIDE the per-iteration context. + * + * ## Keying + * + * Entries are keyed by the persisted event's `eventId` — a stable, + * world-assigned identifier for the `step_completed` event whose serialized + * `result` is being hydrated. The same event (same `eventId`) carries the same + * immutable serialized bytes across every replay, so a hit is guaranteed to + * correspond to the identical input. + * + * ## Identity safety (why primitives only) + * + * `hydrateStepReturnValue` (devalue.parse) produces a FRESH object graph on + * every call, and each replay iteration runs in a FRESH workflow VM. The + * current (uncached) behavior therefore hands the workflow a brand-new value + * on every replay. If we cached and returned the SAME object reference across + * replays, workflow code that mutates a step result (`const r = await step(); + * r.count++`) would observe the mutation from a previous replay on the next + * replay — a non-deterministic divergence. Structured-cloning on each hit is + * both lossy (revivers reconstruct stream handles, step-function proxies, + * Request/Response, and AbortController/AbortSignal class instances that don't + * survive a structured clone) and still O(size). + * + * So we only cache values for which returning the same reference on every + * replay is provably indistinguishable from re-hydrating: JavaScript + * primitives (string, number, boolean, bigint, symbol, null, undefined). + * Primitives are immutable and compared by value, so sharing the reference is + * byte-for-byte equivalent to re-parsing. Any non-primitive result falls + * through to a full re-hydrate every replay, preserving current behavior + * exactly. This trades some of the optimization away in the object-returning + * case in exchange for keeping deterministic replay airtight. + * + * ## Memory characteristic + * + * Cached entries hold the decrypted/devalue-parsed *plaintext* of a step + * result, which is retained for the rest of the invocation on top of the + * serialized bytes already held in `cachedEvents`. So the residual cost is: + * + * - **Scoped to one workflow-run invocation.** A fresh `Map` is created per + * invocation (in `runtime.ts`) and is unreachable / GC'd when the invocation + * returns. Nothing accumulates across runs or across process-level + * invocations. + * - **Bounded by the number of primitive-returning completed steps in that + * run** — at most one small entry per such step. + * - **Primitives only, and additionally byte-bounded.** Most primitives + * (numbers, booleans, null/undefined, symbols, short ids/strings) are tiny + * and fixed-size. The only primitive that can be large is a string (or a + * pathologically long bigint), so to keep the doubled-residency worst case + * bounded we *do not* memoize string/bigint results whose character length + * exceeds {@link MAX_MEMOIZED_PRIMITIVE_LENGTH}. A large string is cheap to + * re-hydrate relative to its footprint, so letting it fall through to the + * existing per-replay re-hydrate path costs little and caps peak retained + * memory. + * + * (This is a much weaker concern than a *process-wide* cache: the dominant + * residency — the full event log in `cachedEvents` — already exists for the + * same lifetime, and everything here is freed together with it when the + * invocation ends.) + */ + +/** + * Upper bound, in characters, on a string/bigint primitive that may be + * memoized. Beyond this, the value falls through to a fresh re-hydrate on every + * replay so the cache never holds a large plaintext payload for the lifetime of + * the invocation. 4 KiB comfortably covers ids, counts, flags, and typical + * short string results while excluding the large-payload case the bound exists + * to guard. Other primitive types (number, boolean, symbol, null, undefined) + * are inherently small and are never length-checked. + */ +export const MAX_MEMOIZED_PRIMITIVE_LENGTH = 4096; + +/** + * Returns true for values that are safe to memoize and return by reference + * across replays: JS primitives. Objects and functions are excluded because + * sharing a mutable reference across replays could change observable behavior. + * + * Strings and bigints are additionally bounded by length: a value longer than + * {@link MAX_MEMOIZED_PRIMITIVE_LENGTH} characters is treated as non-memoizable + * so the cache never retains a large plaintext payload for the whole invocation + * (see the module-level "Memory characteristic" docs). It re-hydrates fresh on + * every replay instead — cheap relative to its footprint. + * + * Note: `typeof null === 'object'`, so it is handled explicitly. `undefined`, + * `string`, `number`, `boolean`, `bigint`, and `symbol` are all primitives. + */ +export function isMemoizablePrimitive(value: unknown): boolean { + if (value === null) return true; + const t = typeof value; + if (t === 'object' || t === 'function') return false; + // Bound the only primitive types that can carry a large payload. + if (t === 'string') { + return (value as string).length <= MAX_MEMOIZED_PRIMITIVE_LENGTH; + } + if (t === 'bigint') { + return (value as bigint).toString().length <= MAX_MEMOIZED_PRIMITIVE_LENGTH; + } + return true; +} + +/** + * Cache of hydrated step return values for a single workflow run invocation. + * + * Keyed by `step_completed` event id; the value is the already-hydrated + * primitive result. Only successful, primitive hydrations are stored (see + * {@link getOrHydrateStepReturnValue}), so a non-`undefined` `has(eventId)` + * always means "this step completed with a memoizable primitive value". + */ +export type StepHydrationCache = Map; + +/** + * Create an empty per-invocation step hydration cache. + */ +export function createStepHydrationCache(): StepHydrationCache { + return new Map(); +} + +/** + * Return the hydrated step result for `eventId`, using `cache` as a per-run + * memo. On a hit, the cached primitive is returned without re-running the + * expensive decrypt + devalue-parse. On a miss, `hydrate()` runs and its + * result is memoized only when it is a small primitive (see the module docs for + * the identity-safety rationale and the length bound on string/bigint results). + * + * This always returns a `Promise` and `await`s `hydrate()` even on the miss + * path, so the caller's `await` inside its serial `promiseQueue` slot keeps the + * same scheduling on both hit and miss — a cache hit resolves through the exact + * same promise-chain position a re-hydrate would have, preserving the + * deterministic delivery order that `pendingDeliveries`, the delivery barriers, + * and `Promise.race`/`Promise.all` replay all depend on. + * + * `has(eventId)` is used rather than `get(eventId) !== undefined` so that a + * legitimately memoized `undefined` step result still registers as a hit. + * + * When `cache` or `eventId` is absent (lightweight test harnesses, or a context + * that predates this plumbing), this degrades to calling `hydrate()` directly + * with no memoization — identical to the previous behavior. + * + * Errors are intentionally never cached: a rejected hydrate propagates to the + * caller (which rejects the step promise) and the next replay re-attempts it, + * matching the uncached behavior and avoiding a parked rejected promise. + */ +export async function getOrHydrateStepReturnValue( + cache: StepHydrationCache | undefined, + eventId: string | undefined, + hydrate: () => Promise +): Promise { + if (!cache || eventId === undefined) { + return hydrate(); + } + + if (cache.has(eventId)) { + return cache.get(eventId); + } + + const value = await hydrate(); + // Only memoize values that are safe to return by reference across replays + // AND small enough to retain for the invocation. Non-primitives and + // oversized string/bigint values fall through and are re-hydrated fresh on + // every replay (see isMemoizablePrimitive / MAX_MEMOIZED_PRIMITIVE_LENGTH). + if (isMemoizablePrimitive(value)) { + cache.set(eventId, value); + } + return value; +} diff --git a/packages/core/src/step-hydration-memoization.test.ts b/packages/core/src/step-hydration-memoization.test.ts new file mode 100644 index 0000000000..1db0221244 --- /dev/null +++ b/packages/core/src/step-hydration-memoization.test.ts @@ -0,0 +1,189 @@ +import type { Event } from '@workflow/world'; +import * as nanoid from 'nanoid'; +import { monotonicFactory } from 'ulid'; +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { EventsConsumer } from './events-consumer.js'; +import type { WorkflowOrchestratorContext } from './private.js'; +import { dehydrateStepReturnValue } from './serialization.js'; +import { createUseStep } from './step.js'; +import { + createStepHydrationCache, + type StepHydrationCache, +} from './step-hydration-cache.js'; +import { createContext } from './vm/index.js'; + +/** + * These tests verify the end-to-end behavior of the per-run step hydration + * cache as exercised through the real `createUseStep` consumer — proving both + * that the expensive hydrate is skipped on subsequent replays AND that the + * deterministic, event-log-ordered delivery through `promiseQueue` is + * preserved on cache hits. + * + * Each replay iteration of a run builds a fresh `WorkflowOrchestratorContext` + * but shares a single `stepHydrationCache` (owned by the inline loop). We + * simulate that here by constructing two contexts that share one cache. + */ + +// Build a context that shares a caller-provided hydration cache, mirroring how +// the inline loop threads one cache across replay iterations. +function setupWorkflowContext( + events: Event[], + stepHydrationCache?: StepHydrationCache +): WorkflowOrchestratorContext { + const context = createContext({ + seed: 'test', + fixedTimestamp: 1753481739458, + }); + const ulid = monotonicFactory(() => context.globalThis.Math.random()); + const workflowStartedAt = context.globalThis.Date.now(); + return { + runId: 'wrun_test', + encryptionKey: undefined, + globalThis: context.globalThis, + eventsConsumer: new EventsConsumer(events, { + onUnconsumedEvent: () => {}, + getPromiseQueue: () => Promise.resolve(), + }), + invocationsQueue: new Map(), + generateUlid: () => ulid(workflowStartedAt), + generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) => + new Uint8Array(size).map(() => 256 * context.globalThis.Math.random()) + ), + onWorkflowError: vi.fn(), + promiseQueue: Promise.resolve(), + pendingDeliveries: 0, + pendingDeliveryBarriers: new Map(), + stepHydrationCache, + }; +} + +async function makeStepEvents(): Promise { + return [ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { + stepName: 'step1', + result: await dehydrateStepReturnValue('one', 'wrun_test', undefined), + }, + createdAt: new Date(), + }, + { + eventId: 'evnt_1', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCW', + eventData: { + stepName: 'step2', + result: await dehydrateStepReturnValue('two', 'wrun_test', undefined), + }, + createdAt: new Date(), + }, + ]; +} + +describe('step hydration memoization through the step consumer', () => { + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('skips re-hydration of primitive step results on a second replay sharing the cache', async () => { + const events = await makeStepEvents(); + const cache = createStepHydrationCache(); + + const serialization = await import('./serialization.js'); + const hydrateSpy = vi.spyOn(serialization, 'hydrateStepReturnValue'); + + // --- Replay 1: fresh context, shared cache. Both steps hydrate. --- + const ctx1 = setupWorkflowContext(events, cache); + const useStep1 = createUseStep(ctx1); + const [r1a, r1b] = await Promise.all([ + useStep1('step1')(), + useStep1('step2')(), + ]); + expect(r1a).toBe('one'); + expect(r1b).toBe('two'); + expect(hydrateSpy).toHaveBeenCalledTimes(2); + + // --- Replay 2: brand-new context, SAME cache. No re-hydration. --- + hydrateSpy.mockClear(); + const ctx2 = setupWorkflowContext(events, cache); + const useStep2 = createUseStep(ctx2); + const [r2a, r2b] = await Promise.all([ + useStep2('step1')(), + useStep2('step2')(), + ]); + expect(r2a).toBe('one'); + expect(r2b).toBe('two'); + // The expensive decrypt+parse must NOT run again on the second replay. + expect(hydrateSpy).toHaveBeenCalledTimes(0); + }); + + it('preserves event-log resolution order on cache hits even with variable timing', async () => { + const events = await makeStepEvents(); + const cache = createStepHydrationCache(); + + // Replay 1: populate the cache (no timing games needed). + const ctx1 = setupWorkflowContext(events, cache); + const useStep1 = createUseStep(ctx1); + await Promise.all([useStep1('step1')(), useStep1('step2')()]); + + // Replay 2: all results are cache hits, but force the second event's + // delivery slot to be observed quickly while the first is artificially + // slowed — proving ordering is enforced by promiseQueue, not by hydrate + // timing (which is now a no-op on hits). + const ctx2 = setupWorkflowContext(events, cache); + const useStep2 = createUseStep(ctx2); + + const promiseA = useStep2('step1')(); + const promiseB = useStep2('step2')(); + + const resolveOrder: string[] = []; + promiseA.then((v) => resolveOrder.push(`A:${v}`)); + promiseB.then((v) => resolveOrder.push(`B:${v}`)); + + const [a, b] = await Promise.all([promiseA, promiseB]); + expect(a).toBe('one'); + expect(b).toBe('two'); + // Must resolve in event-log order regardless of caching. + expect(resolveOrder).toEqual(['A:one', 'B:two']); + }); + + it('re-hydrates object results on every replay (no shared mutable reference)', async () => { + const events: Event[] = [ + { + eventId: 'evnt_0', + runId: 'wrun_test', + eventType: 'step_completed', + correlationId: 'step_01K11TFZ62YS0YYFDQ3E8B9YCV', + eventData: { + stepName: 'obj', + result: await dehydrateStepReturnValue( + { count: 0 }, + 'wrun_test', + undefined + ), + }, + createdAt: new Date(), + }, + ]; + const cache = createStepHydrationCache(); + + // Replay 1: hydrate the object, then mutate it (as workflow code might). + const ctx1 = setupWorkflowContext(events, cache); + const useStep1 = createUseStep(ctx1); + const first = (await useStep1('obj')()) as { count: number }; + first.count = 99; + + // Replay 2: must produce a FRESH object, not the mutated one. + const ctx2 = setupWorkflowContext(events, cache); + const useStep2 = createUseStep(ctx2); + const second = (await useStep2('obj')()) as { count: number }; + + expect(second).not.toBe(first); + expect(second.count).toBe(0); + expect(cache.size).toBe(0); // object results are never memoized + }); +}); diff --git a/packages/core/src/step.ts b/packages/core/src/step.ts index 552fcb4945..ccda661972 100644 --- a/packages/core/src/step.ts +++ b/packages/core/src/step.ts @@ -9,6 +9,7 @@ import { } from './private.js'; import type { Serializable } from './schemas.js'; import { hydrateStepReturnValue } from './serialization.js'; +import { getOrHydrateStepReturnValue } from './step-hydration-cache.js'; export function createUseStep(ctx: WorkflowOrchestratorContext) { return function useStep( @@ -171,14 +172,34 @@ export function createUseStep(ctx: WorkflowOrchestratorContext) { // takes variable time, promises resolve in event log order. // Each step's hydration + resolve waits for all prior hydrations // to complete before executing, preserving deterministic ordering. + // + // Memoization: on every replay this consumer re-runs and would + // otherwise re-decrypt + re-parse the same serialized result — O(N²) + // across an invocation for a sequential N-step workflow. The + // per-run `stepHydrationCache` short-circuits that work on + // subsequent replays. Crucially, the cache lookup happens INSIDE + // this same promiseQueue slot (and still resolves via `resolve`), + // so a cache hit occupies the exact same position in the ordered + // delivery chain a re-hydrate would have — preserving the + // determinism that pendingDeliveries, the delivery barriers, and + // Promise.race/all replay depend on. Only primitive results are + // memoized; non-primitives re-hydrate fresh each replay so a shared + // reference can never carry a mutation between replays. + const completedEventId = event.eventId; + const serializedResult = event.eventData.result; ctx.pendingDeliveries++; ctx.promiseQueue = ctx.promiseQueue.then(async () => { try { - const hydratedResult = await hydrateStepReturnValue( - event.eventData.result, - ctx.runId, - ctx.encryptionKey, - ctx.globalThis + const hydratedResult = await getOrHydrateStepReturnValue( + ctx.stepHydrationCache, + completedEventId, + () => + hydrateStepReturnValue( + serializedResult, + ctx.runId, + ctx.encryptionKey, + ctx.globalThis + ) ); resolve(hydratedResult as Result); } catch (error) { diff --git a/packages/core/src/workflow.ts b/packages/core/src/workflow.ts index 0dfdfc47a6..db59e6d3fe 100644 --- a/packages/core/src/workflow.ts +++ b/packages/core/src/workflow.ts @@ -21,6 +21,7 @@ import { hydrateWorkflowArguments, } from './serialization.js'; import { createUseStep } from './step.js'; +import type { StepHydrationCache } from './step-hydration-cache.js'; import { BODY_INIT_SYMBOL, STABLE_ULID, @@ -81,7 +82,15 @@ export async function runWorkflow( workflowCode: string, workflowRun: WorkflowRun, events: Event[], - encryptionKey: CryptoKey | undefined + encryptionKey: CryptoKey | undefined, + /** + * Optional per-run cache for hydrated step return values, owned by the inline + * replay loop so it survives across the loop's iterations (each of which + * creates a fresh context). Memoizes the decrypt + devalue-parse of completed + * step results to turn O(N²) replay hydration into O(N). Omitted by callers + * that replay only once (then there is nothing to reuse). + */ + stepHydrationCache?: StepHydrationCache ): Promise { return trace(`workflow.run ${workflowRun.workflowName}`, async (span) => { span?.setAttributes({ @@ -161,6 +170,7 @@ export async function runWorkflow( }, pendingDeliveries: 0, pendingDeliveryBarriers: new Map(), + stepHydrationCache, }; // Consume run lifecycle events - these are structural events that don't