Skip to content

Commit 02cfe1a

Browse files
d-csclaude
andcommitted
fix(redis-worker): encode mollifier composite-key segments + per-claim ownership token
Addresses code-review feedback on the buffer's idempotency keying: - Encode `envId` / `taskIdentifier` / `idempotencyKey` with base64url before concatenation so customer-supplied segments containing `:` cannot alias each other onto the same Redis key. Exports `idempotencyLookupKeyFor` so tests assert against the same encoding the buffer writes. - Replace the shared `"pending"` claim marker with a caller-supplied ownership token (`"pending:<token>"`). `publishClaim` and `releaseClaim` become compare-and-set / compare-and-delete via Lua, so a late release from a previous claimant whose TTL expired cannot erase a new owner's claim. New buffer tests cover the alias-collision case, the encoded-key-shape contract, and the token-ownership safety properties (stale release is a no-op, wrong-token publish is a no-op, fresh claim survives the post-TTL-expiry stale-release race). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 8165846 commit 02cfe1a

3 files changed

Lines changed: 420 additions & 31 deletions

File tree

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 296 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { describe, expect, it } from "vitest";
22
import { BufferEntrySchema, serialiseSnapshot, deserialiseSnapshot } from "./schemas.js";
33
import { redisTest } from "@internal/testcontainers";
44
import { Logger } from "@trigger.dev/core/logger";
5-
import { MollifierBuffer } from "./buffer.js";
5+
import { MollifierBuffer, idempotencyLookupKeyFor } from "./buffer.js";
66

77
describe("schemas", () => {
88
it("serialiseSnapshot then deserialiseSnapshot is identity for plain objects", () => {
@@ -1110,7 +1110,11 @@ describe("MollifierBuffer idempotency lookup", () => {
11101110
});
11111111
expect(result).toEqual({ kind: "accepted" });
11121112

1113-
const lookupKey = "mollifier:idempotency:env_i:my-task:ikey-1";
1113+
const lookupKey = idempotencyLookupKeyFor({
1114+
envId: "env_i",
1115+
taskIdentifier: "my-task",
1116+
idempotencyKey: "ikey-1",
1117+
});
11141118
const stored = await buffer["redis"].get(lookupKey);
11151119
expect(stored).toBe("ri1");
11161120
// -1 = key exists with no TTL set.
@@ -1241,7 +1245,11 @@ describe("MollifierBuffer idempotency lookup", () => {
12411245
});
12421246
try {
12431247
// Plant a stale lookup pointing at a non-existent entry.
1244-
const lookupKey = "mollifier:idempotency:env_i:t:stale";
1248+
const lookupKey = idempotencyLookupKeyFor({
1249+
envId: "env_i",
1250+
taskIdentifier: "t",
1251+
idempotencyKey: "stale",
1252+
});
12451253
await buffer["redis"].set(lookupKey, "rl-stale", "EX", 600);
12461254
expect(await buffer["redis"].get(lookupKey)).toBe("rl-stale");
12471255

@@ -1283,7 +1291,11 @@ describe("MollifierBuffer idempotency lookup", () => {
12831291
await buffer.pop("env_i");
12841292
await buffer.ack("ra1");
12851293

1286-
const lookupKey = "mollifier:idempotency:env_i:t:ka";
1294+
const lookupKey = idempotencyLookupKeyFor({
1295+
envId: "env_i",
1296+
taskIdentifier: "t",
1297+
idempotencyKey: "ka",
1298+
});
12871299
expect(await buffer["redis"].get(lookupKey)).toBeNull();
12881300
const entry = await buffer.getEntry("ra1");
12891301
expect(entry!.materialised).toBe(true);
@@ -1327,7 +1339,11 @@ describe("MollifierBuffer idempotency lookup", () => {
13271339
expect(result.clearedRunId).toBe("rr1");
13281340

13291341
// Lookup is gone.
1330-
const lookupKey = "mollifier:idempotency:env_i:t:kr";
1342+
const lookupKey = idempotencyLookupKeyFor({
1343+
envId: "env_i",
1344+
taskIdentifier: "t",
1345+
idempotencyKey: "kr",
1346+
});
13311347
expect(await buffer["redis"].get(lookupKey)).toBeNull();
13321348

13331349
// Snapshot's idempotency fields are nulled, other fields kept.
@@ -2028,3 +2044,278 @@ describe("MollifierBuffer.listEntriesForEnv", () => {
20282044
}
20292045
});
20302046
});
2047+
2048+
// Composite-key safety. The Redis-key builders concatenate
2049+
// `(envId, taskIdentifier, idempotencyKey)` with `:` separators; without
2050+
// per-segment encoding, `taskIdentifier="a:b"` and `idempotencyKey="x"`
2051+
// would map to the same key as `taskIdentifier="a"` and
2052+
// `idempotencyKey="b:x"`. base64url encoding has no `:` in its alphabet,
2053+
// so the encoded keys are unique per tuple.
2054+
describe("MollifierBuffer composite-key encoding (collision resistance)", () => {
2055+
redisTest(
2056+
"two accepts whose unencoded keys would alias don't collide on the idempotency lookup",
2057+
{ timeout: 30_000 },
2058+
async ({ redisContainer }) => {
2059+
const buffer = new MollifierBuffer({
2060+
redisOptions: {
2061+
host: redisContainer.getHost(),
2062+
port: redisContainer.getPort(),
2063+
password: redisContainer.getPassword(),
2064+
},
2065+
logger: new Logger("test", "log"),
2066+
});
2067+
try {
2068+
// Aliased tuples under raw `:` concatenation:
2069+
// env_x : "a:b" : "x" → "mollifier:idempotency:env_x:a:b:x"
2070+
// env_x : "a" : "b:x" → "mollifier:idempotency:env_x:a:b:x"
2071+
const r1 = await buffer.accept({
2072+
runId: "ck_run_1",
2073+
envId: "env_x",
2074+
orgId: "org_1",
2075+
payload: "{}",
2076+
taskIdentifier: "a:b",
2077+
idempotencyKey: "x",
2078+
});
2079+
const r2 = await buffer.accept({
2080+
runId: "ck_run_2",
2081+
envId: "env_x",
2082+
orgId: "org_1",
2083+
payload: "{}",
2084+
taskIdentifier: "a",
2085+
idempotencyKey: "b:x",
2086+
});
2087+
// Both accepted — no false-positive collision.
2088+
expect(r1).toEqual({ kind: "accepted" });
2089+
expect(r2).toEqual({ kind: "accepted" });
2090+
2091+
// Each tuple resolves to its own runId.
2092+
const hit1 = await buffer.lookupIdempotency({
2093+
envId: "env_x",
2094+
taskIdentifier: "a:b",
2095+
idempotencyKey: "x",
2096+
});
2097+
const hit2 = await buffer.lookupIdempotency({
2098+
envId: "env_x",
2099+
taskIdentifier: "a",
2100+
idempotencyKey: "b:x",
2101+
});
2102+
expect(hit1).toBe("ck_run_1");
2103+
expect(hit2).toBe("ck_run_2");
2104+
} finally {
2105+
await buffer.close();
2106+
}
2107+
},
2108+
);
2109+
2110+
redisTest(
2111+
"encoded lookup key contains no ':' separator beyond the namespace",
2112+
{ timeout: 20_000 },
2113+
async () => {
2114+
// Pure-function test — verifies the encoding bijection without
2115+
// needing a live buffer. Re-uses the redisTest fixture for
2116+
// parallelism with other describe blocks but doesn't touch redis.
2117+
const key = idempotencyLookupKeyFor({
2118+
envId: "env_x",
2119+
taskIdentifier: "a:b",
2120+
idempotencyKey: "x:y:z",
2121+
});
2122+
// namespace prefix is exactly `mollifier:idempotency:` (two `:`),
2123+
// then three base64url segments separated by two more `:` —
2124+
// never the customer-supplied colons.
2125+
const colonCount = key.split(":").length - 1;
2126+
expect(colonCount).toBe(4);
2127+
// base64url alphabet has no `:`, `+`, `/`, or `=`.
2128+
const afterNamespace = key.slice("mollifier:idempotency:".length);
2129+
expect(afterNamespace).toMatch(/^[A-Za-z0-9_\-]+:[A-Za-z0-9_\-]+:[A-Za-z0-9_\-]+$/);
2130+
},
2131+
);
2132+
});
2133+
2134+
// Pre-gate claim ownership protection. The claim slot stores
2135+
// `"pending:<token>"` so publish and release compare-and-act on the
2136+
// caller's token — a late release from a previous claimant whose TTL
2137+
// expired cannot erase a new owner's claim.
2138+
describe("MollifierBuffer pre-gate claim — ownership token safety", () => {
2139+
const claimInput = {
2140+
envId: "env_c",
2141+
taskIdentifier: "task_c",
2142+
idempotencyKey: "key_c",
2143+
};
2144+
2145+
redisTest(
2146+
"claimIdempotency: first caller gets 'claimed', second concurrent caller gets 'pending'",
2147+
{ timeout: 20_000 },
2148+
async ({ redisContainer }) => {
2149+
const buffer = new MollifierBuffer({
2150+
redisOptions: {
2151+
host: redisContainer.getHost(),
2152+
port: redisContainer.getPort(),
2153+
password: redisContainer.getPassword(),
2154+
},
2155+
logger: new Logger("test", "log"),
2156+
});
2157+
try {
2158+
const first = await buffer.claimIdempotency({
2159+
...claimInput,
2160+
token: "token-A",
2161+
ttlSeconds: 30,
2162+
});
2163+
expect(first.kind).toBe("claimed");
2164+
2165+
// Second concurrent caller with a different token sees pending.
2166+
const second = await buffer.claimIdempotency({
2167+
...claimInput,
2168+
token: "token-B",
2169+
ttlSeconds: 30,
2170+
});
2171+
expect(second.kind).toBe("pending");
2172+
2173+
// readClaim distinguishes pending from resolved without leaking
2174+
// the token to the loser.
2175+
const read = await buffer.readClaim(claimInput);
2176+
expect(read?.kind).toBe("pending");
2177+
} finally {
2178+
await buffer.close();
2179+
}
2180+
},
2181+
);
2182+
2183+
redisTest(
2184+
"releaseClaim with the wrong token is a no-op (compare-and-delete)",
2185+
{ timeout: 20_000 },
2186+
async ({ redisContainer }) => {
2187+
const buffer = new MollifierBuffer({
2188+
redisOptions: {
2189+
host: redisContainer.getHost(),
2190+
port: redisContainer.getPort(),
2191+
password: redisContainer.getPassword(),
2192+
},
2193+
logger: new Logger("test", "log"),
2194+
});
2195+
try {
2196+
await buffer.claimIdempotency({ ...claimInput, token: "owner", ttlSeconds: 30 });
2197+
2198+
// Pretend a stale claimant fires a release with their old token.
2199+
await buffer.releaseClaim({ ...claimInput, token: "stale-impostor" });
2200+
2201+
// The owner's claim survives.
2202+
const stillThere = await buffer.readClaim(claimInput);
2203+
expect(stillThere?.kind).toBe("pending");
2204+
2205+
// The owner can still release.
2206+
await buffer.releaseClaim({ ...claimInput, token: "owner" });
2207+
expect(await buffer.readClaim(claimInput)).toBeNull();
2208+
} finally {
2209+
await buffer.close();
2210+
}
2211+
},
2212+
);
2213+
2214+
redisTest(
2215+
"publishClaim with the wrong token is a no-op and returns false",
2216+
{ timeout: 20_000 },
2217+
async ({ redisContainer }) => {
2218+
const buffer = new MollifierBuffer({
2219+
redisOptions: {
2220+
host: redisContainer.getHost(),
2221+
port: redisContainer.getPort(),
2222+
password: redisContainer.getPassword(),
2223+
},
2224+
logger: new Logger("test", "log"),
2225+
});
2226+
try {
2227+
await buffer.claimIdempotency({ ...claimInput, token: "owner", ttlSeconds: 30 });
2228+
2229+
const wrongTokenPublish = await buffer.publishClaim({
2230+
...claimInput,
2231+
token: "stale-impostor",
2232+
runId: "imposter-run",
2233+
ttlSeconds: 60,
2234+
});
2235+
expect(wrongTokenPublish).toBe(false);
2236+
2237+
// Claim slot unchanged.
2238+
const stillPending = await buffer.readClaim(claimInput);
2239+
expect(stillPending?.kind).toBe("pending");
2240+
2241+
const goodPublish = await buffer.publishClaim({
2242+
...claimInput,
2243+
token: "owner",
2244+
runId: "real-run",
2245+
ttlSeconds: 60,
2246+
});
2247+
expect(goodPublish).toBe(true);
2248+
2249+
const resolved = await buffer.readClaim(claimInput);
2250+
expect(resolved).toEqual({ kind: "resolved", runId: "real-run" });
2251+
} finally {
2252+
await buffer.close();
2253+
}
2254+
},
2255+
);
2256+
2257+
redisTest(
2258+
"regression: stale release after TTL expiry does NOT erase a fresh claim",
2259+
{ timeout: 20_000 },
2260+
async ({ redisContainer }) => {
2261+
// Hazard from CodeRabbit r3290070707:
2262+
// 1. Claimant A SETNXs the slot with their token, then stalls.
2263+
// 2. TTL expires, slot vanishes.
2264+
// 3. Claimant B SETNXs the slot with a DIFFERENT token.
2265+
// 4. Claimant A finally finishes (or errors) and calls
2266+
// releaseClaim with their original token.
2267+
// Without compare-and-delete, A's release would wipe B's slot and
2268+
// any concurrent customer of B's idempotency key would see "no
2269+
// claim" and re-issue, breaking same-key dedup.
2270+
const buffer = new MollifierBuffer({
2271+
redisOptions: {
2272+
host: redisContainer.getHost(),
2273+
port: redisContainer.getPort(),
2274+
password: redisContainer.getPassword(),
2275+
},
2276+
logger: new Logger("test", "log"),
2277+
});
2278+
try {
2279+
// Step 1: A claims with token "A".
2280+
const a = await buffer.claimIdempotency({
2281+
...claimInput,
2282+
token: "A",
2283+
ttlSeconds: 1, // short TTL to simulate expiry quickly
2284+
});
2285+
expect(a.kind).toBe("claimed");
2286+
2287+
// Step 2: simulate TTL expiry — DEL the slot directly so the
2288+
// test doesn't rely on wall-clock sleeping.
2289+
await buffer["redis"].del(`mollifier:claim:${[claimInput.envId, claimInput.taskIdentifier, claimInput.idempotencyKey]
2290+
.map((s) => Buffer.from(s, "utf8").toString("base64url"))
2291+
.join(":")}`);
2292+
2293+
// Step 3: B claims with token "B".
2294+
const b = await buffer.claimIdempotency({
2295+
...claimInput,
2296+
token: "B",
2297+
ttlSeconds: 30,
2298+
});
2299+
expect(b.kind).toBe("claimed");
2300+
2301+
// Step 4: A's late release. MUST be a no-op.
2302+
await buffer.releaseClaim({ ...claimInput, token: "A" });
2303+
2304+
// B's claim survives intact.
2305+
const after = await buffer.readClaim(claimInput);
2306+
expect(after?.kind).toBe("pending");
2307+
2308+
// B can still publish.
2309+
const published = await buffer.publishClaim({
2310+
...claimInput,
2311+
token: "B",
2312+
runId: "B-run",
2313+
ttlSeconds: 60,
2314+
});
2315+
expect(published).toBe(true);
2316+
} finally {
2317+
await buffer.close();
2318+
}
2319+
},
2320+
);
2321+
});

0 commit comments

Comments
 (0)