From 8b6c383261840888169773fcfa3200bd9034f0ca Mon Sep 17 00:00:00 2001 From: Alex Wang Date: Thu, 8 Jan 2026 17:27:26 -0800 Subject: [PATCH] feat: Implement json serdes for Operation - Implement json compatiable serdes for Operation and InvocationInput class --- .../execution.py | 35 + .../lambda_service.py | 100 ++- tests/execution_test.py | 514 +++++++++++++++ tests/lambda_service_test.py | 618 ++++++++++++++++++ 4 files changed, 1264 insertions(+), 3 deletions(-) diff --git a/src/aws_durable_execution_sdk_python/execution.py b/src/aws_durable_execution_sdk_python/execution.py index 6f4e438..0ce1ac3 100644 --- a/src/aws_durable_execution_sdk_python/execution.py +++ b/src/aws_durable_execution_sdk_python/execution.py @@ -59,6 +59,16 @@ def from_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionState: next_marker=input_dict.get("NextMarker", ""), ) + @staticmethod + def from_json_dict(input_dict: MutableMapping[str, Any]) -> InitialExecutionState: + operations = [] + if input_operations := input_dict.get("Operations"): + operations = [Operation.from_json_dict(op) for op in input_operations] + return InitialExecutionState( + operations=operations, + next_marker=input_dict.get("NextMarker", ""), + ) + def get_execution_operation(self) -> Operation | None: if not self.operations: # Due to payload size limitations we may have an empty operations list. @@ -91,6 +101,12 @@ def to_dict(self) -> MutableMapping[str, Any]: "NextMarker": self.next_marker, } + def to_json_dict(self) -> MutableMapping[str, Any]: + return { + "Operations": [op.to_json_dict() for op in self.operations], + "NextMarker": self.next_marker, + } + @dataclass(frozen=True) class DurableExecutionInvocationInput: @@ -110,6 +126,18 @@ def from_dict( ), ) + @staticmethod + def from_json_dict( + input_dict: MutableMapping[str, Any], + ) -> DurableExecutionInvocationInput: + return DurableExecutionInvocationInput( + durable_execution_arn=input_dict["DurableExecutionArn"], + checkpoint_token=input_dict["CheckpointToken"], + initial_execution_state=InitialExecutionState.from_json_dict( + input_dict.get("InitialExecutionState", {}) + ), + ) + def to_dict(self) -> MutableMapping[str, Any]: return { "DurableExecutionArn": self.durable_execution_arn, @@ -117,6 +145,13 @@ def to_dict(self) -> MutableMapping[str, Any]: "InitialExecutionState": self.initial_execution_state.to_dict(), } + def to_json_dict(self) -> MutableMapping[str, Any]: + return { + "DurableExecutionArn": self.durable_execution_arn, + "CheckpointToken": self.checkpoint_token, + "InitialExecutionState": self.initial_execution_state.to_json_dict(), + } + @dataclass(frozen=True) class DurableExecutionInvocationInputWithClient(DurableExecutionInvocationInput): diff --git a/src/aws_durable_execution_sdk_python/lambda_service.py b/src/aws_durable_execution_sdk_python/lambda_service.py index b907391..200aede 100644 --- a/src/aws_durable_execution_sdk_python/lambda_service.py +++ b/src/aws_durable_execution_sdk_python/lambda_service.py @@ -1,5 +1,6 @@ from __future__ import annotations +import copy import datetime import logging from dataclasses import dataclass, field @@ -692,6 +693,24 @@ def create_wait_start( # endregion wait +class TimestampConverter: + """Converter for datetime/Unix timestamp conversions.""" + + @staticmethod + def to_unix_millis(dt: datetime.datetime | None) -> int | None: + """Convert datetime to Unix timestamp in milliseconds.""" + return int(dt.timestamp() * 1000) if dt else None + + @staticmethod + def from_unix_millis(ms: int | None) -> datetime.datetime | None: + """Convert Unix timestamp in milliseconds to datetime.""" + return ( + datetime.datetime.fromtimestamp(ms / 1000, tz=datetime.UTC) + if ms is not None + else None + ) + + @dataclass(frozen=True) class Operation: """Represent the Operation type for GetDurableExecutionState and CheckpointDurableExecution.""" @@ -805,9 +824,11 @@ def to_dict(self) -> MutableMapping[str, Any]: step_dict["Error"] = self.step_details.error.to_dict() result["StepDetails"] = step_dict if self.wait_details: - result["WaitDetails"] = { - "ScheduledEndTimestamp": self.wait_details.scheduled_end_timestamp - } + result["WaitDetails"] = ( + {"ScheduledEndTimestamp": self.wait_details.scheduled_end_timestamp} + if self.wait_details.scheduled_end_timestamp + else {} + ) if self.callback_details: callback_dict: MutableMapping[str, Any] = { "CallbackId": self.callback_details.callback_id @@ -826,6 +847,79 @@ def to_dict(self) -> MutableMapping[str, Any]: result["ChainedInvokeDetails"] = invoke_dict return result + def to_json_dict(self) -> MutableMapping[str, Any]: + """Convert the Operation to a JSON-serializable dictionary. + + Converts datetime objects to millisecond timestamps for JSON compatibility. + + Returns: + A dictionary with JSON-serializable values + """ + # Start with the regular to_dict output + result = self.to_dict() + + # Convert datetime objects to millisecond timestamps + if ts := result.get("StartTimestamp"): + result["StartTimestamp"] = TimestampConverter.to_unix_millis(ts) + + if ts := result.get("EndTimestamp"): + result["EndTimestamp"] = TimestampConverter.to_unix_millis(ts) + + if (step_details := result.get("StepDetails")) and ( + ts := step_details.get("NextAttemptTimestamp") + ): + result["StepDetails"]["NextAttemptTimestamp"] = ( + TimestampConverter.to_unix_millis(ts) + ) + + if (wait_details := result.get("WaitDetails")) and ( + ts := wait_details.get("ScheduledEndTimestamp") + ): + result["WaitDetails"]["ScheduledEndTimestamp"] = ( + TimestampConverter.to_unix_millis(ts) + ) + + return result + + @classmethod + def from_json_dict(cls, data: MutableMapping[str, Any]) -> Operation: + """Create an Operation from a JSON-serializable dictionary. + + Converts millisecond timestamps back to datetime objects. + + Args: + data: Dictionary with JSON-serializable values (millisecond timestamps) + + Returns: + An Operation instance with datetime objects + """ + # Make a copy to avoid modifying the original data + data_copy = copy.deepcopy(data) + + # Convert millisecond timestamps back to datetime objects + if ms := data_copy.get("StartTimestamp"): + data_copy["StartTimestamp"] = TimestampConverter.from_unix_millis(ms) + + if ms := data_copy.get("EndTimestamp"): + data_copy["EndTimestamp"] = TimestampConverter.from_unix_millis(ms) + + if (step_details := data_copy.get("StepDetails")) and ( + ms := step_details.get("NextAttemptTimestamp") + ): + step_details["NextAttemptTimestamp"] = TimestampConverter.from_unix_millis( + ms + ) + + if (wait_details := data_copy.get("WaitDetails")) and ( + ms := wait_details.get("ScheduledEndTimestamp") + ): + wait_details["ScheduledEndTimestamp"] = TimestampConverter.from_unix_millis( + ms + ) + + # Use the existing from_dict method with the converted data + return cls.from_dict(data_copy) + @dataclass(frozen=True) class CheckpointUpdatedExecutionState: diff --git a/tests/execution_test.py b/tests/execution_test.py index 4383ceb..b62db57 100644 --- a/tests/execution_test.py +++ b/tests/execution_test.py @@ -29,15 +29,20 @@ # LambdaContext no longer needed - using duck typing from aws_durable_execution_sdk_python.lambda_service import ( + CallbackDetails, CheckpointOutput, CheckpointUpdatedExecutionState, + ContextDetails, DurableServiceClient, + ErrorObject, ExecutionDetails, Operation, OperationAction, OperationStatus, OperationType, OperationUpdate, + StepDetails, + WaitDetails, ) LARGE_RESULT = "large_success" * 1024 * 1024 @@ -2037,3 +2042,512 @@ def test_handler(event: Any, context: DurableContext) -> dict: ), ): test_handler(non_dict_event, lambda_context) + + +# ============================================================================= +# Tests for JSON Serialization Methods +# ============================================================================= + + +def test_initial_execution_state_to_json_dict_minimal(): + """Test InitialExecutionState.to_json_dict with minimal data.""" + operation = Operation( + operation_id="op1", + operation_type=OperationType.EXECUTION, + status=OperationStatus.STARTED, + ) + + state = InitialExecutionState(operations=[operation], next_marker="marker123") + + result = state.to_json_dict() + expected = {"Operations": [operation.to_json_dict()], "NextMarker": "marker123"} + + assert result == expected + + +def test_initial_execution_state_to_json_dict_with_timestamps(): + """Test InitialExecutionState.to_json_dict converts datetime objects to millisecond timestamps.""" + start_time = datetime.datetime(2023, 1, 1, 10, 0, 0, tzinfo=datetime.UTC) + end_time = datetime.datetime(2023, 1, 1, 11, 0, 0, tzinfo=datetime.UTC) + + operation = Operation( + operation_id="op1", + operation_type=OperationType.EXECUTION, + status=OperationStatus.STARTED, + start_timestamp=start_time, + end_timestamp=end_time, + execution_details=ExecutionDetails(input_payload="test_payload"), + ) + + state = InitialExecutionState(operations=[operation], next_marker="marker123") + + result = state.to_json_dict() + + # Verify that timestamps are converted to milliseconds in the operation + operation_result = result["Operations"][0] + expected_start_ms = int(start_time.timestamp() * 1000) + expected_end_ms = int(end_time.timestamp() * 1000) + + assert operation_result["StartTimestamp"] == expected_start_ms + assert operation_result["EndTimestamp"] == expected_end_ms + assert result["NextMarker"] == "marker123" + + +def test_initial_execution_state_to_json_dict_empty(): + """Test InitialExecutionState.to_json_dict with empty operations.""" + state = InitialExecutionState(operations=[], next_marker="") + + result = state.to_json_dict() + expected = {"Operations": [], "NextMarker": ""} + + assert result == expected + + +def test_initial_execution_state_from_json_dict_minimal(): + """Test InitialExecutionState.from_json_dict with minimal data.""" + data = { + "Operations": [ + { + "Id": "op1", + "Type": "EXECUTION", + "Status": "STARTED", + } + ], + "NextMarker": "test-marker", + } + + result = InitialExecutionState.from_json_dict(data) + + assert len(result.operations) == 1 + assert result.next_marker == "test-marker" + assert result.operations[0].operation_id == "op1" + assert result.operations[0].operation_type is OperationType.EXECUTION + assert result.operations[0].status is OperationStatus.STARTED + + +def test_initial_execution_state_from_json_dict_with_timestamps(): + """Test InitialExecutionState.from_json_dict converts millisecond timestamps to datetime objects.""" + start_ms = 1672574400000 # 2023-01-01 12:00:00 UTC + end_ms = 1672578000000 # 2023-01-01 13:00:00 UTC + + data = { + "Operations": [ + { + "Id": "op1", + "Type": "EXECUTION", + "Status": "STARTED", + "StartTimestamp": start_ms, + "EndTimestamp": end_ms, + "ExecutionDetails": {"InputPayload": "test_payload"}, + } + ], + "NextMarker": "test-marker", + } + + result = InitialExecutionState.from_json_dict(data) + + expected_start = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + expected_end = datetime.datetime(2023, 1, 1, 13, 0, 0, tzinfo=datetime.UTC) + + assert len(result.operations) == 1 + operation = result.operations[0] + assert operation.start_timestamp == expected_start + assert operation.end_timestamp == expected_end + assert operation.execution_details.input_payload == "test_payload" + + +def test_initial_execution_state_from_json_dict_no_operations(): + """Test InitialExecutionState.from_json_dict handles missing Operations key.""" + data = {"NextMarker": "test-marker"} + + result = InitialExecutionState.from_json_dict(data) + + assert len(result.operations) == 0 + assert result.next_marker == "test-marker" + + +def test_initial_execution_state_from_json_dict_empty_operations(): + """Test InitialExecutionState.from_json_dict handles empty Operations list.""" + data = {"Operations": [], "NextMarker": "test-marker"} + + result = InitialExecutionState.from_json_dict(data) + + assert len(result.operations) == 0 + assert result.next_marker == "test-marker" + + +def test_initial_execution_state_json_roundtrip(): + """Test InitialExecutionState to_json_dict -> from_json_dict roundtrip preserves all data.""" + start_time = datetime.datetime(2023, 1, 1, 10, 0, 0, tzinfo=datetime.UTC) + next_attempt_time = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + + error = ErrorObject( + message="Test error", + type="TestError", + data="error_data", + stack_trace=["line1", "line2"], + ) + + step_details = StepDetails( + attempt=2, + next_attempt_timestamp=next_attempt_time, + result="step_result", + error=error, + ) + + operation = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.PENDING, + parent_id="parent1", + name="test_step", + start_timestamp=start_time, + step_details=step_details, + ) + + original = InitialExecutionState(operations=[operation], next_marker="marker123") + + # Convert to JSON dict and back + json_data = original.to_json_dict() + restored = InitialExecutionState.from_json_dict(json_data) + + # Verify all fields are preserved + assert len(restored.operations) == len(original.operations) + assert restored.next_marker == original.next_marker + + restored_op = restored.operations[0] + original_op = original.operations[0] + + assert restored_op.operation_id == original_op.operation_id + assert restored_op.operation_type == original_op.operation_type + assert restored_op.status == original_op.status + assert restored_op.parent_id == original_op.parent_id + assert restored_op.name == original_op.name + assert restored_op.start_timestamp == original_op.start_timestamp + assert restored_op.step_details.attempt == original_op.step_details.attempt + assert ( + restored_op.step_details.next_attempt_timestamp + == original_op.step_details.next_attempt_timestamp + ) + assert restored_op.step_details.result == original_op.step_details.result + assert ( + restored_op.step_details.error.message == original_op.step_details.error.message + ) + + +def test_durable_execution_invocation_input_to_json_dict_minimal(): + """Test DurableExecutionInvocationInput.to_json_dict with minimal data.""" + operation = Operation( + operation_id="exec1", + operation_type=OperationType.EXECUTION, + status=OperationStatus.STARTED, + ) + + initial_state = InitialExecutionState( + operations=[operation], next_marker="test_marker" + ) + + invocation_input = DurableExecutionInvocationInput( + durable_execution_arn="arn:test:execution", + checkpoint_token="token123", # noqa: S106 + initial_execution_state=initial_state, + ) + + result = invocation_input.to_json_dict() + expected = { + "DurableExecutionArn": "arn:test:execution", + "CheckpointToken": "token123", + "InitialExecutionState": initial_state.to_json_dict(), + } + + assert result == expected + + +def test_durable_execution_invocation_input_to_json_dict_with_timestamps(): + """Test DurableExecutionInvocationInput.to_json_dict converts datetime objects to millisecond timestamps.""" + start_time = datetime.datetime(2023, 1, 1, 10, 0, 0, tzinfo=datetime.UTC) + end_time = datetime.datetime(2023, 1, 1, 11, 0, 0, tzinfo=datetime.UTC) + + operation = Operation( + operation_id="exec1", + operation_type=OperationType.EXECUTION, + status=OperationStatus.STARTED, + start_timestamp=start_time, + end_timestamp=end_time, + execution_details=ExecutionDetails(input_payload="test_payload"), + ) + + initial_state = InitialExecutionState( + operations=[operation], next_marker="test_marker" + ) + + invocation_input = DurableExecutionInvocationInput( + durable_execution_arn="arn:test:execution", + checkpoint_token="token123", # noqa: S106 + initial_execution_state=initial_state, + ) + + result = invocation_input.to_json_dict() + + # Verify that timestamps are converted to milliseconds in nested operations + operation_result = result["InitialExecutionState"]["Operations"][0] + expected_start_ms = int(start_time.timestamp() * 1000) + expected_end_ms = int(end_time.timestamp() * 1000) + + assert operation_result["StartTimestamp"] == expected_start_ms + assert operation_result["EndTimestamp"] == expected_end_ms + assert result["DurableExecutionArn"] == "arn:test:execution" + assert result["CheckpointToken"] == "token123" + + +def test_durable_execution_invocation_input_to_json_dict_empty_operations(): + """Test DurableExecutionInvocationInput.to_json_dict with empty operations.""" + initial_state = InitialExecutionState(operations=[], next_marker="") + + invocation_input = DurableExecutionInvocationInput( + durable_execution_arn="arn:test:execution", + checkpoint_token="token123", # noqa: S106 + initial_execution_state=initial_state, + ) + + result = invocation_input.to_json_dict() + expected = { + "DurableExecutionArn": "arn:test:execution", + "CheckpointToken": "token123", + "InitialExecutionState": {"Operations": [], "NextMarker": ""}, + } + + assert result == expected + + +def test_durable_execution_invocation_input_from_json_dict_minimal(): + """Test DurableExecutionInvocationInput.from_json_dict with minimal data.""" + data = { + "DurableExecutionArn": "arn:test:execution", + "CheckpointToken": "token123", + "InitialExecutionState": { + "Operations": [ + { + "Id": "exec1", + "Type": "EXECUTION", + "Status": "STARTED", + } + ], + "NextMarker": "test_marker", + }, + } + + result = DurableExecutionInvocationInput.from_json_dict(data) + + assert result.durable_execution_arn == "arn:test:execution" + assert result.checkpoint_token == "token123" # noqa: S105 + assert isinstance(result.initial_execution_state, InitialExecutionState) + assert len(result.initial_execution_state.operations) == 1 + assert result.initial_execution_state.next_marker == "test_marker" + assert result.initial_execution_state.operations[0].operation_id == "exec1" + + +def test_durable_execution_invocation_input_from_json_dict_with_timestamps(): + """Test DurableExecutionInvocationInput.from_json_dict converts millisecond timestamps to datetime objects.""" + start_ms = 1672574400000 # 2023-01-01 12:00:00 UTC + end_ms = 1672578000000 # 2023-01-01 13:00:00 UTC + + data = { + "DurableExecutionArn": "arn:test:execution", + "CheckpointToken": "token123", + "InitialExecutionState": { + "Operations": [ + { + "Id": "exec1", + "Type": "EXECUTION", + "Status": "STARTED", + "StartTimestamp": start_ms, + "EndTimestamp": end_ms, + "ExecutionDetails": {"InputPayload": "test_payload"}, + } + ], + "NextMarker": "test_marker", + }, + } + + result = DurableExecutionInvocationInput.from_json_dict(data) + + expected_start = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + expected_end = datetime.datetime(2023, 1, 1, 13, 0, 0, tzinfo=datetime.UTC) + + operation = result.initial_execution_state.operations[0] + assert operation.start_timestamp == expected_start + assert operation.end_timestamp == expected_end + assert operation.execution_details.input_payload == "test_payload" + + +def test_durable_execution_invocation_input_from_json_dict_empty_initial_state(): + """Test DurableExecutionInvocationInput.from_json_dict handles missing InitialExecutionState.""" + data = { + "DurableExecutionArn": "arn:test:execution", + "CheckpointToken": "token123", + } + + result = DurableExecutionInvocationInput.from_json_dict(data) + + assert result.durable_execution_arn == "arn:test:execution" + assert result.checkpoint_token == "token123" # noqa: S105 + assert isinstance(result.initial_execution_state, InitialExecutionState) + assert len(result.initial_execution_state.operations) == 0 + assert not result.initial_execution_state.next_marker + + +def test_durable_execution_invocation_input_json_roundtrip(): + """Test DurableExecutionInvocationInput to_json_dict -> from_json_dict roundtrip preserves all data.""" + start_time = datetime.datetime(2023, 1, 1, 10, 0, 0, tzinfo=datetime.UTC) + end_time = datetime.datetime(2023, 1, 1, 11, 0, 0, tzinfo=datetime.UTC) + next_attempt_time = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + + error = ErrorObject( + message="Test error", + type="TestError", + data="error_data", + stack_trace=["line1", "line2"], + ) + + step_details = StepDetails( + attempt=2, + next_attempt_timestamp=next_attempt_time, + result="step_result", + error=error, + ) + + wait_details = WaitDetails(scheduled_end_timestamp=next_attempt_time) + + execution_operation = Operation( + operation_id="exec1", + operation_type=OperationType.EXECUTION, + status=OperationStatus.STARTED, + start_timestamp=start_time, + end_timestamp=end_time, + execution_details=ExecutionDetails(input_payload="test_payload"), + ) + + step_operation = Operation( + operation_id="step1", + operation_type=OperationType.STEP, + status=OperationStatus.PENDING, + parent_id="exec1", + name="test_step", + start_timestamp=start_time, + step_details=step_details, + wait_details=wait_details, + ) + + initial_state = InitialExecutionState( + operations=[execution_operation, step_operation], next_marker="marker123" + ) + + original = DurableExecutionInvocationInput( + durable_execution_arn="arn:test:execution:12345", + checkpoint_token="token123456", # noqa: S106 + initial_execution_state=initial_state, + ) + + # Convert to JSON dict and back + json_data = original.to_json_dict() + restored = DurableExecutionInvocationInput.from_json_dict(json_data) + + # Verify all top-level fields are preserved + assert restored.durable_execution_arn == original.durable_execution_arn + assert restored.checkpoint_token == original.checkpoint_token + + # Verify initial execution state is preserved + assert len(restored.initial_execution_state.operations) == len( + original.initial_execution_state.operations + ) + assert ( + restored.initial_execution_state.next_marker + == original.initial_execution_state.next_marker + ) + + # Verify execution operation is preserved + restored_exec_op = restored.initial_execution_state.operations[0] + original_exec_op = original.initial_execution_state.operations[0] + + assert restored_exec_op.operation_id == original_exec_op.operation_id + assert restored_exec_op.operation_type == original_exec_op.operation_type + assert restored_exec_op.status == original_exec_op.status + assert restored_exec_op.start_timestamp == original_exec_op.start_timestamp + assert restored_exec_op.end_timestamp == original_exec_op.end_timestamp + assert ( + restored_exec_op.execution_details.input_payload + == original_exec_op.execution_details.input_payload + ) + + # Verify step operation is preserved + restored_step_op = restored.initial_execution_state.operations[1] + original_step_op = original.initial_execution_state.operations[1] + + assert restored_step_op.operation_id == original_step_op.operation_id + assert restored_step_op.operation_type == original_step_op.operation_type + assert restored_step_op.status == original_step_op.status + assert restored_step_op.parent_id == original_step_op.parent_id + assert restored_step_op.name == original_step_op.name + assert restored_step_op.start_timestamp == original_step_op.start_timestamp + assert ( + restored_step_op.step_details.attempt == original_step_op.step_details.attempt + ) + assert ( + restored_step_op.step_details.next_attempt_timestamp + == original_step_op.step_details.next_attempt_timestamp + ) + assert restored_step_op.step_details.result == original_step_op.step_details.result + assert ( + restored_step_op.step_details.error.message + == original_step_op.step_details.error.message + ) + assert ( + restored_step_op.wait_details.scheduled_end_timestamp + == original_step_op.wait_details.scheduled_end_timestamp + ) + + +def test_durable_execution_invocation_input_json_dict_preserves_non_timestamp_fields(): + """Test that to_json_dict preserves all non-timestamp fields unchanged.""" + + context_details = ContextDetails(replay_children=True, result="context_result") + + callback_details = CallbackDetails(callback_id="cb123", result="callback_result") + + operation = Operation( + operation_id="op1", + operation_type=OperationType.CONTEXT, + status=OperationStatus.SUCCEEDED, + parent_id="parent1", + name="test_context", + context_details=context_details, + callback_details=callback_details, + ) + + initial_state = InitialExecutionState( + operations=[operation], next_marker="marker123" + ) + + invocation_input = DurableExecutionInvocationInput( + durable_execution_arn="arn:test:execution", + checkpoint_token="token123", # noqa: S106 + initial_execution_state=initial_state, + ) + + result = invocation_input.to_json_dict() + + # Verify non-timestamp fields are unchanged + operation_result = result["InitialExecutionState"]["Operations"][0] + assert operation_result["Id"] == "op1" + assert operation_result["Type"] == "CONTEXT" + assert operation_result["Status"] == "SUCCEEDED" + assert operation_result["ParentId"] == "parent1" + assert operation_result["Name"] == "test_context" + assert operation_result["ContextDetails"]["Result"] == "context_result" + assert operation_result["CallbackDetails"]["CallbackId"] == "cb123" + assert operation_result["CallbackDetails"]["Result"] == "callback_result" + + assert result["DurableExecutionArn"] == "arn:test:execution" + assert result["CheckpointToken"] == "token123" + assert result["InitialExecutionState"]["NextMarker"] == "marker123" diff --git a/tests/lambda_service_test.py b/tests/lambda_service_test.py index cc4dce4..c812069 100644 --- a/tests/lambda_service_test.py +++ b/tests/lambda_service_test.py @@ -1,6 +1,7 @@ """Tests for the service module.""" import datetime +from datetime import UTC from unittest.mock import Mock, patch import pytest @@ -33,6 +34,7 @@ StateOutput, StepDetails, StepOptions, + TimestampConverter, WaitDetails, WaitOptions, ) @@ -2042,3 +2044,619 @@ def test_lambda_client_checkpoint_with_non_none_client_token(): call_args = mock_client.checkpoint_durable_execution.call_args[1] assert call_args["ClientToken"] == "client_token_123" assert result.checkpoint_token == "new_token" # noqa: S105 + + +# ============================================================================= +# Tests for Operation JSON Serialization Methods +# ============================================================================= + + +def test_operation_to_json_dict_minimal(): + """Test Operation.to_json_dict with minimal required fields.""" + operation = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + ) + + result = operation.to_json_dict() + expected = { + "Id": "op1", + "Type": "STEP", + "Status": "SUCCEEDED", + } + assert result == expected + + +def test_operation_to_json_dict_with_timestamps(): + """Test Operation.to_json_dict converts datetime objects to millisecond timestamps.""" + start_time = datetime.datetime(2023, 1, 1, 10, 0, 0, tzinfo=datetime.UTC) + end_time = datetime.datetime(2023, 1, 1, 11, 30, 0, tzinfo=datetime.UTC) + + operation = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=start_time, + end_timestamp=end_time, + ) + + result = operation.to_json_dict() + + # Convert expected timestamps to milliseconds + expected_start_ms = int(start_time.timestamp() * 1000) # 1672574400000 + expected_end_ms = int(end_time.timestamp() * 1000) # 1672579800000 + + assert result["StartTimestamp"] == expected_start_ms + assert result["EndTimestamp"] == expected_end_ms + assert result["Id"] == "op1" + assert result["Type"] == "STEP" + assert result["Status"] == "SUCCEEDED" + + +def test_operation_to_json_dict_with_step_details_timestamp(): + """Test Operation.to_json_dict converts StepDetails.NextAttemptTimestamp to milliseconds.""" + next_attempt_time = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + step_details = StepDetails( + attempt=2, next_attempt_timestamp=next_attempt_time, result="step_result" + ) + + operation = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.PENDING, + step_details=step_details, + ) + + result = operation.to_json_dict() + expected_ms = int(next_attempt_time.timestamp() * 1000) # 1672581600000 + + assert result["StepDetails"]["NextAttemptTimestamp"] == expected_ms + assert result["StepDetails"]["Attempt"] == 2 + assert result["StepDetails"]["Result"] == "step_result" + + +def test_operation_to_json_dict_with_wait_details_timestamp(): + """Test Operation.to_json_dict converts WaitDetails.ScheduledEndTimestamp to milliseconds.""" + scheduled_end_time = datetime.datetime(2023, 1, 1, 15, 0, 0, tzinfo=datetime.UTC) + wait_details = WaitDetails(scheduled_end_timestamp=scheduled_end_time) + + operation = Operation( + operation_id="op1", + operation_type=OperationType.WAIT, + status=OperationStatus.PENDING, + wait_details=wait_details, + ) + + result = operation.to_json_dict() + expected_ms = int(scheduled_end_time.timestamp() * 1000) # 1672592400000 + + assert result["WaitDetails"]["ScheduledEndTimestamp"] == expected_ms + + +def test_operation_to_json_dict_with_all_timestamps(): + """Test Operation.to_json_dict with all timestamp fields present.""" + start_time = datetime.datetime(2023, 1, 1, 10, 0, 0, tzinfo=datetime.UTC) + end_time = datetime.datetime(2023, 1, 1, 11, 0, 0, tzinfo=datetime.UTC) + next_attempt_time = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + scheduled_end_time = datetime.datetime(2023, 1, 1, 13, 0, 0, tzinfo=datetime.UTC) + + step_details = StepDetails( + attempt=1, next_attempt_timestamp=next_attempt_time, result="step_result" + ) + wait_details = WaitDetails(scheduled_end_timestamp=scheduled_end_time) + + operation = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.PENDING, + start_timestamp=start_time, + end_timestamp=end_time, + step_details=step_details, + wait_details=wait_details, + ) + + result = operation.to_json_dict() + + # Verify all timestamps are converted to milliseconds + assert result["StartTimestamp"] == int(start_time.timestamp() * 1000) + assert result["EndTimestamp"] == int(end_time.timestamp() * 1000) + assert result["StepDetails"]["NextAttemptTimestamp"] == int( + next_attempt_time.timestamp() * 1000 + ) + assert result["WaitDetails"]["ScheduledEndTimestamp"] == int( + scheduled_end_time.timestamp() * 1000 + ) + + +def test_operation_to_json_dict_with_none_timestamps(): + """Test Operation.to_json_dict handles None timestamp values correctly.""" + step_details = StepDetails( + attempt=1, next_attempt_timestamp=None, result="step_result" + ) + wait_details = WaitDetails(scheduled_end_timestamp=None) + + operation = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.SUCCEEDED, + start_timestamp=None, + end_timestamp=None, + step_details=step_details, + wait_details=wait_details, + ) + + result = operation.to_json_dict() + + # None timestamps should not be present in the result + assert "StartTimestamp" not in result + assert "EndTimestamp" not in result + assert "NextAttemptTimestamp" not in result["StepDetails"] + assert result["WaitDetails"] == {} # Empty dict when no scheduled end timestamp + + +def test_operation_from_json_dict_minimal(): + """Test Operation.from_json_dict with minimal required fields.""" + data = { + "Id": "op1", + "Type": "STEP", + "Status": "SUCCEEDED", + } + + operation = Operation.from_json_dict(data) + assert operation.operation_id == "op1" + assert operation.operation_type is OperationType.STEP + assert operation.status is OperationStatus.SUCCEEDED + assert operation.start_timestamp is None + assert operation.end_timestamp is None + + +def test_operation_from_json_dict_with_timestamps(): + """Test Operation.from_json_dict converts millisecond timestamps to datetime objects.""" + start_ms = 1672574400000 # 2023-01-01 12:00:00 UTC + end_ms = 1672579800000 # 2023-01-01 13:30:00 UTC + + data = { + "Id": "op1", + "Type": "STEP", + "Status": "SUCCEEDED", + "StartTimestamp": start_ms, + "EndTimestamp": end_ms, + } + + operation = Operation.from_json_dict(data) + + expected_start = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + expected_end = datetime.datetime(2023, 1, 1, 13, 30, 0, tzinfo=datetime.UTC) + + assert operation.start_timestamp == expected_start + assert operation.end_timestamp == expected_end + assert operation.operation_id == "op1" + + +def test_operation_from_json_dict_with_step_details_timestamp(): + """Test Operation.from_json_dict converts StepDetails.NextAttemptTimestamp from milliseconds.""" + next_attempt_ms = 1672581600000 # 2023-01-01 14:00:00 UTC + + data = { + "Id": "op1", + "Type": "STEP", + "Status": "PENDING", + "StepDetails": { + "Attempt": 2, + "NextAttemptTimestamp": next_attempt_ms, + "Result": "step_result", + }, + } + + operation = Operation.from_json_dict(data) + expected_time = datetime.datetime(2023, 1, 1, 14, 0, 0, tzinfo=datetime.UTC) + + assert operation.step_details.next_attempt_timestamp == expected_time + assert operation.step_details.attempt == 2 + assert operation.step_details.result == "step_result" + + +def test_operation_from_json_dict_with_wait_details_timestamp(): + """Test Operation.from_json_dict converts WaitDetails.ScheduledEndTimestamp from milliseconds.""" + scheduled_end_ms = 1672592400000 # 2023-01-01 17:00:00 UTC + + data = { + "Id": "op1", + "Type": "WAIT", + "Status": "PENDING", + "WaitDetails": {"ScheduledEndTimestamp": scheduled_end_ms}, + } + + operation = Operation.from_json_dict(data) + expected_time = datetime.datetime(2023, 1, 1, 17, 0, 0, tzinfo=datetime.UTC) + + assert operation.wait_details.scheduled_end_timestamp == expected_time + + +def test_operation_from_json_dict_with_all_timestamps(): + """Test Operation.from_json_dict with all timestamp fields present.""" + start_ms = 1672574400000 # 2023-01-01 120:00:00 UTC + end_ms = 1672578000000 # 2023-01-01 13:00:00 UTC + next_attempt_ms = 1672581600000 # 2023-01-01 14:00:00 UTC + scheduled_end_ms = 1672585200000 # 2023-01-01 15:00:00 UTC + + data = { + "Id": "op1", + "Type": "STEP", + "Status": "PENDING", + "StartTimestamp": start_ms, + "EndTimestamp": end_ms, + "StepDetails": { + "Attempt": 1, + "NextAttemptTimestamp": next_attempt_ms, + "Result": "step_result", + }, + "WaitDetails": {"ScheduledEndTimestamp": scheduled_end_ms}, + } + + operation = Operation.from_json_dict(data) + + # Verify all timestamps are converted correctly + assert operation.start_timestamp == datetime.datetime( + 2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC + ) + assert operation.end_timestamp == datetime.datetime( + 2023, 1, 1, 13, 0, 0, tzinfo=datetime.UTC + ) + assert operation.step_details.next_attempt_timestamp == datetime.datetime( + 2023, 1, 1, 14, 0, 0, tzinfo=datetime.UTC + ) + assert operation.wait_details.scheduled_end_timestamp == datetime.datetime( + 2023, 1, 1, 15, 0, 0, tzinfo=datetime.UTC + ) + + +def test_operation_from_json_dict_with_none_timestamps(): + """Test Operation.from_json_dict handles None timestamp values correctly.""" + data = { + "Id": "op1", + "Type": "STEP", + "Status": "SUCCEEDED", + "StartTimestamp": None, + "EndTimestamp": None, + "StepDetails": { + "Attempt": 1, + "NextAttemptTimestamp": None, + "Result": "step_result", + }, + "WaitDetails": {"ScheduledEndTimestamp": None}, + } + + operation = Operation.from_json_dict(data) + + assert operation.start_timestamp is None + assert operation.end_timestamp is None + assert operation.step_details.next_attempt_timestamp is None + assert operation.wait_details.scheduled_end_timestamp is None + + +def test_operation_json_roundtrip(): + """Test Operation to_json_dict -> from_json_dict roundtrip preserves all data.""" + start_time = datetime.datetime(2023, 1, 1, 10, 0, 0, tzinfo=datetime.UTC) + end_time = datetime.datetime(2023, 1, 1, 11, 0, 0, tzinfo=datetime.UTC) + next_attempt_time = datetime.datetime(2023, 1, 1, 12, 0, 0, tzinfo=datetime.UTC) + scheduled_end_time = datetime.datetime(2023, 1, 1, 13, 0, 0, tzinfo=datetime.UTC) + + error = ErrorObject( + message="Test error", + type="TestError", + data="error_data", + stack_trace=["line1", "line2"], + ) + + step_details = StepDetails( + attempt=2, + next_attempt_timestamp=next_attempt_time, + result="step_result", + error=error, + ) + + wait_details = WaitDetails(scheduled_end_timestamp=scheduled_end_time) + + callback_details = CallbackDetails(callback_id="cb123", result="callback_result") + + execution_details = ExecutionDetails(input_payload="exec_payload") + + original = Operation( + operation_id="op1", + operation_type=OperationType.STEP, + status=OperationStatus.PENDING, + parent_id="parent1", + name="test_step", + start_timestamp=start_time, + end_timestamp=end_time, + sub_type=OperationSubType.STEP, + execution_details=execution_details, + step_details=step_details, + wait_details=wait_details, + callback_details=callback_details, + ) + + # Convert to JSON dict and back + json_data = original.to_json_dict() + restored = Operation.from_json_dict(json_data) + + # Verify all fields are preserved + assert restored.operation_id == original.operation_id + assert restored.operation_type == original.operation_type + assert restored.status == original.status + assert restored.parent_id == original.parent_id + assert restored.name == original.name + assert restored.start_timestamp == original.start_timestamp + assert restored.end_timestamp == original.end_timestamp + assert restored.sub_type == original.sub_type + assert ( + restored.execution_details.input_payload + == original.execution_details.input_payload + ) + assert restored.step_details.attempt == original.step_details.attempt + assert ( + restored.step_details.next_attempt_timestamp + == original.step_details.next_attempt_timestamp + ) + assert restored.step_details.result == original.step_details.result + assert restored.step_details.error.message == original.step_details.error.message + assert ( + restored.wait_details.scheduled_end_timestamp + == original.wait_details.scheduled_end_timestamp + ) + assert ( + restored.callback_details.callback_id == original.callback_details.callback_id + ) + + +def test_operation_json_dict_preserves_non_timestamp_fields(): + """Test that to_json_dict preserves all non-timestamp fields unchanged.""" + context_details = ContextDetails(replay_children=True, result="context_result") + + chained_invoke_details = ChainedInvokeDetails(result="invoke_result") + + operation = Operation( + operation_id="op1", + operation_type=OperationType.CONTEXT, + status=OperationStatus.SUCCEEDED, + parent_id="parent1", + name="test_context", + sub_type=OperationSubType.RUN_IN_CHILD_CONTEXT, + context_details=context_details, + chained_invoke_details=chained_invoke_details, + ) + + result = operation.to_json_dict() + + # Verify non-timestamp fields are unchanged + assert result["Id"] == "op1" + assert result["Type"] == "CONTEXT" + assert result["Status"] == "SUCCEEDED" + assert result["ParentId"] == "parent1" + assert result["Name"] == "test_context" + assert result["SubType"] == "RunInChildContext" + assert result["ContextDetails"]["Result"] == "context_result" + assert result["ChainedInvokeDetails"]["Result"] == "invoke_result" + + +# region TimestampConverter Tests +def test_timestamp_converter_to_unix_millis_valid_datetime(): + """Test converting valid datetime to Unix timestamp in milliseconds.""" + # Test epoch + epoch = datetime.datetime(1970, 1, 1, tzinfo=UTC) + assert TimestampConverter.to_unix_millis(epoch) == 0 + + # Test specific datetime + dt = datetime.datetime(2024, 1, 1, 12, 30, 45, 123456, tzinfo=UTC) + expected_ms = int(dt.timestamp() * 1000) + assert TimestampConverter.to_unix_millis(dt) == expected_ms + + # Test current time + now = datetime.datetime.now(UTC) + result = TimestampConverter.to_unix_millis(now) + assert isinstance(result, int) + assert result > 0 + + +def test_timestamp_converter_to_unix_millis_none(): + """Test converting None to Unix timestamp returns None.""" + assert TimestampConverter.to_unix_millis(None) is None + + +def test_timestamp_converter_to_unix_millis_edge_cases(): + """Test edge cases for datetime to Unix timestamp conversion.""" + # Test year 2038 (Unix timestamp overflow boundary for 32-bit systems) + dt_2038 = datetime.datetime(2038, 1, 19, 3, 14, 7, tzinfo=UTC) + result = TimestampConverter.to_unix_millis(dt_2038) + assert isinstance(result, int) + assert result > 0 + + # Test far future date + far_future = datetime.datetime(2100, 12, 31, 23, 59, 59, tzinfo=UTC) + result = TimestampConverter.to_unix_millis(far_future) + assert isinstance(result, int) + assert result > 0 + + # Test microseconds precision (should be truncated in milliseconds) + dt_with_microseconds = datetime.datetime(2024, 1, 1, 0, 0, 0, 123456, tzinfo=UTC) + result = TimestampConverter.to_unix_millis(dt_with_microseconds) + # Verify milliseconds precision (microseconds should be truncated) + expected = int(dt_with_microseconds.timestamp() * 1000) + assert result == expected + + +def test_timestamp_converter_from_unix_millis_valid_timestamp(): + """Test converting valid Unix timestamp in milliseconds to datetime.""" + # Test epoch + assert TimestampConverter.from_unix_millis(0) == datetime.datetime( + 1970, 1, 1, tzinfo=UTC + ) + + # Test specific timestamp + ms = 1704110445123 # 2024-01-01 12:30:45.123 UTC + result = TimestampConverter.from_unix_millis(ms) + expected = datetime.datetime.fromtimestamp(ms / 1000, tz=UTC) + assert result == expected + assert result.tzinfo == UTC + + # Test positive timestamp + ms = 1609459200000 # 2021-01-01 00:00:00 UTC + result = TimestampConverter.from_unix_millis(ms) + assert result == datetime.datetime(2021, 1, 1, tzinfo=UTC) + + +def test_timestamp_converter_from_unix_millis_none(): + """Test converting None timestamp returns None.""" + assert TimestampConverter.from_unix_millis(None) is None + + +def test_timestamp_converter_from_unix_millis_zero(): + """Test converting zero timestamp returns epoch.""" + result = TimestampConverter.from_unix_millis(0) + assert result == datetime.datetime(1970, 1, 1, tzinfo=UTC) + + +def test_timestamp_converter_from_unix_millis_negative(): + """Test converting negative timestamp (before epoch).""" + # Test negative timestamp (before 1970) + ms = -86400000 # 1969-12-31 00:00:00 UTC + result = TimestampConverter.from_unix_millis(ms) + expected = datetime.datetime.fromtimestamp(ms / 1000, tz=UTC) + assert result == expected + assert result.year == 1969 + + +def test_timestamp_converter_from_unix_millis_large_timestamp(): + """Test converting large timestamp values.""" + # Test year 2038 boundary + ms = 2147483647000 # 2038-01-19 03:14:07 UTC + result = TimestampConverter.from_unix_millis(ms) + expected = datetime.datetime.fromtimestamp(ms / 1000, tz=UTC) + assert result == expected + + # Test far future + ms = 4102444800000 # 2100-01-01 00:00:00 UTC + result = TimestampConverter.from_unix_millis(ms) + expected = datetime.datetime.fromtimestamp(ms / 1000, tz=UTC) + assert result == expected + + +def test_timestamp_converter_roundtrip_conversion(): + """Test roundtrip conversion: datetime -> millis -> datetime.""" + original_datetimes = [ + datetime.datetime(1970, 1, 1, tzinfo=UTC), # Epoch + datetime.datetime(2024, 1, 1, 12, 30, 45, tzinfo=UTC), # Specific date + datetime.datetime( + 2024, 12, 31, 23, 59, 59, 999000, tzinfo=UTC + ), # End of year with millis + datetime.datetime.now(UTC), # Current time + datetime.datetime(2038, 1, 19, 3, 14, 7, tzinfo=UTC), # 2038 boundary + datetime.datetime(1969, 12, 31, 23, 59, 59, tzinfo=UTC), # Before epoch + ] + + for original in original_datetimes: + # Convert to milliseconds and back + millis = TimestampConverter.to_unix_millis(original) + converted_back = TimestampConverter.from_unix_millis(millis) + + # Should be equal within millisecond precision + # (microseconds may be lost due to integer conversion) + assert abs((converted_back - original).total_seconds()) < 0.001 + + +def test_timestamp_converter_roundtrip_with_none(): + """Test roundtrip conversion with None values.""" + # None -> None -> None + millis = TimestampConverter.to_unix_millis(None) + assert millis is None + + converted_back = TimestampConverter.from_unix_millis(millis) + assert converted_back is None + + +def test_timestamp_converter_precision_handling(): + """Test precision handling in timestamp conversions.""" + # Test that microseconds are properly handled in millisecond conversion + dt_with_microseconds = datetime.datetime(2024, 1, 1, 0, 0, 0, 123456, tzinfo=UTC) + + # Convert to milliseconds (should truncate microseconds to nearest millisecond) + millis = TimestampConverter.to_unix_millis(dt_with_microseconds) + + # Convert back + converted_back = TimestampConverter.from_unix_millis(millis) + + # The difference should be less than 1 millisecond + time_diff = abs((converted_back - dt_with_microseconds).total_seconds()) + assert time_diff < 0.001 + + +def test_timestamp_converter_timezone_handling(): + """Test that converted datetimes always have UTC timezone.""" + test_timestamps = [0, 1704110445123, -86400000, 2147483647000] + + for ms in test_timestamps: + result = TimestampConverter.from_unix_millis(ms) + assert result.tzinfo == UTC + + +def test_timestamp_converter_type_validation(): + """Test that methods return correct types.""" + # Test to_unix_millis return type + dt = datetime.datetime(2024, 1, 1, tzinfo=UTC) + result = TimestampConverter.to_unix_millis(dt) + assert isinstance(result, int) + + result_none = TimestampConverter.to_unix_millis(None) + assert result_none is None + + # Test from_unix_millis return type + ms = 1704110445123 + result = TimestampConverter.from_unix_millis(ms) + assert isinstance(result, datetime.datetime) + + result_none = TimestampConverter.from_unix_millis(None) + assert result_none is None + + +def test_timestamp_converter_static_methods(): + """Test that TimestampConverter methods are static and can be called without instance.""" + # Should be able to call without creating instance + dt = datetime.datetime(2024, 1, 1, tzinfo=UTC) + + # Call as static methods + millis = TimestampConverter.to_unix_millis(dt) + converted_back = TimestampConverter.from_unix_millis(millis) + + assert isinstance(millis, int) + assert isinstance(converted_back, datetime.datetime) + assert converted_back.tzinfo == UTC + + +def test_timestamp_converter_millisecond_boundaries(): + """Test conversion at millisecond boundaries.""" + # Test exact millisecond values + test_cases = [ + (datetime.datetime(2024, 1, 1, 0, 0, 0, 0, tzinfo=UTC), 1704067200000), + ( + datetime.datetime(2024, 1, 1, 0, 0, 0, 500000, tzinfo=UTC), + 1704067200500, + ), # 500ms + ( + datetime.datetime(2024, 1, 1, 0, 0, 0, 999000, tzinfo=UTC), + 1704067200999, + ), # 999ms + ] + + for dt, expected_ms in test_cases: + result_ms = TimestampConverter.to_unix_millis(dt) + assert result_ms == expected_ms + + # Convert back and verify + result_dt = TimestampConverter.from_unix_millis(result_ms) + # Should be equal within millisecond precision + assert abs((result_dt - dt).total_seconds()) < 0.001 + + +# endregion