Skip to content

Commit 7665b25

Browse files
d-csclaude
andcommitted
fix(webapp,run-engine): replay-layer Devin follow-ups
Two bugs flagged by Devin on PR #3754: 1. entry.server.tsx reverted to \`void sessionsReplicationInstance;\`, which esbuild tree-shakes under \`"sideEffects": false\`. Restored the globalThis assignment + warning comment from #3738 (incident TRI-9864). Without this the sessions→ClickHouse logical replication slot stops being consumed at boot. 2. createFailedTaskRun unconditionally emitted \`runFailed\`, which the \`completeFailedRunEvent\` listener uses to write a span completion into ClickHouse. But TriggerFailedTaskService.call() already wraps createFailedTaskRun inside \`repository.traceEvent({ incomplete: false, isError: true })\` which writes its own completion row for the same (traceId, spanId). Two completions racing on the same span row is a real observability bug. Added an \`emitRunFailedEvent: boolean = true\` opt-out. The TriggerFailedTaskService.call() path now passes \`false\` and enqueues \`PerformTaskRunAlertsService\` directly after the trace event closes so the alerts side of \`runFailed\` is preserved. \`callWithoutTraceEvents\` and the mollifier drainer's terminal- failure path keep the default emit (they have no outer trace event managing the span). Regression test pins the opt-out: \`emitRunFailedEvent: false\` writes the PG row but does NOT fire the bus event. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fb4f31d commit 7665b25

4 files changed

Lines changed: 129 additions & 1 deletion

File tree

apps/webapp/app/entry.server.tsx

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,17 @@ import {
3131
// on webapp startup. The singleton's initializer wires start (gated on
3232
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
3333
// runsReplicationInstance.
34+
//
35+
// IMPORTANT: do NOT replace this with `void sessionsReplicationInstance;`.
36+
// `apps/webapp/package.json` declares `"sideEffects": false`, so esbuild
37+
// treats `void <identifier>;` as a pure expression statement and tree-shakes
38+
// the entire import — the singleton's initializer never fires and the
39+
// sessions→ClickHouse logical replication slot stops being consumed. Assigning
40+
// to globalThis is an unambiguous side effect the bundler must preserve. See
41+
// TRI-9864 for the incident write-up.
3442
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
35-
void sessionsReplicationInstance;
43+
(globalThis as Record<string, unknown>).__sessionsReplicationInstance =
44+
sessionsReplicationInstance;
3645

3746
const ABORT_DELAY = 30000;
3847

apps/webapp/app/runEngine/services/triggerFailedTask.server.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import type { PrismaClientOrTransaction } from "@trigger.dev/database";
66
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
77
import { logger } from "~/services/logger.server";
88
import { getEventRepository } from "~/v3/eventRepository/index.server";
9+
import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server";
910
import { DefaultQueueManager } from "../concerns/queues.server";
1011
import type { TriggerTaskRequest } from "../types";
1112

@@ -176,6 +177,14 @@ export class TriggerFailedTaskService {
176177
event.setAttribute("runId", failedRunFriendlyId);
177178
event.failWithError(taskRunError);
178179

180+
// `emitRunFailedEvent: false` because this call site owns the
181+
// trace-event lifecycle via the outer `traceEvent({
182+
// incomplete: false, isError: true })`. Letting the engine
183+
// emit `runFailed` here would race the
184+
// `completeFailedRunEvent` listener against the outer trace
185+
// event's own completion write for the same (traceId, spanId).
186+
// We re-trigger the alerts side directly after the trace
187+
// event closes, below.
179188
return await this.engine.createFailedTaskRun({
180189
friendlyId: failedRunFriendlyId,
181190
environment: {
@@ -200,12 +209,30 @@ export class TriggerFailedTaskService {
200209
spanId: event.spanId,
201210
traceContext: traceContext as Record<string, unknown>,
202211
taskEventStore: store,
212+
emitRunFailedEvent: false,
203213
...(queueName !== undefined && { queue: queueName }),
204214
...(lockedQueueId !== undefined && { lockedQueueId }),
205215
});
206216
}
207217
);
208218

219+
// Alerts side of `runFailed` — the engine emit was suppressed
220+
// above so the trace-event completion isn't double-written; we
221+
// still need the alert pipeline to fire so customers' ERROR
222+
// channels see the failure. Best-effort: a failed enqueue logs
223+
// but doesn't block returning the friendlyId, mirroring the
224+
// engine handler's behaviour at runEngineHandlers.server.ts:81.
225+
try {
226+
await PerformTaskRunAlertsService.enqueue(failedRun.id);
227+
} catch (alertsError) {
228+
logger.warn("TriggerFailedTaskService: alert enqueue failed", {
229+
taskId: request.taskId,
230+
friendlyId: failedRun.friendlyId,
231+
error:
232+
alertsError instanceof Error ? alertsError.message : String(alertsError),
233+
});
234+
}
235+
209236
return failedRun.friendlyId;
210237
} catch (createError) {
211238
const createErrorMsg =

internal-packages/run-engine/src/engine/index.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,7 @@ export class RunEngine {
10511051
taskEventStore,
10521052
queue: queueOverride,
10531053
lockedQueueId: lockedQueueIdOverride,
1054+
emitRunFailedEvent = true,
10541055
}: {
10551056
friendlyId: string;
10561057
environment: {
@@ -1078,6 +1079,19 @@ export class RunEngine {
10781079
queue?: string;
10791080
/** Resolved TaskQueue.id when the task is locked to a specific queue. */
10801081
lockedQueueId?: string;
1082+
/**
1083+
* Whether to emit the `runFailed` engine-bus event. Defaults to true.
1084+
*
1085+
* Set to `false` when the caller is ALREADY managing the trace-event
1086+
* lifecycle for this run via `repository.traceEvent({ incomplete: false,
1087+
* isError: true, ... })`. In that path the outer trace event handles
1088+
* span completion itself; emitting `runFailed` from here causes the
1089+
* `runFailed` → `completeFailedRunEvent` handler to write a second
1090+
* completion row for the same (traceId, spanId), racing with the
1091+
* outer trace event's own write. The alert side of `runFailed` is
1092+
* preserved by emitting from the caller after `traceEvent` returns.
1093+
*/
1094+
emitRunFailedEvent?: boolean;
10811095
}): Promise<TaskRun> {
10821096
return startSpan(
10831097
this.tracer,
@@ -1160,6 +1174,19 @@ export class RunEngine {
11601174
// exceed-limit failures) land in PG silently — visible in the
11611175
// dashboard list but never reaching customers' configured
11621176
// ERROR alert channels.
1177+
//
1178+
// Gated by `emitRunFailedEvent` so call sites that already wrap
1179+
// this inside `repository.traceEvent({ incomplete: false,
1180+
// isError: true })` can opt out — the outer trace event writes
1181+
// the completion row itself, and a second write via
1182+
// `completeFailedRunEvent` would race against it. Callers that
1183+
// disable the emit are responsible for triggering the alerts
1184+
// side themselves (e.g. by calling
1185+
// `PerformTaskRunAlertsService.enqueue` directly after the
1186+
// trace event closes).
1187+
if (!emitRunFailedEvent) {
1188+
return taskRun;
1189+
}
11631190
this.eventBus.emit("runFailed", {
11641191
time: taskRun.completedAt ?? new Date(),
11651192
run: {

internal-packages/run-engine/src/engine/tests/createFailedTaskRun.test.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,69 @@ describe("RunEngine.createFailedTaskRun", () => {
108108
await engine.quit();
109109
}
110110
});
111+
112+
// The TriggerFailedTaskService.call() path wraps createFailedTaskRun
113+
// inside `repository.traceEvent({ incomplete: false, isError: true })`
114+
// which already writes the completion row for the (traceId, spanId).
115+
// Emitting `runFailed` from here would cause the
116+
// `completeFailedRunEvent` handler to race a second write against
117+
// the same span — the `emitRunFailedEvent: false` opt-out is what
118+
// suppresses the emit. The PG row + alert side stay correct because
119+
// the caller enqueues `PerformTaskRunAlertsService.enqueue(run.id)`
120+
// directly after the trace event closes.
121+
containerTest(
122+
"emitRunFailedEvent: false suppresses the bus emit but still creates the PG row",
123+
async ({ prisma, redisOptions }) => {
124+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
125+
126+
const engine = new RunEngine({
127+
prisma,
128+
worker: { redis: redisOptions, workers: 1, tasksPerWorker: 10, pollIntervalMs: 100 },
129+
queue: { redis: redisOptions, masterQueueConsumersDisabled: true, processWorkerQueueDebounceMs: 50 },
130+
runLock: { redis: redisOptions },
131+
machines: {
132+
defaultMachine: "small-1x",
133+
machines: {
134+
"small-1x": { name: "small-1x" as const, cpu: 0.5, memory: 0.5, centsPerMs: 0.0001 },
135+
},
136+
baseCostInCents: 0.0005,
137+
},
138+
tracer: trace.getTracer("test", "0.0.0"),
139+
});
140+
141+
try {
142+
const failedEvents: EventBusEventArgs<"runFailed">[0][] = [];
143+
engine.eventBus.on("runFailed", (event) => {
144+
failedEvents.push(event);
145+
});
146+
147+
const friendlyId = generateFriendlyId("run");
148+
const failed = await engine.createFailedTaskRun({
149+
friendlyId,
150+
environment: {
151+
id: authenticatedEnvironment.id,
152+
type: authenticatedEnvironment.type,
153+
project: { id: authenticatedEnvironment.project.id },
154+
organization: { id: authenticatedEnvironment.organization.id },
155+
},
156+
taskIdentifier: "outer-trace-event-test",
157+
payload: "{}",
158+
payloadType: "application/json",
159+
error: { type: "STRING_ERROR", raw: "outer trace event manages span" },
160+
traceId: "0123456789abcdef0123456789abcdef",
161+
spanId: "fedcba9876543210",
162+
emitRunFailedEvent: false,
163+
});
164+
165+
// PG row landed (caller still gets a usable TaskRun).
166+
expect(failed.status).toBe("SYSTEM_FAILURE");
167+
expect(failed.friendlyId).toBe(friendlyId);
168+
169+
// Bus emit was suppressed.
170+
expect(failedEvents).toHaveLength(0);
171+
} finally {
172+
await engine.quit();
173+
}
174+
},
175+
);
111176
});

0 commit comments

Comments
 (0)