Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-concurrent-wait-completed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-local': patch
---

Fix concurrent `wait_completed` race condition that caused duplicate events and `Unconsumed event` errors during replay
6 changes: 6 additions & 0 deletions .changeset/fix-wait-completed-event-refetch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@workflow/core': patch
'workflow': patch
---

Re-fetch event log on `wait_completed` 409 conflict to ensure correct event ordering
15 changes: 13 additions & 2 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ export function workflowEntrypoint(
}

// Load all events into memory before running
const events = await getAllWorkflowRunEvents(workflowRun.runId);
let events = await getAllWorkflowRunEvents(workflowRun.runId);

// Check for any elapsed waits and create wait_completed events
const now = Date.now();
Expand All @@ -245,7 +245,11 @@ export function workflowEntrypoint(
correlationId: e.correlationId,
}));

// Create all wait_completed events
// Create all wait_completed events.
// If any creation returns 409, a concurrent invocation already
// created the event. Re-fetch the full event log to get the
// authoritative ordering rather than appending locally.
let needsRefetch = false;
for (const waitEvent of waitsToComplete) {
try {
const result = await world.events.create(runId, waitEvent);
Expand All @@ -257,12 +261,19 @@ export function workflowEntrypoint(
workflowRunId: runId,
correlationId: waitEvent.correlationId,
});
needsRefetch = true;
continue;
}
throw err;
}
}

// Re-fetch the event log if a concurrent invocation created
// events we don't have locally, ensuring correct ordering.
if (needsRefetch) {
events = await getAllWorkflowRunEvents(workflowRun.runId);
}

// Resolve the encryption key for this run's deployment
const rawKey =
await world.getEncryptionKeyForRun?.(workflowRun);
Expand Down
98 changes: 98 additions & 0 deletions packages/world-local/src/storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { writeJSON } from './fs.js';
import { createStorage } from './storage.js';
import {
completeWait,
createHook,
createRun,
createStep,
createWait,
disposeHook,
updateRun,
updateStep,
Expand Down Expand Up @@ -2544,6 +2546,102 @@ describe('Storage', () => {
});
});

describe('waits', () => {
it('should create and complete a wait', async () => {
const run = await createRun(storage, {
deploymentId: 'dep-1',
workflowName: 'wf-1',
input: new Uint8Array(),
});
await updateRun(storage, run.runId, 'run_started');

const wait = await createWait(storage, run.runId, {
waitId: 'wait_001',
resumeAt: new Date('2099-01-01'),
});
expect(wait.status).toBe('waiting');

const completed = await completeWait(storage, run.runId, 'wait_001');
expect(completed.status).toBe('completed');
});

it('should reject duplicate wait_created for the same correlationId', async () => {
const run = await createRun(storage, {
deploymentId: 'dep-1',
workflowName: 'wf-1',
input: new Uint8Array(),
});
await updateRun(storage, run.runId, 'run_started');

await createWait(storage, run.runId, {
waitId: 'wait_dup',
resumeAt: new Date('2099-01-01'),
});

await expect(
createWait(storage, run.runId, {
waitId: 'wait_dup',
resumeAt: new Date('2099-01-01'),
})
).rejects.toThrow(/already exists/i);
});

it('should reject duplicate wait_completed for the same correlationId', async () => {
const run = await createRun(storage, {
deploymentId: 'dep-1',
workflowName: 'wf-1',
input: new Uint8Array(),
});
await updateRun(storage, run.runId, 'run_started');

await createWait(storage, run.runId, {
waitId: 'wait_once',
resumeAt: new Date('2099-01-01'),
});

await completeWait(storage, run.runId, 'wait_once');

// Second completion should be rejected with 409
await expect(
completeWait(storage, run.runId, 'wait_once')
).rejects.toThrow(/already completed/i);
});

it('should reject concurrent wait_completed for the same correlationId', async () => {
const run = await createRun(storage, {
deploymentId: 'dep-1',
workflowName: 'wf-1',
input: new Uint8Array(),
});
await updateRun(storage, run.runId, 'run_started');

await createWait(storage, run.runId, {
waitId: 'wait_race',
resumeAt: new Date('2099-01-01'),
});

// Simulate two concurrent completions racing
const results = await Promise.allSettled([
completeWait(storage, run.runId, 'wait_race'),
completeWait(storage, run.runId, 'wait_race'),
]);

const fulfilled = results.filter((r) => r.status === 'fulfilled');
const rejected = results.filter((r) => r.status === 'rejected');

// Exactly one should succeed, one should fail with 409
expect(fulfilled).toHaveLength(1);
expect(rejected).toHaveLength(1);
expect((rejected[0] as PromiseRejectedResult).reason).toBeInstanceOf(
WorkflowAPIError
);
expect(
((rejected[0] as PromiseRejectedResult).reason as WorkflowAPIError)
.status
).toBe(409);
});
});

describe('custom runId validation', () => {
const runCreatedEvent = {
eventType: 'run_created' as const,
Expand Down
23 changes: 16 additions & 7 deletions packages/world-local/src/storage/events-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -718,8 +718,23 @@ export function createEventsStorage(
wait
);
} else if (data.eventType === 'wait_completed') {
// wait_completed: Transitions wait to 'completed', rejects duplicates
// wait_completed: Transitions wait to 'completed', rejects duplicates.
// Uses writeExclusive on a lock file to atomically prevent concurrent
// invocations from both completing the same wait (TOCTOU race).
const waitCompositeKey = `${effectiveRunId}-${data.correlationId}`;
const lockPath = taggedPath(
basedir,
'waits',
`${waitCompositeKey}.completed`,
tag
Comment on lines +725 to +729
);
const claimed = await writeExclusive(lockPath, '');
if (!claimed) {
throw new WorkflowAPIError(
`Wait "${data.correlationId}" already completed`,
{ status: 409 }
);
}
const existingWait = await readJSONWithFallback(
basedir,
'waits',
Expand All @@ -732,12 +747,6 @@ export function createEventsStorage(
status: 404,
});
}
Comment on lines 731 to 749
if (existingWait.status === 'completed') {
throw new WorkflowAPIError(
`Wait "${data.correlationId}" already completed`,
{ status: 409 }
);
}
wait = {
...existingWait,
status: 'completed',
Expand Down
43 changes: 43 additions & 0 deletions packages/world-local/src/test-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type {
SerializedData,
Step,
Storage,
Wait,
WorkflowRun,
} from '@workflow/world';
import { SPEC_VERSION_CURRENT } from '@workflow/world';
Expand Down Expand Up @@ -139,3 +140,45 @@ export async function disposeHook(
correlationId: hookId,
});
}

/**
* Create a new wait through the wait_created event.
*/
export async function createWait(
storage: Storage,
runId: string,
data: {
waitId: string;
resumeAt: Date;
}
): Promise<Wait> {
const result = await storage.events.create(runId, {
eventType: 'wait_created',
specVersion: SPEC_VERSION_CURRENT,
correlationId: data.waitId,
eventData: { resumeAt: data.resumeAt },
});
if (!result.wait) {
throw new Error('Expected wait to be created');
}
return result.wait;
}

/**
* Complete a wait through the wait_completed event.
*/
export async function completeWait(
storage: Storage,
runId: string,
waitId: string
): Promise<Wait> {
const result = await storage.events.create(runId, {
eventType: 'wait_completed',
specVersion: SPEC_VERSION_CURRENT,
correlationId: waitId,
});
if (!result.wait) {
throw new Error('Expected wait to be completed');
}
return result.wait;
}
Loading