Skip to content

Commit df65a3b

Browse files
d-csclaude
andcommitted
fix(webapp): honour cancel-wins conflict + requeue on transient PG outage in mollifier drainer
The mollifier drainer's cancel bifurcation called engine.createCancelledRun without handling its documented conflict contract: when the normal trigger replay path races ahead and materialises a live (non-CANCELED) row, the engine throws a conflict so the caller can "decide between engine.cancelRun() and skipping". The handler did neither — the conflict propagated, isRetryablePgError returned false, and the drainer buffer.fail()'d the entry, silently losing the cancellation while the run kept executing. Now route conflicts to engine.cancelRun() so the cancel actually wins. Separately, when engine.trigger fails non-retryably and the SYSTEM_FAILURE fallback write then fails because PG is transiently unreachable, rethrowing the original non-retryable error made the drainer buffer.fail() the entry — losing the run with no PG row ever landing, and dropping the write error entirely. Rethrow the retryable write error instead so the drainer requeues; the failure row lands once PG recovers. Non-retryable write failures still rethrow the original error as before. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e0708c4 commit df65a3b

2 files changed

Lines changed: 129 additions & 11 deletions

File tree

apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { context, trace, TraceFlags } from "@opentelemetry/api";
22
import type { RunEngine } from "@internal/run-engine";
33
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
4+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
45
import type { MollifierDrainerHandler } from "@trigger.dev/redis-worker";
56
import { startSpan } from "~/v3/tracing.server";
67
import type { MollifierSnapshot } from "./mollifierSnapshot.server";
@@ -72,14 +73,41 @@ export function createDrainerHandler(deps: {
7273
span.setAttribute("mollifier.run_friendly_id", input.runId);
7374
span.setAttribute("mollifier.cancel_bifurcation", true);
7475
span.setAttribute("taskRunId", input.runId);
75-
await deps.engine.createCancelledRun(
76-
{
77-
snapshot: input.payload as any,
78-
cancelledAt: new Date(cancelledAtStr),
79-
cancelReason,
80-
},
81-
deps.prisma,
82-
);
76+
try {
77+
await deps.engine.createCancelledRun(
78+
{
79+
snapshot: input.payload as any,
80+
cancelledAt: new Date(cancelledAtStr),
81+
cancelReason,
82+
},
83+
deps.prisma,
84+
);
85+
} catch (err) {
86+
// createCancelledRun throws a conflict when the normal trigger
87+
// replay path won the race and already materialised a live
88+
// (non-CANCELED) row for this friendlyId. Its contract leaves
89+
// the resolution to us: honour the cancel by actually
90+
// cancelling the now-live run. Letting the conflict propagate
91+
// would instead reach the drainer's terminal-failure path
92+
// (isRetryablePgError() is false for it), buffer.fail() the
93+
// entry, and silently lose the cancellation while the run
94+
// keeps executing.
95+
const isConflict =
96+
err instanceof Error && err.message.startsWith("createCancelledRun conflict");
97+
if (!isConflict) {
98+
throw err;
99+
}
100+
span.setAttribute("mollifier.cancel_conflict", true);
101+
const friendlyId =
102+
typeof input.payload.friendlyId === "string"
103+
? input.payload.friendlyId
104+
: input.runId;
105+
await deps.engine.cancelRun({
106+
runId: RunId.fromFriendlyId(friendlyId),
107+
completedAt: new Date(cancelledAtStr),
108+
reason: cancelReason,
109+
});
110+
}
83111
});
84112
});
85113
return;
@@ -158,9 +186,23 @@ export function createDrainerHandler(deps: {
158186
typeof snapshot.lockedQueueId === "string" ? snapshot.lockedQueueId : undefined,
159187
});
160188
} catch (writeErr) {
161-
// Class A — PG itself is failing. Rethrow the original
162-
// error so the drainer falls back to buffer.fail. Include
163-
// the write error in the log line at the drainer layer.
189+
// The terminal SYSTEM_FAILURE write itself failed. If it
190+
// failed because PG is transiently unreachable, rethrow the
191+
// *write* error so the drainer requeues — buffer.fail()ing on
192+
// the original non-retryable error would lose the run with no
193+
// PG row ever landing. Once PG recovers the requeued entry
194+
// writes its failure row and the customer sees it.
195+
if (isRetryablePgError(writeErr)) {
196+
span.setAttribute("mollifier.terminal_write_retryable", true);
197+
throw writeErr;
198+
}
199+
// PG reachable but the write was rejected for another reason
200+
// (genuinely bad snapshot). Rethrow the original trigger error
201+
// so the drainer falls back to buffer.fail.
202+
span.setAttribute(
203+
"mollifier.terminal_write_error",
204+
writeErr instanceof Error ? writeErr.message : String(writeErr)
205+
);
164206
throw err;
165207
}
166208
}

apps/webapp/test/mollifierDrainerHandler.test.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, expect, it, vi } from "vitest";
22
import { trace } from "@opentelemetry/api";
3+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
34

45
vi.mock("~/db.server", () => ({
56
prisma: {},
@@ -180,6 +181,81 @@ describe("createDrainerHandler", () => {
180181
expect(createFailedTaskRun).toHaveBeenCalledOnce();
181182
});
182183

184+
it("honours the cancel when a buffered cancel races a materialised non-CANCELED row", async () => {
185+
// Cancel-wins-over-trigger (Q4 bifurcation). If the normal trigger
186+
// replay path materialised a live PENDING row before the cancel
187+
// bifurcation drained, engine.createCancelledRun throws a conflict —
188+
// its documented contract is that "the caller must decide between
189+
// engine.cancelRun() and skipping". The drainer handler must honour
190+
// the cancel intent by actually cancelling the now-live run; otherwise
191+
// the conflict propagates, isRetryablePgError() returns false, and the
192+
// drainer buffer.fail()s the entry — silently losing the cancellation
193+
// while the run keeps executing.
194+
const friendlyId = RunId.generate().friendlyId;
195+
const createCancelledRun = vi.fn(async () => {
196+
throw new Error(
197+
`createCancelledRun conflict: existing run ${friendlyId} has status PENDING`
198+
);
199+
});
200+
const cancelRun = vi.fn(async () => ({ alreadyFinished: false }));
201+
const handler = createDrainerHandler({
202+
engine: { createCancelledRun, cancelRun } as any,
203+
prisma: {} as any,
204+
});
205+
206+
await expect(
207+
handler({
208+
runId: friendlyId,
209+
envId: "env_a",
210+
orgId: "org_1",
211+
payload: {
212+
friendlyId,
213+
taskIdentifier: "t",
214+
environment: envFixture,
215+
cancelledAt: new Date().toISOString(),
216+
cancelReason: "Canceled by user",
217+
},
218+
attempts: 0,
219+
createdAt: new Date(),
220+
} as any)
221+
).resolves.toBeUndefined();
222+
223+
// The live run is actually cancelled, by its internal id.
224+
expect(cancelRun).toHaveBeenCalledOnce();
225+
expect(cancelRun.mock.calls[0][0].runId).toBe(RunId.fromFriendlyId(friendlyId));
226+
});
227+
228+
it("requeues on a transient PG outage during the SYSTEM_FAILURE fallback write", async () => {
229+
// engine.trigger failed non-retryably, so we try to write a terminal
230+
// SYSTEM_FAILURE row. If THAT write fails because PG is transiently
231+
// unreachable, rethrowing the *original* non-retryable error makes the
232+
// drainer buffer.fail() the entry — losing the run with no PG row ever
233+
// landing. Rethrow the retryable write error instead so the drainer
234+
// requeues; once PG recovers the failure row lands and the customer
235+
// sees it.
236+
const trigger = vi.fn(async () => {
237+
throw new Error("validation failed: payload too large");
238+
});
239+
const createFailedTaskRun = vi.fn(async () => {
240+
throw new Error("Can't reach database server");
241+
});
242+
const handler = createDrainerHandler({
243+
engine: { trigger, createFailedTaskRun } as any,
244+
prisma: {} as any,
245+
});
246+
247+
await expect(
248+
handler({
249+
runId: "run_x",
250+
envId: "env_a",
251+
orgId: "org_1",
252+
payload: { taskIdentifier: "t", environment: envFixture },
253+
attempts: 0,
254+
createdAt: new Date(),
255+
} as any)
256+
).rejects.toThrow("Can't reach database server");
257+
});
258+
183259
it("rethrows the original error when the snapshot lacks an environment block", async () => {
184260
const triggerErr = new Error("engine rejected the snapshot");
185261
const trigger = vi.fn(async () => {

0 commit comments

Comments
 (0)