From 03ca70da014e747168504c498ea2a27fe0dc1bcd Mon Sep 17 00:00:00 2001 From: svozza Date: Wed, 29 Oct 2025 16:54:14 +0000 Subject: [PATCH 1/2] feat(batch): use async local storage for batch --- package-lock.json | 10 + packages/batch/package.json | 1 + .../batch/src/BasePartialBatchProcessor.ts | 11 +- packages/batch/src/BasePartialProcessor.ts | 85 ++-- packages/batch/src/BatchProcessingStore.ts | 159 ++++++++ packages/batch/src/SqsFifoPartialProcessor.ts | 6 +- .../batch/src/SqsFifoPartialProcessorAsync.ts | 6 +- packages/batch/src/SqsFifoProcessor.ts | 33 +- packages/batch/src/SqsFifoProcessorStore.ts | 74 ++++ .../concurrency/BatchProcessingStore.test.ts | 221 ++++++++++ .../unit/concurrency/BatchProcessor.test.ts | 260 ++++++++++++ .../SqsFifoPartialProcessor.test.ts | 379 ++++++++++++++++++ .../concurrency/SqsFifoProcessorStore.test.ts | 155 +++++++ packages/testing/src/helpers.ts | 19 +- 14 files changed, 1350 insertions(+), 69 deletions(-) create mode 100644 packages/batch/src/BatchProcessingStore.ts create mode 100644 packages/batch/src/SqsFifoProcessorStore.ts create mode 100644 packages/batch/tests/unit/concurrency/BatchProcessingStore.test.ts create mode 100644 packages/batch/tests/unit/concurrency/BatchProcessor.test.ts create mode 100644 packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts create mode 100644 packages/batch/tests/unit/concurrency/SqsFifoProcessorStore.test.ts diff --git a/package-lock.json b/package-lock.json index 2f504a112a..df568181f1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10674,6 +10674,7 @@ "license": "MIT-0", "dependencies": { "@aws-lambda-powertools/commons": "2.28.1", + "@aws/lambda-invoke-store": "0.1.1", "@standard-schema/spec": "^1.0.0" }, "devDependencies": { @@ -10682,6 +10683,15 @@ "zod": "^4.1.12" } }, + "packages/batch/node_modules/@aws/lambda-invoke-store": { + "version": "0.1.1", + "resolved": "https://registry.npmjs.org/@aws/lambda-invoke-store/-/lambda-invoke-store-0.1.1.tgz", + "integrity": "sha512-RcLam17LdlbSOSp9VxmUu1eI6Mwxp+OwhD2QhiSNmNCzoDb0EeUXTD2n/WbcnrAYMGlmf05th6QYq23VqvJqpA==", + "license": "Apache-2.0", + "engines": { + "node": ">=18.0.0" + } + }, "packages/commons": { "name": "@aws-lambda-powertools/commons", "version": "2.28.1", diff --git a/packages/batch/package.json b/packages/batch/package.json index 070a9d297d..743fc81122 100644 --- a/packages/batch/package.json +++ b/packages/batch/package.json @@ -83,6 +83,7 @@ ], "dependencies": { "@aws-lambda-powertools/commons": "2.28.1", + "@aws/lambda-invoke-store": "0.1.1", "@standard-schema/spec": "^1.0.0" }, "devDependencies": { diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index 21806be424..296fdc362f 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -35,11 +35,6 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { */ public COLLECTOR_MAPPING; - /** - * Response to be returned after processing - */ - public batchResponse: PartialItemFailureResponse; - /** * Type of event that the processor is handling */ @@ -200,9 +195,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { * Set up the processor with the initial state ready for processing */ public prepare(): void { - this.successMessages.length = 0; - this.failureMessages.length = 0; - this.errors.length = 0; + this.successMessages = []; + this.failureMessages = []; + this.errors = []; this.batchResponse = DEFAULT_RESPONSE; } diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index da09f22557..4bef6a1038 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -1,3 +1,4 @@ +import { BatchProcessingStore } from './BatchProcessingStore.js'; import type { BaseRecord, BatchProcessingOptions, @@ -21,42 +22,64 @@ import type { */ abstract class BasePartialProcessor { /** - * List of errors that occurred during processing + * Store for managing invocation-specific state */ - public errors: Error[]; + readonly #store = new BatchProcessingStore(); - /** - * List of records that failed processing - */ - public failureMessages: EventSourceDataClassTypes[]; + public get errors(): Error[] { + return this.#store.getErrors(); + } - /** - * Record handler provided by customers to process records - */ - public handler: CallableFunction; + protected set errors(errors: Error[]) { + this.#store.setErrors(errors); + } - /** - * Options to be used during processing (optional) - */ - public options?: BatchProcessingOptions; + public get failureMessages(): EventSourceDataClassTypes[] { + return this.#store.getFailureMessages(); + } - /** - * List of records to be processed - */ - public records: BaseRecord[]; + protected set failureMessages(messages: EventSourceDataClassTypes[]) { + this.#store.setFailureMessages(messages); + } - /** - * List of records that were processed successfully - */ - public successMessages: EventSourceDataClassTypes[]; - - public constructor() { - this.successMessages = []; - this.failureMessages = []; - this.errors = []; - this.records = []; - // No-op function to avoid null checks, will be overridden by customer when using the class - this.handler = new Function(); + public get handler(): CallableFunction { + return this.#store.getHandler(); + } + + protected set handler(handler: CallableFunction) { + this.#store.setHandler(handler); + } + + public get options(): BatchProcessingOptions | undefined { + return this.#store.getOptions(); + } + + protected set options(options: BatchProcessingOptions | undefined) { + this.#store.setOptions(options); + } + + public get records(): BaseRecord[] { + return this.#store.getRecords(); + } + + protected set records(records: BaseRecord[]) { + this.#store.setRecords(records); + } + + public get successMessages(): EventSourceDataClassTypes[] { + return this.#store.getSuccessMessages(); + } + + protected set successMessages(messages: EventSourceDataClassTypes[]) { + this.#store.setSuccessMessages(messages); + } + + protected get batchResponse() { + return this.#store.getBatchResponse(); + } + + protected set batchResponse(response) { + this.#store.setBatchResponse(response); } /** @@ -196,7 +219,7 @@ abstract class BasePartialProcessor { * * We use a separate method to do this rather than the constructor * to allow for reusing the processor instance across multiple invocations - * by instantiating the processor outside of the Lambda function handler. + * by instantiating the processor outside the Lambda function handler. * * @param records - Array of records to be processed * @param handler - CallableFunction to process each record from the batch diff --git a/packages/batch/src/BatchProcessingStore.ts b/packages/batch/src/BatchProcessingStore.ts new file mode 100644 index 0000000000..d63aa6c3c7 --- /dev/null +++ b/packages/batch/src/BatchProcessingStore.ts @@ -0,0 +1,159 @@ +import { InvokeStore } from '@aws/lambda-invoke-store'; +import type { + BaseRecord, + BatchProcessingOptions, + EventSourceDataClassTypes, + PartialItemFailureResponse, +} from './types.js'; + +/** + * Manages storage of batch processing state with automatic context detection. + * + * This class abstracts the storage mechanism for batch processing state, + * automatically choosing between InvokeStore (when in Lambda context) and + * fallback instance variables (when outside Lambda context). The decision is + * made at runtime on every method call to support Lambda's concurrent execution + * isolation. + */ +class BatchProcessingStore { + readonly #recordsKey = Symbol('powertools.batch.records'); + readonly #handlerKey = Symbol('powertools.batch.handler'); + readonly #optionsKey = Symbol('powertools.batch.options'); + readonly #failureMessagesKey = Symbol('powertools.batch.failureMessages'); + readonly #successMessagesKey = Symbol('powertools.batch.successMessages'); + readonly #batchResponseKey = Symbol('powertools.batch.batchResponse'); + readonly #errorsKey = Symbol('powertools.batch.errors'); + + #fallbackRecords: BaseRecord[] = []; + #fallbackHandler: CallableFunction = () => {}; + #fallbackOptions?: BatchProcessingOptions; + #fallbackFailureMessages: EventSourceDataClassTypes[] = []; + #fallbackSuccessMessages: EventSourceDataClassTypes[] = []; + #fallbackBatchResponse: PartialItemFailureResponse = { + batchItemFailures: [], + }; + #fallbackErrors: Error[] = []; + + public getRecords(): BaseRecord[] { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackRecords; + } + return (InvokeStore.get(this.#recordsKey) as BaseRecord[]) ?? []; + } + + public setRecords(records: BaseRecord[]): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackRecords = records; + return; + } + InvokeStore.set(this.#recordsKey, records); + } + + public getHandler(): CallableFunction { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackHandler; + } + return ( + (InvokeStore.get(this.#handlerKey) as CallableFunction) ?? (() => {}) + ); + } + + public setHandler(handler: CallableFunction): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackHandler = handler; + return; + } + InvokeStore.set(this.#handlerKey, handler); + } + + public getOptions(): BatchProcessingOptions | undefined { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackOptions; + } + return InvokeStore.get(this.#optionsKey) as + | BatchProcessingOptions + | undefined; + } + + public setOptions(options: BatchProcessingOptions | undefined): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackOptions = options; + return; + } + InvokeStore.set(this.#optionsKey, options); + } + + public getFailureMessages(): EventSourceDataClassTypes[] { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackFailureMessages; + } + return ( + (InvokeStore.get( + this.#failureMessagesKey + ) as EventSourceDataClassTypes[]) ?? [] + ); + } + + public setFailureMessages(messages: EventSourceDataClassTypes[]): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackFailureMessages = messages; + return; + } + InvokeStore.set(this.#failureMessagesKey, messages); + } + + public getSuccessMessages(): EventSourceDataClassTypes[] { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackSuccessMessages; + } + return ( + (InvokeStore.get( + this.#successMessagesKey + ) as EventSourceDataClassTypes[]) ?? [] + ); + } + + public setSuccessMessages(messages: EventSourceDataClassTypes[]): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackSuccessMessages = messages; + return; + } + InvokeStore.set(this.#successMessagesKey, messages); + } + + public getBatchResponse(): PartialItemFailureResponse { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackBatchResponse; + } + return ( + (InvokeStore.get( + this.#batchResponseKey + ) as PartialItemFailureResponse) ?? { batchItemFailures: [] } + ); + } + + public setBatchResponse(response: PartialItemFailureResponse): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackBatchResponse = response; + return; + } + InvokeStore.set(this.#batchResponseKey, response); + } + + public getErrors(): Error[] { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackErrors; + } + return (InvokeStore.get(this.#errorsKey) as Error[]) ?? []; + } + + public setErrors(errors: Error[]): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackErrors = errors; + return; + } + InvokeStore.set(this.#errorsKey, errors); + } +} + +export { BatchProcessingStore }; diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index d7752b83f8..8ac6253428 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -142,7 +142,11 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const remainingRecords = this.records.slice(firstFailureIndex); for (const record of remainingRecords) { - this.#processFailRecord(record, new SqsFifoShortCircuitError()); + const result = this.#processFailRecord( + record, + new SqsFifoShortCircuitError() + ); + processedRecords.push(result); } this.clean(); diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index 9e15d4a916..eb3ccc8126 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -141,7 +141,11 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { const remainingRecords = this.records.slice(firstFailureIndex); for (const record of remainingRecords) { - this.#processFailRecord(record, new SqsFifoShortCircuitError()); + const result = this.#processFailRecord( + record, + new SqsFifoShortCircuitError() + ); + processedRecords.push(result); } this.clean(); diff --git a/packages/batch/src/SqsFifoProcessor.ts b/packages/batch/src/SqsFifoProcessor.ts index f98eed0ec1..5fb9ee0922 100644 --- a/packages/batch/src/SqsFifoProcessor.ts +++ b/packages/batch/src/SqsFifoProcessor.ts @@ -1,3 +1,4 @@ +import { SqsFifoProcessorStore } from './SqsFifoProcessorStore.js'; import type { BatchProcessingOptions, EventSourceDataClassTypes, @@ -9,18 +10,14 @@ import type { * determining whether to short-circuit processing, and skipping groups based on processing options. */ class SqsFifoProcessor { - /** - * The ID of the current message group being processed. - */ - #currentGroupId?: string; + readonly #store = new SqsFifoProcessorStore(); - /** - * A set of group IDs that have already encountered failures. - */ - readonly #failedGroupIds: Set; + private get currentGroupId(): string | undefined { + return this.#store.getCurrentGroupId(); + } - public constructor() { - this.#failedGroupIds = new Set(); + private set currentGroupId(groupId: string | undefined) { + this.#store.setCurrentGroupId(groupId); } /** @@ -29,15 +26,15 @@ class SqsFifoProcessor { * @param group - The group ID to be added to the set of failed group IDs. */ public addToFailedGroup(group: string): void { - this.#failedGroupIds.add(group); + this.#store.addFailedGroupId(group); } /** * Prepares the processor for a new batch of messages. */ public prepare(): void { - this.#currentGroupId = undefined; - this.#failedGroupIds.clear(); + this.currentGroupId = undefined; + this.#store.clearFailedGroupIds(); } /** @@ -46,7 +43,7 @@ class SqsFifoProcessor { * @param group - The group ID of the current message being processed. */ public setCurrentGroup(group?: string): void { - this.#currentGroupId = group; + this.currentGroupId = group; } /** @@ -76,8 +73,8 @@ class SqsFifoProcessor { public shouldSkipCurrentGroup(options?: BatchProcessingOptions): boolean { return ( (options?.skipGroupOnError ?? false) && - this.#currentGroupId && - this.#failedGroupIds.has(this.#currentGroupId) + this.currentGroupId !== undefined && + this.#store.hasFailedGroupId(this.currentGroupId) ); } @@ -88,8 +85,8 @@ class SqsFifoProcessor { * @param options - The options for the batch processing. */ public processFailureForCurrentGroup(options?: BatchProcessingOptions) { - if (options?.skipGroupOnError && this.#currentGroupId) { - this.addToFailedGroup(this.#currentGroupId); + if (options?.skipGroupOnError && this.currentGroupId) { + this.addToFailedGroup(this.currentGroupId); } } } diff --git a/packages/batch/src/SqsFifoProcessorStore.ts b/packages/batch/src/SqsFifoProcessorStore.ts new file mode 100644 index 0000000000..6763329f9f --- /dev/null +++ b/packages/batch/src/SqsFifoProcessorStore.ts @@ -0,0 +1,74 @@ +import { InvokeStore } from '@aws/lambda-invoke-store'; + +/** + * Manages storage of SQS FIFO processor state with automatic context detection. + * + * This class abstracts the storage mechanism for SQS FIFO processing state, + * automatically choosing between InvokeStore (when in Lambda context) and + * fallback instance variables (when outside Lambda context). The decision is + * made at runtime on every method call to support Lambda's concurrent execution + * isolation. + */ +class SqsFifoProcessorStore { + readonly #currentGroupIdKey = Symbol( + 'powertools.batch.sqsFifo.currentGroupId' + ); + readonly #failedGroupIdsKey = Symbol( + 'powertools.batch.sqsFifo.failedGroupIds' + ); + + #fallbackCurrentGroupId?: string; + #fallbackFailedGroupIds = new Set(); + + public getCurrentGroupId(): string | undefined { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackCurrentGroupId; + } + + return InvokeStore.get(this.#currentGroupIdKey) as string | undefined; + } + + public setCurrentGroupId(groupId: string | undefined): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackCurrentGroupId = groupId; + return; + } + + InvokeStore.set(this.#currentGroupIdKey, groupId); + } + + public addFailedGroupId(groupId: string): void { + this.getFailedGroupIds().add(groupId); + } + + public hasFailedGroupId(groupId: string): boolean { + return this.getFailedGroupIds().has(groupId); + } + + public getFailedGroupIds(): Set { + if (InvokeStore.getContext() === undefined) { + return this.#fallbackFailedGroupIds; + } + + let failedGroupIds = InvokeStore.get(this.#failedGroupIdsKey) as + | Set + | undefined; + if (failedGroupIds == null) { + failedGroupIds = new Set(); + InvokeStore.set(this.#failedGroupIdsKey, failedGroupIds); + } + + return failedGroupIds; + } + + public clearFailedGroupIds(): void { + if (InvokeStore.getContext() === undefined) { + this.#fallbackFailedGroupIds = new Set(); + return; + } + + InvokeStore.set(this.#failedGroupIdsKey, new Set()); + } +} + +export { SqsFifoProcessorStore }; diff --git a/packages/batch/tests/unit/concurrency/BatchProcessingStore.test.ts b/packages/batch/tests/unit/concurrency/BatchProcessingStore.test.ts new file mode 100644 index 0000000000..eb8d3d1d1e --- /dev/null +++ b/packages/batch/tests/unit/concurrency/BatchProcessingStore.test.ts @@ -0,0 +1,221 @@ +import { sequence } from '@aws-lambda-powertools/testing-utils'; +import type { SQSRecord } from 'aws-lambda'; +import { beforeEach, describe, expect, it } from 'vitest'; +import { BatchProcessingStore } from '../../../src/BatchProcessingStore.js'; +import { sqsRecordFactory } from '../../helpers/factories.js'; + +describe('BatchProcessingStore concurrent invocation isolation', () => { + beforeEach(() => { + // No mocks needed + }); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + }, + ])( + 'returns empty defaults when not initialized $description', + async ({ useInvokeStore }) => { + // Prepare + const store = new BatchProcessingStore(); + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [() => {}, () => {}], + return: () => ({ + records: store.getRecords(), + errors: store.getErrors(), + failureMessages: store.getFailureMessages(), + successMessages: store.getSuccessMessages(), + batchResponse: store.getBatchResponse(), + handler: store.getHandler(), + }), + }, + { + sideEffects: [() => {}, () => {}], + return: () => ({ + records: store.getRecords(), + errors: store.getErrors(), + failureMessages: store.getFailureMessages(), + successMessages: store.getSuccessMessages(), + batchResponse: store.getBatchResponse(), + handler: store.getHandler(), + }), + }, + { useInvokeStore } + ); + + // Assess + expect(resultA.records).toEqual([]); + expect(resultA.errors).toEqual([]); + expect(resultA.failureMessages).toEqual([]); + expect(resultA.successMessages).toEqual([]); + expect(resultA.batchResponse).toEqual({ batchItemFailures: [] }); + expect(resultA.handler()).toBeUndefined(); + expect(resultB.records).toEqual([]); + expect(resultB.errors).toEqual([]); + expect(resultB.failureMessages).toEqual([]); + expect(resultB.successMessages).toEqual([]); + expect(resultB.batchResponse).toEqual({ batchItemFailures: [] }); + expect(resultB.handler()).toBeUndefined(); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedResultA: ['record-B'], + expectedResultB: ['record-B'], + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedResultA: ['record-A'], + expectedResultB: ['record-B'], + }, + ])( + 'isolates records per invocation $description', + async ({ useInvokeStore, expectedResultA, expectedResultB }) => { + // Prepare + const store = new BatchProcessingStore(); + const recordsA = [sqsRecordFactory('record-A')]; + const recordsB = [sqsRecordFactory('record-B')]; + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [ + () => { + store.setRecords(recordsA); + }, + () => {}, + ], + return: () => store.getRecords().map((r) => (r as SQSRecord).body), + }, + { + sideEffects: [ + () => {}, + () => { + store.setRecords(recordsB); + }, + ], + return: () => store.getRecords().map((r) => (r as SQSRecord).body), + }, + { useInvokeStore } + ); + + // Assess + expect(resultA).toEqual(expectedResultA); + expect(resultB).toEqual(expectedResultB); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedResultA: ['fail-B'], + expectedResultB: ['fail-B'], + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedResultA: ['fail-A'], + expectedResultB: ['fail-B'], + }, + ])( + 'isolates failure messages per invocation $description', + async ({ useInvokeStore, expectedResultA, expectedResultB }) => { + // Prepare + const store = new BatchProcessingStore(); + const recordA = sqsRecordFactory('fail-A'); + const recordB = sqsRecordFactory('fail-B'); + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [ + () => { + store.setFailureMessages([recordA]); + }, + () => {}, + ], + return: () => + store.getFailureMessages().map((r) => (r as SQSRecord).body), + }, + { + sideEffects: [ + () => {}, + () => { + store.setFailureMessages([recordB]); + }, + ], + return: () => + store.getFailureMessages().map((r) => (r as SQSRecord).body), + }, + { useInvokeStore } + ); + + // Assess + expect(resultA).toEqual(expectedResultA); + expect(resultB).toEqual(expectedResultB); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedResultA: ['error-B'], + expectedResultB: ['error-B'], + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedResultA: ['error-A'], + expectedResultB: ['error-B'], + }, + ])( + 'isolates errors per invocation $description', + async ({ useInvokeStore, expectedResultA, expectedResultB }) => { + // Prepare + const store = new BatchProcessingStore(); + const errorA = new Error('error-A'); + const errorB = new Error('error-B'); + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [ + () => { + store.setErrors([errorA]); + }, + () => {}, + ], + return: () => store.getErrors().map((e) => e.message), + }, + { + sideEffects: [ + () => {}, + () => { + store.setErrors([errorB]); + }, + ], + return: () => store.getErrors().map((e) => e.message), + }, + { useInvokeStore } + ); + + // Assess + expect(resultA).toEqual(expectedResultA); + expect(resultB).toEqual(expectedResultB); + } + ); +}); diff --git a/packages/batch/tests/unit/concurrency/BatchProcessor.test.ts b/packages/batch/tests/unit/concurrency/BatchProcessor.test.ts new file mode 100644 index 0000000000..23edc23f28 --- /dev/null +++ b/packages/batch/tests/unit/concurrency/BatchProcessor.test.ts @@ -0,0 +1,260 @@ +import { sequence } from '@aws-lambda-powertools/testing-utils'; +import type { SQSRecord } from 'aws-lambda'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { BatchProcessor, EventType } from '../../../src/index.js'; +import { sqsRecordFactory } from '../../helpers/factories.js'; + +describe('BatchProcessor concurrent invocation isolation', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedResults: [[['success', 'record-B']], [['success', 'record-B']]], + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedResults: [[['success', 'record-A']], [['success', 'record-B']]], + }, + ])( + 'processes correct records per invocation $description', + async ({ useInvokeStore, expectedResults }) => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + const recordsA = [sqsRecordFactory('record-A')]; + const recordsB = [sqsRecordFactory('record-B')]; + const handlerA = vi.fn((record: SQSRecord) => record.body); + const handlerB = vi.fn((record: SQSRecord) => record.body); + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [ + () => { + processor.register(recordsA, handlerA); + }, + () => {}, // Wait for inv2 to register + ], + return: async () => { + const processed = await processor.process(); + return processed.map((p) => [p[0], p[1]]); + }, + }, + { + sideEffects: [ + () => {}, // Wait for inv1 to register + () => { + processor.register(recordsB, handlerB); + }, + ], + return: async () => { + const processed = await processor.process(); + return processed.map((p) => [p[0], p[1]]); + }, + }, + { useInvokeStore } + ); + + // Assess + expect(resultA).toEqual(expectedResults[0]); + expect(resultB).toEqual(expectedResults[1]); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedCalls: { + handlerA: 0, + handlerB: 2, + }, + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedCalls: { + handlerA: 1, + handlerB: 1, + }, + }, + ])( + 'calls correct handler per invocation $description', + async ({ useInvokeStore, expectedCalls }) => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + const recordsA = [sqsRecordFactory('record-A')]; + const recordsB = [sqsRecordFactory('record-B')]; + const handlerA = vi.fn((record: SQSRecord) => record.body); + const handlerB = vi.fn((record: SQSRecord) => record.body); + + // Act + await sequence( + { + sideEffects: [ + () => { + processor.register(recordsA, handlerA); + }, + () => {}, // Wait for inv2 to register + ], + return: async () => { + await processor.process(); + }, + }, + { + sideEffects: [ + () => {}, // Wait for inv1 to register + () => { + processor.register(recordsB, handlerB); + }, + ], + return: async () => { + await processor.process(); + }, + }, + { useInvokeStore } + ); + + // Assess + expect(handlerA).toHaveBeenCalledTimes(expectedCalls.handlerA); + expect(handlerB).toHaveBeenCalledTimes(expectedCalls.handlerB); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + }, + ])( + 'tracks failures independently per invocation $description', + async ({ useInvokeStore }) => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + const recordsA = [sqsRecordFactory('fail')]; + const recordsB = [sqsRecordFactory('success')]; + const handlerA = vi.fn((record: SQSRecord) => { + if (record.body === 'fail') throw new Error('Failed'); + return record.body; + }); + const handlerB = vi.fn((record: SQSRecord) => record.body); + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [ + () => { + processor.register(recordsA, handlerA, { + throwOnFullBatchFailure: false, + }); + }, + () => {}, // Wait for inv2 to register + ], + return: async () => { + await processor.process(); + return processor.response().batchItemFailures; + }, + }, + { + sideEffects: [ + () => {}, // Wait for inv1 to register + () => { + processor.register(recordsB, handlerB, { + throwOnFullBatchFailure: false, + }); + }, + ], + return: async () => { + await processor.process(); + return processor.response().batchItemFailures; + }, + }, + { useInvokeStore } + ); + + // Assess + if (useInvokeStore) { + expect(resultA).toEqual([{ itemIdentifier: recordsA[0].messageId }]); + expect(resultB).toEqual([]); + } else { + expect(resultA).toEqual([]); + expect(resultB).toEqual([]); + } + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedErrorCountA: 1, + expectedErrorCountB: 1, + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedErrorCountA: 2, + expectedErrorCountB: 1, + }, + ])( + 'isolates use of prepare method across invocations $description', + async ({ useInvokeStore, expectedErrorCountA, expectedErrorCountB }) => { + // Prepare + const processor = new BatchProcessor(EventType.SQS); + const recordsA = [sqsRecordFactory('fail-1'), sqsRecordFactory('fail-2')]; + const recordsB = [sqsRecordFactory('fail-3')]; + const handlerA = vi.fn(() => { + throw new Error('Handler failed'); + }); + const handlerB = vi.fn(() => { + throw new Error('Handler failed'); + }); + + // Act + const [errorCountA, errorCountB] = await sequence( + { + sideEffects: [ + () => { + processor.register(recordsA, handlerA, { + throwOnFullBatchFailure: false, + }); + }, + async () => { + // Start processing while inv2 calls prepare() + await processor.process(); + }, + ], + return: () => processor.errors.length, + }, + { + sideEffects: [ + () => { + // This prepare() call clears inv1's errors mid-processing + processor.prepare(); + }, + async () => { + processor.register(recordsB, handlerB, { + throwOnFullBatchFailure: false, + }); + await processor.process(); + }, + ], + return: () => processor.errors.length, + }, + { useInvokeStore } + ); + + // Assess + expect(errorCountA).toBe(expectedErrorCountA); + expect(errorCountB).toBe(expectedErrorCountB); + } + ); +}); diff --git a/packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts new file mode 100644 index 0000000000..7910bce8f4 --- /dev/null +++ b/packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts @@ -0,0 +1,379 @@ +import { sequence } from '@aws-lambda-powertools/testing-utils'; +import type { SQSRecord } from 'aws-lambda'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { + SqsFifoPartialProcessor, + SqsFifoPartialProcessorAsync, +} from '../../../src/index.js'; +import type { + BatchProcessingOptions, + FailureResponse, + SuccessResponse, +} from '../../../src/types.js'; +import { sqsRecordFactory } from '../../helpers/factories.js'; + +type ProcessResult = { status: string; body: unknown; record: SQSRecord }[]; + +const tupleToObject = (tuple: SuccessResponse | FailureResponse) => ({ + status: tuple[0], + body: tuple[1], + record: tuple[2] as SQSRecord, +}); + +type ProcessorConfig = { + name: string; + processorClass: + | typeof SqsFifoPartialProcessor + | typeof SqsFifoPartialProcessorAsync; + isAsync: boolean; +}; + +const processors: ProcessorConfig[] = [ + { + name: 'Synchronous', + processorClass: SqsFifoPartialProcessor, + isAsync: false, + }, + { + name: 'Asynchronous', + processorClass: SqsFifoPartialProcessorAsync, + isAsync: true, + }, +]; + +describe('SQS FIFO Processors concurrent invocation isolation', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + for (const { name, processorClass, isAsync } of processors) { + describe(`${name}`, () => { + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedBodyA: 'record-B', + expectedBodyB: 'record-B', + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedBodyA: 'record-A', + expectedBodyB: 'record-B', + }, + ])( + 'processes correct records per invocation $description', + async ({ useInvokeStore, expectedBodyA, expectedBodyB }) => { + // Prepare + const processor = new processorClass(); + const recordsA = [sqsRecordFactory('record-A', '1')]; + const recordsB = [sqsRecordFactory('record-B', '2')]; + const handlerA = vi.fn((record: SQSRecord) => record.body); + const handlerB = vi.fn((record: SQSRecord) => record.body); + + // Act + const [resultAPromise, resultBPromise] = await sequence< + Promise, + Promise + >( + { + sideEffects: [ + () => { + processor.register(recordsA, handlerA); + }, + () => {}, // Wait for inv2 to register + ], + return: async () => { + const processed = isAsync + ? await processor.process() + : processor.processSync(); + return processed.map(tupleToObject); + }, + }, + { + sideEffects: [ + () => {}, // Wait for inv1 to register + () => { + processor.register(recordsB, handlerB); + }, + ], + return: async () => { + const processed = isAsync + ? await processor.process() + : processor.processSync(); + return processed.map(tupleToObject); + }, + }, + { useInvokeStore } + ); + + // Assess + const resultA = await resultAPromise; + const resultB = await resultBPromise; + expect(resultA).toHaveLength(1); + expect(resultA[0].body).toBe(expectedBodyA); + expect(resultB).toHaveLength(1); + expect(resultB[0].body).toBe(expectedBodyB); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedCalls: { + handlerA: 0, + handlerB: 2, + }, + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedCalls: { + handlerA: 1, + handlerB: 1, + }, + }, + ])( + 'calls correct handler per invocation $description', + async ({ useInvokeStore, expectedCalls }) => { + // Prepare + const processor = new processorClass(); + const recordsA = [sqsRecordFactory('record-A', '1')]; + const recordsB = [sqsRecordFactory('record-B', '2')]; + const handlerA = vi.fn((record: SQSRecord) => record.body); + const handlerB = vi.fn((record: SQSRecord) => record.body); + + // Act + await sequence, Promise>( + { + sideEffects: [ + () => { + processor.register(recordsA, handlerA); + }, + () => {}, // Wait for inv2 to register + ], + return: async () => { + if (isAsync) { + await processor.process(); + } else { + processor.processSync(); + } + }, + }, + { + sideEffects: [ + () => {}, // Wait for inv1 to register + () => { + processor.register(recordsB, handlerB); + }, + ], + return: async () => { + if (isAsync) { + await processor.process(); + } else { + processor.processSync(); + } + }, + }, + { useInvokeStore } + ); + + // Assess + expect(handlerA).toHaveBeenCalledTimes(expectedCalls.handlerA); + expect(handlerB).toHaveBeenCalledTimes(expectedCalls.handlerB); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedLengthA: 1, + expectedLengthB: 1, + expectedBodyA: 'body-B', + expectedBodyB: 'body-B', + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedLengthA: 2, + expectedLengthB: 1, + expectedBodyA: 'fail', + expectedBodyB: 'body-B', + }, + ])( + 'tracks failures and short-circuits independently per invocation $description', + async ({ + useInvokeStore, + expectedLengthA, + expectedLengthB, + expectedBodyA, + expectedBodyB, + }) => { + // Prepare + const processor = new processorClass(); + const recordsA = [ + sqsRecordFactory('fail', '1'), + sqsRecordFactory('body-A-2', '1'), + ]; + const recordsB = [sqsRecordFactory('body-B', '1')]; + const handlerA = vi.fn((record: SQSRecord) => { + if (record.body === 'fail') throw new Error('Processing failed'); + return record.body; + }); + const handlerB = vi.fn((record: SQSRecord) => record.body); + + // Act + const [resultAPromise, resultBPromise] = await sequence< + Promise, + Promise + >( + { + sideEffects: [ + () => { + processor.register(recordsA, handlerA, { + throwOnFullBatchFailure: false, + }); + }, + () => {}, // Wait for inv2 to register + ], + return: async () => { + const processed = isAsync + ? await processor.process() + : processor.processSync(); + return processed.map(tupleToObject); + }, + }, + { + sideEffects: [ + () => {}, // Wait for inv1 to register + () => { + processor.register(recordsB, handlerB, { + throwOnFullBatchFailure: false, + }); + }, + ], + return: async () => { + const processed = isAsync + ? await processor.process() + : processor.processSync(); + return processed.map(tupleToObject); + }, + }, + { useInvokeStore } + ); + + // Assess + const resultA = await resultAPromise; + const resultB = await resultBPromise; + expect(resultA).toHaveLength(expectedLengthA); + expect(resultB).toHaveLength(expectedLengthB); + expect(resultA[0].record.body).toBe(expectedBodyA); + expect(resultB[0].record.body).toBe(expectedBodyB); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedLengthA: 2, + expectedLengthB: 2, + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedLengthA: 4, + expectedLengthB: 2, + }, + ])( + 'skips failed group but processes other groups independently $description', + async ({ useInvokeStore, expectedLengthA, expectedLengthB }) => { + // Prepare + const processor = new processorClass(); + const recordsA = [ + sqsRecordFactory('fail', '1'), + sqsRecordFactory('success-1', '1'), + sqsRecordFactory('success-2a', '2'), + sqsRecordFactory('success-2b', '2'), + ]; + const recordsB = [ + sqsRecordFactory('success-3', '3'), + sqsRecordFactory('success-4', '4'), + ]; + const handlerA = vi.fn((record: SQSRecord) => { + if (record.body === 'fail') throw new Error('Processing failed'); + return record.body; + }); + const handlerB = vi.fn((record: SQSRecord) => record.body); + + // Act + const [resultAPromise, resultBPromise] = await sequence< + Promise, + Promise + >( + { + sideEffects: [ + () => { + processor.register(recordsA, handlerA, { + skipGroupOnError: true, + throwOnFullBatchFailure: false, + } as BatchProcessingOptions< + InstanceType + >); + }, + () => {}, // Wait for inv2 to register + ], + return: async () => { + const processed = isAsync + ? await processor.process() + : processor.processSync(); + return processed.map(tupleToObject); + }, + }, + { + sideEffects: [ + () => {}, // Wait for inv1 to register + () => { + processor.register(recordsB, handlerB); + }, + ], + return: async () => { + const processed = isAsync + ? await processor.process() + : processor.processSync(); + return processed.map(tupleToObject); + }, + }, + { useInvokeStore } + ); + + // Assess + const resultA = await resultAPromise; + const resultB = await resultBPromise; + expect(resultA).toHaveLength(expectedLengthA); + expect(resultB).toHaveLength(expectedLengthB); + if (useInvokeStore) { + expect(resultA[0].record.body).toBe('fail'); // group 1 fails + expect(resultA[1].status).toBe('fail'); + expect(resultA[1].record.body).toBe('success-1'); // group 1 skipped + expect(resultA[2].record.body).toBe('success-2a'); // group 2 processes + expect(resultA[3].record.body).toBe('success-2b'); // group 2 processes + expect(resultB[0].record.body).toBe('success-3'); // group 3 processes + expect(resultB[1].record.body).toBe('success-4'); // group 4 processes + } else { + // Without InvokeStore: both invocations process invB's records + expect(resultA).toHaveLength(expectedLengthA); + expect(resultB).toHaveLength(expectedLengthB); + // Both process invB's records due to state leaking + expect(resultA[0].record.body).toBe('success-3'); // processed invB's records + expect(resultA[1].record.body).toBe('success-4'); + expect(resultB[0].status).toBe('success'); + expect(resultB[1].status).toBe('success'); + } + } + ); + }); + } +}); diff --git a/packages/batch/tests/unit/concurrency/SqsFifoProcessorStore.test.ts b/packages/batch/tests/unit/concurrency/SqsFifoProcessorStore.test.ts new file mode 100644 index 0000000000..04e9e77435 --- /dev/null +++ b/packages/batch/tests/unit/concurrency/SqsFifoProcessorStore.test.ts @@ -0,0 +1,155 @@ +import { sequence } from '@aws-lambda-powertools/testing-utils'; +import { beforeEach, describe, expect, it } from 'vitest'; +import { SqsFifoProcessorStore } from '../../../src/SqsFifoProcessorStore.js'; + +describe('SqsFifoProcessorStore concurrent invocation isolation', () => { + beforeEach(() => { + // No mocks needed + }); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedResultA: ['group-A', 'group-B'], + expectedResultB: ['group-A', 'group-B'], + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedResultA: ['group-A'], + expectedResultB: ['group-B'], + }, + ])( + 'lazily initializes failedGroupIds independently $description', + async ({ useInvokeStore, expectedResultA, expectedResultB }) => { + // Prepare + const store = new SqsFifoProcessorStore(); + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [ + () => { + store.addFailedGroupId('group-A'); + }, + () => {}, + ], + return: () => Array.from(store.getFailedGroupIds()), + }, + { + sideEffects: [ + () => {}, + () => { + store.addFailedGroupId('group-B'); + }, + ], + return: () => Array.from(store.getFailedGroupIds()), + }, + { useInvokeStore } + ); + + // Assess + expect(resultA.sort()).toEqual(expectedResultA.sort()); + expect(resultB.sort()).toEqual(expectedResultB.sort()); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedResultA: 'group-B', + expectedResultB: 'group-B', + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedResultA: 'group-A', + expectedResultB: 'group-B', + }, + ])( + 'isolates currentGroupId per invocation $description', + async ({ useInvokeStore, expectedResultA, expectedResultB }) => { + // Prepare + const store = new SqsFifoProcessorStore(); + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [ + () => { + store.setCurrentGroupId('group-A'); + }, + () => {}, + ], + return: () => store.getCurrentGroupId(), + }, + { + sideEffects: [ + () => {}, + () => { + store.setCurrentGroupId('group-B'); + }, + ], + return: () => store.getCurrentGroupId(), + }, + { useInvokeStore } + ); + + // Assess + expect(resultA).toBe(expectedResultA); + expect(resultB).toBe(expectedResultB); + } + ); + + it.each([ + { + description: 'without InvokeStore', + useInvokeStore: false, + expectedResultA: ['group-B'], + expectedResultB: ['group-B'], + }, + { + description: 'with InvokeStore', + useInvokeStore: true, + expectedResultA: [], + expectedResultB: ['group-B'], + }, + ])( + 'clears failedGroupIds independently per invocation $description', + async ({ useInvokeStore, expectedResultA, expectedResultB }) => { + // Prepare + const store = new SqsFifoProcessorStore(); + + // Act + const [resultA, resultB] = await sequence( + { + sideEffects: [ + () => { + store.addFailedGroupId('group-A'); + }, + () => { + store.clearFailedGroupIds(); + }, + ], + return: () => Array.from(store.getFailedGroupIds()), + }, + { + sideEffects: [ + () => {}, + () => { + store.addFailedGroupId('group-B'); + }, + ], + return: () => Array.from(store.getFailedGroupIds()), + }, + { useInvokeStore } + ); + + // Assess + expect(resultA.sort()).toEqual(expectedResultA.sort()); + expect(resultB.sort()).toEqual(expectedResultB.sort()); + } + ); +}); diff --git a/packages/testing/src/helpers.ts b/packages/testing/src/helpers.ts index 0b2e6fc846..4fdee898a2 100644 --- a/packages/testing/src/helpers.ts +++ b/packages/testing/src/helpers.ts @@ -99,9 +99,9 @@ const findAndGetStackOutputValue = ( return outputs[value]; }; -type Invocation = { +type Invocation = { sideEffects: (() => void)[]; - return: () => unknown; + return: () => T; }; /** @@ -194,14 +194,13 @@ const withResolvers = () => { * // Execution order: action1() → action2() → action3() → both return * ``` */ -function sequence( - inv1: Invocation, - inv2: Invocation, +function sequence( + inv1: Invocation, + inv2: Invocation, options: { useInvokeStore?: boolean } -) { - const executionEnv = options?.useInvokeStore - ? (f: () => unknown) => InvokeStore.run({}, f) - : (f: () => unknown) => f(); +): Promise<[T1, T2]> { + const executionEnv = (f: () => T) => + options?.useInvokeStore ? InvokeStore.run({}, f) : f(); const inv1Barriers = inv1.sideEffects.map(() => withResolvers()); const inv2Barriers = inv2.sideEffects.map(() => withResolvers()); @@ -226,7 +225,7 @@ function sequence( return inv2.return(); }); - return Promise.all([invocation1, invocation2]); + return Promise.all([invocation1, invocation2]) as Promise<[T1, T2]>; } export { From 5709781f75865292ff0d5b61357774e1ac8f211f Mon Sep 17 00:00:00 2001 From: svozza Date: Mon, 3 Nov 2025 11:29:41 +0000 Subject: [PATCH 2/2] address PR comments --- packages/batch/src/SqsFifoPartialProcessor.ts | 6 +- .../batch/src/SqsFifoPartialProcessorAsync.ts | 6 +- .../SqsFifoPartialProcessor.test.ts | 58 +++++++++---------- 3 files changed, 31 insertions(+), 39 deletions(-) diff --git a/packages/batch/src/SqsFifoPartialProcessor.ts b/packages/batch/src/SqsFifoPartialProcessor.ts index 8ac6253428..d7752b83f8 100644 --- a/packages/batch/src/SqsFifoPartialProcessor.ts +++ b/packages/batch/src/SqsFifoPartialProcessor.ts @@ -142,11 +142,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync { const remainingRecords = this.records.slice(firstFailureIndex); for (const record of remainingRecords) { - const result = this.#processFailRecord( - record, - new SqsFifoShortCircuitError() - ); - processedRecords.push(result); + this.#processFailRecord(record, new SqsFifoShortCircuitError()); } this.clean(); diff --git a/packages/batch/src/SqsFifoPartialProcessorAsync.ts b/packages/batch/src/SqsFifoPartialProcessorAsync.ts index eb3ccc8126..9e15d4a916 100644 --- a/packages/batch/src/SqsFifoPartialProcessorAsync.ts +++ b/packages/batch/src/SqsFifoPartialProcessorAsync.ts @@ -141,11 +141,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor { const remainingRecords = this.records.slice(firstFailureIndex); for (const record of remainingRecords) { - const result = this.#processFailRecord( - record, - new SqsFifoShortCircuitError() - ); - processedRecords.push(result); + this.#processFailRecord(record, new SqsFifoShortCircuitError()); } this.clean(); diff --git a/packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts index 7910bce8f4..f9e22f5a6f 100644 --- a/packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/concurrency/SqsFifoPartialProcessor.test.ts @@ -8,6 +8,7 @@ import { import type { BatchProcessingOptions, FailureResponse, + PartialItemFailureResponse, SuccessResponse, } from '../../../src/types.js'; import { sqsRecordFactory } from '../../helpers/factories.js'; @@ -189,45 +190,46 @@ describe('SQS FIFO Processors concurrent invocation isolation', () => { { description: 'without InvokeStore', useInvokeStore: false, - expectedLengthA: 1, - expectedLengthB: 1, - expectedBodyA: 'body-B', - expectedBodyB: 'body-B', + expectedLengthA: 3, + expectedLengthASync: 2, + expectedLengthB: 3, + expectedLengthBSync: 2, }, { description: 'with InvokeStore', useInvokeStore: true, - expectedLengthA: 2, - expectedLengthB: 1, - expectedBodyA: 'fail', - expectedBodyB: 'body-B', + expectedLengthA: 0, + expectedLengthASync: 0, + expectedLengthB: 2, + expectedLengthBSync: 2, }, ])( 'tracks failures and short-circuits independently per invocation $description', async ({ useInvokeStore, expectedLengthA, + expectedLengthASync, expectedLengthB, - expectedBodyA, - expectedBodyB, + expectedLengthBSync, }) => { // Prepare const processor = new processorClass(); - const recordsA = [ + const recordsA = [sqsRecordFactory('body-A-2', '1')]; + const recordsB = [ sqsRecordFactory('fail', '1'), - sqsRecordFactory('body-A-2', '1'), + sqsRecordFactory('body-B', '1'), ]; - const recordsB = [sqsRecordFactory('body-B', '1')]; - const handlerA = vi.fn((record: SQSRecord) => { + + const handlerA = vi.fn((record: SQSRecord) => record.body); + const handlerB = vi.fn((record: SQSRecord) => { if (record.body === 'fail') throw new Error('Processing failed'); return record.body; }); - const handlerB = vi.fn((record: SQSRecord) => record.body); // Act const [resultAPromise, resultBPromise] = await sequence< - Promise, - Promise + Promise, + Promise >( { sideEffects: [ @@ -239,10 +241,8 @@ describe('SQS FIFO Processors concurrent invocation isolation', () => { () => {}, // Wait for inv2 to register ], return: async () => { - const processed = isAsync - ? await processor.process() - : processor.processSync(); - return processed.map(tupleToObject); + isAsync ? await processor.process() : processor.processSync(); + return processor.response(); }, }, { @@ -255,10 +255,8 @@ describe('SQS FIFO Processors concurrent invocation isolation', () => { }, ], return: async () => { - const processed = isAsync - ? await processor.process() - : processor.processSync(); - return processed.map(tupleToObject); + isAsync ? await processor.process() : processor.processSync(); + return processor.response(); }, }, { useInvokeStore } @@ -267,10 +265,12 @@ describe('SQS FIFO Processors concurrent invocation isolation', () => { // Assess const resultA = await resultAPromise; const resultB = await resultBPromise; - expect(resultA).toHaveLength(expectedLengthA); - expect(resultB).toHaveLength(expectedLengthB); - expect(resultA[0].record.body).toBe(expectedBodyA); - expect(resultB[0].record.body).toBe(expectedBodyB); + expect(resultA.batchItemFailures).toHaveLength( + isAsync ? expectedLengthA : expectedLengthASync + ); + expect(resultB.batchItemFailures).toHaveLength( + isAsync ? expectedLengthB : expectedLengthBSync + ); } );