generated from amazon-archives/__template_Apache-2.0
-
Notifications
You must be signed in to change notification settings - Fork 448
Labels
bugSomething isn't workingSomething isn't workingready for contributionPull requests welcomePull requests welcome
Description
Checks
- I have updated to the lastest minor and patch version of Strands
- I have checked the documentation and this is not expected behavior
- I have searched ./issues and there are no duplicates of my issue
Strands Version
1.10.0
Python Version
Python 3.12.11
Operating System
Linux, CloudDesktop
Installation Method
pip
Steps to Reproduce
- Host the agent as provided by the be example
- Add a
logger.info(f"Current messages: {self.agent.messages}")
into the line: - Run the client example from the docs with logging level info. Ask the same questions twice:
In the logs you should see the following:
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO: 127.0.0.1:41184 - "GET /a2a/.well-known/agent-card.json HTTP/1.1" 200 OK
2025-10-06 21:17:52 - INFO - Current messages: []
2025-10-06 21:17:52 - INFO - Creating Strands MetricsClient
INFO: 127.0.0.1:41184 - "POST /a2a/ HTTP/1.1" 200 OK
INFO: 127.0.0.1:41192 - "GET /a2a/.well-known/agent-card.json HTTP/1.1" 200 OK
2025-10-06 21:17:56 - INFO - Current messages: [{'role': 'user', 'content': [{'text': 'What was my previous question?'}]}, {'role': 'assistant', 'content': [{'text': 'You haven\'t asked me a question before this one. This is our first interaction in this conversation. Your current question is "What was my previous question?"'}]}]
INFO: 127.0.0.1:41192 - "POST /a2a/ HTTP/1.1" 200 OK
Expected Behavior
The session management is expected to be handled by the implementation TaskStore coming from a2a-sdk not by the instance of the Strands Agent
Actual Behavior
The agent keeps the previous messages in memory, collecting all the messages which the server receives.
Additional Context
No response
Possible Solution
"""A2A-compatible wrapper for Strands Agent.
This module provides the A2AAgent class, which adapts a Strands Agent to the A2A protocol,
allowing it to be used in A2A-compatible systems.
"""
import logging
- from typing import Any, Literal
+ from typing import Any, Callable, Literal
from urllib.parse import urlparse
class A2AServer:
"""A2A-compatible wrapper for Strands Agent."""
def __init__(
self,
- agent: SAAgent,
+ agent_generator: Callable[[], SAAgent],
*,
# AgentCard
host: str = "127.0.0.1",
port: int = 9000,
http_url: str | None = None,
serve_at_root: bool = False,
version: str = "0.0.1",
skills: list[AgentSkill] | None = None,
# RequestHandler
task_store: TaskStore | None = None,
queue_manager: QueueManager | None = None,
push_config_store: PushNotificationConfigStore | None = None,
push_sender: PushNotificationSender | None = None,
):
- self.strands_agent = agent
+ self.agent_generator = agent_generator
+ self.strands_agent = self.agent_generator()
self.name = self.strands_agent.name
self.description = self.strands_agent.description
self.capabilities = AgentCapabilities(streaming=True)
self.request_handler = DefaultRequestHandler(
- agent_executor=StrandsA2AExecutor(self.strands_agent),
+ agent_executor=StrandsA2AExecutor(self.agent_generator),
task_store=task_store or InMemoryTaskStore(),
queue_manager=queue_manager,
push_config_store=push_config_store,
push_sender=push_sender,
)
self._agent_skills = skills
class StrandsA2AExecutor(AgentExecutor):
- def __init__(self, agent: SAAgent):
+ def __init__(self, agent_generator: Callable[[], SAAgent]):
async def _execute_streaming(self, context: RequestContext, updater: TaskUpdater) -> None:
"""Execute request in streaming mode.
Streams the agent's response in real-time, sending incremental updates
as they become available from the agent.
Args:
context: The A2A request context, containing the user's input and other metadata.
updater: The task updater for managing task state and sending updates.
"""
# Convert A2A message parts to Strands ContentBlocks
if context.message and hasattr(context.message, "parts"):
content_blocks = self._convert_a2a_parts_to_content_blocks(context.message.parts)
if not content_blocks:
raise ValueError("No content blocks available")
else:
raise ValueError("No content blocks available")
+ agent = self.agent_generator()
try:
- async for event in self.agent.stream_async(content_blocks):
+ async for event in agent.stream_async(content_blocks):
await self._handle_streaming_event(event, updater)
except Exception:
logger.exception("Error in streaming execution")
raise
Related Issues
No response
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingready for contributionPull requests welcomePull requests welcome