Skip to content

Commit decea6b

Browse files
d-csclaude
andcommitted
fix(redis-worker): self-heal stale idempotency lookups + default createdAtMicros
Address AI review findings on PR #3752: - accept(): if an idempotency lookup survives its entry hash being evicted (maxmemory), the lookup is stale — rebind to the new run instead of returning a dead existingRunId that blocks the key forever. Mirrors the self-heal lookupIdempotency already does. (CodeRabbit) - lookupIdempotency(): clear a stale lookup with a compare-and-delete (delMollifierKeyIfEquals Lua) so a concurrent accept that rebinds the key between our GET and DEL isn't clobbered. (CodeRabbit) - schemas: default createdAtMicros to "0" so an entry written before the field existed (or surviving across the deploy that added it) still parses on pop instead of being silently dropped. (Devin) - rename the requeue-ordering test to "retry priority" — RPUSH-to-tail pops the requeued entry ahead of newer items; that's deliberate retry priority, not FIFO relative to the rest of the queue. (CodeRabbit nit) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 905becf commit decea6b

3 files changed

Lines changed: 138 additions & 12 deletions

File tree

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,24 @@ describe("schemas", () => {
3030
expect(parsed.createdAtMicros).toBe(1747044000000000);
3131
});
3232

33+
it("BufferEntrySchema defaults createdAtMicros for entries written before the field existed", () => {
34+
// Backward compat: an entry written by an accept Lua predating
35+
// createdAtMicros (only the original 7 fields) must still parse on
36+
// pop rather than being silently dropped.
37+
const raw = {
38+
runId: "run_old",
39+
envId: "env_1",
40+
orgId: "org_1",
41+
payload: serialiseSnapshot({}),
42+
status: "QUEUED",
43+
attempts: "0",
44+
createdAt: "2026-05-11T10:00:00.000Z",
45+
// no createdAtMicros
46+
};
47+
const parsed = BufferEntrySchema.parse(raw);
48+
expect(parsed.createdAtMicros).toBe(0);
49+
});
50+
3351
it("BufferEntrySchema parses a FAILED entry with lastError", () => {
3452
const raw = {
3553
runId: "run_abc",
@@ -560,13 +578,15 @@ describe("MollifierBuffer.requeue on missing entry", () => {
560578

561579
describe("MollifierBuffer.requeue ordering", () => {
562580
redisTest(
563-
"requeued entry pops next (RPUSH to the RPOP/tail end), preserving FIFO",
581+
"requeued entry gets retry priority (RPUSH to the RPOP/tail end), popping ahead of newer items",
564582
{ timeout: 20_000 },
565583
async ({ redisContainer }) => {
566-
// LIST FIFO: accept LPUSHes at the head, pop RPOPs from the tail, so
567-
// the first-accepted entry pops first. requeue RPUSHes back to the
568-
// tail, so a transiently failed entry pops next rather than going to
569-
// the back. `maxAttempts` in the drainer bounds the retry loop for a
584+
// LIST: accept LPUSHes at the head, pop RPOPs from the tail, so the
585+
// first-accepted entry pops first. requeue RPUSHes back to the tail,
586+
// giving a transiently failed entry *retry priority* — it pops next,
587+
// ahead of newer queued items, rather than going to the back. (This
588+
// is deliberately not FIFO relative to the rest of the queue.)
589+
// `maxAttempts` in the drainer bounds the retry loop for a
570590
// persistently failing entry (after which it goes to `fail`, not requeue).
571591
const buffer = new MollifierBuffer({
572592
redisOptions: {
@@ -1486,6 +1506,78 @@ describe("MollifierBuffer idempotency lookup", () => {
14861506
}
14871507
},
14881508
);
1509+
1510+
redisTest(
1511+
"accept self-heals a stale lookup: a new run rebinds when the bound entry was evicted",
1512+
{ timeout: 20_000 },
1513+
async ({ redisContainer }) => {
1514+
// If an entry hash is evicted (maxmemory) but its idempotency lookup
1515+
// survives, a fresh accept with the same key must NOT return the dead
1516+
// runId (which would block the key forever) — it should rebind to the
1517+
// new run and accept it.
1518+
const buffer = new MollifierBuffer({
1519+
redisOptions: {
1520+
host: redisContainer.getHost(),
1521+
port: redisContainer.getPort(),
1522+
password: redisContainer.getPassword(),
1523+
},
1524+
logger: new Logger("test", "log"),
1525+
});
1526+
const idem = { idempotencyKey: "kheal", taskIdentifier: "t" };
1527+
try {
1528+
await buffer.accept({ runId: "heal_old", envId: "env_h", orgId: "org_1", payload: "{}", ...idem });
1529+
// Simulate eviction of the entry hash while the lookup survives.
1530+
await buffer["redis"].del("mollifier:entries:heal_old");
1531+
const lookupKey = idempotencyLookupKeyFor({ envId: "env_h", ...idem });
1532+
expect(await buffer["redis"].get(lookupKey)).toBe("heal_old");
1533+
1534+
// A fresh accept with the same key rebinds rather than deduping
1535+
// onto the dead run.
1536+
const result = await buffer.accept({
1537+
runId: "heal_new",
1538+
envId: "env_h",
1539+
orgId: "org_1",
1540+
payload: "{}",
1541+
...idem,
1542+
});
1543+
expect(result).toEqual({ kind: "accepted" });
1544+
expect(await buffer["redis"].get(lookupKey)).toBe("heal_new");
1545+
} finally {
1546+
await buffer.close();
1547+
}
1548+
},
1549+
);
1550+
1551+
redisTest(
1552+
"accept still dedups when the bound entry is live",
1553+
{ timeout: 20_000 },
1554+
async ({ redisContainer }) => {
1555+
// The self-heal must not weaken normal dedup: a live bound entry
1556+
// still wins, and the loser gets its runId back.
1557+
const buffer = new MollifierBuffer({
1558+
redisOptions: {
1559+
host: redisContainer.getHost(),
1560+
port: redisContainer.getPort(),
1561+
password: redisContainer.getPassword(),
1562+
},
1563+
logger: new Logger("test", "log"),
1564+
});
1565+
const idem = { idempotencyKey: "klive", taskIdentifier: "t" };
1566+
try {
1567+
await buffer.accept({ runId: "live_win", envId: "env_h", orgId: "org_1", payload: "{}", ...idem });
1568+
const result = await buffer.accept({
1569+
runId: "live_lose",
1570+
envId: "env_h",
1571+
orgId: "org_1",
1572+
payload: "{}",
1573+
...idem,
1574+
});
1575+
expect(result).toEqual({ kind: "duplicate_idempotency", existingRunId: "live_win" });
1576+
} finally {
1577+
await buffer.close();
1578+
}
1579+
},
1580+
);
14891581
});
14901582

14911583
describe("MollifierBuffer.casSetMetadata", () => {

packages/redis-worker/src/mollifier/buffer.ts

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ export class MollifierBuffer {
160160
String(createdAtMicros),
161161
"mollifier:org-envs:",
162162
idempotencyLookupKey,
163+
"mollifier:entries:",
163164
);
164165
// Lua returns 1 (accepted), 0 (duplicate runId), or a string runId
165166
// (duplicate idempotency — value is the existing winner's runId).
@@ -391,15 +392,17 @@ export class MollifierBuffer {
391392
// Resolve a buffered run by (env, task, idempotencyKey) tuple. Used by
392393
// `IdempotencyKeyConcern.handleTriggerRequest` after the PG check
393394
// misses — same key may belong to a buffered run waiting to drain. The
394-
// lookup self-heals: if the lookup points at an entry hash that's
395-
// expired, we DEL the lookup and report a miss.
395+
// lookup self-heals: if the lookup points at an entry hash that's gone,
396+
// we clear the lookup and report a miss. The clear is a compare-and-
397+
// delete (only if the key still holds the stale runId we observed) so a
398+
// fresh accept that rebinds the key between our GET and DEL isn't wiped.
396399
async lookupIdempotency(input: IdempotencyLookupInput): Promise<string | null> {
397400
const lookupKey = idempotencyLookupKeyFor(input);
398401
const runId = await this.redis.get(lookupKey);
399402
if (!runId) return null;
400403
const entry = await this.getEntry(runId);
401404
if (!entry) {
402-
await this.redis.del(lookupKey);
405+
await this.redis.delMollifierKeyIfEquals(lookupKey, runId);
403406
return null;
404407
}
405408
return runId;
@@ -502,6 +505,7 @@ export class MollifierBuffer {
502505
local createdAtMicros = ARGV[6]
503506
local orgEnvsPrefix = ARGV[7]
504507
local idempotencyLookupKey = ARGV[8] or ''
508+
local entryPrefix = ARGV[9]
505509
506510
-- Idempotent: refuse if an entry for this runId already exists in any
507511
-- state. Caller-side dedup is also enforced via API idempotency keys,
@@ -519,7 +523,14 @@ export class MollifierBuffer {
519523
if idempotencyLookupKey ~= '' then
520524
local existing = redis.call('GET', idempotencyLookupKey)
521525
if existing then
522-
return existing
526+
-- Self-heal: only honour the binding if its entry hash still
527+
-- exists. If the entry was evicted (maxmemory) but the lookup
528+
-- survived, the binding is stale — fall through and rebind to
529+
-- this run rather than returning a dead runId that would block
530+
-- the key indefinitely. Mirrors lookupIdempotency's self-heal.
531+
if redis.call('EXISTS', entryPrefix .. existing) == 1 then
532+
return existing
533+
end
523534
end
524535
redis.call('SET', idempotencyLookupKey, runId)
525536
end
@@ -935,6 +946,20 @@ export class MollifierBuffer {
935946
`,
936947
});
937948

949+
// Compare-and-delete: DEL the key only if it still holds the expected
950+
// value. Used by lookupIdempotency's stale-lookup self-heal so a
951+
// concurrent accept that rebinds the key between the reader's GET and
952+
// this DEL isn't clobbered.
953+
this.redis.defineCommand("delMollifierKeyIfEquals", {
954+
numberOfKeys: 1,
955+
lua: `
956+
if redis.call('GET', KEYS[1]) == ARGV[1] then
957+
return redis.call('DEL', KEYS[1])
958+
end
959+
return 0
960+
`,
961+
});
962+
938963
this.redis.defineCommand("mollifierEvaluateTrip", {
939964
numberOfKeys: 2,
940965
lua: `
@@ -974,6 +999,7 @@ declare module "@internal/redis" {
974999
createdAtMicros: string,
9751000
orgEnvsPrefix: string,
9761001
idempotencyLookupKey: string,
1002+
entryPrefix: string,
9771003
callback?: Callback<number | string>,
9781004
): Result<number | string, Context>;
9791005
popAndMarkDraining(
@@ -1039,6 +1065,11 @@ declare module "@internal/redis" {
10391065
errorPayload: string,
10401066
callback?: Callback<number>,
10411067
): Result<number, Context>;
1068+
delMollifierKeyIfEquals(
1069+
key: string,
1070+
expected: string,
1071+
callback?: Callback<number>,
1072+
): Result<number, Context>;
10421073
mollifierEvaluateTrip(
10431074
rateKey: string,
10441075
trippedKey: string,

packages/redis-worker/src/mollifier/schemas.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@ export const BufferEntrySchema = z.object({
4848
status: BufferEntryStatus,
4949
attempts: stringToInt,
5050
createdAt: stringToDate,
51-
// Microsecond epoch matching the ZSET queue score. Stable across
52-
// requeues — the score never moves once set at accept time.
53-
createdAtMicros: stringToInt,
51+
// Microsecond epoch of accept time, kept as a hash field for dwell
52+
// metrics. Not a queue sort key (the queue is a FIFO LIST). Defaulted
53+
// so an entry written by an accept Lua predating this field — or one
54+
// surviving across the deploy that introduced it — still parses instead
55+
// of being silently dropped on pop.
56+
createdAtMicros: stringToInt.default("0"),
5457
// Drainer-ack flag: `true` once the drainer has materialised this run
5558
// into PG. The hash persists for a short grace TTL after ack so direct
5659
// reads (retrieve, trace, etc.) still resolve while PG replica lag

0 commit comments

Comments
 (0)