Skip to content

Commit deaa29b

Browse files
matt-aitkenclaude
andcommitted
fix(webapp): treat Phase 2 batch-stream retries as idempotent (TRI-9944)
When the SDK created a batch and then streamed its items (Phase 2 of the 2-phase batch API), a lost response would trigger the SDK's network-retry path. For small, fast-completing batches the original request had already enqueued every item, sealed the batch, and the runs flipped the batch to PROCESSING or COMPLETED by the time the retry arrived. The retry then failed the pre-loop check at streamBatchItems.server.ts:109 with a 422 — surfacing a customer-visible BatchTriggerError for a batch whose runs had actually succeeded. StreamBatchItemsService.call now returns the standard sealed:true success response (itemsAccepted: 0, itemsDeduplicated: 0, runCount: batch.runCount) when the batch is already sealed or in PROCESSING/COMPLETED, matching the idempotency already applied at the two post-loop race-condition branches in the same file (lines 226 and 306). ABORTED and other unexpected non-PENDING states still throw. Tests: - Rewrote the existing "already sealed" race test from expecting a throw to expecting sealed:true (Phase 2 retry idempotency). - Added a COMPLETED-pre-loop test mirroring the exact customer scenario (single-item batch, status=COMPLETED, sealed=false — tryCompleteBatch sets status without setting sealed). - Added a negative ABORTED test to lock in that terminal-failure states still surface as errors. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 5083d16 commit deaa29b

3 files changed

Lines changed: 183 additions & 26 deletions

File tree

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Treat Phase 2 batch-stream retries as idempotent when the batch has
7+
already been sealed or moved past `PENDING` (TRI-9944).
8+
9+
When the SDK created a batch and then streamed its items (Phase 2 of
10+
the 2-phase batch API), a lost response would trigger the SDK's
11+
network-retry path. For small, fast-completing batches the original
12+
request had already enqueued every item, sealed the batch, and let the
13+
runs flip the batch to `PROCESSING` or even `COMPLETED` by the time the
14+
retry arrived. The retry then failed the pre-loop check at
15+
`apps/webapp/app/runEngine/services/streamBatchItems.server.ts:109`
16+
with a 422 — surfacing a customer-visible `BatchTriggerError` for a
17+
batch whose runs had actually succeeded.
18+
19+
`StreamBatchItemsService.call` now returns the standard `sealed: true`
20+
success response (with `itemsAccepted: 0`, `itemsDeduplicated: 0`,
21+
`runCount: batch.runCount`) when the batch is already sealed or in
22+
`PROCESSING`/`COMPLETED`, matching the idempotency already applied at
23+
the two post-loop race-condition branches in the same file.
24+
`ABORTED` and other unexpected non-`PENDING` states still throw.

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,29 @@ export class StreamBatchItemsService extends WithRunEngine {
100100
throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`);
101101
}
102102

103-
if (batch.sealed) {
104-
throw new ServiceValidationError(
105-
`Batch ${batchFriendlyId} is already sealed and cannot accept more items`
106-
);
103+
// Phase 2 retry idempotency (TRI-9944): if the batch is already sealed
104+
// or has moved past PENDING into PROCESSING/COMPLETED, this is a retry
105+
// of a request whose response was lost — the original successful request
106+
// already enqueued every item and sealed the batch. Returning sealed:true
107+
// makes the SDK stop retrying instead of throwing a customer-visible 422.
108+
if (batch.sealed || batch.status === "PROCESSING" || batch.status === "COMPLETED") {
109+
logger.info("Batch already sealed/completed - treating Phase 2 retry as success", {
110+
batchId: batchFriendlyId,
111+
batchSealed: batch.sealed,
112+
batchStatus: batch.status,
113+
});
114+
115+
return {
116+
id: batchFriendlyId,
117+
itemsAccepted: 0,
118+
itemsDeduplicated: 0,
119+
sealed: true,
120+
runCount: batch.runCount,
121+
};
107122
}
108123

109124
if (batch.status !== "PENDING") {
125+
// ABORTED or any other unexpected non-PENDING state — surface as an error.
110126
throw new ServiceValidationError(
111127
`Batch ${batchFriendlyId} is not in PENDING status (current: ${batch.status})`
112128
);

apps/webapp/test/engine/streamBatchItems.test.ts

Lines changed: 139 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ describe("StreamBatchItemsService", () => {
174174
);
175175

176176
containerTest(
177-
"should handle race condition when batch already sealed by another request",
177+
"should return sealed=true when batch is already sealed and PROCESSING (Phase 2 retry idempotency)",
178178
async ({ prisma, redisOptions }) => {
179179
const engine = new RunEngine({
180180
prisma,
@@ -211,43 +211,160 @@ describe("StreamBatchItemsService", () => {
211211

212212
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
213213

214-
// Create a batch that is already sealed and PROCESSING (simulating another request won the race)
214+
// Simulate the SDK retrying Phase 2 after the original request succeeded:
215+
// the original request already sealed the batch and moved it to PROCESSING.
215216
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
216217
runCount: 2,
217218
status: "PROCESSING",
218219
sealed: true,
219220
});
220221

221-
// Initialize the batch in Redis with full count
222-
await engine.initializeBatch({
223-
batchId: batch.id,
224-
friendlyId: batch.friendlyId,
225-
environmentId: authenticatedEnvironment.id,
226-
environmentType: authenticatedEnvironment.type,
227-
organizationId: authenticatedEnvironment.organizationId,
228-
projectId: authenticatedEnvironment.projectId,
229-
runCount: 2,
230-
processingConcurrency: 10,
222+
const service = new StreamBatchItemsService({
223+
prisma,
224+
engine,
231225
});
232226

233-
// Enqueue items directly
234-
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, {
235-
task: "test-task",
236-
payload: JSON.stringify({ data: "item1" }),
237-
payloadType: "application/json",
227+
const result = await service.call(
228+
authenticatedEnvironment,
229+
batch.friendlyId,
230+
itemsToAsyncIterable([]),
231+
{
232+
maxItemBytes: 1024 * 1024,
233+
}
234+
);
235+
236+
// The retry should be treated as success — the original request already
237+
// enqueued every item, so the SDK should stop retrying.
238+
expect(result.sealed).toBe(true);
239+
expect(result.id).toBe(batch.friendlyId);
240+
expect(result.itemsAccepted).toBe(0);
241+
expect(result.itemsDeduplicated).toBe(0);
242+
expect(result.runCount).toBe(2);
243+
244+
await engine.quit();
245+
}
246+
);
247+
248+
containerTest(
249+
"should return sealed=true when batch is COMPLETED before Phase 2 retry arrives (TRI-9944)",
250+
async ({ prisma, redisOptions }) => {
251+
const engine = new RunEngine({
252+
prisma,
253+
worker: {
254+
redis: redisOptions,
255+
workers: 1,
256+
tasksPerWorker: 10,
257+
pollIntervalMs: 100,
258+
disabled: true,
259+
},
260+
queue: {
261+
redis: redisOptions,
262+
},
263+
runLock: {
264+
redis: redisOptions,
265+
},
266+
machines: {
267+
defaultMachine: "small-1x",
268+
machines: {
269+
"small-1x": {
270+
name: "small-1x" as const,
271+
cpu: 0.5,
272+
memory: 0.5,
273+
centsPerMs: 0.0001,
274+
},
275+
},
276+
baseCostInCents: 0.0005,
277+
},
278+
batchQueue: {
279+
redis: redisOptions,
280+
},
281+
tracer: trace.getTracer("test", "0.0.0"),
238282
});
239-
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, {
240-
task: "test-task",
241-
payload: JSON.stringify({ data: "item2" }),
242-
payloadType: "application/json",
283+
284+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
285+
286+
// The customer-reported scenario: single-item batch where the original
287+
// Phase 2 request succeeded server-side, the run executed fast, the batch
288+
// flipped to COMPLETED, then the lost-response SDK retry hits us.
289+
// Note: tryCompleteBatch sets status=COMPLETED but does NOT set sealed=true.
290+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
291+
runCount: 1,
292+
status: "COMPLETED",
293+
sealed: false,
294+
});
295+
296+
const service = new StreamBatchItemsService({
297+
prisma,
298+
engine,
299+
});
300+
301+
const result = await service.call(
302+
authenticatedEnvironment,
303+
batch.friendlyId,
304+
itemsToAsyncIterable([]),
305+
{
306+
maxItemBytes: 1024 * 1024,
307+
}
308+
);
309+
310+
expect(result.sealed).toBe(true);
311+
expect(result.id).toBe(batch.friendlyId);
312+
expect(result.itemsAccepted).toBe(0);
313+
expect(result.itemsDeduplicated).toBe(0);
314+
315+
await engine.quit();
316+
}
317+
);
318+
319+
containerTest(
320+
"should throw when batch is in ABORTED status",
321+
async ({ prisma, redisOptions }) => {
322+
const engine = new RunEngine({
323+
prisma,
324+
worker: {
325+
redis: redisOptions,
326+
workers: 1,
327+
tasksPerWorker: 10,
328+
pollIntervalMs: 100,
329+
disabled: true,
330+
},
331+
queue: {
332+
redis: redisOptions,
333+
},
334+
runLock: {
335+
redis: redisOptions,
336+
},
337+
machines: {
338+
defaultMachine: "small-1x",
339+
machines: {
340+
"small-1x": {
341+
name: "small-1x" as const,
342+
cpu: 0.5,
343+
memory: 0.5,
344+
centsPerMs: 0.0001,
345+
},
346+
},
347+
baseCostInCents: 0.0005,
348+
},
349+
batchQueue: {
350+
redis: redisOptions,
351+
},
352+
tracer: trace.getTracer("test", "0.0.0"),
353+
});
354+
355+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
356+
357+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
358+
runCount: 2,
359+
status: "ABORTED",
360+
sealed: false,
243361
});
244362

245363
const service = new StreamBatchItemsService({
246364
prisma,
247365
engine,
248366
});
249367

250-
// This should fail because the batch is already sealed
251368
await expect(
252369
service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), {
253370
maxItemBytes: 1024 * 1024,

0 commit comments

Comments
 (0)