diff --git a/.changeset/quickjs-runtime.md b/.changeset/quickjs-runtime.md new file mode 100644 index 0000000000..77a557eb7b --- /dev/null +++ b/.changeset/quickjs-runtime.md @@ -0,0 +1,5 @@ +--- +"@workflow/core": minor +--- + +Add an opt-in QuickJS WASM workflow runtime (`WORKFLOW_RUNTIME=quickjs`, or per-run `executionContext.workflowRuntime`) as an alternative to the default `node:vm` runtime. It executes workflow orchestrator code inside a QuickJS WASM VM via `quickjs-wasi`, enabling workflow execution on runtimes that disallow `node:vm` / code-generation-from-strings (e.g. Cloudflare Workers). The default runtime is unchanged. diff --git a/.github/workflows/quickjs.yml b/.github/workflows/quickjs.yml new file mode 100644 index 0000000000..0e00c1d3bb --- /dev/null +++ b/.github/workflows/quickjs.yml @@ -0,0 +1,150 @@ +name: QuickJS Runtime + +# Exercises the opt-in QuickJS WASM workflow runtime (WORKFLOW_RUNTIME=quickjs) +# two ways: +# - node: the runtime running under Node.js (the regular host), via the +# nextjs-turbopack workbench + a curated e2e subset. +# - workerd: the QuickJS VM execution layer (quickjs-wasi + native extensions +# + core's VM serde bundle) running inside Cloudflare's workerd, +# where node:vm is unavailable. This is the foundation for running +# workflows on Cloudflare Workers. + +on: + push: + branches: [main, stable] + pull_request: + paths: + - 'packages/core/src/runtime/quickjs-runtime.ts' + - 'packages/core/src/runtime/quickjs-entrypoint.ts' + - 'packages/core/src/runtime/runtime-mode.ts' + - 'packages/core/src/serialization/**' + - 'packages/core/scripts/build-quickjs-assets.js' + - 'packages/core/scripts/build-vm-serde-bundle.js' + - 'packages/core/test/workerd-smoke/**' + - '.github/workflows/quickjs.yml' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} + +jobs: + # ---- QuickJS runtime under Node.js ------------------------------------- + # Builds the nextjs-turbopack workbench and runs a curated subset of the e2e + # suite with WORKFLOW_RUNTIME=quickjs so the workflow orchestrator executes + # inside the QuickJS VM instead of node:vm. The subset is the set of features + # verified to work end-to-end under QuickJS today; widen it as remaining gaps + # (streams, AbortController, advanced hook-conflict, attributes) are closed. + node: + name: QuickJS Runtime (Node) + runs-on: ubuntu-latest + timeout-minutes: 30 + env: + TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} + TURBO_TEAM: ${{ vars.TURBO_TEAM }} + WORKFLOW_PUBLIC_MANIFEST: '1' + APP_NAME: nextjs-turbopack + steps: + - name: Checkout Repo + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} + + - name: Setup environment and build packages + uses: ./.github/actions/setup-workflow-dev + with: + build-packages: 'true' + + - name: Build workbench + # `next build --turbopack` can intermittently crash on CI runners with a + # SIGBUS / Go deadlock (exit 135). Retry a couple of times — turbo only + # caches successful tasks, so a crashed build is re-run cleanly. + run: | + for attempt in 1 2 3; do + if pnpm turbo run build --filter=nextjs-turbopack; then exit 0; fi + echo "::warning::workbench build failed (attempt $attempt/3), retrying..." + sleep 5 + done + echo "::error::workbench build failed after 3 attempts" + exit 1 + + - name: Run QuickJS unit + integration tests + run: pnpm vitest run packages/core/src/runtime/runtime-mode.test.ts packages/core/src/runtime/quickjs-runtime.test.ts + + - name: Run curated e2e subset (WORKFLOW_RUNTIME=quickjs) + env: + WORKFLOW_RUNTIME: quickjs + WORKFLOW_NO_UPDATE_CHECK: '1' + NODE_OPTIONS: '--enable-source-maps' + DEPLOYMENT_URL: 'http://localhost:3000' + run: | + (cd "workbench/$APP_NAME" && WORKFLOW_RUNTIME=quickjs pnpm start &) + echo "Waiting for workbench to start..." && sleep 12 + for i in $(seq 1 30); do + if curl -sf -o /dev/null http://localhost:3000/; then echo "ready"; break; fi + sleep 2 + done + # NOTE: \bhookWorkflow (not hookWorkflow) so the pattern does not also + # match webhookWorkflow / parallelStepsThenWebhookWorkflow, whose + # byte-stream framing is a known QuickJS gap (tracked separately). + pnpm vitest run packages/core/e2e/e2e.test.ts \ + -t "addTenWorkflow|promiseAllWorkflow|promiseRaceWorkflow|promiseAnyWorkflow|\bhookWorkflow|sleepingWorkflow|nullByteWorkflow|workflowAndStepMetadataWorkflow|customSerializationWorkflow|errorSubclassRoundTripWorkflow|resilient start" + + # ---- QuickJS VM under workerd ------------------------------------------ + # Runs the committed smoke worker (packages/core/test/workerd-smoke) under a + # local workerd via `wrangler dev`, then fetches it. The worker instantiates + # quickjs-wasi + the native extensions (imported as pre-compiled + # WebAssembly.Modules — workerd bans runtime WASM compilation from bytes) and + # evaluates core's real VM serde bundle, proving the QuickJS VM executes on + # Workers. Returns HTTP 200 only if every in-VM check passes. + workerd: + name: QuickJS Runtime (workerd) + runs-on: ubuntu-latest + timeout-minutes: 20 + env: + TURBO_TOKEN: ${{ secrets.TURBO_TOKEN }} + TURBO_TEAM: ${{ vars.TURBO_TEAM }} + steps: + - name: Checkout Repo + uses: actions/checkout@v4 + with: + ref: ${{ github.event.pull_request.head.sha || github.sha }} + + - name: Setup environment + uses: ./.github/actions/setup-workflow-dev + with: + build-packages: 'false' + + - name: Build @workflow/core (generates VM serde bundle + assets) + run: pnpm turbo run build --filter=@workflow/core + + - name: Run workerd smoke test + working-directory: packages/core/test/workerd-smoke + run: | + npx --yes wrangler@4.100.0 dev --port 8788 > /tmp/workerd-smoke.log 2>&1 & + WRANGLER_PID=$! + trap 'kill $WRANGLER_PID 2>/dev/null || true' EXIT + + echo "Waiting for workerd to be ready..." + ready=false + for i in $(seq 1 60); do + if grep -qiE "Ready on|localhost:8788" /tmp/workerd-smoke.log 2>/dev/null; then ready=true; break; fi + if grep -qiE "\[ERROR\]|Build failed" /tmp/workerd-smoke.log 2>/dev/null; then break; fi + sleep 2 + done + if [ "$ready" != "true" ]; then + echo "::error::workerd did not become ready"; cat /tmp/workerd-smoke.log; exit 1 + fi + + # Retry the fetch a few times while the isolate warms up. + code=0 + for i in $(seq 1 10); do + code=$(curl -s -m 30 -o /tmp/workerd-smoke-resp.json -w "%{http_code}" http://localhost:8788/ || echo 000) + if [ "$code" = "200" ]; then break; fi + sleep 3 + done + + echo "=== response ==="; cat /tmp/workerd-smoke-resp.json || true; echo + if [ "$code" != "200" ]; then + echo "::error::workerd smoke test failed (HTTP $code)"; cat /tmp/workerd-smoke.log; exit 1 + fi + echo "workerd smoke test passed" diff --git a/packages/core/.gitignore b/packages/core/.gitignore index 7b6d0b4576..c7cfda0c50 100644 --- a/packages/core/.gitignore +++ b/packages/core/.gitignore @@ -1,2 +1,9 @@ # Auto-generated version file src/version.ts + +# Auto-generated quickjs-wasi binary assets (base64-encoded WASM + .so files) +src/runtime/quickjs-assets.generated.ts + +# Auto-generated VM serde bundle (devalue + format-prefix + reducers, +# packaged as a string for evaluation inside the QuickJS VM) +src/runtime/vm-serde-bundle.generated.ts diff --git a/packages/core/package.json b/packages/core/package.json index 0c230f26ea..8158e1879c 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -76,7 +76,7 @@ "./_workflow": "./dist/workflow/index.js" }, "scripts": { - "build": "genversion --es6 src/version.ts && tsc", + "build": "genversion --es6 src/version.ts && node scripts/build-vm-serde-bundle.js && node scripts/build-quickjs-assets.js && tsc", "dev": "genversion --es6 src/version.ts && tsc --watch", "clean": "tsc --build --clean && rm -rf dist src/version.ts docs ||:", "test": "cross-env WORKFLOW_TARGET_WORLD=local vitest run src", @@ -101,6 +101,7 @@ "devalue": "5.8.1", "ms": "2.1.3", "nanoid": "5.1.6", + "quickjs-wasi": "3.0.0", "seedrandom": "3.0.5", "semver": "catalog:", "ulid": "catalog:", @@ -117,6 +118,7 @@ "@types/semver": "7.7.1", "@workflow/tsconfig": "workspace:*", "cross-env": "10.1.0", + "esbuild": "catalog:", "genversion": "3.2.0" }, "peerDependencies": { diff --git a/packages/core/scripts/build-quickjs-assets.js b/packages/core/scripts/build-quickjs-assets.js new file mode 100644 index 0000000000..71223022af --- /dev/null +++ b/packages/core/scripts/build-quickjs-assets.js @@ -0,0 +1,76 @@ +/** + * Build script: generates quickjs-assets.generated.ts + * + * Reads the quickjs-wasi WASM binary and native C extension .so files, + * base64-encodes them, and writes a TypeScript module that exports the + * decoded Buffer/Uint8Array values. This embeds the binaries directly + * in JavaScript, bypassing all bundler/framework/deployment issues with + * import.meta.url, require.resolve, and file tracing. Crucially, this means + * the QuickJS runtime does NO filesystem access at runtime, so it works on + * platforms like Cloudflare Workers where `node:vm` is unavailable. + * + * Targets quickjs-wasi@3.x: `btoa`/`atob`/`DOMException` are built-in + * intrinsics (enabled by default), so there is no separate `base64` + * extension to embed — only the C extensions that add Web APIs. + */ + +import { readFileSync, writeFileSync } from 'fs'; +import { createRequire } from 'module'; +import { dirname, resolve } from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const srcDir = resolve(__dirname, '../src'); + +const require_ = createRequire(import.meta.url); + +// quickjs-wasi 3.x exposes the binary + each extension as subpath exports. +const files = { + quickjsWasm: require_.resolve('quickjs-wasi/quickjs.wasm'), + encodingSo: require_.resolve('quickjs-wasi/encoding.so'), + headersSo: require_.resolve('quickjs-wasi/headers.so'), + urlSo: require_.resolve('quickjs-wasi/url.so'), + structuredCloneSo: require_.resolve('quickjs-wasi/structured-clone.so'), +}; + +let output = `/** + * Auto-generated by scripts/build-quickjs-assets.js + * Do not edit manually. + * + * Contains base64-encoded quickjs-wasi WASM binary and native C extension + * .so files. Decoded at import time so they can be passed directly to + * QuickJS.create() without any filesystem access, import.meta.url + * resolution, or require.resolve calls. + */ +import type { ExtensionDescriptor } from 'quickjs-wasi'; + +`; + +let totalSize = 0; + +for (const [name, filePath] of Object.entries(files)) { + const buf = readFileSync(filePath); + const b64 = buf.toString('base64'); + totalSize += buf.length; + output += `const ${name} = Buffer.from('${b64}', 'base64');\n\n`; +} + +output += `export { quickjsWasm };\n\n`; + +// initFn defaults to `qjs_ext_${name}_init` (with `-` replaced by `_`), so +// `structured-clone` resolves to `qjs_ext_structured_clone_init` automatically. +output += `export const quickjsExtensions: ExtensionDescriptor[] = [ + { name: 'encoding', wasm: encodingSo }, + { name: 'headers', wasm: headersSo }, + { name: 'url', wasm: urlSo }, + { name: 'structured-clone', wasm: structuredCloneSo }, +];\n`; + +const outPath = resolve(srcDir, 'runtime/quickjs-assets.generated.ts'); +writeFileSync(outPath, output); + +const sizeKB = (totalSize / 1024).toFixed(0); +const b64SizeKB = (Buffer.byteLength(output) / 1024).toFixed(0); +console.log( + `Generated quickjs-assets.generated.ts (${sizeKB} KB binary → ${b64SizeKB} KB base64)` +); diff --git a/packages/core/scripts/build-vm-serde-bundle.js b/packages/core/scripts/build-vm-serde-bundle.js new file mode 100644 index 0000000000..f738853b9a --- /dev/null +++ b/packages/core/scripts/build-vm-serde-bundle.js @@ -0,0 +1,65 @@ +/** + * Build script: generates the VM serialization bundle. + * + * Uses esbuild to bundle workflow-vm.ts into a self-contained IIFE. + * The output is written as a TypeScript file containing the bundle as + * a string constant, which can be imported by the snapshot runtime. + * + * TextEncoder, TextDecoder, and Headers are provided by native C + * extensions in quickjs-wasi, so no JS polyfills are needed. + */ + +import { buildSync } from 'esbuild'; +import { writeFileSync } from 'fs'; +import { dirname, resolve } from 'path'; +import { fileURLToPath } from 'url'; + +const __dirname = dirname(fileURLToPath(import.meta.url)); +const srcDir = resolve(__dirname, '../src'); + +const result = buildSync({ + entryPoints: [resolve(srcDir, 'serialization/vm-bundle-entry.ts')], + // NOTE: TextEncoder, TextDecoder, and Headers are provided by native + // C extensions (encoding, headers) in quickjs-wasi, so the polyfill + // injection that was previously here has been removed. + bundle: true, + format: 'iife', + platform: 'neutral', + target: 'es2020', + write: false, + minify: true, +}); + +const bundleCode = result.outputFiles[0].text; + +// Write as a TS module using a template literal. Template literals avoid +// the escaping issues that occur with regular string literals — esbuild's +// minifier produces patterns like `typeof x<"u"` whose escaped quotes +// inside a JSON-stringified string break when downstream esbuild (e.g., +// Nitro) re-processes the compiled JS output. Template literals don't +// have this problem since backticks don't conflict with inner quotes. +const escaped = bundleCode + .replace(/\\/g, '\\\\') + .replace(/`/g, '\\`') + .replace(/\$\{/g, '\\${'); + +const outPath = resolve(srcDir, 'runtime/vm-serde-bundle.generated.ts'); +writeFileSync( + outPath, + `/** + * Auto-generated by scripts/build-vm-serde-bundle.js + * Do not edit manually. + * + * This is the VM serialization bundle — a self-contained IIFE that sets up + * serialize/deserialize + TextEncoder/TextDecoder polyfills inside the + * QuickJS WASM VM. It includes devalue and all workflow-mode reducers. + * + * Size: ${(bundleCode.length / 1024).toFixed(1)} KB minified + */ +export const VM_SERDE_BUNDLE: string = \`${escaped}\`; +` +); + +console.log( + `Generated vm-serde-bundle.generated.ts (${(bundleCode.length / 1024).toFixed(1)} KB)` +); diff --git a/packages/core/src/runtime.ts b/packages/core/src/runtime.ts index 09d4a003ab..e6388f5299 100644 --- a/packages/core/src/runtime.ts +++ b/packages/core/src/runtime.ts @@ -46,6 +46,10 @@ import { handleReplayBudgetExhausted, ReplayBudget, } from './runtime/replay-budget.js'; +import { + getWorkflowRuntimeFromEnv, + WORKFLOW_RUNTIMES, +} from './runtime/runtime-mode.js'; import { executeStep } from './runtime/step-executor.js'; import { handleSuspension } from './runtime/suspension-handler.js'; import { getWaitContinuationDispatch } from './runtime/wait-continuation.js'; @@ -274,6 +278,33 @@ function hasOpenHookOrWait(events: Event[]): boolean { return false; } +/** + * Whether to execute a given run with the QuickJS WASM runtime instead of the + * default `node:vm` runtime. + * + * The node:vm runtime is the default. QuickJS is opt-in, either globally via + * the `WORKFLOW_RUNTIME=quickjs` env var, or per-run via + * `executionContext.workflowRuntime = 'quickjs'` (set by the SDK at start()). + * The per-run setting lets one deployment serve both runtimes. + * + * Throws if `WORKFLOW_RUNTIME` or the run's `executionContext.workflowRuntime` + * is set to an unknown value. + */ +function useQuickJSRuntime(workflowRun: WorkflowRun): boolean { + if (getWorkflowRuntimeFromEnv() === 'quickjs') return true; + const runtimeFromRun = workflowRun.executionContext?.workflowRuntime; + if (runtimeFromRun !== undefined) { + if (!(WORKFLOW_RUNTIMES as readonly string[]).includes(runtimeFromRun)) { + throw new WorkflowRuntimeError( + `Invalid executionContext.workflowRuntime value: "${runtimeFromRun}". ` + + `Expected one of: ${WORKFLOW_RUNTIMES.join(', ')}.` + ); + } + if (runtimeFromRun === 'quickjs') return true; + } + return false; +} + /** * Creates a single route which handles workflow execution requests, * executing steps inline when possible to reduce function invocations @@ -530,8 +561,26 @@ export function workflowEntrypoint( // ineligible. The single-handler guarantee that makes forced // optimistic start safe ends once a hook/wait/attr is created, // so turbo exits at that point (see `forceOptimisticStart`). + // Decide the QuickJS runtime up front — from the env var or + // the run's executionContext carried on the queue message — + // so it can be EXCLUDED from turbo below. Turbo synthesizes + // the run locally with `startedAt: now`, but the QuickJS + // runtime seeds its PRNG and ULID clock from the run's + // `startedAt` (quickjs-runtime.ts). A turbo first-invocation + // would therefore seed from a transient local `now` while + // every resume seeds from the persisted `run_started` + // timestamp — divergent correlationIds/hook tokens that break + // replay dedup. Excluding QuickJS from turbo makes the first + // invocation await a real `run_started` and seed from the + // persisted `startedAt` (and avoids racing the entrypoint's + // event writes ahead of a backgrounded run_started). + const quickjs = + getWorkflowRuntimeFromEnv() === 'quickjs' || + runInput?.executionContext?.workflowRuntime === 'quickjs'; + const turbo = isTurboEnabled() && + !quickjs && runInput !== undefined && metadata.attempt === 1 && incomingStepId === undefined && @@ -918,6 +967,35 @@ export function workflowEntrypoint( } // end else (non-turbo run_started) } // end if (!workflowRun) + // --- QuickJS runtime dispatch --- + // When this run opts into the QuickJS runtime, execute it in + // a QuickJS WASM VM instead of the node:vm inline-replay loop + // below. The QuickJS entrypoint manages its own run + // lifecycle (events, step queueing, run_completed/failed) and + // queues steps through the same combined route — so step + // messages (incomingStepId) still hit executeStep above on + // re-entry. Return immediately after dispatch. + if (useQuickJSRuntime(workflowRun)) { + // Dynamically import so the QuickJS entrypoint (and its + // multi-MB embedded WASM assets + quickjs-wasi) is only + // loaded when the QuickJS runtime is actually selected — + // node:vm runs never pay that cost. + const { runWorkflowWithQuickJS } = await import( + './runtime/quickjs-entrypoint.js' + ); + const quickjsResult = await runWorkflowWithQuickJS({ + workflowCode, + workflowName, + workflowRun, + runInput, + namespace, + }); + if (quickjsResult?.timeoutSeconds !== undefined) { + return { timeoutSeconds: quickjsResult.timeoutSeconds }; + } + return; + } + // Resolve the encryption key for this run's deployment. // Used eagerly here since both runWorkflow (input // hydration / hook payload decryption) and the run_failed diff --git a/packages/core/src/runtime/quickjs-entrypoint.ts b/packages/core/src/runtime/quickjs-entrypoint.ts new file mode 100644 index 0000000000..30d383b534 --- /dev/null +++ b/packages/core/src/runtime/quickjs-entrypoint.ts @@ -0,0 +1,536 @@ +/** + * QuickJS runtime integration with the Workflow runtime. + * + * Entry point for running a workflow invocation using the QuickJS WASM + * runtime instead of the default `node:vm` event-replay runtime. + * + * Unlike the (removed) snapshot runtime, this replays the FULL event log on + * every invocation — no snapshot save/restore, no `world.snapshots.*`. Each + * call: + * 1. Loads the complete event log for the run. + * 2. Runs the workflow in a fresh QuickJS VM (replaying all events). + * 3. On completion: writes `run_completed`. + * 4. On suspension: writes `step_created` / `hook_created` / `wait_created` + * events for new pending operations and queues steps; schedules wait + * timeouts. + * 5. On failure: writes `run_failed`. + * + * Steps are queued through the same combined workflow queue as the node:vm + * runtime (messages carrying a `stepId` are dispatched to `executeStep` in + * runtime.ts), so step execution is shared between the two runtimes. + */ + +import { + EntityConflictError, + RunExpiredError, + WorkflowNotRegisteredError, +} from '@workflow/errors'; +import { getPort } from '@workflow/utils/get-port'; +import { parseWorkflowName } from '@workflow/utils/parse-name'; +import { + type RunInput, + SPEC_VERSION_CURRENT, + type WorkflowRun, +} from '@workflow/world'; +import { classifyRunError } from '../classify-error.js'; +import { importKey } from '../encryption.js'; +import { runtimeLogger } from '../logger.js'; +import { encrypt as encryptSerializedData } from '../serialization/encryption.js'; +import { + dehydrateRunError, + hydrateRunError, + maybeEncrypt, +} from '../serialization.js'; +import { remapErrorStack, stripInlineSourceMap } from '../source-map.js'; +import { serializeTraceCarrier } from '../telemetry.js'; +import { + getWorkflowQueueName, + loadWorkflowRunEvents, + queueMessage, +} from './helpers.js'; +import { + type PendingHook, + type PendingStep, + type PendingWait, + runQuickJSWorkflow, +} from './quickjs-runtime.js'; +import { getWorld } from './world.js'; + +/** + * Run a single workflow invocation using the QuickJS runtime. + * + * Returns `{ timeoutSeconds }` when the caller should re-queue the run after + * a delay (pending waits), or `void` when there is nothing further to do for + * this invocation (completed, failed, or awaiting external steps/hooks). + */ +export async function runWorkflowWithQuickJS(params: { + workflowCode: string; + workflowName: string; + workflowRun: WorkflowRun; + /** + * Run input carried through the queue message on first delivery. Used as a + * last-resort fallback for `run_created.eventData.input` when the event log + * is incomplete (eventual consistency right after `start()`). + */ + runInput?: RunInput; + /** + * Builder-configured queue namespace. Must be threaded into every + * `getWorkflowQueueName()` call so queued steps and hook-conflict re-queues + * land on the same namespaced queue the handler consumes — otherwise a + * namespaced deployment never picks them up and the run hangs. + */ + namespace?: string; +}): Promise<{ timeoutSeconds?: number } | void> { + const { workflowCode, workflowName, workflowRun, runInput, namespace } = + params; + const world = await getWorld(); + const runId = workflowRun.runId; + + // Strip the inline source map comment before evaluating the bundle in the + // QuickJS VM. The map is purely host-side metadata for `remapErrorStack` + // (called below on workflow failures, against the ORIGINAL `workflowCode`). + // Keeping it out of the VM avoids bloating the QuickJS heap. + const workflowCodeForVM = stripInlineSourceMap(workflowCode); + + // The workflowName from the queue topic is already the full workflow ID + // (e.g. "workflow//./workflows/1_simple//simple"). + const workflowId = workflowName; + + // Resolve the encryption key up front — needed to encrypt event payloads + // produced by the VM (which has no access to the key). + const rawKey = await world.getEncryptionKeyForRun?.(workflowRun); + const encryptionKey = rawKey ? await importKey(rawKey) : undefined; + + // Load the FULL event log. We replay from scratch every invocation, so we + // always need every event (no snapshot/cursor delta). `runInput` covers the + // run_created read-after-write race when the log is briefly incomplete. + const { events } = await loadWorkflowRunEvents(runId); + + runtimeLogger.debug('QuickJS runtime: fetched events', { + workflowRunId: runId, + eventCount: events.length, + }); + + // Complete any waits that have already elapsed BEFORE running the VM, so the + // replay observes their wait_completed events. + const now = Date.now(); + const completedWaitIds = new Set( + events + .filter((e) => e.eventType === 'wait_completed') + .map((e) => e.correlationId) + ); + for (const event of events) { + if ( + event.eventType === 'wait_created' && + event.correlationId && + !completedWaitIds.has(event.correlationId) + ) { + const eventData = + 'eventData' in event + ? (event.eventData as Record) + : undefined; + const resumeAt = eventData?.resumeAt; + if (resumeAt && now >= new Date(resumeAt as string).getTime()) { + try { + const result = await world.events.create(runId, { + eventType: 'wait_completed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: event.correlationId, + }); + if (result.event) events.push(result.event); + } catch (err) { + if (EntityConflictError.is(err)) continue; + throw err; + } + } + } + } + + // Resolve the workflow server port so `getWorkflowMetadata().url` inside the + // VM matches what the step-side handler reports. Skipped on Vercel — the VM + // reads VERCEL_URL directly there. + const isVercel = process.env.VERCEL_URL !== undefined; + const port = isVercel ? undefined : await getPort(); + + runtimeLogger.debug('QuickJS runtime: invoking VM', { + workflowRunId: runId, + workflowId, + eventCount: events.length, + }); + + const result = await runQuickJSWorkflow({ + // Pass the STRIPPED bundle to the VM; the original (unstripped) + // `workflowCode` stays host-side for `remapErrorStack` on failures. + workflowCode: workflowCodeForVM, + workflowId, + workflowRun, + events, + encryptionKey, + port, + runInput, + }); + + runtimeLogger.debug('QuickJS runtime: VM returned', { + workflowRunId: runId, + completed: !!result.completed, + suspended: !!result.suspended, + failed: !!result.failed, + pendingOpsCount: result.suspended?.pendingOperations?.length, + }); + + if (result.completed) { + // Create run_completed. The VM serializes the workflow result as + // format-prefixed devalue bytes ("devl" + devalue) with no encryption + // (it has no access to the CryptoKey). Host-side encryption is applied + // here so run_completed events match the node:vm runtime's payload shape. + try { + await world.events.create(runId, { + eventType: 'run_completed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + output: await encryptSerializedData( + result.completed.result, + encryptionKey + ), + }, + }); + } catch (err) { + if (EntityConflictError.is(err) || RunExpiredError.is(err)) { + runtimeLogger.warn( + 'Workflow already finished, skipping run_completed', + { + workflowRunId: runId, + } + ); + return; + } + throw err; + } + return; + } + + if (result.suspended) { + const { pendingOperations } = result.suspended; + + runtimeLogger.info('QuickJS runtime: workflow suspended', { + workflowRunId: runId, + pendingSteps: pendingOperations.filter((p) => p.type === 'step').length, + pendingWaits: pendingOperations.filter((p) => p.type === 'wait').length, + }); + + let minTimeoutSeconds: number | undefined; + + // Create events + queue steps for each new pending operation. These fan + // out in parallel — correlationIds are deterministic across replays, so + // the world's per-(runId, correlationId) uniqueness dedups duplicates as + // EntityConflictError (swallowed below). + const opsPromises: Promise[] = []; + for (const op of pendingOperations) { + if (op.type === 'step' && !op.hasCreatedEvent) { + const step = op as PendingStep; + opsPromises.push( + (async () => { + // `step.input` is format-prefixed devalue bytes from the VM; + // encrypt host-side (the VM lacks the key). + try { + await world.events.create(runId, { + eventType: 'step_created', + specVersion: SPEC_VERSION_CURRENT, + correlationId: step.correlationId, + eventData: { + stepName: step.stepId, + input: await encryptSerializedData(step.input, encryptionKey), + }, + }); + } catch (err) { + if (EntityConflictError.is(err)) return; + throw err; + } + + // Queue step execution via the unified workflow queue. The + // combined handler in runtime.ts dispatches messages with a + // `stepId` to executeStep — shared with the node:vm runtime. + const traceCarrier = await serializeTraceCarrier(); + await queueMessage( + world, + getWorkflowQueueName(workflowRun.workflowName, namespace), + { + runId, + stepId: step.correlationId, + stepName: step.stepId, + traceCarrier, + requestedAt: new Date(), + }, + { + idempotencyKey: step.correlationId, + } + ); + })() + ); + } else if (op.type === 'hook' && !op.hasCreatedEvent) { + const hook = op as PendingHook; + opsPromises.push( + (async () => { + try { + const encryptedMetadata = + typeof hook.metadata === 'undefined' + ? undefined + : await encryptSerializedData(hook.metadata, encryptionKey); + const result = await world.events.create(runId, { + eventType: 'hook_created', + specVersion: SPEC_VERSION_CURRENT, + correlationId: hook.correlationId, + eventData: { + token: hook.token, + metadata: encryptedMetadata, + // Always include isWebhook explicitly. Worlds default it to + // `true` when absent, which would break the public webhook + // endpoint's 404 guard for hooks created via createHook(). + isWebhook: hook.isWebhook, + } as any, + }); + + // If storage detected a real token conflict with another + // workflow's hook, re-queue so the runtime can process the + // conflict event and fail gracefully. + if (result.event?.eventType === 'hook_conflict') { + await queueMessage( + world, + getWorkflowQueueName(workflowRun.workflowName, namespace), + { runId }, + { idempotencyKey: `hook_conflict_${hook.correlationId}` } + ); + } + } catch (err) { + if (EntityConflictError.is(err)) return; + throw err; + } + })() + ); + } else if (op.type === 'hook_dispose' && !op.hasCreatedEvent) { + opsPromises.push( + (async () => { + try { + await world.events.create(runId, { + eventType: 'hook_disposed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: op.correlationId, + }); + } catch (err) { + if (EntityConflictError.is(err)) return; + throw err; + } + })() + ); + } else if (op.type === 'wait' && !op.hasCreatedEvent) { + const wait = op as PendingWait; + opsPromises.push( + (async () => { + try { + await world.events.create(runId, { + eventType: 'wait_created', + specVersion: SPEC_VERSION_CURRENT, + correlationId: wait.correlationId, + eventData: { + resumeAt: new Date(wait.resumeAt), + }, + }); + } catch (err) { + if (EntityConflictError.is(err)) return; + throw err; + } + })() + ); + } + } + await Promise.all(opsPromises); + + // Handle pending waits (new and pre-existing). For each: complete it if + // elapsed (and re-queue), otherwise track the soonest timeout. + let needsRequeue = false; + const waitCompletePromises: Promise[] = []; + for (const op of pendingOperations) { + if (op.type !== 'wait') continue; + const wait = op as PendingWait; + const resumeMs = new Date(wait.resumeAt).getTime() - Date.now(); + + if (resumeMs <= 0) { + waitCompletePromises.push( + (async () => { + try { + await world.events.create(runId, { + eventType: 'wait_completed', + specVersion: SPEC_VERSION_CURRENT, + correlationId: wait.correlationId, + }); + needsRequeue = true; + } catch (err) { + if (EntityConflictError.is(err)) return; + throw err; + } + })() + ); + } else { + const timeoutSeconds = Math.max(1, Math.ceil(resumeMs / 1000)); + if ( + minTimeoutSeconds === undefined || + timeoutSeconds < minTimeoutSeconds + ) { + minTimeoutSeconds = timeoutSeconds; + } + } + } + if (waitCompletePromises.length > 0) { + await Promise.all(waitCompletePromises); + } + + if (needsRequeue) { + // An elapsed wait was completed — re-queue immediately so the runtime + // can process the wait_completed event. + return { timeoutSeconds: 0 }; + } + + if (minTimeoutSeconds !== undefined) { + return { timeoutSeconds: minTimeoutSeconds }; + } + + // Suspended awaiting external steps/hooks — nothing to schedule. + return; + } + + if (result.failed) { + // Remap the stack trace using the inline source map (host-side only). + let errorStack = result.failed.stack; + if (errorStack) { + const parsedName = parseWorkflowName(workflowName); + const filename = parsedName?.moduleSpecifier || workflowName; + errorStack = remapErrorStack(errorStack, filename, workflowCode); + } + + // Reconstruct a host-side Error of the right class from the VM-side name + // so classifyRunError() tags WorkflowRuntimeError subclasses correctly. + const reconstructed: Error = + result.failed.name === 'WorkflowNotRegisteredError' + ? new WorkflowNotRegisteredError(workflowName) + : result.failed.name === 'Error' + ? new Error(result.failed.message) + : Object.assign(new Error(result.failed.message), { + name: result.failed.name, + }); + const errorCode = classifyRunError(reconstructed); + + runtimeLogger.error('QuickJS runtime: workflow failed', { + workflowRunId: runId, + errorName: result.failed.name, + errorMessage: result.failed.message, + errorStack, + errorCode, + }); + + // Serialize the error through the dehydration pipeline so consumers get + // the same hydrated value shape as the node:vm runtime. Two paths: + // * valueBytes present: the VM serialized the original thrown value; + // hydrate, remap stacks (incl. the cause chain), re-dehydrate. + // * legacy fallback: reconstruct from {name, message, stack}. + let dehydratedError: Uint8Array; + if (result.failed.valueBytes) { + try { + const hydrated = await hydrateRunError( + result.failed.valueBytes, + runId, + undefined // VM bytes are unencrypted + ); + const parsedName = parseWorkflowName(workflowName); + const filename = parsedName?.moduleSpecifier || workflowName; + if ( + hydrated && + typeof hydrated === 'object' && + typeof (hydrated as { stack?: unknown }).stack === 'string' + ) { + (hydrated as { stack?: string }).stack = remapErrorStack( + (hydrated as { stack: string }).stack, + filename, + workflowCode + ); + } + // Walk the cause chain and remap nested stacks too. + const seen = new WeakSet(); + let node = (hydrated as { cause?: unknown })?.cause; + while (node && typeof node === 'object' && !seen.has(node as object)) { + seen.add(node as object); + const nodeStack = (node as { stack?: unknown }).stack; + if (typeof nodeStack === 'string') { + (node as { stack?: string }).stack = remapErrorStack( + nodeStack, + filename, + workflowCode + ); + } + node = (node as { cause?: unknown }).cause; + } + dehydratedError = await dehydrateRunError( + hydrated, + runId, + encryptionKey + ); + } catch (rehydrateErr) { + // If hydration / re-dehydration fails, pass the original VM bytes + // through (applying encryption if configured). Better to lose + // source-mapped frames than the error entirely. + runtimeLogger.warn( + 'QuickJS runtime: failed to remap workflow error stack, passing VM bytes through', + { + workflowRunId: runId, + message: (rehydrateErr as Error)?.message, + } + ); + dehydratedError = (await maybeEncrypt( + result.failed.valueBytes, + encryptionKey + )) as Uint8Array; + } + } else { + if (errorStack) { + reconstructed.stack = errorStack; + } + try { + dehydratedError = await dehydrateRunError( + reconstructed, + runId, + encryptionKey + ); + } catch (serErr) { + runtimeLogger.warn( + 'QuickJS runtime: failed to dehydrate run error, falling back to bare Error', + { workflowRunId: runId, message: (serErr as Error)?.message } + ); + dehydratedError = await dehydrateRunError( + Object.assign(new Error(result.failed.message), { + name: result.failed.name, + }), + runId, + encryptionKey + ); + } + } + + try { + await world.events.create(runId, { + eventType: 'run_failed', + specVersion: SPEC_VERSION_CURRENT, + eventData: { + error: dehydratedError, + errorCode, + }, + }); + } catch (err) { + if (EntityConflictError.is(err) || RunExpiredError.is(err)) { + runtimeLogger.warn('Workflow already finished, skipping run_failed', { + workflowRunId: runId, + }); + return; + } + throw err; + } + } +} diff --git a/packages/core/src/runtime/quickjs-runtime.test.ts b/packages/core/src/runtime/quickjs-runtime.test.ts new file mode 100644 index 0000000000..c8ff97afe4 --- /dev/null +++ b/packages/core/src/runtime/quickjs-runtime.test.ts @@ -0,0 +1,239 @@ +import type { Event, RunInput, WorkflowRun } from '@workflow/world'; +import { describe, expect, it } from 'vitest'; +import { + dehydrateStepReturnValue, + dehydrateWorkflowArguments, + hydrateWorkflowReturnValue, +} from '../serialization.js'; +import { runQuickJSWorkflow } from './quickjs-runtime.js'; + +// No encryption key = encryption disabled +const noKey = undefined; + +/** Build a workflow-mode bundle that registers `name` into __private_workflows. */ +function bundle(name: string, source: string): string { + return `${source} +;globalThis.__private_workflows = new Map(); +globalThis.__private_workflows.set(${JSON.stringify(name)}, ${name});`; +} + +function makeRun(workflowName: string): WorkflowRun { + return { + runId: 'wrun_qjs_test', + workflowName, + status: 'running', + input: new Uint8Array(), + createdAt: new Date('2024-01-01T00:00:00.000Z'), + updatedAt: new Date('2024-01-01T00:00:00.000Z'), + startedAt: new Date('2024-01-01T00:00:00.000Z'), + deploymentId: 'test-deployment', + } as WorkflowRun; +} + +async function runInputFor(args: unknown[]): Promise { + const ops: Promise[] = []; + const input = await dehydrateWorkflowArguments( + args, + 'wrun_qjs_test', + noKey, + ops + ); + await Promise.all(ops); + return { input } as RunInput; +} + +async function hydrate(bytes: Uint8Array): Promise { + const ops: Promise[] = []; + const value = await hydrateWorkflowReturnValue( + bytes as any, + 'wrun_qjs_test', + noKey, + ops + ); + await Promise.all(ops); + return value; +} + +describe('runQuickJSWorkflow', () => { + it('completes a simple workflow with no arguments', async () => { + const result = await runQuickJSWorkflow({ + workflowCode: bundle('wf', 'function wf() { return "success"; }'), + workflowId: 'wf', + workflowRun: makeRun('wf'), + events: [], + encryptionKey: noKey, + runInput: await runInputFor([]), + }); + + expect(result.failed).toBeUndefined(); + expect(result.suspended).toBeUndefined(); + expect(result.completed).toBeDefined(); + expect(await hydrate(result.completed!.result)).toBe('success'); + }); + + it('completes a workflow using its arguments', async () => { + const result = await runQuickJSWorkflow({ + workflowCode: bundle('wf', 'function wf(a, b) { return a + b; }'), + workflowId: 'wf', + workflowRun: makeRun('wf'), + events: [], + encryptionKey: noKey, + runInput: await runInputFor([2, 3]), + }); + + expect(result.completed).toBeDefined(); + expect(await hydrate(result.completed!.result)).toBe(5); + }); + + it('round-trips structured values (Date, Map) through the VM serde', async () => { + const result = await runQuickJSWorkflow({ + workflowCode: bundle( + 'wf', + 'function wf() { return { when: new Date(0), m: new Map([["a", 1]]) }; }' + ), + workflowId: 'wf', + workflowRun: makeRun('wf'), + events: [], + encryptionKey: noKey, + runInput: await runInputFor([]), + }); + + const value = (await hydrate(result.completed!.result)) as { + when: Date; + m: Map; + }; + expect(value.when).toBeInstanceOf(Date); + expect(value.when.getTime()).toBe(0); + expect(value.m).toBeInstanceOf(Map); + expect(value.m.get('a')).toBe(1); + }); + + it('suspends with a pending step when a step has no result yet', async () => { + const result = await runQuickJSWorkflow({ + workflowCode: bundle( + 'wf', + `async function wf() { + const step = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("step//test//add"); + return await step(1, 2); + }` + ), + workflowId: 'wf', + workflowRun: makeRun('wf'), + events: [], + encryptionKey: noKey, + runInput: await runInputFor([]), + }); + + expect(result.completed).toBeUndefined(); + expect(result.failed).toBeUndefined(); + expect(result.suspended).toBeDefined(); + const steps = result.suspended!.pendingOperations.filter( + (p) => p.type === 'step' + ); + expect(steps).toHaveLength(1); + expect((steps[0] as { stepId: string }).stepId).toBe('step//test//add'); + }); + + it('advances Date.now() along the event timeline across a step', async () => { + const startedAt = new Date('2024-01-01T00:00:00.000Z'); + const stepCompletedAt = new Date('2024-01-01T00:05:00.000Z'); // +5 min + const code = bundle( + 'wf', + `async function wf() { + const t0 = Date.now(); + const step = globalThis[Symbol.for("WORKFLOW_USE_STEP")]("step//test//s"); + await step(); + const t1 = Date.now(); + return { t0, t1 }; + }` + ); + const run = { ...makeRun('wf'), startedAt } as WorkflowRun; + + // Phase 1: run with no events → suspends; capture the deterministic cid. + const suspended = await runQuickJSWorkflow({ + workflowCode: code, + workflowId: 'wf', + workflowRun: run, + events: [], + encryptionKey: noKey, + runInput: await runInputFor([]), + }); + const cid = suspended.suspended!.pendingOperations[0]!.correlationId; + + // Phase 2: replay with step_created + step_completed carrying timestamps. + const ops: Promise[] = []; + const resultBytes = await dehydrateStepReturnValue( + 'done', + run.runId, + noKey, + ops + ); + await Promise.all(ops); + const events: Event[] = [ + { + eventId: 'e1', + eventType: 'step_created', + correlationId: cid, + createdAt: startedAt, + eventData: { stepName: 'step//test//s' }, + } as unknown as Event, + { + eventId: 'e2', + eventType: 'step_completed', + correlationId: cid, + createdAt: stepCompletedAt, + eventData: { result: resultBytes }, + } as unknown as Event, + ]; + + const completed = await runQuickJSWorkflow({ + workflowCode: code, + workflowId: 'wf', + workflowRun: run, + events, + encryptionKey: noKey, + runInput: await runInputFor([]), + }); + + expect(completed.completed).toBeDefined(); + const value = (await hydrate(completed.completed!.result)) as { + t0: number; + t1: number; + }; + expect(value.t0).toBe(startedAt.getTime()); + expect(value.t1).toBe(stepCompletedAt.getTime()); + }); + + it('reports a workflow that throws as failed', async () => { + const result = await runQuickJSWorkflow({ + workflowCode: bundle( + 'wf', + 'function wf() { throw new TypeError("boom"); }' + ), + workflowId: 'wf', + workflowRun: makeRun('wf'), + events: [], + encryptionKey: noKey, + runInput: await runInputFor([]), + }); + + expect(result.completed).toBeUndefined(); + expect(result.failed).toBeDefined(); + expect(result.failed!.message).toContain('boom'); + expect(result.failed!.name).toBe('TypeError'); + }); + + it('throws WorkflowNotRegisteredError name when the workflow is missing', async () => { + const result = await runQuickJSWorkflow({ + workflowCode: bundle('other', 'function other() { return 1; }'), + workflowId: 'wf', + workflowRun: makeRun('wf'), + events: [], + encryptionKey: noKey, + runInput: await runInputFor([]), + }); + + expect(result.failed).toBeDefined(); + expect(result.failed!.name).toBe('WorkflowNotRegisteredError'); + }); +}); diff --git a/packages/core/src/runtime/quickjs-runtime.ts b/packages/core/src/runtime/quickjs-runtime.ts new file mode 100644 index 0000000000..771cb09e58 --- /dev/null +++ b/packages/core/src/runtime/quickjs-runtime.ts @@ -0,0 +1,1175 @@ +/** + * QuickJS WASM workflow runtime. + * + * An alternative to the default `node:vm` runtime that executes workflow + * orchestrator code inside a QuickJS WASM VM (via quickjs-wasi). This is + * required on platforms that disallow `node:vm` / code-generation-from-strings + * (e.g. Cloudflare Workers), where QuickJS interprets the workflow bytecode + * inside its own WASM linear memory. + * + * Like the `node:vm` runtime, it replays the full event log on every + * invocation (no snapshot/restore): each call creates a fresh VM, loads the + * workflow bundle, processes all events, and either completes, suspends, or + * fails. + * + * The workflow primitives (useStep, sleep, createHook) are implemented as + * JavaScript code running inside the QuickJS VM. The host communicates with + * the VM by evaluating small JS snippets to read pending operations and + * resolve/reject promises. + */ + +import type { Event, RunInput, WorkflowRun } from '@workflow/world'; +import * as nanoid from 'nanoid'; +import { JSException, QuickJS } from 'quickjs-wasi'; +import seedrandom from 'seedrandom'; +import type { CryptoKey } from '../encryption.js'; +import { runtimeLogger } from '../logger.js'; +import { decompress } from '../serialization/compression.js'; +import { decrypt as decryptData } from '../serialization/encryption.js'; +import { quickjsExtensions, quickjsWasm } from './quickjs-assets.generated.js'; +import { VM_SERDE_BUNDLE } from './vm-serde-bundle.generated.js'; + +/** + * Decode a host-written payload into the raw `devl` bytes the QuickJS VM + * understands. Payloads are written as `serialize(devl) → compress → encrypt`, + * so reading is the inverse: `decrypt → decompress`. The VM's deserializer + * only handles the `devl` format — it would throw on a `zstd`/`gzip`/`encr` + * prefix — so both layers must be undone host-side. `decryptData` and + * `decompress` both pass through payloads that lack their format prefix. + */ +async function decodeForVm( + bytes: Uint8Array, + encryptionKey?: CryptoKey +): Promise { + const decrypted = (await decryptData(bytes, encryptionKey)) as Uint8Array; + return (await decompress(decrypted)) as Uint8Array; +} + +// ---- Types ---- + +export interface PendingStep { + type: 'step'; + correlationId: string; + stepId: string; + /** Format-prefixed devalue-serialized step input (args + closureVars) */ + input: Uint8Array; + /** Whether a step_created event already exists for this step */ + hasCreatedEvent: boolean; +} + +export interface PendingWait { + type: 'wait'; + correlationId: string; + /** ISO string of when to resume */ + resumeAt: string; + /** Whether a wait_created event already exists for this wait */ + hasCreatedEvent: boolean; +} + +export interface PendingHook { + type: 'hook'; + correlationId: string; + token: string; + isWebhook: boolean; + metadata?: unknown; + hasCreatedEvent: boolean; +} + +export interface PendingHookDispose { + type: 'hook_dispose'; + correlationId: string; + hasCreatedEvent: boolean; +} + +export type PendingOperation = + | PendingStep + | PendingWait + | PendingHook + | PendingHookDispose; + +export interface QuickJSRuntimeResult { + /** The workflow completed — result is format-prefixed devalue bytes */ + completed?: { result: Uint8Array }; + /** The workflow suspended with pending operations */ + suspended?: { + pendingOperations: PendingOperation[]; + }; + /** The workflow failed */ + failed?: { + message: string; + stack?: string; + name?: string; + /** + * Format-prefixed devalue bytes of the original thrown value + * (Error subclass with cause chain, plain object, primitive, etc.). + * Set when the VM-side rejection handler successfully serializes + * the thrown value. The host uses these bytes to reconstruct the + * original value through the standard error hydration pipeline, + * preserving type identity (TypeError, FatalError) and non-Error + * throws verbatim. Falls back to the message/stack/name fields + * when this is undefined (e.g. extractError pseudo-failures). + */ + valueBytes?: Uint8Array; + }; +} + +export interface QuickJSRuntimeOptions { + /** The compiled workflow bundle code (workflow mode output from SWC) */ + workflowCode: string; + /** The workflow ID (e.g. "workflow//./workflows/1_simple//simple") */ + workflowId: string; + /** The workflow run entity */ + workflowRun: WorkflowRun; + /** All events for this run, replayed from scratch on every invocation. */ + events: Event[]; + /** Encryption key for decrypting event payloads (undefined if unencrypted) */ + encryptionKey?: CryptoKey; + /** + * The local port the workflow server is listening on, used to populate + * `workflowMetadata.url`. Resolved at call time on the host side so the + * VM doesn't have to probe the filesystem. Ignored on Vercel — VERCEL_URL + * takes precedence there. + */ + port?: number; + /** + * Fallback workflow input from the queue message's resilient-start + * payload. Used when the fetched event log lacks a `run_created` event + * (eventually-consistent read after the parent's start() wrote it). + */ + runInput?: RunInput; +} + +// ---- VM Bootstrap Code ---- + +/** + * JavaScript code that runs inside the QuickJS VM to set up the workflow + * primitives. This sets up: + * - globalThis.__private_workflows (Map) - workflow registry + * - globalThis.__resolvers (Object) - pending promise resolve/reject functions + * - globalThis.__pending (Array) - metadata about pending operations + * - globalThis[Symbol.for("WORKFLOW_USE_STEP")] - step proxy factory + * - globalThis[Symbol.for("WORKFLOW_SLEEP")] - sleep function + */ +const VM_BOOTSTRAP = ` +// Symbol.dispose / Symbol.asyncDispose polyfills for QuickJS +if (typeof Symbol.dispose === "undefined") { + Symbol.dispose = Symbol.for("Symbol.dispose"); +} +if (typeof Symbol.asyncDispose === "undefined") { + Symbol.asyncDispose = Symbol.for("Symbol.asyncDispose"); +} + +globalThis.__private_workflows = new Map(); +globalThis.__resolvers = {}; +globalThis.__pending = []; +globalThis.__workflowResult = undefined; +globalThis.__workflowError = undefined; +// Buffer for hook_received payloads that arrive before the hook is awaited. +// Keyed by correlationId → array of payloads (preserves delivery order). +// This mirrors the event-replay runtime's payloadsQueue in hook.ts. +globalThis.__hookPayloadBuffer = {}; + +// Stubs for Web APIs that the workflow bundle may reference but are not +// available in QuickJS. Native C extensions (encoding, headers, +// url, structuredClone) provide the real implementations; these are +// minimal stubs for APIs that don't have native extensions yet. + +if (typeof ReadableStream === "undefined") { + // Minimal ReadableStream that stores body data for Response.json()/text() + globalThis.ReadableStream = function() {}; + globalThis.ReadableStream.prototype.__bodyData = null; +} + +if (typeof WritableStream === "undefined") { + globalThis.WritableStream = function() {}; +} + +if (typeof TransformStream === "undefined") { + globalThis.TransformStream = function() {}; +} + +if (typeof console === "undefined") { + globalThis.console = { log: function(){}, error: function(){}, warn: function(){}, info: function(){} }; +} +// Stub exports/module for CJS bundle format +globalThis.exports = {}; +globalThis.module = { exports: globalThis.exports }; +// NOTE: TextEncoder/TextDecoder are provided by the native encoding extension. + +globalThis[Symbol.for("WORKFLOW_USE_STEP")] = function(stepId, closureVarsFn) { + var fn = function() { + var args = Array.prototype.slice.call(arguments); + var correlationId = "step_" + globalThis.__generateUlid(); + // Capture 'this' for method invocations (e.g., MyClass.method()) + var thisVal = (this !== undefined && this !== null && this !== globalThis) ? this : undefined; + // Serialize step input using the host-provided devalue serializer. + // This produces a format-prefixed Uint8Array ("devl" + devalue.stringify). + var input = globalThis[Symbol.for("workflow-serialize")]({ + args: args, + closureVars: closureVarsFn ? closureVarsFn() : undefined, + thisVal: thisVal, + }); + globalThis.__pending.push({ + type: "step", + correlationId: correlationId, + stepId: stepId, + input: input, + hasCreatedEvent: false, + }); + return new Promise(function(resolve, reject) { + globalThis.__resolvers[correlationId] = { resolve: resolve, reject: reject }; + }); + }; + // Set stepId on the proxy so the StepFunction reducer can detect and + // serialize step function references (e.g. when passed as arguments). + fn.stepId = stepId; + if (closureVarsFn) fn.__closureVarsFn = closureVarsFn; + return fn; +}; + +// Parses an "ms" library style duration string into milliseconds. +// Supports the same units as the replay runtime (which uses the "ms" +// package): ms / s / m / h / d / w / y, with verbose aliases +// (seconds, minutes, ...). +globalThis.__parseDurationMs = function(str) { + str = String(str); + if (str.length > 100) return undefined; + var match = str.match( + /^(-?(?:\\d+)?\\.?\\d+) *(milliseconds?|msecs?|ms|seconds?|secs?|s|minutes?|mins?|m|hours?|hrs?|h|days?|d|weeks?|w|years?|yrs?|y)?$/i + ); + if (!match) return undefined; + var n = parseFloat(match[1]); + var type = (match[2] || "ms").toLowerCase(); + var s = 1000, m = 60 * s, h = 60 * m, d = 24 * h, w = 7 * d, y = 365.25 * d; + switch (type) { + case "years": case "year": case "yrs": case "yr": case "y": return n * y; + case "weeks": case "week": case "w": return n * w; + case "days": case "day": case "d": return n * d; + case "hours": case "hour": case "hrs": case "hr": case "h": return n * h; + case "minutes": case "minute": case "mins": case "min": case "m": return n * m; + case "seconds": case "second": case "secs": case "sec": case "s": return n * s; + case "milliseconds": case "millisecond": case "msecs": case "msec": case "ms": return n; + default: return undefined; + } +}; + +globalThis[Symbol.for("WORKFLOW_SLEEP")] = function(param) { + var correlationId = "wait_" + globalThis.__generateUlid(); + var resumeAt; + if (typeof param === "number") { + resumeAt = new Date(Date.now() + param).toISOString(); + } else if (typeof param === "string") { + var ms = globalThis.__parseDurationMs(param); + if (typeof ms === "number" && isFinite(ms)) { + resumeAt = new Date(Date.now() + ms).toISOString(); + } else { + // Not a duration string — try as an absolute date string. + var date = new Date(param); + if (isNaN(date.getTime())) { + throw new Error("Invalid sleep parameter: " + param); + } + resumeAt = date.toISOString(); + } + } else if (param instanceof Date) { + if (isNaN(param.getTime())) { + throw new Error("Invalid sleep parameter: " + param); + } + resumeAt = param.toISOString(); + } else { + throw new Error("Invalid sleep parameter: " + param); + } + globalThis.__pending.push({ + type: "wait", + correlationId: correlationId, + resumeAt: resumeAt, + hasCreatedEvent: false, + }); + return new Promise(function(resolve, reject) { + globalThis.__resolvers[correlationId] = { resolve: resolve, reject: reject }; + }); +}; + +// Response/Request polyfills — .json()/.text()/.arrayBuffer() are useStep +// proxies that execute on the host side. The proxies are assigned directly +// to the prototypes so that 'this' (the Response/Request instance) is +// serialized as thisVal by WORKFLOW_USE_STEP, matching the event-replay +// runtime's approach (commit dcb0761). +if (typeof Response === "undefined") { + var __BODY_INIT = Symbol.for("BODY_INIT"); + + globalThis.Response = function(body, init) { + init = init || {}; + this.status = init.status || 200; + this.statusText = init.statusText || ""; + this.headers = new globalThis.Headers(init.headers || []); + this.type = "default"; + this.url = ""; + this.redirected = false; + if (body !== null && body !== undefined) { + this.body = Object.create(globalThis.ReadableStream.prototype); + this.body[__BODY_INIT] = body; + } else { + this.body = null; + } + }; + Object.defineProperty(globalThis.Response.prototype, "ok", { + get: function() { return this.status >= 200 && this.status < 300; } + }); + Object.defineProperty(globalThis.Response.prototype, "bodyUsed", { + get: function() { return false; } + }); + // Assign useStep proxies directly — 'this' binding provides the + // Response instance, which gets serialized as thisVal by the proxy. + Object.defineProperties(globalThis.Response.prototype, { + arrayBuffer: { value: globalThis[Symbol.for("WORKFLOW_USE_STEP")]("__builtin_response_array_buffer"), writable: true, configurable: true }, + json: { value: globalThis[Symbol.for("WORKFLOW_USE_STEP")]("__builtin_response_json"), writable: true, configurable: true }, + text: { value: globalThis[Symbol.for("WORKFLOW_USE_STEP")]("__builtin_response_text"), writable: true, configurable: true }, + }); + globalThis.Response.prototype.bytes = function() { + return this.arrayBuffer().then(function(buf) { return new Uint8Array(buf); }); + }; + globalThis.Response.prototype.clone = function() { + var r = Object.create(globalThis.Response.prototype); + r.status = this.status; r.statusText = this.statusText; + r.headers = this.headers; r.type = this.type; + r.url = this.url; r.redirected = this.redirected; r.body = this.body; + return r; + }; + globalThis.Response.json = function(data, init) { + var body = JSON.stringify(data); + var headers = new globalThis.Headers(init ? init.headers : []); + if (!headers.has("content-type")) { headers.set("content-type", "application/json"); } + return new globalThis.Response(body, { status: (init && init.status) || 200, statusText: (init && init.statusText) || "", headers: headers }); + }; +} +if (typeof Request === "undefined") { + globalThis.Request = function(input, init) { + init = init || {}; + if (typeof input === "string") { this.url = input; } + else if (input && typeof input === "object") { + this.url = input.url || ""; this.method = input.method; + this.headers = input.headers; this.body = input.body; + } + if (init.method) this.method = init.method.toUpperCase(); + if (!this.method) this.method = "GET"; + if (init.headers) this.headers = new globalThis.Headers(init.headers); + if (!this.headers) this.headers = new globalThis.Headers(); + if (init.body !== undefined) this.body = init.body; + if (!this.body) this.body = null; + this.duplex = init.duplex || "half"; + }; + Object.defineProperty(globalThis.Request.prototype, "bodyUsed", { + get: function() { return false; } + }); + Object.defineProperties(globalThis.Request.prototype, { + arrayBuffer: { value: globalThis[Symbol.for("WORKFLOW_USE_STEP")]("__builtin_response_array_buffer"), writable: true, configurable: true }, + json: { value: globalThis[Symbol.for("WORKFLOW_USE_STEP")]("__builtin_response_json"), writable: true, configurable: true }, + text: { value: globalThis[Symbol.for("WORKFLOW_USE_STEP")]("__builtin_response_text"), writable: true, configurable: true }, + }); +} + +// createHook — returns a Hook object that is both a Thenable and AsyncIterable. +// Each await/yield creates a new promise keyed by the same correlationId. +// The promise is resolved when a hook_received event arrives. +globalThis[Symbol.for("WORKFLOW_CREATE_HOOK")] = function(options) { + options = options || {}; + var token = options.token || globalThis.__generateNanoid(); + var correlationId = "hook_" + globalThis.__generateUlid(); + var isDisposed = false; + var hasCreatedEvent = false; + + // Register in pending operations. + // Serialize metadata inside the VM so Response/Request objects are + // properly handled by the devalue reducers before crossing the boundary. + globalThis.__pending.push({ + type: "hook", + correlationId: correlationId, + token: token, + isWebhook: !!options.isWebhook, + metadata: options.metadata ? globalThis[Symbol.for("workflow-serialize")](options.metadata) : undefined, + hasCreatedEvent: false, + }); + + // Each await creates a new promise for the next payload. + // The correlationId stays the same — the resolver is replaced each time. + function createHookPromise() { + // Check the payload buffer first — if a hook_received event arrived + // before this hook was awaited, the payload was buffered in the VM + // heap. Drain it immediately (matching event-replay payloadsQueue). + var buf = globalThis.__hookPayloadBuffer[correlationId]; + if (buf && buf.length > 0) { + return Promise.resolve(buf.shift()); + } + return new Promise(function(resolve, reject) { + globalThis.__resolvers[correlationId] = { resolve: resolve, reject: reject }; + }); + } + + function disposeHook() { + if (isDisposed) return; + isDisposed = true; + // Signal to the entrypoint to create a hook_disposed event + globalThis.__pending.push({ + type: "hook_dispose", + correlationId: correlationId, + hasCreatedEvent: false, + }); + // If there's a pending resolver, resolve it with undefined to break the iterator + if (globalThis.__resolvers[correlationId]) { + globalThis.__resolvers[correlationId].resolve(undefined); + delete globalThis.__resolvers[correlationId]; + } + } + + var hook = { + token: token, + then: function(onFulfilled, onRejected) { + return createHookPromise().then(onFulfilled, onRejected); + }, + dispose: disposeHook, + }; + + // Symbol.dispose for explicit resource management + hook[Symbol.dispose] = disposeHook; + + // AsyncIterable — yields payloads until disposed + hook[Symbol.asyncIterator] = function() { + return { + next: function() { + if (isDisposed) { + return Promise.resolve({ done: true, value: undefined }); + } + return createHookPromise().then(function(value) { + // If disposed while waiting, signal done + if (isDisposed) return { done: true, value: undefined }; + return { done: false, value: value }; + }); + }, + return: function() { + disposeHook(); + return Promise.resolve({ done: true, value: undefined }); + }, + }; + }; + + return hook; +}; + +// WORKFLOW_GET_STREAM_ID — generates a stream ID for a workflow run. +// Replicates getWorkflowRunStreamId() from util.ts inside the QuickJS VM. +// Uses native btoa() from the base64 extension for base64url encoding. +globalThis[Symbol.for("WORKFLOW_GET_STREAM_ID")] = function(namespace) { + var runId = globalThis[Symbol.for("WORKFLOW_CONTEXT")] + ? globalThis[Symbol.for("WORKFLOW_CONTEXT")].workflowRunId + : ""; + var streamId = runId.replace("wrun_", "strm_") + "_user"; + if (!namespace) return streamId; + // base64url: btoa then replace + with -, / with _, strip = + var b64 = btoa(namespace).replace(/\\+/g, "-").replace(/\\//g, "_").replace(/=+$/, ""); + return streamId + "_" + b64; +}; +`; + +// ---- Runtime ---- + +export async function runQuickJSWorkflow( + options: QuickJSRuntimeOptions +): Promise { + const { workflowCode, workflowId, workflowRun, events } = options; + + const startedAt = workflowRun.startedAt ? +workflowRun.startedAt : Date.now(); + + // Seed the PRNG from stable per-run inputs. Because we always replay the + // full event log from scratch (no snapshots), the same seed is used on + // every invocation, so correlationIds are generated deterministically and + // the world's EntityConflictError dedups idempotent event creation — the + // same model the node:vm runtime uses. + const seed = [ + workflowRun.runId, + workflowRun.workflowName, + String(startedAt), + ].join(':'); + const rng = seedrandom(seed); + + // Seeded nanoid generator — uses the same nanoid package and seeded PRNG + // as the node:vm runtime for consistent token generation. + const generateNanoid = nanoid.customRandom(nanoid.urlAlphabet, 21, (size) => + new Uint8Array(size).map(() => 256 * rng()) + ); + + { + const vm = await QuickJS.create({ + wasm: quickjsWasm, + // Use real time for Date.now() — determinism is handled by seeded Math.random + memoryLimit: 256 * 1024 * 1024, + interruptHandler: createInterruptHandler(), + extensions: quickjsExtensions, + }); + + // Seeded Math.random — host callback ID = baseId + { + using randomFn = vm.newFunction('random', () => vm.newNumber(rng())); + using math = vm.global.getProp('Math'); + math.setProp('random', randomFn); + } + + // Seeded nanoid generator — host callback ID = baseId + 1 + { + using nanoidFn = vm.newFunction('__generateNanoid', () => + vm.newString(generateNanoid()) + ); + vm.setProp(vm.global, '__generateNanoid', nanoidFn); + } + + // Inject a deterministic timestamp for the VM's ULID factory. ULIDs + // produced inside the VM use this as their time prefix instead of + // Date.now(). Because we replay the FULL event log from scratch on every + // invocation and seed the PRNG from the same `runId:workflowName:startedAt` + // (no snapshots, no cursor), every invocation regenerates the IDENTICAL + // correlationId sequence. That determinism is the whole dedup contract: + // each step/hook/wait gets the same correlationId across invocations, so + // the world's EntityConflictError on `events.create` collapses duplicates. + // (`startedAt` is the persisted run_started timestamp — see the note in + // runtime.ts on excluding the QuickJS runtime from turbo so the first + // invocation seeds from it, not a transient local `now`.) + vm.evalCode(`globalThis.__ulidTimestamp = ${startedAt};`).dispose(); + + // Deterministic Date — workflow time follows the event timeline (like the + // node:vm runtime), NOT wall-clock. `Date.now()` and `new Date()` (no + // args) return `globalThis.__currentTime`, which starts at the run's + // startedAt and is advanced to each event's createdAt as it is consumed + // (see processEvents). Without this, a full replay would read the same + // real-time instant for every Date.now() call, so time would not advance + // across sleeps/steps. `new Date(arg)` and the static methods are + // delegated to the real Date. + vm.evalCode( + `globalThis.__currentTime = ${startedAt};` + + `(function(){` + + `var RealDate = Date;` + + `function D(){` + + ` if (arguments.length === 0) return new RealDate(globalThis.__currentTime);` + + ` return new RealDate(...arguments);` + + `}` + + `D.prototype = RealDate.prototype;` + + `Object.setPrototypeOf(D, RealDate);` + + `D.now = function(){ return globalThis.__currentTime; };` + + `globalThis.Date = D;` + + `})();` + ).dispose(); + + // Evaluate the VM serde bundle + vm.evalCode(VM_SERDE_BUNDLE, 'vm-serde.js').dispose(); + + // Bootstrap workflow primitives + vm.evalCode(VM_BOOTSTRAP, 'bootstrap.js').dispose(); + + // Execute the workflow bundle — use the workflowId as the eval filename + // so QuickJS stack traces reference the workflow name, enabling source map + // remapping by remapErrorStack (which matches frames by filename). + try { + vm.evalCode(workflowCode, workflowId || 'workflow.js').dispose(); + } catch (err) { + return extractError(vm, err, 'Workflow evaluation failed'); + } + + // Extract workflow arguments. Prefer the run_created event; fall back + // to the queue message's runInput if the event log is incomplete + // (eventually-consistent read after start()). Failing to find input + // for a first invocation is fatal — running the workflow function + // with no args would silently turn typed arguments into `undefined` + // and, for recursive workflows, produce exponential fan-out. + const runCreatedEvent = events.find((e) => e.eventType === 'run_created'); + const runCreatedInput = + runCreatedEvent && 'eventData' in runCreatedEvent + ? (runCreatedEvent.eventData as Record)?.input + : undefined; + const runInput: unknown = + runCreatedInput ?? (options.runInput?.input as unknown); + + if (runInput instanceof Uint8Array) { + const decryptedInput = (await decodeForVm( + runInput, + options.encryptionKey + )) as Uint8Array; + runtimeLogger.debug('QuickJS runtime: run input format', { + prefix: new TextDecoder().decode(decryptedInput.subarray(0, 4)), + byteLength: decryptedInput.byteLength, + source: runCreatedInput ? 'run_created' : 'queueMessage.runInput', + }); + const inputHandle = vm.newUint8Array(decryptedInput); + vm.setProp(vm.global, '__wdk_input', inputHandle); + inputHandle.dispose(); + } else if (runInput === undefined && events.length > 0) { + // The event log is non-empty (we got run_started or similar) but + // no run_created event was found and no queue-provided runInput is + // available. This is the race condition observed during the fib + // incident — silently dropping arguments would turn `n` into + // `undefined` and, for recursive workflows, cause exponential + // fan-out. Fail loud so the run goes to `run_failed` and the queue + // can retry. Empty `events` is allowed because tests that bootstrap + // a workflow with no arguments rely on the old permissive behavior. + throw new Error( + `Cannot start workflow run "${workflowRun.runId}": no run_created event found and no runInput in the queue payload, but other events are present (likely a read-after-write race during start()).` + ); + } + + // Set workflow context metadata (for getWorkflowMetadata()). + // Must match the shape that the replay runtime produces (see + // packages/core/src/workflow.ts: runWorkflow → ctx) so user code + // that compares `getWorkflowMetadata()` values between a step + // (server-side) and the workflow (VM-side) sees identical objects. + { + const metadata = { + workflowName: workflowRun.workflowName, + workflowRunId: workflowRun.runId, + workflowStartedAt: workflowRun.startedAt + ? new Date(+workflowRun.startedAt) + : new Date(), + url: process.env.VERCEL_URL + ? `https://${process.env.VERCEL_URL}` + : `http://localhost:${options.port ?? 3000}`, + features: { encryption: !!options.encryptionKey }, + }; + vm.evalCode( + `globalThis[Symbol.for("WORKFLOW_CONTEXT")] = ${JSON.stringify(metadata)};` + + `globalThis[Symbol.for("WORKFLOW_CONTEXT")].workflowStartedAt = new Date(${JSON.stringify(metadata.workflowStartedAt.toISOString())});` + ).dispose(); + } + + // Start the workflow function. If the workflow isn't registered, + // throw an error tagged with `name = "WorkflowNotRegisteredError"` + // so the host-side entrypoint can reconstruct a real + // WorkflowNotRegisteredError (a WorkflowRuntimeError subclass that + // classifies as RUNTIME_ERROR) rather than a generic user error. + // See snapshot-entrypoint.ts's run_failed branch. + try { + vm.evalCode(` + var __wfn = globalThis.__private_workflows.get(${JSON.stringify(workflowId)}); + if (!__wfn) { + var __wfnErr = new Error("Workflow \\"" + ${JSON.stringify(workflowId)} + "\\" is not registered in the current deployment."); + __wfnErr.name = "WorkflowNotRegisteredError"; + throw __wfnErr; + } + var __args = globalThis.__wdk_input + ? globalThis[Symbol.for("workflow-deserialize")](globalThis.__wdk_input) + : []; + delete globalThis.__wdk_input; + if (!Array.isArray(__args)) __args = [__args]; + // Wrap in Promise.resolve so synchronous workflow returns (non-async + // functions, or values returned before the first await) are handled + // the same as the node:vm runtime, which awaits the result. + Promise.resolve(__wfn.apply(null, __args)).then( + function(result) { globalThis.__workflowResult = globalThis[Symbol.for("workflow-serialize")](result); }, + function(error) { + // Preserve display info on the host-side failed object + // (matches the legacy host-visible shape) AND serialize the + // entire thrown value so the host can dehydrate the original + // type-identity, cause chain, or non-Error throws verbatim + // through the standard error pipeline. + globalThis.__workflowError = { + message: error && error.message != null ? String(error.message) : String(error), + stack: error && error.stack ? error.stack : "", + name: error && error.name ? error.name : (error instanceof Error ? "Error" : typeof error), + valueBytes: globalThis[Symbol.for("workflow-serialize")](error), + }; + } + ); + `).dispose(); + } catch (err) { + return extractError(vm, err, 'Failed to start workflow'); + } + + // Process events and drain jobs in a loop. Events may resolve promises + // that unblock workflow code, which then creates NEW resolvers for + // subsequent events; re-processing matches those against delivered events. + let maxIterations = 100; + let madeProgress: boolean; + do { + madeProgress = await processEvents(vm, events, options.encryptionKey); + let batch: number; + do { + batch = vm.executePendingJobs(); + if (batch > 0) madeProgress = true; + } while (batch > 0); + } while (madeProgress && --maxIterations > 0); + + // If we exhausted the iteration budget while still making progress, the VM + // hasn't settled — checkWorkflowState would read a half-processed state + // (e.g. suspend with ops not yet surfaced), silently wedging the run. Fail + // loud instead so the queue retries / surfaces a real error. + if (maxIterations <= 0 && madeProgress) { + runtimeLogger.error( + 'QuickJS runtime: event processing did not converge within iteration budget', + { workflowId, eventCount: events.length } + ); + return { + failed: { + message: + 'QuickJS runtime: event processing did not converge (exceeded iteration budget). This likely indicates a runtime bug.', + }, + }; + } + + // ---- Check result ---- + return checkWorkflowState(vm); + } +} + +// ---- Event Processing ---- + +async function processEvents( + vm: QuickJS, + events: Event[], + encryptionKey?: CryptoKey +): Promise { + let resolved = false; + for (const event of events) { + // Advance the VM's deterministic clock to this event's timestamp before + // resolving it, so workflow code that runs after the corresponding await + // (drained immediately after each resolution below) observes the event + // timeline — mirroring the node:vm runtime's updateTimestamp(+createdAt). + if (event.createdAt != null) { + const ts = +new Date( + event.createdAt as unknown as string | number | Date + ); + if (!Number.isNaN(ts)) { + vm.evalCode(`globalThis.__currentTime = ${ts};`).dispose(); + } + } + + const cid = event.correlationId; + if (!cid) continue; + + // Escape both backslash and double-quote before interpolating cid into the + // evalCode strings below. correlationIds are `step_`/`wait_`/`hook_` + ULID + // (Crockford base32 — no quotes or backslashes) so this is belt-and-braces + // today, but it keeps the interpolation safe if the id format ever changes. + const escapedCid = cid.replace(/[\\"]/g, '\\$&'); + const eventData = + 'eventData' in event + ? (event.eventData as Record) + : undefined; + + // Log the event and whether the resolver exists + switch (event.eventType) { + case 'step_completed': { + const hasResolver = vm.dump( + vm.evalCode(`!!globalThis.__resolvers["${escapedCid}"]`) + ); + const rawOutput = eventData?.result ?? eventData?.output; + if (hasResolver) { + if (rawOutput instanceof Uint8Array) { + // Decrypt if encrypted — the VM only understands 'devl' format + runtimeLogger.debug('QuickJS runtime: step result raw', { + correlationId: escapedCid, + rawPrefix: new TextDecoder().decode(rawOutput.subarray(0, 4)), + rawByteLength: rawOutput.byteLength, + isBuffer: Buffer.isBuffer(rawOutput), + }); + const decryptedOutput = (await decodeForVm( + rawOutput, + encryptionKey + )) as Uint8Array; + runtimeLogger.debug('QuickJS runtime: step result decrypted', { + correlationId: escapedCid, + prefix: new TextDecoder().decode(decryptedOutput.subarray(0, 4)), + byteLength: decryptedOutput.byteLength, + }); + const bytesHandle = vm.newUint8Array(decryptedOutput); + vm.setProp(vm.global, '__tmp_result', bytesHandle); + bytesHandle.dispose(); + vm.evalCode( + `globalThis.__resolvers["${escapedCid}"].resolve(globalThis[Symbol.for("workflow-deserialize")](globalThis.__tmp_result));` + + `delete globalThis.__resolvers["${escapedCid}"];` + + `delete globalThis.__tmp_result;` + ).dispose(); + } else { + runtimeLogger.debug('QuickJS runtime: step result non-binary', { + correlationId: escapedCid, + type: typeof rawOutput, + isNull: rawOutput === null, + isUndefined: rawOutput === undefined, + constructor: rawOutput?.constructor?.name, + }); + const serialized = + rawOutput !== undefined ? JSON.stringify(rawOutput) : 'undefined'; + vm.evalCode( + `globalThis.__resolvers["${escapedCid}"].resolve(${serialized});` + + `delete globalThis.__resolvers["${escapedCid}"];` + ).dispose(); + } + // Drain ALL microtasks after resolve + { + resolved = true; + let b: number; + do { + b = vm.executePendingJobs(); + } while (b > 0); + } + } + markCreated(vm, escapedCid); + break; + } + case 'step_failed': { + const hasResolver = vm.dump( + vm.evalCode(`!!globalThis.__resolvers["${escapedCid}"]`) + ); + if (hasResolver) { + const errorData = eventData?.error; + if (errorData instanceof Uint8Array) { + // Modern path (post-#1851): the step handler dehydrated the + // thrown value through the first-class error pipeline. Decrypt + // (if encrypted) and pass the bytes to the VM-side deserializer + // so the workflow catch sees a properly typed Error subclass + // (TypeError, FatalError with original cause chain, etc.) with + // the original message and stack preserved. + const decrypted = (await decodeForVm( + errorData, + encryptionKey + )) as Uint8Array; + const bytesHandle = vm.newUint8Array(decrypted); + vm.setProp(vm.global, '__tmp_error', bytesHandle); + bytesHandle.dispose(); + vm.evalCode( + `(function(){` + + `var e=globalThis[Symbol.for("workflow-deserialize")](globalThis.__tmp_error);` + + `globalThis.__resolvers["${escapedCid}"].reject(e);` + + `delete globalThis.__resolvers["${escapedCid}"];` + + `delete globalThis.__tmp_error;` + + `})()` + ).dispose(); + } else { + // Legacy path: pre-pipeline events stored error as + // `{ message, stack, code }`. Reconstruct a FatalError so + // workflow catch can detect it via FatalError.is(), matching + // the original V1 step handler behavior. + const isErrorObject = + typeof errorData === 'object' && errorData !== null; + const msg = isErrorObject + ? (((errorData as Record).message as string) ?? + 'Step failed') + : typeof errorData === 'string' + ? errorData + : 'Step failed'; + const errorStack = + (isErrorObject + ? (errorData as Record).stack + : undefined) ?? (eventData?.stack as string | undefined); + const stackAssignment = errorStack + ? `e.stack=${JSON.stringify(errorStack)};` + : ''; + vm.evalCode( + `(function(){var e=new Error(${JSON.stringify(msg)});e.name="FatalError";e.fatal=true;${stackAssignment}` + + `globalThis.__resolvers["${escapedCid}"].reject(e);` + + `delete globalThis.__resolvers["${escapedCid}"];})()` + ).dispose(); + } + { + resolved = true; + let b: number; + do { + b = vm.executePendingJobs(); + } while (b > 0); + } + } + markCreated(vm, escapedCid); + break; + } + case 'wait_completed': { + const hasResolver = vm.dump( + vm.evalCode(`!!globalThis.__resolvers["${escapedCid}"]`) + ); + if (hasResolver) { + vm.evalCode( + `globalThis.__resolvers["${escapedCid}"].resolve();` + + `delete globalThis.__resolvers["${escapedCid}"];` + ).dispose(); + { + resolved = true; + let b: number; + do { + b = vm.executePendingJobs(); + } while (b > 0); + } + } + markCreated(vm, escapedCid); + break; + } + case 'hook_received': { + // Check if this event was already processed (delivered or buffered) + // in this invocation or a prior one (tracked in the VM heap so it + // survives snapshot/restore). Prevents double-delivery when the + // outer loop re-scans events. + const alreadyProcessed = event.eventId + ? vm.dump( + vm.evalCode( + `!!(globalThis.__hookPayloadBuffer.__processedEventIds && globalThis.__hookPayloadBuffer.__processedEventIds[${JSON.stringify(event.eventId)}])` + ) + ) + : false; + if (alreadyProcessed) { + runtimeLogger.debug( + 'QuickJS runtime: hook_received already processed', + { + correlationId: cid, + eventId: event.eventId, + } + ); + markCreated(vm, escapedCid); + break; + } + const hasResolver = vm.dump( + vm.evalCode(`!!globalThis.__resolvers["${escapedCid}"]`) + ); + const rawPayload = eventData?.payload ?? eventData?.result; + runtimeLogger.debug('QuickJS runtime: processing hook_received', { + correlationId: cid, + eventId: event.eventId, + hasResolver, + payloadType: typeof rawPayload, + payloadIsUint8Array: rawPayload instanceof Uint8Array, + payloadKeys: + rawPayload && typeof rawPayload === 'object' + ? Object.keys(rawPayload) + : undefined, + }); + if (hasResolver) { + if (rawPayload instanceof Uint8Array) { + // Decrypt if encrypted — the VM only understands 'devl' format + const decryptedPayload = (await decodeForVm( + rawPayload, + encryptionKey + )) as Uint8Array; + const bytesHandle = vm.newUint8Array(decryptedPayload); + vm.setProp(vm.global, '__tmp_result', bytesHandle); + bytesHandle.dispose(); + vm.evalCode( + `globalThis.__resolvers["${escapedCid}"].resolve(globalThis[Symbol.for("workflow-deserialize")](globalThis.__tmp_result));` + + `delete globalThis.__resolvers["${escapedCid}"];` + + `delete globalThis.__tmp_result;` + ).dispose(); + } else { + const serialized = + rawPayload !== undefined + ? JSON.stringify(rawPayload) + : 'undefined'; + vm.evalCode( + `globalThis.__resolvers["${escapedCid}"].resolve(${serialized});` + + `delete globalThis.__resolvers["${escapedCid}"];` + ).dispose(); + } + // Mark this event as processed in the VM heap to prevent + // double-delivery on re-scan or snapshot restore. + if (event.eventId) { + vm.evalCode( + `(globalThis.__hookPayloadBuffer.__processedEventIds = globalThis.__hookPayloadBuffer.__processedEventIds || {})[${JSON.stringify(event.eventId)}] = true;` + ).dispose(); + } + { + resolved = true; + let b: number; + do { + b = vm.executePendingJobs(); + } while (b > 0); + } + } else { + // No resolver yet — buffer the payload in the VM heap so it + // survives snapshot/restore. When createHookPromise() is called + // later, it will drain this buffer first (matching the event- + // replay runtime's payloadsQueue behavior). + const eventIdJs = event.eventId + ? JSON.stringify(event.eventId) + : 'null'; + const bufferAndTrack = + `(globalThis.__hookPayloadBuffer["${escapedCid}"] = globalThis.__hookPayloadBuffer["${escapedCid}"] || [])` + + `.push(%PAYLOAD%);` + + (event.eventId + ? `(globalThis.__hookPayloadBuffer.__processedEventIds = globalThis.__hookPayloadBuffer.__processedEventIds || {})[${eventIdJs}] = true;` + : ''); + if (rawPayload instanceof Uint8Array) { + // Decrypt if encrypted — the VM only understands 'devl' format + const decryptedPayload = (await decodeForVm( + rawPayload, + encryptionKey + )) as Uint8Array; + const bytesHandle = vm.newUint8Array(decryptedPayload); + vm.setProp(vm.global, '__tmp_result', bytesHandle); + bytesHandle.dispose(); + vm.evalCode( + bufferAndTrack.replace( + '%PAYLOAD%', + 'globalThis[Symbol.for("workflow-deserialize")](globalThis.__tmp_result)' + ) + 'delete globalThis.__tmp_result;' + ).dispose(); + } else { + const serialized = + rawPayload !== undefined + ? JSON.stringify(rawPayload) + : 'undefined'; + vm.evalCode( + bufferAndTrack.replace('%PAYLOAD%', serialized) + ).dispose(); + } + } + markCreated(vm, escapedCid); + break; + } + case 'hook_conflict': { + const hasResolver = vm.dump( + vm.evalCode(`!!globalThis.__resolvers["${escapedCid}"]`) + ); + if (hasResolver) { + const conflictToken = (eventData?.token as string) ?? 'unknown'; + vm.evalCode( + `globalThis.__resolvers["${escapedCid}"].reject(new Error(${JSON.stringify(`Hook token "${conflictToken}" is already in use by another workflow`)}));` + + `delete globalThis.__resolvers["${escapedCid}"];` + ).dispose(); + { + resolved = true; + let b: number; + do { + b = vm.executePendingJobs(); + } while (b > 0); + } + } + markCreated(vm, escapedCid); + break; + } + case 'step_created': + case 'step_started': + case 'step_retrying': + case 'wait_created': + case 'hook_created': { + markCreated(vm, escapedCid); + break; + } + case 'hook_disposed': { + // Disambiguate from the `hook` pending op with the same + // correlationId — we want to mark the `hook_dispose` entry. + markCreated(vm, escapedCid, 'hook_dispose'); + break; + } + } + } + return resolved; +} + +function markCreated(vm: QuickJS, escapedCid: string, opType?: string): void { + // `hook` and `hook_dispose` pending ops share the same correlationId, + // so when processing `hook_disposed` events we must disambiguate by + // type — otherwise `.find()` returns the original `hook` op and the + // `hook_dispose` op is never marked, causing the entrypoint to keep + // retrying a hook_disposed for an already-deleted entity. + const predicate = opType + ? `function(p){return p.correlationId==="${escapedCid}"&&p.type==="${opType}";}` + : `function(p){return p.correlationId==="${escapedCid}";}`; + vm.evalCode( + `var __p=globalThis.__pending.find(${predicate});` + + `if(__p)__p.hasCreatedEvent=true;` + ).dispose(); +} + +// ---- State Checking ---- + +function checkWorkflowState(vm: QuickJS): QuickJSRuntimeResult { + // Check completed — __workflowResult is a format-prefixed Uint8Array + { + using h = vm.evalCode('globalThis.__workflowResult'); + if (!h.isUndefined) { + const resultBytes = h.toUint8Array(); + vm.dispose(); + return { completed: { result: resultBytes } }; + } + } + + // Check failed + { + using h = vm.evalCode('globalThis.__workflowError'); + if (!h.isUndefined) { + const errorObj = vm.dump(h) as + | { + message: string; + stack?: string; + name?: string; + valueBytes?: Uint8Array; + } + | string; + const failed = + typeof errorObj === 'string' + ? { message: errorObj } + : { + message: errorObj.message, + stack: errorObj.stack || undefined, + name: errorObj.name || undefined, + valueBytes: errorObj.valueBytes, + }; + runtimeLogger.error('QuickJS runtime: workflow failed in VM', { + errorMessage: failed.message, + errorName: failed.name, + errorStack: failed.stack, + }); + vm.dispose(); + return { failed }; + } + } + + // Check suspended — the workflow is suspended if there are active resolvers + // OR pending operations that haven't been created yet (e.g. hooks created + // upfront but not yet awaited) + { + using h = vm.evalCode( + 'Object.keys(globalThis.__resolvers).length > 0 || globalThis.__pending.some(function(p){return!p.hasCreatedEvent;})' + ); + if (vm.dump(h)) { + using pendingH = vm.evalCode( + `globalThis.__pending.filter(function(p){return!!globalThis.__resolvers[p.correlationId] || !p.hasCreatedEvent;})` + ); + const pendingOps = vm.dump(pendingH) as PendingOperation[]; + vm.dispose(); + + return { + suspended: { + pendingOperations: pendingOps, + }, + }; + } + } + + vm.dispose(); + return { failed: { message: 'Workflow ended in unknown state' } }; +} + +// ---- Helpers ---- + +function extractError( + vm: QuickJS, + err: unknown, + fallbackMessage: string +): QuickJSRuntimeResult { + let message = fallbackMessage; + let stack: string | undefined; + let name: string | undefined; + + if (err instanceof JSException) { + const error = vm.dump(err.handle) as Record | null; + err.handle.dispose(); + message = (error?.message as string) ?? err.message ?? fallbackMessage; + stack = (error?.stack as string) ?? err.stack; + name = (error?.name as string) ?? err.name; + } else if (err instanceof Error) { + message = err.message ?? fallbackMessage; + stack = err.stack; + name = err.name; + } + + vm.dispose(); + return { + failed: { message, stack, name }, + }; +} + +function createInterruptHandler(): () => boolean { + const start = Date.now(); + const timeout = 30_000; + return () => Date.now() - start > timeout; +} diff --git a/packages/core/src/runtime/runtime-mode.test.ts b/packages/core/src/runtime/runtime-mode.test.ts new file mode 100644 index 0000000000..c2906f1245 --- /dev/null +++ b/packages/core/src/runtime/runtime-mode.test.ts @@ -0,0 +1,35 @@ +import { WorkflowRuntimeError } from '@workflow/errors'; +import { describe, expect, it } from 'vitest'; +import { + DEFAULT_WORKFLOW_RUNTIME, + getWorkflowRuntimeFromEnv, + WORKFLOW_RUNTIMES, +} from './runtime-mode.js'; + +describe('getWorkflowRuntimeFromEnv', () => { + it('returns undefined when WORKFLOW_RUNTIME is unset', () => { + expect(getWorkflowRuntimeFromEnv({})).toBeUndefined(); + }); + + it('returns undefined when WORKFLOW_RUNTIME is empty', () => { + expect(getWorkflowRuntimeFromEnv({ WORKFLOW_RUNTIME: '' })).toBeUndefined(); + }); + + it('returns the configured mode for known values', () => { + for (const mode of WORKFLOW_RUNTIMES) { + expect(getWorkflowRuntimeFromEnv({ WORKFLOW_RUNTIME: mode })).toBe(mode); + } + }); + + it('throws WorkflowRuntimeError on an unknown value', () => { + expect(() => + getWorkflowRuntimeFromEnv({ WORKFLOW_RUNTIME: 'bogus' }) + ).toThrow(WorkflowRuntimeError); + }); + + it('defaults to node-vm', () => { + expect(DEFAULT_WORKFLOW_RUNTIME).toBe('node-vm'); + expect(WORKFLOW_RUNTIMES).toContain('node-vm'); + expect(WORKFLOW_RUNTIMES).toContain('quickjs'); + }); +}); diff --git a/packages/core/src/runtime/runtime-mode.ts b/packages/core/src/runtime/runtime-mode.ts new file mode 100644 index 0000000000..d2b194163c --- /dev/null +++ b/packages/core/src/runtime/runtime-mode.ts @@ -0,0 +1,47 @@ +/** + * Runtime mode selection for workflow execution. + * + * The `node-vm` runtime (the default) executes workflow orchestrator code in a + * Node.js `vm` context. The `quickjs` runtime executes it inside a QuickJS WASM + * VM instead — this is required on runtimes that disallow `node:vm` / + * code-generation-from-strings (e.g. Cloudflare Workers). + * + * The QuickJS runtime is opt-in via the `WORKFLOW_RUNTIME` env var or + * per-run via `executionContext.workflowRuntime` (set by the SDK at start()), + * so the same deployment can serve both. + */ + +import { WorkflowRuntimeError } from '@workflow/errors'; + +/** + * Known workflow runtime modes. Any other `WORKFLOW_RUNTIME` value is + * treated as a misconfiguration and rejected at startup. + */ +export const WORKFLOW_RUNTIMES = ['node-vm', 'quickjs'] as const; + +export type WorkflowRuntimeMode = (typeof WORKFLOW_RUNTIMES)[number]; + +/** The runtime used when nothing overrides it. */ +export const DEFAULT_WORKFLOW_RUNTIME: WorkflowRuntimeMode = 'node-vm'; + +/** + * Read and validate the `WORKFLOW_RUNTIME` env var. + * + * Returns the configured mode, or `undefined` if unset/empty. + * Throws {@link WorkflowRuntimeError} if the value is set but not one of + * the known modes — catching misconfiguration early is better than + * silently falling back to the default. + */ +export function getWorkflowRuntimeFromEnv( + env: NodeJS.ProcessEnv = process.env +): WorkflowRuntimeMode | undefined { + const raw = env.WORKFLOW_RUNTIME; + if (raw === undefined || raw === '') return undefined; + if ((WORKFLOW_RUNTIMES as readonly string[]).includes(raw)) { + return raw as WorkflowRuntimeMode; + } + throw new WorkflowRuntimeError( + `Invalid WORKFLOW_RUNTIME value: "${raw}". ` + + `Expected one of: ${WORKFLOW_RUNTIMES.join(', ')}.` + ); +} diff --git a/packages/core/src/runtime/start.ts b/packages/core/src/runtime/start.ts index 88c8838074..6f9c753647 100644 --- a/packages/core/src/runtime/start.ts +++ b/packages/core/src/runtime/start.ts @@ -29,6 +29,7 @@ import { version as workflowCoreVersion } from '../version.js'; import { getWorldLazy } from './get-world-lazy.js'; import { getWorkflowQueueName, healthCheck } from './helpers.js'; import { Run } from './run.js'; +import { getWorkflowRuntimeFromEnv } from './runtime-mode.js'; import { safeWaitUntil, waitedUntil } from './wait-until.js'; /** @@ -360,10 +361,16 @@ export async function start( compression ); + // If WORKFLOW_RUNTIME is set on the client starting the run, propagate + // that choice to the runtime so the same deployment can serve both + // runtimes. Unknown values throw — see getWorkflowRuntimeFromEnv(). + const workflowRuntime = getWorkflowRuntimeFromEnv(); + const executionContext = { traceCarrier, workflowCoreVersion, features: { encryption: !!encryptionKey }, + ...(workflowRuntime ? { workflowRuntime } : {}), }; // Call events.create (run_created) and queue in parallel. diff --git a/packages/core/src/serialization/codec-devalue-vm.ts b/packages/core/src/serialization/codec-devalue-vm.ts new file mode 100644 index 0000000000..63b2e16a1f --- /dev/null +++ b/packages/core/src/serialization/codec-devalue-vm.ts @@ -0,0 +1,93 @@ +/** + * VM-compatible devalue codec. + * + * Same as codec-devalue.ts but uses VM-compatible reducers/revivers + * (no Node.js Buffer, no node:util). Safe to bundle into the QuickJS VM. + */ + +import { parse, stringify, unflatten } from 'devalue'; +import type { Codec, SerializationMode } from './codec.js'; +import { getClassReducers, getClassRevivers } from './reducers/class.js'; +import { getCommonReducers, getCommonRevivers } from './reducers/common-vm.js'; +import { + getStepFunctionReducer, + getStepFunctionReviver, +} from './reducers/step-function.js'; +import { type Reducers, type Revivers, SerializationFormat } from './types.js'; + +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +function getReducersForMode(mode: SerializationMode): Partial { + switch (mode) { + case 'workflow': + return { + ...getClassReducers(), + ...getStepFunctionReducer(), + ...getCommonReducers(), + }; + case 'step': + return { + ...getClassReducers(), + ...getCommonReducers(), + }; + case 'client': + return { + ...getClassReducers(), + ...getCommonReducers(), + }; + } +} + +function getReviversForMode(mode: SerializationMode): Partial { + switch (mode) { + case 'workflow': + return { + ...getClassRevivers(), + ...getStepFunctionReviver(), + ...getCommonRevivers(), + }; + case 'step': + return { + ...getClassRevivers(), + ...getCommonRevivers(), + }; + case 'client': + return { + ...getClassRevivers(), + ...getCommonRevivers(), + StepFunction: () => { + throw new Error( + 'Step functions cannot be deserialized in client context.' + ); + }, + }; + } +} + +export const devalueVmCodec: Codec = { + formatPrefix: SerializationFormat.DEVALUE_V1, + + serialize(value: unknown, mode: SerializationMode): Uint8Array { + const reducers = getReducersForMode(mode); + const str = stringify( + value, + reducers as Record any> + ); + return encoder.encode(str); + }, + + deserialize(data: Uint8Array, mode: SerializationMode): unknown { + const revivers = getReviversForMode(mode); + const str = decoder.decode(data); + return parse(str, revivers as Record any>); + }, + + deserializeLegacy(data: unknown, mode: SerializationMode): unknown { + const revivers = getReviversForMode(mode); + return unflatten( + data as any[], + revivers as Record any> + ); + }, +}; diff --git a/packages/core/src/serialization/reducers/common-vm.ts b/packages/core/src/serialization/reducers/common-vm.ts new file mode 100644 index 0000000000..c089d68749 --- /dev/null +++ b/packages/core/src/serialization/reducers/common-vm.ts @@ -0,0 +1,465 @@ +/** + * VM-compatible common reducers and revivers. + * + * Identical to common.ts but without Node.js dependencies: + * - Uses the native TC39 `proposal-arraybuffer-base64` API + * (`Uint8Array.prototype.toBase64` / `Uint8Array.fromBase64`), which + * quickjs-wasi implements natively, instead of Buffer or a btoa/atob dance. + * - Uses `instanceof Error` instead of `types.isNativeError()`. + * + * This module is safe to bundle into the QuickJS WASM VM. + */ + +import type { Reducers, Revivers, SerializableSpecial } from '../types.js'; + +// Minimal typing for the TC39 Uint8Array base64 methods (not in lib.d.ts yet). +type Base64Capable = { + toBase64(): string; +}; +type Base64Static = { + fromBase64(s: string): Uint8Array; +}; + +// ---- Base64 helpers (native proposal-arraybuffer-base64 in quickjs-wasi) ---- + +function arrayBufferToBase64( + value: ArrayBufferLike, + offset: number, + length: number +): string { + if (length === 0) return '.'; + return ( + new Uint8Array(value, offset, length) as unknown as Base64Capable + ).toBase64(); +} + +function viewToBase64(value: ArrayBufferView): string { + return arrayBufferToBase64(value.buffer, value.byteOffset, value.byteLength); +} + +function reviveArrayBuffer(value: string): ArrayBuffer { + if (value === '.') return new ArrayBuffer(0); + return (Uint8Array as unknown as Base64Static).fromBase64(value) + .buffer as ArrayBuffer; +} + +// ---- Error subclass helper ---- + +// Creates a reducer for a built-in Error subclass whose serialized shape +// is exactly { message, stack, cause? }. Matches by `value.name` +// (instance property) for cross-realm + bundler-output robustness — see +// the host-side common.ts for full rationale. +function makeNamedErrorSubclassReducer(subclassName: string) { + return ( + value: unknown + ): { message: string; stack?: string; cause?: unknown } | false => { + if (!(value instanceof Error)) return false; + if (value.name !== subclassName) return false; + const reduced: { message: string; stack?: string; cause?: unknown } = { + message: value.message, + stack: value.stack, + }; + if ('cause' in value) reduced.cause = (value as { cause: unknown }).cause; + return reduced; + }; +} + +// Creates a reviver for a built-in Error subclass. Looks up the +// constructor on globalThis so the resulting object passes +// `instanceof TypeError` etc. in the consuming realm. Falls back to +// a base Error with the right `.name` if the constructor is not +// available (defensive — built-ins always exist). +function makeNamedErrorSubclassReviver(subclassName: string) { + return (value: { message: string; stack?: string; cause?: unknown }) => { + const Cls = (globalThis as any)[subclassName]; + let error: Error; + if (typeof Cls === 'function') { + error = new Cls(value.message); + } else { + error = new Error(value.message); + error.name = subclassName; + } + if (value.stack !== undefined) error.stack = value.stack; + if ('cause' in value) (error as any).cause = (value as any).cause; + return error; + }; +} + +// ---- Reducers ---- + +export function getCommonReducers(): Partial { + return { + ArrayBuffer: (value) => + value instanceof ArrayBuffer && + arrayBufferToBase64(value, 0, value.byteLength), + BigInt: (value) => typeof value === 'bigint' && value.toString(), + BigInt64Array: (value) => + value instanceof BigInt64Array && viewToBase64(value), + BigUint64Array: (value) => + value instanceof BigUint64Array && viewToBase64(value), + Date: (value) => { + if (!(value instanceof Date)) return false; + const valid = !Number.isNaN(value.getDate()); + return valid ? value.toISOString() : '.'; + }, + // DOMException is checked before Error so that DOMException-specific + // shape (name, message, stack, cause) survives the round-trip. + DOMException: (value) => { + if (!(value instanceof DOMException)) return false; + const reduced: SerializableSpecial['DOMException'] = { + message: value.message, + name: value.name, + stack: value.stack, + }; + if ('cause' in value) reduced.cause = (value as any).cause; + return reduced; + }, + // First-class Error subclass reducers. Order matters: each subclass + // reducer is checked before the generic `Error` catch-all so that + // e.g. a TypeError instance routes through the TypeError reducer + // instead of the base Error reducer. Matching is by `value.name` + // (the instance property) for cross-realm + bundler robustness; + // see common.ts for full rationale. + AggregateError: (value) => { + if (!(value instanceof Error) || value.name !== 'AggregateError') + return false; + const reduced: SerializableSpecial['AggregateError'] = { + message: value.message, + stack: value.stack, + errors: (value as AggregateError).errors, + }; + if ('cause' in value) reduced.cause = (value as any).cause; + return reduced; + }, + EvalError: makeNamedErrorSubclassReducer('EvalError'), + FatalError: makeNamedErrorSubclassReducer('FatalError'), + RangeError: makeNamedErrorSubclassReducer('RangeError'), + ReferenceError: makeNamedErrorSubclassReducer('ReferenceError'), + // RetryableError carries an extra retryAfter; serialize as numeric + // epoch timestamp for cross-realm safety (see host-side common.ts). + RetryableError: (value) => { + if (!(value instanceof Error) || value.name !== 'RetryableError') + return false; + const retryAfterRaw = (value as any).retryAfter; + let retryAfter: number; + if ( + retryAfterRaw && + typeof retryAfterRaw === 'object' && + typeof (retryAfterRaw as { getTime?: unknown }).getTime === 'function' + ) { + const t = (retryAfterRaw as Date).getTime(); + retryAfter = Number.isNaN(t) ? Date.now() + 1000 : t; + } else if ( + typeof retryAfterRaw === 'string' || + typeof retryAfterRaw === 'number' + ) { + const t = new Date(retryAfterRaw).getTime(); + retryAfter = Number.isNaN(t) ? Date.now() + 1000 : t; + } else { + retryAfter = Date.now() + 1000; + } + const reduced: SerializableSpecial['RetryableError'] = { + message: value.message, + stack: value.stack, + retryAfter, + }; + if ('cause' in value) reduced.cause = (value as any).cause; + return reduced; + }, + SyntaxError: makeNamedErrorSubclassReducer('SyntaxError'), + TypeError: makeNamedErrorSubclassReducer('TypeError'), + URIError: makeNamedErrorSubclassReducer('URIError'), + // Base Error reducer — catch-all. Matched LAST after subclass-specific + // reducers above. Preserves `name` so user Error subclasses without + // dedicated reducers retain their identity through the round-trip. + Error: (value) => { + // In the VM, use instanceof Error (no node:util available) + if (!(value instanceof Error)) return false; + const reduced: SerializableSpecial['Error'] = { + name: value.name, + message: value.message, + stack: value.stack, + }; + if ('cause' in value) reduced.cause = (value as any).cause; + return reduced; + }, + Float32Array: (value) => + value instanceof Float32Array && viewToBase64(value), + Float64Array: (value) => + value instanceof Float64Array && viewToBase64(value), + Int8Array: (value) => value instanceof Int8Array && viewToBase64(value), + Int16Array: (value) => value instanceof Int16Array && viewToBase64(value), + Int32Array: (value) => value instanceof Int32Array && viewToBase64(value), + Map: (value) => value instanceof Map && Array.from(value), + RegExp: (value) => + value instanceof RegExp && { + source: value.source, + flags: value.flags, + }, + // Request/Response/Headers — serialize using the polyfill constructors + Headers: (value) => { + const H = (globalThis as any).Headers; + if (!H || !(value instanceof H)) return false; + return Array.from(value as Iterable<[string, string]>); + }, + Request: (value) => { + const R = (globalThis as any).Request; + if (!R) return false; + // Use instanceof OR check for the Request-specific .json method + // (duck-typing on method/url alone would match plain objects and + // cause infinite recursion since the reducer output also has those) + if (!(value instanceof R) && typeof value?.json !== 'function') + return false; + if (typeof value?.method !== 'string') return false; + const data: any = { + method: value.method, + url: value.url, + headers: value.headers, + body: value.body, + duplex: value.duplex, + }; + // Include the webhook response writable stream if present + const responseWritable = value[Symbol.for('WEBHOOK_RESPONSE_WRITABLE')]; + if (responseWritable) { + data.responseWritable = responseWritable; + } + return data; + }, + Response: (value) => { + const R = (globalThis as any).Response; + if (!R) return false; + // Use instanceof OR check for Response-specific .clone method + if (!(value instanceof R) && typeof value?.clone !== 'function') + return false; + if (typeof value?.status !== 'number') return false; + return { + type: value.type, + url: value.url, + status: value.status, + statusText: value.statusText, + headers: value.headers, + body: value.body, + redirected: value.redirected, + }; + }, + ReadableStream: ((value: any) => { + if (value == null) return false; + const RS = (globalThis as any).ReadableStream; + if ( + !RS || + !(value instanceof RS || Object.getPrototypeOf(value) === RS.prototype) + ) + return false; + const bodyInit = value[Symbol.for('BODY_INIT')]; + if (bodyInit !== undefined) { + return { bodyInit }; + } + // Preserve stream name if present (opaque pointer for passing to steps) + const name = value[Symbol.for('WORKFLOW_STREAM_NAME')]; + if (name) { + const s: any = { name }; + const type = value[Symbol.for('WORKFLOW_STREAM_TYPE')]; + if (type) s.type = type; + return s; + } + return { name: '__empty' }; + }) as any, + WritableStream: ((value: any) => { + if (value == null) return false; + const WS = (globalThis as any).WritableStream; + if ( + !WS || + !(value instanceof WS || Object.getPrototypeOf(value) === WS.prototype) + ) + return false; + const name = value[Symbol.for('WORKFLOW_STREAM_NAME')]; + return { name: name || '__empty' }; + }) as any, + Set: (value) => value instanceof Set && Array.from(value), + URL: (value) => value instanceof URL && value.href, + WorkflowFunction: (value) => { + // Only match function references with a workflowId property (set by + // the SWC compiler on workflow functions). Plain { workflowId } objects + // are NOT matched — this prevents infinite recursion since the reduced + // form { workflowId } is a plain object, not a function. + if (typeof value !== 'function') return false; + const workflowId = (value as any).workflowId; + if (typeof workflowId !== 'string') return false; + return { workflowId }; + }, + URLSearchParams: (value) => { + if (!(value instanceof URLSearchParams)) return false; + return value.size === 0 ? '.' : String(value); + }, + Uint8Array: (value) => value instanceof Uint8Array && viewToBase64(value), + Uint8ClampedArray: (value) => + value instanceof Uint8ClampedArray && viewToBase64(value), + Uint16Array: (value) => value instanceof Uint16Array && viewToBase64(value), + Uint32Array: (value) => value instanceof Uint32Array && viewToBase64(value), + }; +} + +// ---- Revivers ---- + +export function getCommonRevivers(): Partial { + return { + ArrayBuffer: (value: string) => reviveArrayBuffer(value), + BigInt: (value: string) => BigInt(value), + BigInt64Array: (value: string) => + new BigInt64Array(reviveArrayBuffer(value)), + BigUint64Array: (value: string) => + new BigUint64Array(reviveArrayBuffer(value)), + Date: (value) => new Date(value), + DOMException: (value) => { + const error = new DOMException(value.message, value.name); + if (value.stack !== undefined) error.stack = value.stack; + if ('cause' in value) (error as any).cause = value.cause; + return error; + }, + AggregateError: (value) => { + const error = new AggregateError(value.errors ?? [], value.message); + if (value.stack !== undefined) error.stack = value.stack; + if ('cause' in value) (error as any).cause = value.cause; + return error; + }, + EvalError: makeNamedErrorSubclassReviver('EvalError'), + FatalError: (value) => { + // Prefer the host-registered FatalError class (registered via + // Symbol.for keys by @workflow/errors so `instanceof FatalError` + // works across realms). Fall back to a synthesized Error with + // the right .name when no registration is present. + // FatalError's constructor takes only `message`, so cause is + // attached as a property after construction (matching the host + // reviver in common.ts). + const Cls = (globalThis as any)[ + Symbol.for('@workflow/errors//FatalError') + ]; + let error: Error; + if (typeof Cls === 'function') { + error = new Cls(value.message); + } else { + error = new Error(value.message); + error.name = 'FatalError'; + } + if (value.stack !== undefined) error.stack = value.stack; + if ('cause' in value) (error as any).cause = (value as any).cause; + return error; + }, + RangeError: makeNamedErrorSubclassReviver('RangeError'), + ReferenceError: makeNamedErrorSubclassReviver('ReferenceError'), + RetryableError: (value) => { + // RetryableError's constructor accepts (message, { retryAfter }). + // Cause is attached after construction (the constructor does not + // forward it). retryAfter is stored as a Date in the VM realm. + const Cls = (globalThis as any)[ + Symbol.for('@workflow/errors//RetryableError') + ]; + const retryAfter = new Date(value.retryAfter); + let error: Error; + if (typeof Cls === 'function') { + error = new Cls(value.message, { retryAfter }); + } else { + error = new Error(value.message); + error.name = 'RetryableError'; + (error as any).retryAfter = retryAfter; + } + if (value.stack !== undefined) error.stack = value.stack; + if ('cause' in value) (error as any).cause = (value as any).cause; + return error; + }, + SyntaxError: makeNamedErrorSubclassReviver('SyntaxError'), + TypeError: makeNamedErrorSubclassReviver('TypeError'), + URIError: makeNamedErrorSubclassReviver('URIError'), + Error: (value) => { + const error = new Error(value.message); + error.name = value.name; + if (value.stack !== undefined) error.stack = value.stack; + if ('cause' in value) (error as any).cause = (value as any).cause; + return error; + }, + Float32Array: (value: string) => new Float32Array(reviveArrayBuffer(value)), + Float64Array: (value: string) => new Float64Array(reviveArrayBuffer(value)), + Int8Array: (value: string) => new Int8Array(reviveArrayBuffer(value)), + Int16Array: (value: string) => new Int16Array(reviveArrayBuffer(value)), + Int32Array: (value: string) => new Int32Array(reviveArrayBuffer(value)), + Map: (value) => new Map(value), + RegExp: (value) => new RegExp(value.source, value.flags), + Set: (value) => new Set(value), + URL: (value) => new URL(value), + WorkflowFunction: (value) => + Object.assign( + () => { + throw new Error( + 'Workflow functions cannot be called directly. Use start() to invoke them.' + ); + }, + { workflowId: value.workflowId } + ), + URLSearchParams: (value) => new URLSearchParams(value === '.' ? '' : value), + Uint8Array: (value: string) => new Uint8Array(reviveArrayBuffer(value)), + Uint8ClampedArray: (value: string) => + new Uint8ClampedArray(reviveArrayBuffer(value)), + Uint16Array: (value: string) => new Uint16Array(reviveArrayBuffer(value)), + Uint32Array: (value: string) => new Uint32Array(reviveArrayBuffer(value)), + // Web API types — revived as plain objects in the VM since the real + // constructors (Headers, Request, Response) are not available in QuickJS. + // The workflow code can access the properties but not call Web API methods. + Headers: (value) => { + return new (globalThis as any).Headers(value); + }, + Request: (value: any) => { + const Req = (globalThis as any).Request; + if (Req) { + value.json = Req.prototype.json; + value.text = Req.prototype.text; + value.arrayBuffer = Req.prototype.arrayBuffer; + } + // Carry over the webhook response writable stream to the symbol property + if (value.responseWritable) { + value[Symbol.for('WEBHOOK_RESPONSE_WRITABLE')] = value.responseWritable; + } + return value; + }, + Response: (value: any) => { + // Don't use Object.setPrototypeOf — devalue continues to set properties + // on the object after the reviver runs, and getter-only properties + // (like 'ok') on the prototype would cause "no setter" errors. + // Instead, copy methods directly onto the object. + const Resp = (globalThis as any).Response; + if (Resp) { + value.json = Resp.prototype.json; + value.text = Resp.prototype.text; + value.arrayBuffer = Resp.prototype.arrayBuffer; + if (Resp.prototype.bytes) value.bytes = Resp.prototype.bytes; + if (Resp.prototype.clone) value.clone = Resp.prototype.clone; + } + value._body = value.body; + value.ok = value.status >= 200 && value.status < 300; + value.bodyUsed = false; + return value; + }, + ReadableStream: (value) => { + const RS = (globalThis as any).ReadableStream; + const stream = Object.create(RS ? RS.prototype : {}); + if (value && 'bodyInit' in value) { + // Body from Response/Request constructor — store the raw data + stream[Symbol.for('BODY_INIT')] = value.bodyInit; + } else if (value && 'name' in value) { + // Named stream reference — preserve the name/type for re-serialization. + // Streams are opaque pointers in the VM — they can be passed to steps + // but not consumed directly. + stream[Symbol.for('WORKFLOW_STREAM_NAME')] = value.name; + if (value.type) stream[Symbol.for('WORKFLOW_STREAM_TYPE')] = value.type; + } + return stream; + }, + WritableStream: (value) => { + const WS = (globalThis as any).WritableStream; + const stream = Object.create(WS ? WS.prototype : {}); + if (value && 'name' in value) { + stream[Symbol.for('WORKFLOW_STREAM_NAME')] = value.name; + } + return stream; + }, + }; +} diff --git a/packages/core/src/serialization/vm-bundle-entry.ts b/packages/core/src/serialization/vm-bundle-entry.ts new file mode 100644 index 0000000000..8bdc795900 --- /dev/null +++ b/packages/core/src/serialization/vm-bundle-entry.ts @@ -0,0 +1,57 @@ +/** + * Entry point for the VM serialization bundle. + * + * This file is bundled by esbuild into a self-contained IIFE that + * sets up serialize/deserialize on globalThis. The bundled output + * is evaluated inside the QuickJS VM during bootstrap. + * + * TextEncoder, TextDecoder, and Headers are provided by native C + * extensions in quickjs-wasi, so no polyfills are needed. + */ + +import { monotonicFactory } from 'ulid'; +import { deserialize, serialize } from './workflow-vm.js'; + +// Install on global scope under the public well-known symbols. The +// snapshot runtime's bootstrap (and the various inline-evaluated JS +// strings in `snapshot-runtime.ts`) reach the same functions via +// `globalThis[Symbol.for('workflow-serialize')]` etc. +(globalThis as any)[Symbol.for('workflow-serialize')] = serialize; +(globalThis as any)[Symbol.for('workflow-deserialize')] = deserialize; + +// ULID generator for correlationIds — uses the same monotonicFactory +// as the event-replay runtime. Both inputs MUST be set by the host +// before this bundle is evaluated, otherwise the seeded-ULID +// determinism guarantee is silently broken: +// +// * `Math.random` must already be replaced with the host's seeded +// PRNG via `vm.newFunction('random', …)` (see +// `snapshot-runtime.ts`, the `Seeded Math.random` block). Two +// workflow invocations of the same resumption MUST observe an +// identical random sequence so their correlationIds collide and +// the world's EntityConflictError dedup applies. We pass it +// explicitly to `monotonicFactory` because ULID's auto-detect +// (`detectPRNG`) only knows about `crypto.getRandomValues` / +// `crypto.randomBytes`, neither of which exist in QuickJS. +// * `globalThis.__ulidTimestamp` must be a number (typically +// `workflowRun.startedAt`). It's used in place of `Date.now()` so +// the time portion of the ULID is also stable across concurrent +// invocations of the same resumption. +// +// Both prerequisites are validated below — fail loudly if either is +// missing rather than fall back to `Date.now()` / unseeded +// `Math.random`, which would re-introduce non-determinism that the +// snapshot runtime relies on us NOT having. +const ulid = monotonicFactory(Math.random); +(globalThis as any).__generateUlid = () => { + const t = (globalThis as any).__ulidTimestamp; + if (typeof t !== 'number') { + throw new Error( + '__generateUlid: globalThis.__ulidTimestamp must be a number set by ' + + 'the host before the serde bundle is evaluated. Without it, ULIDs ' + + 'would fall back to Date.now() and concurrent workflow invocations ' + + 'of the same resumption would produce divergent correlationIds.' + ); + } + return ulid(t); +}; diff --git a/packages/core/src/serialization/workflow-vm.ts b/packages/core/src/serialization/workflow-vm.ts new file mode 100644 index 0000000000..3aaf5bc730 --- /dev/null +++ b/packages/core/src/serialization/workflow-vm.ts @@ -0,0 +1,73 @@ +/** + * VM-compatible workflow mode serialization. + * + * This module is designed to be bundled into the QuickJS WASM VM. + * It has NO Node.js dependencies (no Buffer, no node:util). + * + * Produces and consumes the same wire format as the Node.js workflow.ts — + * format-prefixed devalue data ("devl" + devalue.stringify output). + */ + +import { devalueVmCodec } from './codec-devalue-vm.js'; +import { isFormatPrefix, SerializationFormat } from './types.js'; + +const FORMAT_PREFIX_LENGTH = 4; +let _encoder: { encode(s: string): Uint8Array }; +let _decoder: { decode(d: Uint8Array): string }; +function getEncoder() { + if (!_encoder) _encoder = new (globalThis as any).TextEncoder(); + return _encoder; +} +function getDecoder() { + if (!_decoder) _decoder = new (globalThis as any).TextDecoder(); + return _decoder; +} + +/** + * Serialize a value to format-prefixed bytes. + * + * @param value - The value to serialize + * @returns Uint8Array with "devl" prefix + devalue payload + */ +export function serialize(value: unknown): Uint8Array { + const payload = devalueVmCodec.serialize(value, 'workflow'); + const prefix = getEncoder().encode(SerializationFormat.DEVALUE_V1); + const result = new Uint8Array(prefix.length + payload.length); + result.set(prefix, 0); + result.set(payload, prefix.length); + return result; +} + +/** + * Deserialize format-prefixed bytes back to a value. + * + * @param data - Uint8Array with format prefix, or legacy non-binary data + * @returns The deserialized value + */ +export function deserialize(data: Uint8Array | unknown): unknown { + // Legacy: non-binary data + if (!(data instanceof Uint8Array)) { + if (devalueVmCodec.deserializeLegacy) { + return devalueVmCodec.deserializeLegacy(data, 'workflow'); + } + throw new Error( + 'Cannot deserialize non-binary data without legacy support' + ); + } + + if (data.length < FORMAT_PREFIX_LENGTH) { + throw new Error('Data too short to contain format prefix'); + } + + const prefixStr = getDecoder().decode(data.subarray(0, FORMAT_PREFIX_LENGTH)); + if (!isFormatPrefix(prefixStr)) { + throw new Error(`Invalid format prefix: "${prefixStr}"`); + } + + if (prefixStr === SerializationFormat.DEVALUE_V1) { + const payload = data.subarray(FORMAT_PREFIX_LENGTH); + return devalueVmCodec.deserialize(payload, 'workflow'); + } + + throw new Error(`Unsupported serialization format: ${prefixStr}`); +} diff --git a/packages/core/src/source-map.ts b/packages/core/src/source-map.ts index 14425825b6..d4557fd75f 100644 --- a/packages/core/src/source-map.ts +++ b/packages/core/src/source-map.ts @@ -1,5 +1,27 @@ import { originalPositionFor, TraceMap } from '@jridgewell/trace-mapping'; +/** + * Pattern matching the trailing inline source map comment that bundlers + * (esbuild, etc.) emit. The comment is purely host-side metadata for + * `remapErrorStack` — the QuickJS VM never needs it. Stripping it before + * passing the bundle to `vm.evalCode` reduces the QuickJS heap, because + * QuickJS retains source text for stack-trace line lookups. + */ +const INLINE_SOURCE_MAP_COMMENT_RE = + /\/\/# sourceMappingURL=data:application\/json;base64,[A-Za-z0-9+/=]+\s*$/m; + +/** + * Strip the trailing `//# sourceMappingURL=data:…` comment from a JS bundle. + * Returns the input unchanged if no inline map is present. + * + * Use this on the host side before evaluating workflow bundles inside the + * QuickJS VM — the VM never needs the map; only host-side `remapErrorStack` + * reads it (against the original, unstripped string). + */ +export function stripInlineSourceMap(workflowCode: string): string { + return workflowCode.replace(INLINE_SOURCE_MAP_COMMENT_RE, ''); +} + function isBase64Char(code: number): boolean { return ( (code >= 0x41 && code <= 0x5a) || diff --git a/packages/core/test/workerd-smoke/.gitignore b/packages/core/test/workerd-smoke/.gitignore new file mode 100644 index 0000000000..41a2578946 --- /dev/null +++ b/packages/core/test/workerd-smoke/.gitignore @@ -0,0 +1,2 @@ +.wrangler/ +node_modules/ diff --git a/packages/core/test/workerd-smoke/src/worker.ts b/packages/core/test/workerd-smoke/src/worker.ts new file mode 100644 index 0000000000..9b7557831d --- /dev/null +++ b/packages/core/test/workerd-smoke/src/worker.ts @@ -0,0 +1,171 @@ +/** + * workerd smoke test for the QuickJS workflow runtime. + * + * Proves that the QuickJS VM execution layer — the `quickjs-wasi` binary + + * native extensions + core's VM serde bundle — instantiates and runs inside + * Cloudflare's workerd runtime, where `node:vm` is unavailable. This is the + * foundational guarantee behind the QuickJS runtime: if this passes, workflow + * orchestrator code can execute on Workers. + * + * IMPORTANT (workerd asset loading): workerd bans runtime WASM compilation + * from bytes (`WebAssembly.compile(bytes)` → "Wasm code generation disallowed + * by embedder"). Core's default asset module base64-decodes the binaries and + * compiles them at runtime — great for Node/Vercel (no filesystem), but it + * does NOT work on workerd. So here we import the `.wasm` / `.so` binaries as + * pre-compiled `WebAssembly.Module`s (wrangler/esbuild compiles them at bundle + * time; `.so` is mapped to CompiledWasm via wrangler `rules`). This is the + * asset-loading path a real Workers deployment must use. The VM serde bundle + * is a plain JS string, so core's generated copy is reused directly. + */ +import { QuickJS } from 'quickjs-wasi'; +// @ts-expect-error - wrangler `rules` maps .so to CompiledWasm +import encodingModule from 'quickjs-wasi/encoding.so'; +// @ts-expect-error - .so → CompiledWasm +import headersModule from 'quickjs-wasi/headers.so'; +// Pre-compiled WebAssembly.Module imports (compiled at bundle time, not runtime). +// @ts-expect-error - wrangler resolves .wasm to a WebAssembly.Module +import quickjsModule from 'quickjs-wasi/quickjs.wasm'; +// @ts-expect-error - .so → CompiledWasm +import structuredCloneModule from 'quickjs-wasi/structured-clone.so'; +// @ts-expect-error - .so → CompiledWasm +import urlModule from 'quickjs-wasi/url.so'; +// @ts-expect-error - generated by `pnpm --filter @workflow/core build` +// eslint-disable-next-line import/no-unresolved +import { VM_SERDE_BUNDLE } from '../../../src/runtime/vm-serde-bundle.generated.js'; + +const FIXED_TIME = 1700000000000; + +// Mirror core's quickjsExtensions, but with pre-compiled Modules. +const extensions = [ + { name: 'encoding', wasm: encodingModule as WebAssembly.Module }, + { name: 'headers', wasm: headersModule as WebAssembly.Module }, + { name: 'url', wasm: urlModule as WebAssembly.Module }, + { + name: 'structured-clone', + wasm: structuredCloneModule as WebAssembly.Module, + }, +]; + +async function runChecks() { + const checks: Record = {}; + const vm = await QuickJS.create({ + wasm: quickjsModule as WebAssembly.Module, + memoryLimit: 256 * 1024 * 1024, + extensions: extensions as any, + }); + try { + checks.create = { ok: true }; + + // Mirror the runtime's VM bootstrap: seeded Math.random, deterministic + // clock + ULID timestamp, then eval the serde bundle. + vm.evalCode( + `globalThis.__ulidTimestamp = ${FIXED_TIME};` + + `globalThis.__currentTime = ${FIXED_TIME};` + + `(function(){var R=Date;function D(){return arguments.length===0?new R(globalThis.__currentTime):new R(...arguments);}D.prototype=R.prototype;Object.setPrototypeOf(D,R);D.now=function(){return globalThis.__currentTime;};globalThis.Date=D;})();` + + `Math.random = function(){ return 0.42; };` + ).dispose(); + vm.evalCode(VM_SERDE_BUNDLE, 'vm-serde.js').dispose(); + checks.serdeBundleEval = { ok: true }; + + // Round-trip structured values entirely in-VM. Exercises the native + // extensions (encoding=TextEncoder, structured-clone) and the + // proposal-arraybuffer-base64 reducers for typed arrays. + { + using h = vm.evalCode( + `(function(){` + + `var ser = globalThis[Symbol.for("workflow-serialize")];` + + `var de = globalThis[Symbol.for("workflow-deserialize")];` + + `var v = { ok:true, n:41+1, when:new Date(0), m:new Map([["a",1]]), bytes:new Uint8Array([1,2,3]) };` + + `var bytes = ser(v);` + + `var back = de(bytes);` + + `return JSON.stringify({` + + ` prefix: new TextDecoder().decode(bytes.subarray(0,4)),` + + ` n: back.n, ok: back.ok, whenMs: back.when.getTime(),` + + ` isDate: back.when instanceof Date,` + + ` isMap: back.m instanceof Map, mA: back.m.get("a"),` + + ` isBytes: back.bytes instanceof Uint8Array, b0: back.bytes[0], b2: back.bytes[2]` + + `});` + + `})()` + ); + checks.serdeRoundTrip = JSON.parse(vm.dump(h) as string); + } + + // Register + invoke a workflow function (mirrors how a compiled bundle + // registers into __private_workflows) and read its deterministic clock. + { + vm.evalCode( + `globalThis.__private_workflows = new Map();` + + `globalThis.__private_workflows.set("wf", function(a,b){ return { sum:a+b, when: Date.now() }; });` + ).dispose(); + using h = vm.evalCode( + `JSON.stringify(globalThis.__private_workflows.get("wf")(2,3))` + ); + checks.workflowInvoke = JSON.parse(vm.dump(h) as string); + } + + // Async workflow + microtask drain (the suspend/complete model). + { + vm.evalCode( + `globalThis.__r = undefined;` + + `Promise.resolve((async function(){ return 7*6; })()).then(function(v){ globalThis.__r = v; });` + ).dispose(); + let b: number; + do { + b = vm.executePendingJobs(); + } while (b > 0); + using h = vm.evalCode('globalThis.__r'); + checks.asyncDrain = { ok: true, value: vm.dump(h) }; + } + } finally { + vm.dispose(); + } + return checks; +} + +function allOk(checks: Record): boolean { + const rt = checks.serdeRoundTrip as Record | undefined; + const wf = checks.workflowInvoke as Record | undefined; + const ad = checks.asyncDrain as Record | undefined; + return ( + !!rt && + rt.prefix === 'devl' && + rt.n === 42 && + rt.ok === true && + rt.whenMs === 0 && + rt.isDate === true && + rt.isMap === true && + rt.mA === 1 && + rt.isBytes === true && + rt.b0 === 1 && + rt.b2 === 3 && + !!wf && + wf.sum === 5 && + wf.when === FIXED_TIME && + !!ad && + ad.value === 42 + ); +} + +export default { + async fetch(): Promise { + try { + const checks = await runChecks(); + const ok = allOk(checks); + return new Response( + JSON.stringify({ ok, runtime: 'workerd', checks }, null, 2), + { + status: ok ? 200 : 500, + headers: { 'content-type': 'application/json' }, + } + ); + } catch (e) { + return new Response( + JSON.stringify({ + ok: false, + error: e instanceof Error ? `${e.message}\n${e.stack}` : String(e), + }), + { status: 500, headers: { 'content-type': 'application/json' } } + ); + } + }, +}; diff --git a/packages/core/test/workerd-smoke/wrangler.jsonc b/packages/core/test/workerd-smoke/wrangler.jsonc new file mode 100644 index 0000000000..de92084a67 --- /dev/null +++ b/packages/core/test/workerd-smoke/wrangler.jsonc @@ -0,0 +1,13 @@ +{ + "$schema": "node_modules/wrangler/config-schema.json", + "name": "workflow-quickjs-workerd-smoke", + "main": "src/worker.ts", + "compatibility_date": "2025-10-01", + "compatibility_flags": ["nodejs_compat"], + // quickjs-wasi ships its native extensions as `.so` WASM shared libraries. + // Map them to CompiledWasm so wrangler compiles them at bundle time and the + // worker imports pre-compiled WebAssembly.Modules (workerd bans runtime + // WASM compilation from bytes). + // fallthrough so wrangler's built-in `.wasm` CompiledWasm rule still applies. + "rules": [{ "type": "CompiledWasm", "globs": ["**/*.so"], "fallthrough": true }] +} diff --git a/packages/core/turbo.json b/packages/core/turbo.json index e503fb6757..aa04cd0e81 100644 --- a/packages/core/turbo.json +++ b/packages/core/turbo.json @@ -3,7 +3,12 @@ "tasks": { "build": { "dependsOn": ["^build"], - "outputs": ["dist", "src/version.ts"] + "outputs": [ + "dist", + "src/version.ts", + "src/runtime/vm-serde-bundle.generated.ts", + "src/runtime/quickjs-assets.generated.ts" + ] } } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 030b242008..07302ef937 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -594,6 +594,9 @@ importers: nanoid: specifier: 5.1.6 version: 5.1.6 + quickjs-wasi: + specifier: 3.0.0 + version: 3.0.0 seedrandom: specifier: 3.0.5 version: 3.0.5 @@ -637,6 +640,9 @@ importers: cross-env: specifier: 10.1.0 version: 10.1.0 + esbuild: + specifier: 'catalog:' + version: 0.28.1 genversion: specifier: 3.2.0 version: 3.2.0 @@ -15197,6 +15203,9 @@ packages: resolution: {integrity: sha512-WuyALRjWPDGtt/wzJiadO5AXY+8hZ80hVpe6MyivgraREW751X3SbhRvG3eLKOYN+8VEvqLcf3wdnt44Z4S4SA==} engines: {node: '>=10'} + quickjs-wasi@3.0.0: + resolution: {integrity: sha512-X7ouKC4ZVf9bXQ8rsE7+L6TeBbesejAJH61x16xRaGAQGfBHHRcniWgzJZZVtHc8rS9yVsY+Tvk8/usAosg4bg==} + radix-ui@1.4.3: resolution: {integrity: sha512-aWizCQiyeAenIdUbqEpXgRA1ya65P13NKn/W8rWkcN0OPkRDxdBVLWnIEDsS2RpwCK2nobI7oMUSmexzTDyAmA==} peerDependencies: @@ -33633,6 +33642,8 @@ snapshots: quick-lru@5.1.1: {} + quickjs-wasi@3.0.0: {} + radix-ui@1.4.3(@types/react-dom@19.1.9(@types/react@19.1.13))(@types/react@19.1.13)(react-dom@19.2.4(react@19.2.4))(react@19.2.4): dependencies: '@radix-ui/primitive': 1.1.3