Skip to content

Commit 261dce1

Browse files
authored
v4: fix batchTriggerAndWait completion issues by processing batch chunks sequentially (#2152)
* v4: fix batchTriggerAndWait completion issues by processing batch chunks sequentially * Remove unnecessary test
1 parent 575413c commit 261dce1

File tree

3 files changed

+51
-42
lines changed

3 files changed

+51
-42
lines changed

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
5858
) {
5959
super({ prisma });
6060

61-
this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel";
61+
// Eric note: We need to force sequential processing because when doing parallel, we end up with high-contention on the parent run lock
62+
// becuase we are triggering a lot of runs at once, and each one is trying to lock the parent run.
63+
// by forcing sequential, we are only ever locking the parent run for a single run at a time.
64+
this._batchProcessingStrategy = "sequential";
6265
}
6366

6467
public async call(
@@ -316,6 +319,14 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
316319
}
317320
}
318321

322+
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
323+
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
324+
tx,
325+
jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
326+
});
327+
}
328+
329+
// This is the function that the worker will call
319330
async processBatchTaskRun(options: BatchProcessingOptions) {
320331
logger.debug("[RunEngineBatchTrigger][processBatchTaskRun] Processing batch", {
321332
options,
@@ -648,13 +659,6 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
648659
: undefined;
649660
}
650661

651-
async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
652-
await workerQueue.enqueue("runengine.processBatchTaskRun", options, {
653-
tx,
654-
jobKey: `RunEngineBatchTriggerService.process:${options.batchId}:${options.processingId}`,
655-
});
656-
}
657-
658662
async #handlePayloadPacket(
659663
payload: any,
660664
pathPrefix: string,

internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -680,40 +680,6 @@ describe("ReleaseConcurrencyQueue", () => {
680680
}
681681
});
682682

683-
redisTest(
684-
"Should retrieve metrics for all queues via getQueueMetrics",
685-
async ({ redisContainer }) => {
686-
const { queue } = createReleaseConcurrencyQueue(redisContainer, 1);
687-
688-
// Set up multiple queues with different states
689-
await queue.attemptToRelease({ name: "metrics-queue1" }, "run1"); // Consume 1 token from queue1
690-
691-
// Add more items to queue1 that will be queued due to no tokens
692-
await queue.attemptToRelease({ name: "metrics-queue1" }, "run2"); // This will be queued
693-
await queue.attemptToRelease({ name: "metrics-queue1" }, "run3"); // This will be queued
694-
await queue.attemptToRelease({ name: "metrics-queue1" }, "run4"); // This will be queued
695-
696-
const metrics = await queue.getQueueMetrics();
697-
698-
expect(metrics).toHaveLength(1);
699-
expect(metrics[0].releaseQueue).toBe("metrics-queue1");
700-
expect(metrics[0].currentTokens).toBe(0);
701-
expect(metrics[0].queueLength).toBe(3);
702-
703-
// Now add 10 items to 100 different queues
704-
for (let i = 0; i < 100; i++) {
705-
for (let j = 0; j < 10; j++) {
706-
await queue.attemptToRelease({ name: `metrics-queue2-${i}` }, `run${i}-${j}`);
707-
}
708-
}
709-
710-
const metrics2 = await queue.getQueueMetrics();
711-
expect(metrics2.length).toBeGreaterThan(90);
712-
713-
await queue.quit();
714-
}
715-
);
716-
717683
redisTest(
718684
"refillTokenIfInReleasings should refill token when releaserId is in the releasings set",
719685
async ({ redisContainer }) => {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { task } from "@trigger.dev/sdk/v3";
2+
import { setTimeout } from "timers/promises";
3+
4+
export const batchTriggerAndWait = task({
5+
id: "batch-trigger-and-wait",
6+
maxDuration: 60,
7+
run: async (payload: { count: number }, { ctx }) => {
8+
const payloads = Array.from({ length: payload.count }, (_, i) => ({
9+
payload: { waitSeconds: 1, output: `test${i}` },
10+
}));
11+
12+
// First batch triggerAndWait with idempotency keys
13+
const firstResults = await fixedLengthTask.batchTriggerAndWait(payloads);
14+
},
15+
});
16+
17+
type Payload = {
18+
waitSeconds: number;
19+
error?: string;
20+
output?: any;
21+
};
22+
23+
export const fixedLengthTask = task({
24+
id: "fixed-length-lask",
25+
retry: {
26+
maxAttempts: 2,
27+
maxTimeoutInMs: 100,
28+
},
29+
machine: "micro",
30+
run: async ({ waitSeconds = 1, error, output }: Payload) => {
31+
await setTimeout(waitSeconds * 1000);
32+
33+
if (error) {
34+
throw new Error(error);
35+
}
36+
37+
return output;
38+
},
39+
});

0 commit comments

Comments
 (0)