Skip to content

Commit 9a101b3

Browse files
committed
optional checkpoint delays for dependency waits
1 parent 938c5a1 commit 9a101b3

File tree

1 file changed

+27
-14
lines changed

1 file changed

+27
-14
lines changed

apps/coordinator/src/index.ts

+27-14
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ const TASK_RUN_COMPLETED_WITH_ACK_TIMEOUT_MS =
3535
const TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES =
3636
parseInt(process.env.TASK_RUN_COMPLETED_WITH_ACK_MAX_RETRIES || "") || 7;
3737

38+
const WAIT_FOR_TASK_CHECKPOINT_DELAY_MS =
39+
parseInt(process.env.WAIT_FOR_TASK_CHECKPOINT_DELAY_MS || "") || 0;
40+
const WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS =
41+
parseInt(process.env.WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS || "") || 0;
42+
3843
const logger = new SimpleStructuredLogger("coordinator", undefined, { nodeName: NODE_NAME });
3944
const chaosMonkey = new ChaosMonkey(
4045
!!process.env.CHAOS_MONKEY_ENABLED,
@@ -143,6 +148,7 @@ class TaskCoordinator {
143148
authToken: PLATFORM_SECRET,
144149
logHandlerPayloads: false,
145150
handlers: {
151+
// This is used by resumeAttempt
146152
RESUME_AFTER_DEPENDENCY: async (message) => {
147153
const log = platformLogger.child({
148154
eventName: "RESUME_AFTER_DEPENDENCY",
@@ -168,11 +174,12 @@ class TaskCoordinator {
168174

169175
await chaosMonkey.call();
170176

171-
// In case the task resumed faster than we could checkpoint
177+
// In case the task resumes before the checkpoint is created
172178
this.#cancelCheckpoint(message.runId);
173179

174180
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
175181
},
182+
// This is used by sharedQueueConsumer
176183
RESUME_AFTER_DEPENDENCY_WITH_ACK: async (message) => {
177184
const log = platformLogger.child({
178185
eventName: "RESUME_AFTER_DEPENDENCY_WITH_ACK",
@@ -218,7 +225,7 @@ class TaskCoordinator {
218225

219226
await chaosMonkey.call();
220227

221-
// In case the task resumed faster than we could checkpoint
228+
// In case the task resumes before the checkpoint is created
222229
this.#cancelCheckpoint(message.runId);
223230

224231
taskSocket.emit("RESUME_AFTER_DEPENDENCY", message);
@@ -1096,12 +1103,15 @@ class TaskCoordinator {
10961103
}
10971104
}
10981105

1099-
const checkpoint = await this.#checkpointer.checkpointAndPush({
1100-
runId: socket.data.runId,
1101-
projectRef: socket.data.projectRef,
1102-
deploymentVersion: socket.data.deploymentVersion,
1103-
attemptNumber: getAttemptNumber(),
1104-
});
1106+
const checkpoint = await this.#checkpointer.checkpointAndPush(
1107+
{
1108+
runId: socket.data.runId,
1109+
projectRef: socket.data.projectRef,
1110+
deploymentVersion: socket.data.deploymentVersion,
1111+
attemptNumber: getAttemptNumber(),
1112+
},
1113+
WAIT_FOR_TASK_CHECKPOINT_DELAY_MS
1114+
);
11051115

11061116
if (!checkpoint) {
11071117
log.error("Failed to checkpoint");
@@ -1189,12 +1199,15 @@ class TaskCoordinator {
11891199
}
11901200
}
11911201

1192-
const checkpoint = await this.#checkpointer.checkpointAndPush({
1193-
runId: socket.data.runId,
1194-
projectRef: socket.data.projectRef,
1195-
deploymentVersion: socket.data.deploymentVersion,
1196-
attemptNumber: getAttemptNumber(),
1197-
});
1202+
const checkpoint = await this.#checkpointer.checkpointAndPush(
1203+
{
1204+
runId: socket.data.runId,
1205+
projectRef: socket.data.projectRef,
1206+
deploymentVersion: socket.data.deploymentVersion,
1207+
attemptNumber: getAttemptNumber(),
1208+
},
1209+
WAIT_FOR_BATCH_CHECKPOINT_DELAY_MS
1210+
);
11981211

11991212
if (!checkpoint) {
12001213
log.error("Failed to checkpoint");

0 commit comments

Comments
 (0)