Skip to content

Commit 56d2297

Browse files
committed
Added parallel wait prevention, it’s working for duration waits but not well for triggerAndWait yet
1 parent 2bb3db4 commit 56d2297

File tree

4 files changed

+54
-43
lines changed

4 files changed

+54
-43
lines changed

internal-packages/run-engine/src/engine/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -2857,7 +2857,7 @@ export class RunEngine {
28572857
const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
28582858

28592859
const permanentlyFailRun = async (run?: { status: TaskRunStatus; spanId: string }) => {
2860-
// Emit and event so we can complete any spans of stalled executions
2860+
// Emit an event so we can complete any spans of stalled executions
28612861
if (forceRequeue && run) {
28622862
this.eventBus.emit("runAttemptFailed", {
28632863
time: failedAt,

packages/core/src/v3/runtime/managedRuntimeManager.ts

+42-33
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
} from "../schemas/index.js";
1111
import { ExecutorToWorkerProcessConnection } from "../zodIpc.js";
1212
import { RuntimeManager } from "./manager.js";
13+
import { preventMultipleWaits } from "./preventMultipleWaits.js";
1314

1415
type Resolver = (value: CompletedWaitpoint) => void;
1516

@@ -19,6 +20,8 @@ export class ManagedRuntimeManager implements RuntimeManager {
1920
// Maps a waitpoint ID to a wait ID
2021
private readonly resolversByWaitpoint: Map<string, string> = new Map();
2122

23+
private _preventMultipleWaits = preventMultipleWaits();
24+
2225
constructor(
2326
private ipc: ExecutorToWorkerProcessConnection,
2427
private showLogs: boolean
@@ -36,40 +39,44 @@ export class ManagedRuntimeManager implements RuntimeManager {
3639
}
3740

3841
async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise<TaskRunExecutionResult> {
39-
const promise = new Promise<CompletedWaitpoint>((resolve) => {
40-
this.resolversByWaitId.set(params.id, resolve);
41-
});
42+
return this._preventMultipleWaits(async () => {
43+
const promise = new Promise<CompletedWaitpoint>((resolve) => {
44+
this.resolversByWaitId.set(params.id, resolve);
45+
});
4246

43-
const waitpoint = await promise;
44-
const result = this.waitpointToTaskRunExecutionResult(waitpoint);
47+
const waitpoint = await promise;
48+
const result = this.waitpointToTaskRunExecutionResult(waitpoint);
4549

46-
return result;
50+
return result;
51+
});
4752
}
4853

4954
async waitForBatch(params: {
5055
id: string;
5156
runCount: number;
5257
ctx: TaskRunContext;
5358
}): Promise<BatchTaskRunExecutionResult> {
54-
if (!params.runCount) {
55-
return Promise.resolve({ id: params.id, items: [] });
56-
}
59+
return this._preventMultipleWaits(async () => {
60+
if (!params.runCount) {
61+
return Promise.resolve({ id: params.id, items: [] });
62+
}
63+
64+
const promise = Promise.all(
65+
Array.from({ length: params.runCount }, (_, index) => {
66+
const resolverId = `${params.id}_${index}`;
67+
return new Promise<CompletedWaitpoint>((resolve, reject) => {
68+
this.resolversByWaitId.set(resolverId, resolve);
69+
});
70+
})
71+
);
72+
73+
const waitpoints = await promise;
5774

58-
const promise = Promise.all(
59-
Array.from({ length: params.runCount }, (_, index) => {
60-
const resolverId = `${params.id}_${index}`;
61-
return new Promise<CompletedWaitpoint>((resolve, reject) => {
62-
this.resolversByWaitId.set(resolverId, resolve);
63-
});
64-
})
65-
);
66-
67-
const waitpoints = await promise;
68-
69-
return {
70-
id: params.id,
71-
items: waitpoints.map(this.waitpointToTaskRunExecutionResult),
72-
};
75+
return {
76+
id: params.id,
77+
items: waitpoints.map(this.waitpointToTaskRunExecutionResult),
78+
};
79+
});
7380
}
7481

7582
async waitForWaitpoint({
@@ -79,17 +86,19 @@ export class ManagedRuntimeManager implements RuntimeManager {
7986
waitpointFriendlyId: string;
8087
finishDate?: Date;
8188
}): Promise<WaitpointTokenResult> {
82-
const promise = new Promise<CompletedWaitpoint>((resolve) => {
83-
this.resolversByWaitId.set(waitpointFriendlyId, resolve);
84-
});
89+
return this._preventMultipleWaits(async () => {
90+
const promise = new Promise<CompletedWaitpoint>((resolve) => {
91+
this.resolversByWaitId.set(waitpointFriendlyId, resolve);
92+
});
8593

86-
const waitpoint = await promise;
94+
const waitpoint = await promise;
8795

88-
return {
89-
ok: !waitpoint.outputIsError,
90-
output: waitpoint.output,
91-
outputType: waitpoint.outputType,
92-
};
96+
return {
97+
ok: !waitpoint.outputIsError,
98+
output: waitpoint.output,
99+
outputType: waitpoint.outputType,
100+
};
101+
});
93102
}
94103

95104
associateWaitWithWaitpoint(waitId: string, waitpointId: string) {

packages/core/src/v3/workers/taskExecutor.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -540,17 +540,17 @@ export class TaskExecutor {
540540
return { status: "noop" };
541541
}
542542

543+
if (isInternalError(error) && error.skipRetrying) {
544+
return { status: "skipped", error };
545+
}
546+
543547
if (
544548
error instanceof Error &&
545549
(error.name === "AbortTaskRunError" || error.name === "TaskPayloadParsedError")
546550
) {
547551
return { status: "skipped" };
548552
}
549553

550-
if (isInternalError(error) && error.skipRetrying) {
551-
return { status: "skipped", error };
552-
}
553-
554554
if (execution.run.maxAttempts) {
555555
retry.maxAttempts = Math.max(execution.run.maxAttempts, 1);
556556
}

references/hello-world/src/trigger/parallel-waits.ts

+7-5
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ import { childTask } from "./example.js";
66
*/
77
export const parallelWaits = task({
88
id: "parallel-waits",
9-
run: async (payload: any, { ctx }) => {
9+
run: async ({ skipDuration = false }: { skipDuration?: boolean }) => {
1010
//parallel wait for 5/10 seconds
11-
await Promise.all([
12-
wait.for({ seconds: 5 }),
13-
wait.until({ date: new Date(Date.now() + 10_000) }),
14-
]);
11+
if (!skipDuration) {
12+
await Promise.all([
13+
wait.for({ seconds: 5 }),
14+
wait.until({ date: new Date(Date.now() + 10_000) }),
15+
]);
16+
}
1517

1618
//parallel task call
1719
await Promise.all([

0 commit comments

Comments
 (0)