Skip to content

Commit 5709781

Browse files
committed
address PR comments
1 parent 03ca70d commit 5709781

File tree

3 files changed

+31
-39
lines changed

3 files changed

+31
-39
lines changed

packages/batch/src/SqsFifoPartialProcessor.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,11 +142,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
142142
const remainingRecords = this.records.slice(firstFailureIndex);
143143

144144
for (const record of remainingRecords) {
145-
const result = this.#processFailRecord(
146-
record,
147-
new SqsFifoShortCircuitError()
148-
);
149-
processedRecords.push(result);
145+
this.#processFailRecord(record, new SqsFifoShortCircuitError());
150146
}
151147

152148
this.clean();

packages/batch/src/SqsFifoPartialProcessorAsync.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
141141
const remainingRecords = this.records.slice(firstFailureIndex);
142142

143143
for (const record of remainingRecords) {
144-
const result = this.#processFailRecord(
145-
record,
146-
new SqsFifoShortCircuitError()
147-
);
148-
processedRecords.push(result);
144+
this.#processFailRecord(record, new SqsFifoShortCircuitError());
149145
}
150146

151147
this.clean();

packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
import type {
99
BatchProcessingOptions,
1010
FailureResponse,
11+
PartialItemFailureResponse,
1112
SuccessResponse,
1213
} from '../../../src/types.js';
1314
import { sqsRecordFactory } from '../../helpers/factories.js';
@@ -189,45 +190,46 @@ describe('SQS FIFO Processors concurrent invocation isolation', () => {
189190
{
190191
description: 'without InvokeStore',
191192
useInvokeStore: false,
192-
expectedLengthA: 1,
193-
expectedLengthB: 1,
194-
expectedBodyA: 'body-B',
195-
expectedBodyB: 'body-B',
193+
expectedLengthA: 3,
194+
expectedLengthASync: 2,
195+
expectedLengthB: 3,
196+
expectedLengthBSync: 2,
196197
},
197198
{
198199
description: 'with InvokeStore',
199200
useInvokeStore: true,
200-
expectedLengthA: 2,
201-
expectedLengthB: 1,
202-
expectedBodyA: 'fail',
203-
expectedBodyB: 'body-B',
201+
expectedLengthA: 0,
202+
expectedLengthASync: 0,
203+
expectedLengthB: 2,
204+
expectedLengthBSync: 2,
204205
},
205206
])(
206207
'tracks failures and short-circuits independently per invocation $description',
207208
async ({
208209
useInvokeStore,
209210
expectedLengthA,
211+
expectedLengthASync,
210212
expectedLengthB,
211-
expectedBodyA,
212-
expectedBodyB,
213+
expectedLengthBSync,
213214
}) => {
214215
// Prepare
215216
const processor = new processorClass();
216-
const recordsA = [
217+
const recordsA = [sqsRecordFactory('body-A-2', '1')];
218+
const recordsB = [
217219
sqsRecordFactory('fail', '1'),
218-
sqsRecordFactory('body-A-2', '1'),
220+
sqsRecordFactory('body-B', '1'),
219221
];
220-
const recordsB = [sqsRecordFactory('body-B', '1')];
221-
const handlerA = vi.fn((record: SQSRecord) => {
222+
223+
const handlerA = vi.fn((record: SQSRecord) => record.body);
224+
const handlerB = vi.fn((record: SQSRecord) => {
222225
if (record.body === 'fail') throw new Error('Processing failed');
223226
return record.body;
224227
});
225-
const handlerB = vi.fn((record: SQSRecord) => record.body);
226228

227229
// Act
228230
const [resultAPromise, resultBPromise] = await sequence<
229-
Promise<ProcessResult>,
230-
Promise<ProcessResult>
231+
Promise<PartialItemFailureResponse>,
232+
Promise<PartialItemFailureResponse>
231233
>(
232234
{
233235
sideEffects: [
@@ -239,10 +241,8 @@ describe('SQS FIFO Processors concurrent invocation isolation', () => {
239241
() => {}, // Wait for inv2 to register
240242
],
241243
return: async () => {
242-
const processed = isAsync
243-
? await processor.process()
244-
: processor.processSync();
245-
return processed.map(tupleToObject);
244+
isAsync ? await processor.process() : processor.processSync();
245+
return processor.response();
246246
},
247247
},
248248
{
@@ -255,10 +255,8 @@ describe('SQS FIFO Processors concurrent invocation isolation', () => {
255255
},
256256
],
257257
return: async () => {
258-
const processed = isAsync
259-
? await processor.process()
260-
: processor.processSync();
261-
return processed.map(tupleToObject);
258+
isAsync ? await processor.process() : processor.processSync();
259+
return processor.response();
262260
},
263261
},
264262
{ useInvokeStore }
@@ -267,10 +265,12 @@ describe('SQS FIFO Processors concurrent invocation isolation', () => {
267265
// Assess
268266
const resultA = await resultAPromise;
269267
const resultB = await resultBPromise;
270-
expect(resultA).toHaveLength(expectedLengthA);
271-
expect(resultB).toHaveLength(expectedLengthB);
272-
expect(resultA[0].record.body).toBe(expectedBodyA);
273-
expect(resultB[0].record.body).toBe(expectedBodyB);
268+
expect(resultA.batchItemFailures).toHaveLength(
269+
isAsync ? expectedLengthA : expectedLengthASync
270+
);
271+
expect(resultB.batchItemFailures).toHaveLength(
272+
isAsync ? expectedLengthB : expectedLengthBSync
273+
);
274274
}
275275
);
276276

0 commit comments

Comments
 (0)