Skip to content

Commit 8f3f373

Browse files
authored
Improve reliability of completion submission (#1711)
* add timeout support to sendWithAck * coordinator will retry completion submission * actually retry * increase default retries * something went wrong there, add this back in * add changeset
1 parent 36fea7d commit 8f3f373

File tree

5 files changed

+147
-16
lines changed

5 files changed

+147
-16
lines changed

.changeset/witty-jars-approve.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
- Add new run completion submission message with ack
6+
- Add timeout support to sendWithAck

apps/coordinator/src/index.ts

+84-14
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
} from "@trigger.dev/core/v3";
1212
import { ZodNamespace } from "@trigger.dev/core/v3/zodNamespace";
1313
import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket";
14-
import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
14+
import { ExponentialBackoff, HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
1515
import { ChaosMonkey } from "./chaosMonkey";
1616
import { Checkpointer } from "./checkpointer";
1717
import { boolFromEnv, numFromEnv, safeJsonParse } from "./util";
@@ -30,6 +30,11 @@ const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030;
3030
const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret";
3131
const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false");
3232

33+
const TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS =
34+
parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS || "") || 30_000;
35+
const TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES =
36+
parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 7;
37+
3338
const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME });
3439
const chaosMonkey = new ChaosMonkey(
3540
!!process.env.CHAOS_MONKEY_ENABLED,
@@ -720,37 +725,102 @@ class TaskCoordinator {
720725

721726
await chaosMonkey.call({ throwErrors: false });
722727

723-
const completeWithoutCheckpoint = (shouldExit: boolean) => {
728+
const sendCompletionWithAck = async (): Promise<boolean> => {
729+
try {
730+
const response = await this.#platformSocket?.sendWithAck(
731+
"TASK_RUN_COMPLETED_WITH_ACK",
732+
{
733+
version: "v2",
734+
execution,
735+
completion,
736+
},
737+
TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS
738+
);
739+
740+
if (!response) {
741+
log.error("TASK_RUN_COMPLETED_WITH_ACK: no response");
742+
return false;
743+
}
744+
745+
if (!response.success) {
746+
log.error("TASK_RUN_COMPLETED_WITH_ACK: error response", {
747+
error: response.error,
748+
});
749+
return false;
750+
}
751+
752+
log.log("TASK_RUN_COMPLETED_WITH_ACK: successful response");
753+
return true;
754+
} catch (error) {
755+
log.error("TASK_RUN_COMPLETED_WITH_ACK: threw error", { error });
756+
return false;
757+
}
758+
};
759+
760+
const completeWithoutCheckpoint = async (shouldExit: boolean) => {
724761
const supportsRetryCheckpoints = message.version === "v1";
725762

726-
this.#platformSocket?.send("TASK_RUN_COMPLETED", {
727-
version: supportsRetryCheckpoints ? "v1" : "v2",
728-
execution,
729-
completion,
730-
});
731763
callback({ willCheckpointAndRestore: false, shouldExit });
764+
765+
if (supportsRetryCheckpoints) {
766+
// This is only here for backwards compat
767+
this.#platformSocket?.send("TASK_RUN_COMPLETED", {
768+
version: "v1",
769+
execution,
770+
completion,
771+
});
772+
} else {
773+
// 99.99% of runs should end up here
774+
775+
const completedWithAckBackoff = new ExponentialBackoff("FullJitter").maxRetries(
776+
TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES
777+
);
778+
779+
const result = await completedWithAckBackoff.execute(
780+
async ({ retry, delay, elapsedMs }) => {
781+
logger.log("TASK_RUN_COMPLETED_WITH_ACK: sending with backoff", {
782+
retry,
783+
delay,
784+
elapsedMs,
785+
});
786+
787+
const success = await sendCompletionWithAck();
788+
789+
if (!success) {
790+
throw new Error("Failed to send completion with ack");
791+
}
792+
}
793+
);
794+
795+
if (!result.success) {
796+
logger.error("TASK_RUN_COMPLETED_WITH_ACK: failed to send with backoff", result);
797+
return;
798+
}
799+
800+
logger.log("TASK_RUN_COMPLETED_WITH_ACK: sent with backoff", result);
801+
}
732802
};
733803

734804
if (completion.ok) {
735-
completeWithoutCheckpoint(true);
805+
await completeWithoutCheckpoint(true);
736806
return;
737807
}
738808

739809
if (
740810
completion.error.type === "INTERNAL_ERROR" &&
741811
completion.error.code === "TASK_RUN_CANCELLED"
742812
) {
743-
completeWithoutCheckpoint(true);
813+
await completeWithoutCheckpoint(true);
744814
return;
745815
}
746816

747817
if (completion.retry === undefined) {
748-
completeWithoutCheckpoint(true);
818+
await completeWithoutCheckpoint(true);
749819
return;
750820
}
751821

752822
if (completion.retry.delay < this.#delayThresholdInMs) {
753-
completeWithoutCheckpoint(false);
823+
await completeWithoutCheckpoint(false);
754824

755825
// Prevents runs that fail fast from never sending a heartbeat
756826
this.#sendRunHeartbeat(socket.data.runId);
@@ -759,7 +829,7 @@ class TaskCoordinator {
759829
}
760830

761831
if (message.version === "v2") {
762-
completeWithoutCheckpoint(true);
832+
await completeWithoutCheckpoint(true);
763833
return;
764834
}
765835

@@ -768,7 +838,7 @@ class TaskCoordinator {
768838
const willCheckpointAndRestore = canCheckpoint || willSimulate;
769839

770840
if (!willCheckpointAndRestore) {
771-
completeWithoutCheckpoint(false);
841+
await completeWithoutCheckpoint(false);
772842
return;
773843
}
774844

@@ -792,7 +862,7 @@ class TaskCoordinator {
792862

793863
if (!checkpoint) {
794864
log.error("Failed to checkpoint");
795-
completeWithoutCheckpoint(false);
865+
await completeWithoutCheckpoint(false);
796866
return;
797867
}
798868

apps/webapp/app/v3/handleSocketIo.server.ts

+38
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,44 @@ function createCoordinatorNamespace(io: Server) {
136136
checkpoint: message.checkpoint,
137137
});
138138
},
139+
TASK_RUN_COMPLETED_WITH_ACK: async (message) => {
140+
try {
141+
const completeAttempt = new CompleteAttemptService({
142+
supportsRetryCheckpoints: message.version === "v1",
143+
});
144+
await completeAttempt.call({
145+
completion: message.completion,
146+
execution: message.execution,
147+
checkpoint: message.checkpoint,
148+
});
149+
150+
return {
151+
success: true,
152+
};
153+
} catch (error) {
154+
const friendlyError =
155+
error instanceof Error
156+
? {
157+
name: error.name,
158+
message: error.message,
159+
stack: error.stack,
160+
}
161+
: {
162+
name: "UnknownError",
163+
message: String(error),
164+
};
165+
166+
logger.error("Error while completing attempt with ack", {
167+
error: friendlyError,
168+
message,
169+
});
170+
171+
return {
172+
success: false,
173+
error: friendlyError,
174+
};
175+
}
176+
},
139177
TASK_RUN_FAILED_TO_RUN: async (message) => {
140178
await sharedQueueTasks.taskRunFailed(message.completion);
141179
},

packages/core/src/v3/schemas/messages.ts

+14
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,20 @@ export const CoordinatorToPlatformMessages = {
435435
.optional(),
436436
}),
437437
},
438+
TASK_RUN_COMPLETED_WITH_ACK: {
439+
message: z.object({
440+
version: z.enum(["v1", "v2"]).default("v2"),
441+
execution: ProdTaskRunExecution,
442+
completion: TaskRunExecutionResult,
443+
checkpoint: z
444+
.object({
445+
docker: z.boolean(),
446+
location: z.string(),
447+
})
448+
.optional(),
449+
}),
450+
callback: AckCallbackResult,
451+
},
438452
TASK_RUN_FAILED_TO_RUN: {
439453
message: z.object({
440454
version: z.literal("v1").default("v1"),

packages/core/src/v3/zodSocket.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,8 @@ export class ZodSocketMessageSender<TMessageCatalog extends ZodSocketMessageCata
293293

294294
public async sendWithAck<K extends GetSocketMessagesWithCallback<TMessageCatalog>>(
295295
type: K,
296-
payload: z.input<GetSocketMessageSchema<TMessageCatalog, K>>
296+
payload: z.input<GetSocketMessageSchema<TMessageCatalog, K>>,
297+
timeout?: number
297298
): Promise<z.infer<GetSocketCallbackSchema<TMessageCatalog, K>>> {
298299
const schema = this.#schema[type]?.["message"];
299300

@@ -307,8 +308,10 @@ export class ZodSocketMessageSender<TMessageCatalog extends ZodSocketMessageCata
307308
throw new Error(`Failed to parse message payload: ${JSON.stringify(parsedPayload.error)}`);
308309
}
309310

311+
const socket = timeout ? this.#socket.timeout(timeout) : this.#socket;
312+
310313
// @ts-expect-error
311-
const callbackResult = await this.#socket.emitWithAck(type, { payload, version: "v1" });
314+
const callbackResult = await socket.emitWithAck(type, { payload, version: "v1" });
312315

313316
return callbackResult;
314317
}

0 commit comments

Comments
 (0)