Skip to content

Commit 4186731

Browse files
fix: resolve streaming endpoint deadlock by pre-consuming request body (#426)
- Fix infinite hang issue in `/v1/message:stream` endpoint - Pre-consume `request.body()` in `_handle_streaming_request` to prevent deadlock - `EventSourceResponse` context was causing `request.body()` consumption to block - Add comprehensive error handling for body consumption failures - Add regression tests for streaming endpoint request body handling Fixes deadlock where `request.body()` consumption inside `EventSourceResponse` context caused the event loop to hang indefinitely. Breaking changes: None Backward compatibility: Maintained Fixes [#1001](#431) 🦕 Release-As: 0.3.4
1 parent f34076e commit 4186731

File tree

2 files changed

+143
-1
lines changed

2 files changed

+143
-1
lines changed

src/a2a/server/apps/rest/rest_adapter.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
rest_error_handler,
4040
rest_stream_error_handler,
4141
)
42-
from a2a.utils.errors import ServerError
42+
from a2a.utils.errors import InvalidRequestError, ServerError
4343

4444

4545
logger = logging.getLogger(__name__)
@@ -120,6 +120,18 @@ async def _handle_streaming_request(
120120
method: Callable[[Request, ServerCallContext], AsyncIterable[Any]],
121121
request: Request,
122122
) -> EventSourceResponse:
123+
# Pre-consume and cache the request body to prevent deadlock in streaming context
124+
# This is required because Starlette's request.body() can only be consumed once,
125+
# and attempting to consume it after EventSourceResponse starts causes deadlock
126+
try:
127+
await request.body()
128+
except (ValueError, RuntimeError, OSError) as e:
129+
raise ServerError(
130+
error=InvalidRequestError(
131+
message=f'Failed to pre-consume request body: {e}'
132+
)
133+
) from e
134+
123135
call_context = self._context_builder.build(request)
124136

125137
async def event_generator(

tests/server/apps/rest/test_rest_fastapi_app.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,27 @@ async def agent_card() -> AgentCard:
3434
mock_agent_card = MagicMock(spec=AgentCard)
3535
mock_agent_card.url = 'http://mockurl.com'
3636
mock_agent_card.supports_authenticated_extended_card = False
37+
38+
# Mock the capabilities object with streaming disabled
39+
mock_capabilities = MagicMock()
40+
mock_capabilities.streaming = False
41+
mock_agent_card.capabilities = mock_capabilities
42+
43+
return mock_agent_card
44+
45+
46+
@pytest.fixture
47+
async def streaming_agent_card() -> AgentCard:
48+
"""Agent card that supports streaming for testing streaming endpoints."""
49+
mock_agent_card = MagicMock(spec=AgentCard)
50+
mock_agent_card.url = 'http://mockurl.com'
51+
mock_agent_card.supports_authenticated_extended_card = False
52+
53+
# Mock the capabilities object with streaming enabled
54+
mock_capabilities = MagicMock()
55+
mock_capabilities.streaming = True
56+
mock_agent_card.capabilities = mock_capabilities
57+
3758
return mock_agent_card
3859

3960

@@ -42,6 +63,25 @@ async def request_handler() -> RequestHandler:
4263
return MagicMock(spec=RequestHandler)
4364

4465

66+
@pytest.fixture
67+
async def streaming_app(
68+
streaming_agent_card: AgentCard, request_handler: RequestHandler
69+
) -> FastAPI:
70+
"""Builds the FastAPI application for testing streaming endpoints."""
71+
72+
return A2ARESTFastAPIApplication(
73+
streaming_agent_card, request_handler
74+
).build(agent_card_url='/well-known/agent-card.json', rpc_url='')
75+
76+
77+
@pytest.fixture
78+
async def streaming_client(streaming_app: FastAPI) -> AsyncClient:
79+
"""HTTP client for the streaming FastAPI application."""
80+
return AsyncClient(
81+
transport=ASGITransport(app=streaming_app), base_url='http://test'
82+
)
83+
84+
4585
@pytest.fixture
4686
async def app(
4787
agent_card: AgentCard, request_handler: RequestHandler
@@ -222,5 +262,95 @@ async def test_send_message_success_task(
222262
assert expected_response == actual_response
223263

224264

265+
@pytest.mark.anyio
266+
async def test_streaming_message_request_body_consumption(
267+
streaming_client: AsyncClient, request_handler: MagicMock
268+
) -> None:
269+
"""Test that streaming endpoint properly handles request body consumption.
270+
271+
This test verifies the fix for the deadlock issue where request.body()
272+
was being consumed inside the EventSourceResponse context, causing
273+
the application to hang indefinitely.
274+
"""
275+
276+
# Mock the async generator response from the request handler
277+
async def mock_stream_response():
278+
"""Mock streaming response generator."""
279+
yield Message(
280+
message_id='stream_msg_1',
281+
role=Role.agent,
282+
parts=[Part(TextPart(text='First streaming response'))],
283+
)
284+
yield Message(
285+
message_id='stream_msg_2',
286+
role=Role.agent,
287+
parts=[Part(TextPart(text='Second streaming response'))],
288+
)
289+
290+
request_handler.on_message_send_stream.return_value = mock_stream_response()
291+
292+
# Create a valid streaming request
293+
request = a2a_pb2.SendMessageRequest(
294+
request=a2a_pb2.Message(
295+
message_id='test_stream_msg',
296+
role=a2a_pb2.ROLE_USER,
297+
content=[a2a_pb2.Part(text='Test streaming message')],
298+
),
299+
configuration=a2a_pb2.SendMessageConfiguration(),
300+
)
301+
302+
# This should not hang indefinitely (previously it would due to the deadlock)
303+
response = await streaming_client.post(
304+
'/v1/message:stream',
305+
json=json_format.MessageToDict(request),
306+
headers={'Accept': 'text/event-stream'},
307+
timeout=10.0, # Reasonable timeout to prevent hanging in tests
308+
)
309+
310+
# The response should be successful
311+
response.raise_for_status()
312+
assert response.status_code == 200
313+
assert 'text/event-stream' in response.headers.get('content-type', '')
314+
315+
# Verify that the request handler was called
316+
request_handler.on_message_send_stream.assert_called_once()
317+
318+
319+
@pytest.mark.anyio
320+
async def test_streaming_endpoint_with_invalid_content_type(
321+
streaming_client: AsyncClient, request_handler: MagicMock
322+
) -> None:
323+
"""Test streaming endpoint behavior with invalid content type."""
324+
325+
async def mock_stream_response():
326+
yield Message(
327+
message_id='stream_msg_1',
328+
role=Role.agent,
329+
parts=[Part(TextPart(text='Response'))],
330+
)
331+
332+
request_handler.on_message_send_stream.return_value = mock_stream_response()
333+
334+
request = a2a_pb2.SendMessageRequest(
335+
request=a2a_pb2.Message(
336+
message_id='test_stream_msg',
337+
role=a2a_pb2.ROLE_USER,
338+
content=[a2a_pb2.Part(text='Test message')],
339+
),
340+
configuration=a2a_pb2.SendMessageConfiguration(),
341+
)
342+
343+
# Send request without proper event-stream headers
344+
response = await streaming_client.post(
345+
'/v1/message:stream',
346+
json=json_format.MessageToDict(request),
347+
timeout=10.0,
348+
)
349+
350+
# Should still succeed (the adapter handles content-type internally)
351+
response.raise_for_status()
352+
assert response.status_code == 200
353+
354+
225355
if __name__ == '__main__':
226356
pytest.main([__file__])

0 commit comments

Comments
 (0)