Skip to content
11,308 changes: 4,716 additions & 6,592 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/batch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@
"nodejs"
],
"dependencies": {
"@aws/lambda-invoke-store": "0.2.1",
"@aws-lambda-powertools/commons": "2.28.1",
"@aws/lambda-invoke-store": "0.1.1",
"@standard-schema/spec": "^1.0.0"
},
"devDependencies": {
Expand Down
87 changes: 58 additions & 29 deletions packages/batch/src/BatchProcessingStore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { InvokeStore } from '@aws/lambda-invoke-store';
import '@aws/lambda-invoke-store';
import { shouldUseInvokeStore } from '@aws-lambda-powertools/commons/utils/env';
import type {
BaseRecord,
BatchProcessingOptions,
Expand Down Expand Up @@ -35,124 +36,152 @@ class BatchProcessingStore {
#fallbackErrors: Error[] = [];

public getRecords(): BaseRecord[] {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackRecords;
}
return (InvokeStore.get(this.#recordsKey) as BaseRecord[]) ?? [];

if (globalThis.awslambda?.InvokeStore === undefined) {
throw new Error('InvokeStore is not available');
}

const store = globalThis.awslambda.InvokeStore;
return (store.get(this.#recordsKey) as BaseRecord[]) ?? [];
}

public setRecords(records: BaseRecord[]): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackRecords = records;
return;
}
InvokeStore.set(this.#recordsKey, records);

if (globalThis.awslambda?.InvokeStore === undefined) {
throw new Error('InvokeStore is not available');
}

const store = globalThis.awslambda.InvokeStore;
store.set(this.#recordsKey, records);
}

public getHandler(): CallableFunction {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackHandler;
}

return (
(InvokeStore.get(this.#handlerKey) as CallableFunction) ?? (() => {})
(globalThis.awslambda?.InvokeStore?.get(
this.#handlerKey
) as CallableFunction) ?? (() => {})
);
}

public setHandler(handler: CallableFunction): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackHandler = handler;
return;
}
InvokeStore.set(this.#handlerKey, handler);

globalThis.awslambda?.InvokeStore?.set(this.#handlerKey, handler);
}

public getOptions(): BatchProcessingOptions | undefined {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackOptions;
}
return InvokeStore.get(this.#optionsKey) as

return globalThis.awslambda?.InvokeStore?.get(this.#optionsKey) as
| BatchProcessingOptions
| undefined;
}

public setOptions(options: BatchProcessingOptions | undefined): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackOptions = options;
return;
}
InvokeStore.set(this.#optionsKey, options);

globalThis.awslambda?.InvokeStore?.set(this.#optionsKey, options);
}

public getFailureMessages(): EventSourceDataClassTypes[] {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackFailureMessages;
}

return (
(InvokeStore.get(
(globalThis.awslambda?.InvokeStore?.get(
this.#failureMessagesKey
) as EventSourceDataClassTypes[]) ?? []
);
}

public setFailureMessages(messages: EventSourceDataClassTypes[]): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackFailureMessages = messages;
return;
}
InvokeStore.set(this.#failureMessagesKey, messages);

globalThis.awslambda?.InvokeStore?.set(this.#failureMessagesKey, messages);
}

public getSuccessMessages(): EventSourceDataClassTypes[] {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackSuccessMessages;
}

return (
(InvokeStore.get(
(globalThis.awslambda?.InvokeStore?.get(
this.#successMessagesKey
) as EventSourceDataClassTypes[]) ?? []
);
}

public setSuccessMessages(messages: EventSourceDataClassTypes[]): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackSuccessMessages = messages;
return;
}
InvokeStore.set(this.#successMessagesKey, messages);

globalThis.awslambda?.InvokeStore?.set(this.#successMessagesKey, messages);
}

public getBatchResponse(): PartialItemFailureResponse {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackBatchResponse;
}

return (
(InvokeStore.get(
(globalThis.awslambda?.InvokeStore?.get(
this.#batchResponseKey
) as PartialItemFailureResponse) ?? { batchItemFailures: [] }
);
}

public setBatchResponse(response: PartialItemFailureResponse): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackBatchResponse = response;
return;
}
InvokeStore.set(this.#batchResponseKey, response);

globalThis.awslambda?.InvokeStore?.set(this.#batchResponseKey, response);
}

public getErrors(): Error[] {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackErrors;
}
return (InvokeStore.get(this.#errorsKey) as Error[]) ?? [];

return (
(globalThis.awslambda?.InvokeStore?.get(this.#errorsKey) as Error[]) ?? []
);
}

public setErrors(errors: Error[]): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackErrors = errors;
return;
}
InvokeStore.set(this.#errorsKey, errors);

globalThis.awslambda?.InvokeStore?.set(this.#errorsKey, errors);
}
}

Expand Down
41 changes: 31 additions & 10 deletions packages/batch/src/SqsFifoProcessorStore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { InvokeStore } from '@aws/lambda-invoke-store';
import '@aws/lambda-invoke-store';
import { shouldUseInvokeStore } from '@aws-lambda-powertools/commons/utils/env';

/**
* Manages storage of SQS FIFO processor state with automatic context detection.
Expand All @@ -21,20 +22,30 @@ class SqsFifoProcessorStore {
#fallbackFailedGroupIds = new Set<string>();

public getCurrentGroupId(): string | undefined {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackCurrentGroupId;
}

return InvokeStore.get(this.#currentGroupIdKey) as string | undefined;
if (globalThis.awslambda?.InvokeStore === undefined) {
throw new Error('InvokeStore is not available');
}

const store = globalThis.awslambda.InvokeStore;
return store.get(this.#currentGroupIdKey) as string | undefined;
}

public setCurrentGroupId(groupId: string | undefined): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackCurrentGroupId = groupId;
return;
}

InvokeStore.set(this.#currentGroupIdKey, groupId);
if (globalThis.awslambda?.InvokeStore === undefined) {
throw new Error('InvokeStore is not available');
}

const store = globalThis.awslambda.InvokeStore;
store.set(this.#currentGroupIdKey, groupId);
}

public addFailedGroupId(groupId: string): void {
Expand All @@ -46,28 +57,38 @@ class SqsFifoProcessorStore {
}

public getFailedGroupIds(): Set<string> {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
return this.#fallbackFailedGroupIds;
}

let failedGroupIds = InvokeStore.get(this.#failedGroupIdsKey) as
if (globalThis.awslambda?.InvokeStore === undefined) {
throw new Error('InvokeStore is not available');
}

const store = globalThis.awslambda.InvokeStore;
let failedGroupIds = store.get(this.#failedGroupIdsKey) as
| Set<string>
| undefined;
if (failedGroupIds == null) {
failedGroupIds = new Set<string>();
InvokeStore.set(this.#failedGroupIdsKey, failedGroupIds);
store.set(this.#failedGroupIdsKey, failedGroupIds);
}

return failedGroupIds;
}

public clearFailedGroupIds(): void {
if (InvokeStore.getContext() === undefined) {
if (!shouldUseInvokeStore()) {
this.#fallbackFailedGroupIds = new Set<string>();
return;
}

InvokeStore.set(this.#failedGroupIdsKey, new Set<string>());
if (globalThis.awslambda?.InvokeStore === undefined) {
throw new Error('InvokeStore is not available');
}

const store = globalThis.awslambda.InvokeStore;
store.set(this.#failedGroupIdsKey, new Set<string>());
}
}

Expand Down
7 changes: 7 additions & 0 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { InvokeStoreBase } from '@aws/lambda-invoke-store';
import type { GenericLogger } from '@aws-lambda-powertools/commons/types';
import type { StandardSchemaV1 } from '@standard-schema/spec';
import type {
Expand All @@ -13,6 +14,12 @@ import type { parser } from './parser.js';
import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js';

declare global {
namespace awslambda {
let InvokeStore: InvokeStoreBase | undefined;
}
}

/**
* Options for batch processing
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { sequence } from '@aws-lambda-powertools/testing-utils';
import type { SQSRecord } from 'aws-lambda';
import { beforeEach, describe, expect, it } from 'vitest';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { BatchProcessingStore } from '../../../src/BatchProcessingStore.js';
import { sqsRecordFactory } from '../../helpers/factories.js';

Expand All @@ -9,6 +9,10 @@ describe('BatchProcessingStore concurrent invocation isolation', () => {
// No mocks needed
});

afterEach(() => {
vi.unstubAllEnvs();
});

it.each([
{
description: 'without InvokeStore',
Expand All @@ -22,6 +26,9 @@ describe('BatchProcessingStore concurrent invocation isolation', () => {
'returns empty defaults when not initialized $description',
async ({ useInvokeStore }) => {
// Prepare
if (useInvokeStore) {
vi.stubEnv('AWS_LAMBDA_MAX_CONCURRENCY', '10');
}
const store = new BatchProcessingStore();

// Act
Expand Down Expand Up @@ -84,6 +91,9 @@ describe('BatchProcessingStore concurrent invocation isolation', () => {
'isolates records per invocation $description',
async ({ useInvokeStore, expectedResultA, expectedResultB }) => {
// Prepare
if (useInvokeStore) {
vi.stubEnv('AWS_LAMBDA_MAX_CONCURRENCY', '10');
}
const store = new BatchProcessingStore();
const recordsA = [sqsRecordFactory('record-A')];
const recordsB = [sqsRecordFactory('record-B')];
Expand Down Expand Up @@ -134,6 +144,9 @@ describe('BatchProcessingStore concurrent invocation isolation', () => {
'isolates failure messages per invocation $description',
async ({ useInvokeStore, expectedResultA, expectedResultB }) => {
// Prepare
if (useInvokeStore) {
vi.stubEnv('AWS_LAMBDA_MAX_CONCURRENCY', '10');
}
const store = new BatchProcessingStore();
const recordA = sqsRecordFactory('fail-A');
const recordB = sqsRecordFactory('fail-B');
Expand Down Expand Up @@ -186,6 +199,9 @@ describe('BatchProcessingStore concurrent invocation isolation', () => {
'isolates errors per invocation $description',
async ({ useInvokeStore, expectedResultA, expectedResultB }) => {
// Prepare
if (useInvokeStore) {
vi.stubEnv('AWS_LAMBDA_MAX_CONCURRENCY', '10');
}
const store = new BatchProcessingStore();
const errorA = new Error('error-A');
const errorB = new Error('error-B');
Expand Down Expand Up @@ -218,4 +234,36 @@ describe('BatchProcessingStore concurrent invocation isolation', () => {
expect(resultB).toEqual(expectedResultB);
}
);

describe('InvokeStore error handling', () => {
beforeEach(() => {
vi.stubGlobal('awslambda', undefined);
});

afterEach(() => {
vi.unstubAllGlobals();
});

it('throws error when getRecords is called with AWS_LAMBDA_MAX_CONCURRENCY set but InvokeStore is not available', () => {
// Prepare
vi.stubEnv('AWS_LAMBDA_MAX_CONCURRENCY', '10');
const store = new BatchProcessingStore();

// Act & Assess
expect(() => {
store.getRecords();
}).toThrow('InvokeStore is not available');
});

it('throws error when setRecords is called with AWS_LAMBDA_MAX_CONCURRENCY set but InvokeStore is not available', () => {
// Prepare
vi.stubEnv('AWS_LAMBDA_MAX_CONCURRENCY', '10');
const store = new BatchProcessingStore();

// Act & Assess
expect(() => {
store.setRecords([]);
}).toThrow('InvokeStore is not available');
});
});
});
Loading
Loading