Skip to content

Commit e56b937

Browse files
d-csclaude
andcommitted
fix(webapp): mollifier read-fallback version field + trip-threshold guard
- readFallback: read snapshot.taskVersion (the key buildEngineTriggerInput writes) instead of the nonexistent snapshot.lockToVersion, so buffered version-locked runs report their locked version; test now uses the real key as a regression guard. - env: TRIGGER_MOLLIFIER_TRIP_THRESHOLD back to positive() (matching sibling mollifier numerics) to forbid threshold=0 silently mollifying every trigger. - idempotencyKeys: document why the resolved-but-unfindable fall-through is safe (PG-unique + accept SETNX dedup + ~30s claim TTL self-heal); add regression test pinning the fall-through and the resolved-and-findable cached-hit path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d920982 commit e56b937

5 files changed

Lines changed: 119 additions & 6 deletions

File tree

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1092,7 +1092,7 @@ const EnvironmentSchema = z
10921092
.transform((v) => v ?? process.env.REDIS_PASSWORD),
10931093
TRIGGER_MOLLIFIER_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
10941094
TRIGGER_MOLLIFIER_TRIP_WINDOW_MS: z.coerce.number().int().positive().default(200),
1095-
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().nonnegative().default(100),
1095+
TRIGGER_MOLLIFIER_TRIP_THRESHOLD: z.coerce.number().int().positive().default(100),
10961096
TRIGGER_MOLLIFIER_HOLD_MS: z.coerce.number().int().positive().default(500),
10971097
TRIGGER_MOLLIFIER_DRAIN_CONCURRENCY: z.coerce.number().int().positive().default(50),
10981098
TRIGGER_MOLLIFIER_DRAIN_MAX_ATTEMPTS: z.coerce.number().int().positive().default(3),

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -266,9 +266,27 @@ export class IdempotencyKeyConcern {
266266
if (buffered) {
267267
return { isCached: true, run: buffered };
268268
}
269-
// Claim resolved to a runId nothing can find — likely the
270-
// claimant errored after publish, or the row TTL'd out. Log
271-
// and fall through to a fresh trigger.
269+
// Claim resolved to a runId nothing can find — the run was
270+
// genuinely lost (claimant errored after publish, drain failed,
271+
// or both the PG row and buffer entry TTL'd out). This is
272+
// terminal, not transient: `lookupIdempotency` self-heals a
273+
// dangling pointer, and `ack` keeps the entry hash as a
274+
// read-fallback past the PG write, so re-polling cannot conjure
275+
// a run that is gone. Falling through to a fresh trigger is the
276+
// correct recovery.
277+
//
278+
// Why falling through claimless is safe (no duplicate runs):
279+
// concurrent triggers that also fall through here converge on a
280+
// single run via the same dedup backstops the claim layer relies
281+
// on — the PG unique constraint on the idempotency key
282+
// (RunDuplicateIdempotencyKeyError → retry resolves to the
283+
// winner) for the pass-through path, and `accept`'s idempotency
284+
// SETNX (`duplicate_idempotency`) for the mollify path. Once the
285+
// first fall-through commits a run, later callers find it via the
286+
// writer-PG / buffer lookups above despite the stale `resolved:`
287+
// slot, which the slot's TTL clears within ~30s. The residual
288+
// cost is a few redundant (deduped) trigger attempts in that
289+
// window, not duplicate runs.
272290
logger.warn("idempotency claim resolved but runId not findable", {
273291
envId: request.environment.id,
274292
taskIdentifier: request.taskId,

apps/webapp/app/v3/mollifier/readFallback.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ export async function findRunByIdWithMollifierFallback(
175175
ttl: asString(snapshot.ttl),
176176
tags,
177177
runTags: tags,
178-
lockedToVersion: asString(snapshot.lockToVersion),
178+
lockedToVersion: asString(snapshot.taskVersion),
179179
resumeParentOnCompletion: snapshot.resumeParentOnCompletion === true,
180180
parentTaskRunId: asString(snapshot.parentTaskRunId),
181181

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
// Stub `~/db.server` before importing the concern — the real module
4+
// eagerly calls `prisma.$connect()` at singleton construction, which
5+
// would fail without a database. The concern under test receives its
6+
// prisma via the constructor, so the stub is never used by the code path.
7+
vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
8+
9+
// The IdempotencyKeyConcern resolves the pre-gate claim through the
10+
// global mollifier buffer (`getMollifierBuffer`), shared by both
11+
// `claimOrAwait` and `findBufferedRunWithIdempotency`. Control it via a
12+
// hoisted handle so each test can script the claim/lookup responses.
13+
const h = vi.hoisted(() => ({ buffer: null as unknown }));
14+
vi.mock("~/v3/mollifier/mollifierBuffer.server", () => ({
15+
getMollifierBuffer: () => h.buffer,
16+
}));
17+
18+
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
19+
import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server";
20+
import type { TriggerTaskRequest } from "~/runEngine/types";
21+
22+
function makeConcern(prisma: { findFirst: () => Promise<unknown> }) {
23+
return new IdempotencyKeyConcern(
24+
{ taskRun: { findFirst: prisma.findFirst } } as never,
25+
{} as never, // engine — unused on this path
26+
{} as never, // traceEventConcern — unused on this path
27+
);
28+
}
29+
30+
function makeRequest(): TriggerTaskRequest {
31+
return {
32+
taskId: "my-task",
33+
environment: { id: "env_a", organizationId: "org_1" },
34+
options: {},
35+
body: { options: { idempotencyKey: "k-1" } },
36+
} as unknown as TriggerTaskRequest;
37+
}
38+
39+
describe("IdempotencyKeyConcern · claim resolution", () => {
40+
it("resolved-but-unfindable falls through to a fresh trigger (no cached run, no claim held)", async () => {
41+
// The claim slot holds a runId that is gone from both stores: the PG
42+
// findFirst misses and the buffer lookup misses. Regression guard for
43+
// the resolved-but-unfindable terminal case — the concern must fall
44+
// through to a fresh trigger rather than throw, hand back a bogus
45+
// cached run, or claim ownership it doesn't hold.
46+
const lookupIdempotency = vi.fn(async () => null);
47+
h.buffer = {
48+
claimIdempotency: vi.fn(async () => ({ kind: "resolved", runId: "run_gone" })),
49+
lookupIdempotency,
50+
} as unknown as MollifierBuffer;
51+
52+
const findFirst = vi.fn(async () => null); // PG misses on every call
53+
const concern = makeConcern({ findFirst });
54+
55+
const result = await concern.handleTriggerRequest(makeRequest(), undefined);
56+
57+
expect(result.isCached).toBe(false);
58+
if (result.isCached === false) {
59+
// No claim held — we resolved someone else's (stale) claim, we did
60+
// not win one. The caller must NOT publish/release on our behalf.
61+
expect(result.claim).toBeUndefined();
62+
expect(result.idempotencyKey).toBe("k-1");
63+
}
64+
// We attempted the buffer fallback before giving up.
65+
expect(lookupIdempotency).toHaveBeenCalled();
66+
});
67+
68+
it("resolved-and-findable returns the existing run as a cached hit", async () => {
69+
// Guard the happy resolved path: when the claimed runId IS findable
70+
// (writer-side PG), the fall-through change must not swallow it.
71+
h.buffer = {
72+
claimIdempotency: vi.fn(async () => ({ kind: "resolved", runId: "run_winner" })),
73+
lookupIdempotency: vi.fn(async () => null),
74+
} as unknown as MollifierBuffer;
75+
76+
const winner = { id: "run_winner", friendlyId: "run_winner" };
77+
// First findFirst (initial existingRun check) misses so we enter the
78+
// claim path; the second (writer-side re-resolve) finds the winner.
79+
let calls = 0;
80+
const findFirst = vi.fn(async () => {
81+
calls += 1;
82+
return calls >= 2 ? winner : null;
83+
});
84+
const concern = makeConcern({ findFirst });
85+
86+
const result = await concern.handleTriggerRequest(makeRequest(), undefined);
87+
88+
expect(result.isCached).toBe(true);
89+
if (result.isCached === true) {
90+
expect(result.run).toBe(winner);
91+
}
92+
});
93+
});

apps/webapp/test/mollifierReadFallback.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,9 @@ describe("findRunByIdWithMollifierFallback", () => {
141141
depth: 2,
142142
ttl: "1h",
143143
tags: ["tag-a", "tag-b"],
144-
lockToVersion: "20260511.1",
144+
// The engine.trigger snapshot stores the locked version string under
145+
// `taskVersion` (see triggerTask.server.ts#buildEngineTriggerInput).
146+
taskVersion: "20260511.1",
145147
resumeParentOnCompletion: false,
146148
parentTaskRunId: "run_parent",
147149
}),

0 commit comments

Comments
 (0)