Skip to content

Conversation

@ajcasagrande
Copy link
Contributor

@ajcasagrande ajcasagrande commented Oct 22, 2025

image
image

Summary by CodeRabbit

Release Notes

  • New Features
    • Improved error detection and handling for streaming responses with early error identification
    • Enhanced error reporting with error code support for better diagnostics and troubleshooting

@github-actions github-actions bot added the fix label Oct 22, 2025
@codecov
Copy link

codecov bot commented Oct 22, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@ajcasagrande
Copy link
Contributor Author

ajcasagrande commented Oct 22, 2025

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Oct 22, 2025

@ajcasagrande: I'll review the changes in this PR right away.

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link

coderabbitai bot commented Oct 22, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coderabbitai
Copy link

coderabbitai bot commented Oct 22, 2025

Tip

For best results, initiate chat on the files or code changes.

@ajcasagrande: Hi! I see you've tagged me. How can I help you with this PR?

I've already triggered a review based on your previous request. Would you like me to:

  • Answer questions about the changes?
  • Re-review the PR with specific focus areas?
  • Help with something specific about the SSE error event data parsing?
  • Something else?

Please let me know what you need! 🐰

@coderabbitai
Copy link

coderabbitai bot commented Oct 22, 2025

Walkthrough

The changes introduce Server-Sent Events (SSE) error handling across the codebase by adding a new SSEEventType enum with an ERROR member, creating an SSEResponseError exception, and integrating error detection logic into the transport layer to catch and propagate SSE errors during message streaming.

Changes

Cohort / File(s) Summary
SSE Enums & Exception Types
src/aiperf/common/enums/sse_enums.py, src/aiperf/common/enums/__init__.py, src/aiperf/common/exceptions.py
Added new SSEEventType enum with ERROR member; imported and exported SSEEventType from enums package; created SSEResponseError exception class with message and error_code parameters.
Error Propagation
src/aiperf/common/models/error_models.py
Modified ErrorDetails.from_exception() to extract and propagate error_code attribute from exceptions into ErrorDetails instance.
Transport Layer & SSE Utilities
src/aiperf/transports/sse_utils.py, src/aiperf/transports/aiohttp_client.py
Added inspect_message_for_error() static method to AsyncSSEStreamReader to detect error events and raise SSEResponseError; integrated error detection into read_complete_stream(); added SSEResponseError handling in aiohttp_client._request() with error logging and record population.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Specific areas requiring attention:
    • Error detection logic in AsyncSSEStreamReader.inspect_message_for_error(): verify correct identification of SSE error packets and fallback message construction
    • Error code propagation chain: confirm error_code flows correctly from SSEResponseError → ErrorDetails → record.error
    • Integration point in aiohttp_client._request(): ensure SSEResponseError handling occurs before general exception handler and timing is ended appropriately

Poem

🐰 A stream once flowed with messages untamed,
Now errors caught before they inflamed,
SSE events now speak their code,
With error_code along the road,
Hop-check complete—the path's reclaimed! ✨

Pre-merge checks

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "fix: parse and detect sse error event data from dynamo" directly describes the core changes in the changeset. The modifications add SSE error detection through the new inspect_message_for_error method, introduce SSEEventType enum with an ERROR member, add SSEResponseError exception class, and integrate error handling in the aiohttp client transport. The title accurately captures the main objective of parsing and detecting SSE error events. The reference to "dynamo" appears to be contextual information about the source of the data being processed. The title is concise, uses conventional commit format, and is specific enough to convey the primary change without unnecessary verbosity.
Docstring Coverage ✅ Passed Docstring coverage is 83.33% which is sufficient. The required threshold is 80.00%.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/clients/http/test_aiohttp_sse.py (1)

257-271: Remove unused fixture parameter.

The mock_sse_response parameter is received but never used—a new Mock() is created on line 268 instead. Remove the parameter to clean up the fixture signature.

Apply this diff:

 @pytest.fixture
-async def mock_sse_iterator(mock_sse_response: Mock):
+async def mock_sse_iterator():
     """Async fixture that patches AioHttpSSEStreamReader.__aiter__ for testing."""
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5ee6f01 and 1b5e7c2.

📒 Files selected for processing (9)
  • aiperf/clients/http/aiohttp_client.py (4 hunks)
  • aiperf/common/enums/__init__.py (2 hunks)
  • aiperf/common/enums/sse_enums.py (1 hunks)
  • aiperf/common/exceptions.py (1 hunks)
  • aiperf/common/models/error_models.py (1 hunks)
  • tests/clients/http/conftest.py (2 hunks)
  • tests/clients/http/test_aiohttp_client.py (1 hunks)
  • tests/clients/http/test_aiohttp_sse.py (2 hunks)
  • tests/conftest.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
aiperf/common/enums/sse_enums.py (1)
aiperf/common/enums/base_enums.py (1)
  • CaseInsensitiveStrEnum (10-49)
aiperf/common/enums/__init__.py (1)
aiperf/common/enums/sse_enums.py (1)
  • SSEEventType (17-20)
aiperf/common/models/error_models.py (1)
tests/controller/conftest.py (1)
  • error_details (68-70)
tests/conftest.py (1)
aiperf/module_loader.py (1)
  • ensure_modules_loaded (47-58)
aiperf/clients/http/aiohttp_client.py (5)
aiperf/common/enums/sse_enums.py (2)
  • SSEEventType (17-20)
  • SSEFieldType (7-14)
aiperf/common/exceptions.py (1)
  • SSEResponseError (158-163)
tests/utils/time_traveler.py (2)
  • time (48-49)
  • perf_counter_ns (54-55)
aiperf/common/protocols.py (1)
  • error (70-70)
aiperf/common/models/error_models.py (2)
  • ErrorDetails (11-50)
  • from_exception (42-50)
tests/clients/http/test_aiohttp_client.py (3)
tests/clients/http/conftest.py (2)
  • aiohttp_client (115-119)
  • sse_error_reader_context (291-310)
aiperf/clients/http/aiohttp_client.py (1)
  • AioHttpClientMixin (28-165)
aiperf/common/models/record_models.py (2)
  • start_perf_ns (539-541)
  • end_perf_ns (551-562)
tests/clients/http/test_aiohttp_sse.py (4)
aiperf/clients/http/aiohttp_client.py (3)
  • AioHttpSSEStreamReader (168-244)
  • parse_sse_message (247-273)
  • read_complete_stream (178-213)
aiperf/common/enums/sse_enums.py (1)
  • SSEFieldType (7-14)
aiperf/common/exceptions.py (1)
  • SSEResponseError (158-163)
aiperf/common/models/error_models.py (2)
  • ErrorDetails (11-50)
  • from_exception (42-50)
🪛 Ruff (0.14.1)
aiperf/clients/http/aiohttp_client.py

137-137: Do not catch blind exception: Exception

(BLE001)


207-209: Avoid specifying long messages outside the exception class

(TRY003)

tests/clients/http/test_aiohttp_sse.py

257-257: Unused function argument: mock_sse_response

(ARG001)

🔇 Additional comments (17)
aiperf/common/models/error_models.py (1)

41-50: LGTM! Clean error code extraction from exceptions.

The refactored from_exception method correctly extracts the error_code attribute when present, enabling richer error details for exceptions like SSEResponseError.

aiperf/common/exceptions.py (1)

158-164: LGTM! Well-structured exception for SSE errors.

The SSEResponseError class correctly stores the error_code attribute, which will be automatically extracted by ErrorDetails.from_exception.

aiperf/common/enums/__init__.py (1)

95-97: LGTM! Public API surface updated correctly.

SSEEventType is now properly exported for external usage.

Also applies to: 166-166

tests/conftest.py (1)

33-33: LGTM! Module preloading ensures factory registration.

Calling ensure_modules_loaded() at module import time guarantees all factories are registered before test execution.

Also applies to: 71-72

aiperf/common/enums/sse_enums.py (1)

16-20: LGTM! Well-structured enum for SSE event types.

The SSEEventType enum follows existing patterns and provides case-insensitive matching for SSE error events.

tests/clients/http/conftest.py (1)

5-5: LGTM! Well-designed fixture for SSE error testing.

The sse_error_reader_context fixture provides a clean, reusable way to simulate SSE error scenarios with proper mocking and context management.

Also applies to: 290-310

tests/clients/http/test_aiohttp_client.py (1)

369-413: LGTM! Comprehensive SSE error handling tests.

Both tests thoroughly validate SSE error detection, conversion to ErrorDetails, and timing consistency. The tests verify:

  • Error code (502) propagation
  • Exception type preservation
  • Message content
  • Proper timing metrics
aiperf/clients/http/aiohttp_client.py (4)

12-13: LGTM! Necessary imports for SSE error handling.


133-140: LGTM! Proper exception handling with unified error conversion.

The dedicated SSEResponseError handler and fallback generic handler both correctly set timing, log errors, and convert exceptions to ErrorDetails. The static analysis warning about catching Exception is a false positive—this is the appropriate final catch-all for the request handler.


190-209: SSE error detection logic is sound.

The implementation correctly:

  • Identifies error events by checking the first packet
  • Extracts error messages from the comment field
  • Raises SSEResponseError with a 502 code (appropriate as it indicates the SSE stream is in an error state)

The fallback error message when no comment is present includes the full JSON dump, which provides complete diagnostic information.


240-244: LGTM! Cleaner UTF-8 decoding with error handling.

The simplified approach using errors='replace' is more robust and concise than the previous exception-based handling.

tests/clients/http/test_aiohttp_sse.py (6)

10-20: LGTM!

The new imports are appropriate and necessary for the comprehensive SSE error handling test suite.


241-253: LGTM!

The fixture provides a clean interface for building SSE stream test data with auto-incrementing timestamps, reducing test boilerplate while maintaining flexibility for timestamp overrides.


274-278: LGTM!

Good use of a class-level constant for consistent timestamp values across tests.


280-313: LGTM!

Comprehensive parametrized test coverage for SSE error messages with various comment formats.


315-328: LGTM!

Good coverage of error events without the comment field, properly validating the fallback "Unknown error" message.


347-467: LGTM!

Excellent comprehensive test coverage for SSE error handling edge cases:

  • Error in middle of stream with immediate exception
  • Correct 502 error code propagation
  • ErrorDetails conversion preservation
  • Normal events not misidentified as errors
  • Long messages, special characters, whitespace handling
  • Multiple errors and mixed field scenarios
  • Raw escape sequence parsing

The tests are well-structured, properly documented, and provide thorough validation of the error handling implementation.

@ajcasagrande ajcasagrande self-assigned this Oct 23, 2025
@ajcasagrande ajcasagrande marked this pull request as draft October 24, 2025 05:19
@ajcasagrande ajcasagrande marked this pull request as ready for review October 25, 2025 01:13
@ajcasagrande
Copy link
Contributor Author

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Oct 25, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

1 similar comment
@coderabbitai
Copy link

coderabbitai bot commented Oct 25, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/aiperf/transports/sse_utils.py (1)

80-106: Solid error detection logic with clear flow.

The method correctly:

  1. Detects error events by checking for EVENT field with ERROR value
  2. Extracts error details from COMMENT field when available
  3. Falls back to the full message JSON for debugging
  4. Raises SSEResponseError with 502 (Bad Gateway), which is semantically appropriate for upstream SSE errors

The logic handles edge cases well (no comment field, unknown errors) and provides informative error messages.

The static analysis hint (TRY003) suggests moving the error message construction into the exception class or a helper method to improve maintainability. This is a minor stylistic improvement:

# Option 1: Helper method
def _format_sse_error_message(error_message: str | None, message: SSEMessage) -> str:
    if error_message:
        return f"Error occurred in SSE response: {error_message}"
    return f"Unknown error in SSE response: {message.model_dump_json()}"

# Then in the method:
raise SSEResponseError(_format_sse_error_message(error_message, message), error_code=502)
📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1b5e7c2 and e065976.

📒 Files selected for processing (6)
  • src/aiperf/common/enums/__init__.py (2 hunks)
  • src/aiperf/common/enums/sse_enums.py (1 hunks)
  • src/aiperf/common/exceptions.py (1 hunks)
  • src/aiperf/common/models/error_models.py (1 hunks)
  • src/aiperf/transports/aiohttp_client.py (3 hunks)
  • src/aiperf/transports/sse_utils.py (2 hunks)
🧰 Additional context used
🪛 Ruff (0.14.1)
src/aiperf/transports/sse_utils.py

104-106: Avoid specifying long messages outside the exception class

(TRY003)

🔇 Additional comments (9)
src/aiperf/transports/aiohttp_client.py (3)

9-9: LGTM! Import added for SSE error handling.

The import is correctly placed and necessary for the new exception handling logic below.


106-106: Well-placed error detection for SSE messages.

The error inspection is correctly positioned to detect SSE error events early in the processing pipeline, enabling fail-fast behavior before the message is appended to the responses list.


119-126: Good separation of SSE error handling from general exceptions.

The dedicated SSEResponseError handler provides specific logging while maintaining consistent error recording patterns. The use of ErrorDetails.from_exception(e) enables automatic propagation of the error_code attribute from the exception.

src/aiperf/common/models/error_models.py (1)

68-76: Clean refactoring to support error code propagation.

The modification correctly enables exceptions with an error_code attribute (like SSEResponseError) to propagate their error code to the ErrorDetails instance, while maintaining backward compatibility with exceptions that don't have this attribute.

src/aiperf/common/enums/sse_enums.py (1)

17-20: LGTM! Clean enum addition for SSE event types.

The new SSEEventType enum follows the existing pattern and conventions. The ERROR event type aligns with standard SSE protocol usage.

src/aiperf/common/enums/__init__.py (1)

94-94: LGTM! Auto-generated export of SSEEventType.

The export is correctly added to both the import statement and the __all__ list, making the new enum publicly available through the package's public API.

Also applies to: 159-159

src/aiperf/common/exceptions.py (1)

170-176: Well-designed exception for SSE error handling.

The SSEResponseError class correctly extends AIPerfError and includes an error_code attribute that will be propagated to ErrorDetails via the from_exception method. The default error_code=500 provides a reasonable fallback, while callers can specify more specific codes (like 502 for Bad Gateway).

src/aiperf/transports/sse_utils.py (2)

9-10: LGTM! Imports added for SSE error detection.

The necessary enums and exception class are imported to support the new error detection functionality.


76-76: Consistent error detection in complete stream reading.

The error inspection call mirrors the pattern in the streaming path, ensuring SSE errors are detected regardless of how the stream is consumed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant