Skip to content

Feature request: return result of batch processing #6863

Open
@manzkel

Description

@manzkel

Use case

Hello there,
we have the following use case, where we need to validate records on a per record basis, as part of the validation we also enrich the data with extra information. At the end of this we want to be able to have a function run once on all those validated records (e.g. an insert into the database in bulk).

Currently the implementation for process_partial_response in the batchprocessor does not read out the returned value of the processor.process call, meaning one is not able to re-use the value and will have to recompute the information.

We are currently slightly modifying the code (see below).

Solution/User Experience

I will attach below the modified version we use the question of whether not only the "success" records should be returned (e.g. for monitoring should be up for debate too I suppose)

def process_partial_response_with_return(
    event: dict,
    record_handler: Callable,
    processor: BatchProcessor,
    context: LambdaContext | None = None,
) -> tuple[PartialItemFailureResponse, list[Any]]:
    """
    Higher level function to handle batch event processing.

    Parameters
    ----------
    event: dict
        Lambda's original event
    record_handler: Callable
        Callable to process each record from the batch
    processor: BasePartialBatchProcessor
        Batch Processor to handle partial failure cases
    context: LambdaContext
        Lambda's context, used to optionally inject in record handler

    Returns
    -------
    result: PartialItemFailureResponse
        Lambda Partial Batch Response

    Example
    --------
    **Processes Lambda's SQS event**

    ```python
    from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
    from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord

    processor = BatchProcessor(EventType.SQS)

    def record_handler(record: SQSRecord):
        return record.body

    def handler(event, context):
        return process_partial_response(
            event=event, record_handler=record_handler, processor=processor, context=context
        )
    ```

    Limitations
    -----------
    * Async batch processors. Use `async_process_partial_response` instead.
    """
    try:
        records: list[dict] = event.get("Records", [])
        if not records or not isinstance(records, list):
            raise UnexpectedBatchTypeError(
                "Unexpected batch event type. Possible values are: SQS, KinesisDataStreams, DynamoDBStreams",
            )

    except AttributeError:
        event_types = ", ".join(list(EventType.__members__))
        docs = "https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/#processing-messages-from-sqs"  # noqa: E501 # long-line
        raise ValueError( # noqa: B904
            f"Invalid event format. Please ensure batch event is a valid {processor.event_type.value} event. \n"
            f"See sample events in our documentation for either {event_types}: \n {docs}",
        )

    with processor(records, record_handler, context):
        returned = processor.process()

    successful_returns = [data for status, data, _ in returned if status.lower() == "success"]

    return processor.response(), successful_returns

Alternative solutions

Acknowledgment

Metadata

Metadata

Assignees

No one assigned

    Labels

    feature-requestfeature requesttriagePending triage from maintainers

    Type

    No type

    Projects

    Status

    Triage

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions