Skip to content

Commit dcacc3a

Browse files
authored
Fixes for v4 waits and restores (#1868)
* remove dummy metrics from heartbeat * fix heartbeat timeouts.. * reset clock when resolving waitpoints * optionally set metadata url when scheduling run * don't use global vitest as it's outdated * don't run test files in parallel * handle special graceful shutdown code * exit codes for success and failure can now be set and overridden * ensure immediate cleanup in tests * update lockfile after conflict
1 parent 6d819c6 commit dcacc3a

File tree

14 files changed

+184
-316
lines changed

14 files changed

+184
-316
lines changed

Diff for: apps/supervisor/package.json

+4-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
"build": "tsc",
99
"dev": "tsx --require dotenv/config --watch src/index.ts || (echo '!! Remember to run: nvm use'; exit 1)",
1010
"start": "node dist/index.js",
11-
"test:watch": "vitest",
11+
"test:run": "vitest --no-file-parallelism --run",
12+
"test:watch": "vitest --no-file-parallelism",
1213
"typecheck": "tsc --noEmit"
1314
},
1415
"dependencies": {
@@ -24,6 +25,7 @@
2425
},
2526
"devDependencies": {
2627
"@types/dockerode": "^3.3.33",
27-
"docker-api-ts": "^0.2.2"
28+
"docker-api-ts": "^0.2.2",
29+
"vitest": "^1.4.0"
2830
}
2931
}

Diff for: apps/supervisor/src/env.ts

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ const Env = z.object({
3535
// Optional services
3636
TRIGGER_WARM_START_URL: z.string().optional(),
3737
TRIGGER_CHECKPOINT_URL: z.string().optional(),
38+
TRIGGER_METADATA_URL: z.string().optional(),
3839

3940
// Used by the workload manager, e.g docker/k8s
4041
DOCKER_NETWORK: z.string().default("host"),

Diff for: apps/supervisor/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class ManagedSupervisor {
6161
workloadApiDomain: env.TRIGGER_WORKLOAD_API_DOMAIN,
6262
workloadApiPort: env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL,
6363
warmStartUrl: this.warmStartUrl,
64+
metadataUrl: env.TRIGGER_METADATA_URL,
6465
imagePullSecrets: env.KUBERNETES_IMAGE_PULL_SECRETS?.split(","),
6566
heartbeatIntervalSeconds: env.RUNNER_HEARTBEAT_INTERVAL_SECONDS,
6667
snapshotPollIntervalSeconds: env.RUNNER_SNAPSHOT_POLL_INTERVAL_SECONDS,

Diff for: apps/supervisor/src/services/failedPodHandler.test.ts

+105-2
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,102 @@ describe("FailedPodHandler Integration Tests", () => {
314314
await handler.stop();
315315
}
316316
}, 60000);
317+
318+
it("should handle graceful shutdown pods differently", async () => {
319+
const handler = new FailedPodHandler({ namespace, k8s, register });
320+
321+
try {
322+
// Create first batch of pods before starting handler
323+
const firstBatchPodNames = await createTestPods({
324+
k8sApi: k8s,
325+
namespace,
326+
count: 2,
327+
exitCode: FailedPodHandler.GRACEFUL_SHUTDOWN_EXIT_CODE,
328+
});
329+
330+
// Wait for pods to reach Failed state
331+
await waitForPodsPhase({
332+
k8sApi: k8s,
333+
namespace,
334+
podNames: firstBatchPodNames,
335+
phase: "Failed",
336+
});
337+
338+
// Start the handler
339+
await handler.start();
340+
341+
// Wait for first batch to be deleted
342+
await waitForPodsDeletion({
343+
k8sApi: k8s,
344+
namespace,
345+
podNames: firstBatchPodNames,
346+
});
347+
348+
// Create second batch of pods after handler is running
349+
const secondBatchPodNames = await createTestPods({
350+
k8sApi: k8s,
351+
namespace,
352+
count: 3,
353+
exitCode: FailedPodHandler.GRACEFUL_SHUTDOWN_EXIT_CODE,
354+
});
355+
356+
// Wait for second batch to be deleted
357+
await waitForPodsDeletion({
358+
k8sApi: k8s,
359+
namespace,
360+
podNames: secondBatchPodNames,
361+
});
362+
363+
// Verify metrics
364+
const metrics = handler.getMetrics();
365+
366+
// Check informer events were recorded for both batches
367+
const informerEvents = await metrics.informerEventsTotal.get();
368+
expect(informerEvents.values).toContainEqual(
369+
expect.objectContaining({
370+
labels: expect.objectContaining({
371+
namespace,
372+
verb: "add",
373+
}),
374+
value: 5, // 2 from first batch + 3 from second batch
375+
})
376+
);
377+
378+
// Check pods were processed as graceful shutdowns
379+
const processedPods = await metrics.processedPodsTotal.get();
380+
381+
// Should not be marked as Failed
382+
const failedPods = processedPods.values.find(
383+
(v) => v.labels.namespace === namespace && v.labels.status === "Failed"
384+
);
385+
expect(failedPods).toBeUndefined();
386+
387+
// Should be marked as GracefulShutdown
388+
const gracefulShutdowns = processedPods.values.find(
389+
(v) => v.labels.namespace === namespace && v.labels.status === "GracefulShutdown"
390+
);
391+
expect(gracefulShutdowns).toBeDefined();
392+
expect(gracefulShutdowns?.value).toBe(5); // Total from both batches
393+
394+
// Check pods were still deleted
395+
const deletedPods = await metrics.deletedPodsTotal.get();
396+
expect(deletedPods.values).toContainEqual(
397+
expect.objectContaining({
398+
labels: expect.objectContaining({
399+
namespace,
400+
status: "Failed",
401+
}),
402+
value: 5, // Total from both batches
403+
})
404+
);
405+
406+
// Check no deletion errors were recorded
407+
const deletionErrors = await metrics.deletionErrorsTotal.get();
408+
expect(deletionErrors.values).toHaveLength(0);
409+
} finally {
410+
await handler.stop();
411+
}
412+
}, 30000);
317413
});
318414

319415
async function createTestPods({
@@ -325,6 +421,7 @@ async function createTestPods({
325421
namePrefix = "test-pod",
326422
command = ["/bin/sh", "-c", shouldFail ? "exit 1" : "exit 0"],
327423
randomizeName = true,
424+
exitCode,
328425
}: {
329426
k8sApi: K8sApi;
330427
namespace: string;
@@ -334,9 +431,15 @@ async function createTestPods({
334431
namePrefix?: string;
335432
command?: string[];
336433
randomizeName?: boolean;
434+
exitCode?: number;
337435
}) {
338436
const createdPods: string[] = [];
339437

438+
// If exitCode is specified, override the command
439+
if (exitCode !== undefined) {
440+
command = ["/bin/sh", "-c", `exit ${exitCode}`];
441+
}
442+
340443
for (let i = 0; i < count; i++) {
341444
const podName = randomizeName
342445
? `${namePrefix}-${i}-${Math.random().toString(36).substring(2, 15)}`
@@ -352,7 +455,7 @@ async function createTestPods({
352455
restartPolicy: "Never",
353456
containers: [
354457
{
355-
name: "test",
458+
name: "run-controller", // Changed to match the name we check in failedPodHandler
356459
image: "busybox:1.37.0",
357460
command,
358461
},
@@ -470,7 +573,7 @@ async function deleteAllPodsInNamespace({
470573
const podNames = pods.items.map((p) => p.metadata?.name ?? "");
471574

472575
// Delete all pods
473-
await k8sApi.core.deleteCollectionNamespacedPod({ namespace });
576+
await k8sApi.core.deleteCollectionNamespacedPod({ namespace, gracePeriodSeconds: 0 });
474577

475578
// Wait for all pods to be deleted
476579
await waitForPodsDeletion({ k8sApi, namespace, podNames });

Diff for: apps/supervisor/src/services/failedPodHandler.ts

+18-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { Counter, Registry, Histogram } from "prom-client";
66
import { register } from "../metrics.js";
77
import { setTimeout } from "timers/promises";
88

9-
type PodStatus = "Pending" | "Running" | "Succeeded" | "Failed" | "Unknown";
9+
type PodStatus = "Pending" | "Running" | "Succeeded" | "Failed" | "Unknown" | "GracefulShutdown";
1010

1111
export type FailedPodHandlerOptions = {
1212
namespace: string;
@@ -34,6 +34,8 @@ export class FailedPodHandler {
3434
private readonly processingDurationSeconds: Histogram<string>;
3535
private readonly informerEventsTotal: Counter;
3636

37+
static readonly GRACEFUL_SHUTDOWN_EXIT_CODE = 200;
38+
3739
constructor(opts: FailedPodHandlerOptions) {
3840
this.id = Math.random().toString(36).substring(2, 15);
3941
this.logger = new SimpleStructuredLogger("failed-pod-handler", LogLevel.debug, {
@@ -206,6 +208,21 @@ export class FailedPodHandler {
206208

207209
private async processFailedPod(pod: V1Pod) {
208210
this.logger.info("pod-failed: processing pod", this.podSummary(pod));
211+
212+
const mainContainer = pod.status?.containerStatuses?.find((c) => c.name === "run-controller");
213+
214+
// If it's our special "graceful shutdown" exit code, don't process it further, just delete it
215+
if (
216+
mainContainer?.state?.terminated?.exitCode === FailedPodHandler.GRACEFUL_SHUTDOWN_EXIT_CODE
217+
) {
218+
this.logger.debug("pod-failed: graceful shutdown detected", this.podSummary(pod));
219+
this.processedPodsTotal.inc({
220+
namespace: this.namespace,
221+
status: "GracefulShutdown",
222+
});
223+
return;
224+
}
225+
209226
this.processedPodsTotal.inc({
210227
namespace: this.namespace,
211228
status: this.podStatus(pod),

Diff for: apps/supervisor/src/services/podCleaner.test.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ describe("PodCleaner Integration Tests", () => {
2929
register.clear();
3030

3131
// Delete all pods in the namespace
32-
await k8s.core.deleteCollectionNamespacedPod({ namespace });
32+
await k8s.core.deleteCollectionNamespacedPod({ namespace, gracePeriodSeconds: 0 });
3333
});
3434

3535
it("should clean up succeeded pods", async () => {

Diff for: apps/supervisor/src/workloadManager/docker.ts

+4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ export class DockerWorkloadManager implements WorkloadManager {
4545
runArgs.push(`--env=TRIGGER_WARM_START_URL=${this.opts.warmStartUrl}`);
4646
}
4747

48+
if (this.opts.metadataUrl) {
49+
runArgs.push(`--env=TRIGGER_METADATA_URL=${this.opts.metadataUrl}`);
50+
}
51+
4852
if (this.opts.heartbeatIntervalSeconds) {
4953
runArgs.push(
5054
`--env=TRIGGER_HEARTBEAT_INTERVAL_SECONDS=${this.opts.heartbeatIntervalSeconds}`

Diff for: apps/supervisor/src/workloadManager/kubernetes.ts

+3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ export class KubernetesWorkloadManager implements WorkloadManager {
134134
...(this.opts.warmStartUrl
135135
? [{ name: "TRIGGER_WARM_START_URL", value: this.opts.warmStartUrl }]
136136
: []),
137+
...(this.opts.metadataUrl
138+
? [{ name: "TRIGGER_METADATA_URL", value: this.opts.metadataUrl }]
139+
: []),
137140
...(this.opts.heartbeatIntervalSeconds
138141
? [
139142
{

Diff for: apps/supervisor/src/workloadManager/types.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export interface WorkloadManagerOptions {
55
workloadApiDomain?: string; // If unset, will use orchestrator-specific default
66
workloadApiPort: number;
77
warmStartUrl?: string;
8+
metadataUrl?: string;
89
imagePullSecrets?: string[];
910
heartbeatIntervalSeconds?: number;
1011
snapshotPollIntervalSeconds?: number;

Diff for: packages/cli-v3/src/entryPoints/managed-run-controller.ts

+21-16
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ const Env = z.object({
5656
TRIGGER_WORKER_INSTANCE_NAME: z.string(),
5757
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: z.coerce.number().default(30),
5858
TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: z.coerce.number().default(5),
59+
TRIGGER_SUCCESS_EXIT_CODE: z.coerce.number().default(0),
60+
TRIGGER_FAILURE_EXIT_CODE: z.coerce.number().default(1),
5961
});
6062

6163
const env = Env.parse(stdEnv);
@@ -82,6 +84,8 @@ type Metadata = {
8284
TRIGGER_WORKER_INSTANCE_NAME: string | undefined;
8385
TRIGGER_HEARTBEAT_INTERVAL_SECONDS: number | undefined;
8486
TRIGGER_SNAPSHOT_POLL_INTERVAL_SECONDS: number | undefined;
87+
TRIGGER_SUCCESS_EXIT_CODE: number | undefined;
88+
TRIGGER_FAILURE_EXIT_CODE: number | undefined;
8589
};
8690

8791
class MetadataClient {
@@ -122,6 +126,9 @@ class ManagedRunController {
122126
private workerApiUrl: string;
123127
private workerInstanceName: string;
124128

129+
private successExitCode = env.TRIGGER_SUCCESS_EXIT_CODE;
130+
private failureExitCode = env.TRIGGER_FAILURE_EXIT_CODE;
131+
125132
private state:
126133
| {
127134
phase: "RUN";
@@ -220,11 +227,7 @@ class ManagedRunController {
220227

221228
const response = await this.httpClient.heartbeatRun(
222229
this.runFriendlyId,
223-
this.snapshotFriendlyId,
224-
{
225-
cpu: 0,
226-
memory: 0,
227-
}
230+
this.snapshotFriendlyId
228231
);
229232

230233
if (!response.success) {
@@ -669,6 +672,14 @@ class ManagedRunController {
669672

670673
logger.log("Processing env overrides", { env: overrides });
671674

675+
if (overrides.TRIGGER_SUCCESS_EXIT_CODE) {
676+
this.successExitCode = overrides.TRIGGER_SUCCESS_EXIT_CODE;
677+
}
678+
679+
if (overrides.TRIGGER_FAILURE_EXIT_CODE) {
680+
this.failureExitCode = overrides.TRIGGER_FAILURE_EXIT_CODE;
681+
}
682+
672683
if (overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS) {
673684
this.heartbeatIntervalSeconds = overrides.TRIGGER_HEARTBEAT_INTERVAL_SECONDS;
674685
this.runHeartbeat.updateInterval(this.heartbeatIntervalSeconds * 1000);
@@ -821,7 +832,7 @@ class ManagedRunController {
821832

822833
if (!this.warmStartClient) {
823834
console.error("waitForNextRun: warm starts disabled, shutting down");
824-
this.exitProcess(0);
835+
this.exitProcess(this.successExitCode);
825836
}
826837

827838
// Check the service is up and get additional warm start config
@@ -832,7 +843,7 @@ class ManagedRunController {
832843
warmStartUrl: env.TRIGGER_WARM_START_URL,
833844
error: connect.error,
834845
});
835-
this.exitProcess(0);
846+
this.exitProcess(this.successExitCode);
836847
}
837848

838849
const connectionTimeoutMs =
@@ -860,7 +871,7 @@ class ManagedRunController {
860871
connectionTimeoutMs,
861872
keepaliveMs,
862873
});
863-
this.exitProcess(0);
874+
this.exitProcess(this.successExitCode);
864875
}
865876

866877
const nextRun = await this.warmStartClient.warmStart({
@@ -871,7 +882,7 @@ class ManagedRunController {
871882

872883
if (!nextRun) {
873884
console.error("waitForNextRun: warm start failed, shutting down");
874-
this.exitProcess(0);
885+
this.exitProcess(this.successExitCode);
875886
}
876887

877888
console.log("waitForNextRun: got next run", { nextRun });
@@ -884,7 +895,7 @@ class ManagedRunController {
884895
return;
885896
} catch (error) {
886897
console.error("waitForNextRun: unexpected error", { error });
887-
this.exitProcess(1);
898+
this.exitProcess(this.failureExitCode);
888899
} finally {
889900
this.waitForNextRunLock = false;
890901
}
@@ -1112,12 +1123,6 @@ class ManagedRunController {
11121123
async start() {
11131124
logger.debug("[ManagedRunController] Starting up");
11141125

1115-
// TODO: remove this after testing
1116-
setTimeout(() => {
1117-
console.error("[ManagedRunController] Exiting after 5 minutes");
1118-
this.exitProcess(1);
1119-
}, 60 * 5000);
1120-
11211126
// Websocket notifications are only an optimisation so we don't need to wait for a successful connection
11221127
this.createSocket();
11231128

Diff for: packages/core/src/v3/runEngineWorker/supervisor/schemas.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ export const WorkerApiDequeueResponseBody = DequeuedMessage.array();
7373
export type WorkerApiDequeueResponseBody = z.infer<typeof WorkerApiDequeueResponseBody>;
7474

7575
export const WorkerApiRunHeartbeatRequestBody = z.object({
76-
cpu: z.number(),
77-
memory: z.number(),
76+
cpu: z.number().optional(),
77+
memory: z.number().optional(),
7878
});
7979
export type WorkerApiRunHeartbeatRequestBody = z.infer<typeof WorkerApiRunHeartbeatRequestBody>;
8080

0 commit comments

Comments
 (0)