-
Notifications
You must be signed in to change notification settings - Fork 449
feat(mcp): add experimental agent managed connection via ToolProvider #895
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cce1dcd
9dec5df
7833a49
6cba7d7
8f39f8b
abe5575
c804208
3546ad4
d03b924
9829035
bf8760a
c71764b
f0da75b
f40c8f7
88e4ce2
3384915
6596b07
5419a3f
1a99e13
ccebe72
a0d5d70
c88d585
beed7c1
9c5b91e
4bfe696
f33f8ec
cb9d954
4fc00a8
bd77288
c495708
5103092
3f30664
983d6dd
1641849
fbcc356
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| coverage: | ||
| status: | ||
| project: | ||
| default: | ||
| target: 90% # overall coverage threshold | ||
| patch: | ||
| default: | ||
| target: 90% # patch coverage threshold | ||
| base: auto | ||
| # Only post patch coverage on decreases | ||
| only_pulls: true |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| """Private async execution utilities.""" | ||
|
|
||
| import asyncio | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from typing import Awaitable, Callable, TypeVar | ||
|
|
||
| T = TypeVar("T") | ||
|
|
||
|
|
||
| def run_async(async_func: Callable[[], Awaitable[T]]) -> T: | ||
| """Run an async function in a separate thread to avoid event loop conflicts. | ||
|
|
||
| This utility handles the common pattern of running async code from sync contexts | ||
| by using ThreadPoolExecutor to isolate the async execution. | ||
|
|
||
| Args: | ||
| async_func: A callable that returns an awaitable | ||
|
|
||
| Returns: | ||
| The result of the async function | ||
| """ | ||
|
|
||
| async def execute_async() -> T: | ||
| return await async_func() | ||
|
|
||
| def execute() -> T: | ||
| return asyncio.run(execute_async()) | ||
|
|
||
| with ThreadPoolExecutor() as executor: | ||
| future = executor.submit(execute) | ||
| return future.result() |
dbschmigelski marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,13 +9,12 @@ | |
| 2. Method-style for direct tool access: `agent.tool.tool_name(param1="value")` | ||
| """ | ||
|
|
||
| import asyncio | ||
| import json | ||
| import logging | ||
| import random | ||
| import warnings | ||
| from concurrent.futures import ThreadPoolExecutor | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| AsyncGenerator, | ||
| AsyncIterator, | ||
|
|
@@ -32,7 +31,11 @@ | |
| from pydantic import BaseModel | ||
|
|
||
| from .. import _identifier | ||
| from .._async import run_async | ||
| from ..event_loop.event_loop import event_loop_cycle | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ..experimental.tools import ToolProvider | ||
| from ..handlers.callback_handler import PrintingCallbackHandler, null_callback_handler | ||
| from ..hooks import ( | ||
| AfterInvocationEvent, | ||
|
|
@@ -167,12 +170,7 @@ async def acall() -> ToolResult: | |
|
|
||
| return tool_results[0] | ||
|
|
||
| def tcall() -> ToolResult: | ||
| return asyncio.run(acall()) | ||
|
|
||
| with ThreadPoolExecutor() as executor: | ||
| future = executor.submit(tcall) | ||
| tool_result = future.result() | ||
| tool_result = run_async(acall) | ||
|
|
||
| if record_direct_tool_call is not None: | ||
| should_record_direct_tool_call = record_direct_tool_call | ||
|
|
@@ -215,7 +213,7 @@ def __init__( | |
| self, | ||
| model: Union[Model, str, None] = None, | ||
| messages: Optional[Messages] = None, | ||
| tools: Optional[list[Union[str, dict[str, str], Any]]] = None, | ||
| tools: Optional[list[Union[str, dict[str, str], "ToolProvider", Any]]] = None, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't we use the full name for ToolProvider? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same issues @JackYPCOnline was seeing with the recursive issues introduced in the experimental directory seemingly related to agent_config |
||
| system_prompt: Optional[str] = None, | ||
| structured_output_model: Optional[Type[BaseModel]] = None, | ||
| callback_handler: Optional[ | ||
|
|
@@ -248,6 +246,7 @@ def __init__( | |
| - File paths (e.g., "/path/to/tool.py") | ||
| - Imported Python modules (e.g., from strands_tools import current_time) | ||
| - Dictionaries with name/path keys (e.g., {"name": "tool_name", "path": "/path/to/tool.py"}) | ||
| - ToolProvider instances for managed tool collections | ||
| - Functions decorated with `@strands.tool` decorator. | ||
|
|
||
| If provided, only these tools will be available. If None, all tools will be available. | ||
|
|
@@ -423,17 +422,11 @@ def __call__( | |
| - state: The final state of the event loop | ||
| - structured_output: Parsed structured output when structured_output_model was specified | ||
| """ | ||
|
|
||
| def execute() -> AgentResult: | ||
| return asyncio.run( | ||
| self.invoke_async( | ||
| prompt, invocation_state=invocation_state, structured_output_model=structured_output_model, **kwargs | ||
| ) | ||
| return run_async( | ||
| lambda: self.invoke_async( | ||
| prompt, invocation_state=invocation_state, structured_output_model=structured_output_model, **kwargs | ||
| ) | ||
|
|
||
| with ThreadPoolExecutor() as executor: | ||
| future = executor.submit(execute) | ||
| return future.result() | ||
| ) | ||
|
|
||
| async def invoke_async( | ||
| self, | ||
|
|
@@ -505,13 +498,8 @@ def structured_output(self, output_model: Type[T], prompt: AgentInput = None) -> | |
| category=DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
|
|
||
| def execute() -> T: | ||
| return asyncio.run(self.structured_output_async(output_model, prompt)) | ||
|
|
||
| with ThreadPoolExecutor() as executor: | ||
| future = executor.submit(execute) | ||
| return future.result() | ||
|
|
||
| return run_async(lambda: self.structured_output_async(output_model, prompt)) | ||
|
|
||
| async def structured_output_async(self, output_model: Type[T], prompt: AgentInput = None) -> T: | ||
| """This method allows you to get structured output from the agent. | ||
|
|
@@ -529,6 +517,7 @@ async def structured_output_async(self, output_model: Type[T], prompt: AgentInpu | |
|
|
||
| Raises: | ||
| ValueError: If no conversation history or prompt is provided. | ||
| - | ||
| """ | ||
| if self._interrupt_state.activated: | ||
| raise RuntimeError("cannot call structured output during interrupt") | ||
|
|
@@ -583,6 +572,25 @@ async def structured_output_async(self, output_model: Type[T], prompt: AgentInpu | |
| finally: | ||
| self.hooks.invoke_callbacks(AfterInvocationEvent(agent=self)) | ||
|
|
||
| def cleanup(self) -> None: | ||
| """Clean up resources used by the agent. | ||
|
|
||
| This method cleans up all tool providers that require explicit cleanup, | ||
| such as MCP clients. It should be called when the agent is no longer needed | ||
| to ensure proper resource cleanup. | ||
|
|
||
| Note: This method uses a "belt and braces" approach with automatic cleanup | ||
| through finalizers as a fallback, but explicit cleanup is recommended. | ||
| """ | ||
| self.tool_registry.cleanup() | ||
|
|
||
| def __del__(self) -> None: | ||
| """Clean up resources when agent is garbage collected.""" | ||
| # __del__ is called even when an exception is thrown in the constructor, | ||
| # so there is no guarantee tool_registry was set.. | ||
| if hasattr(self, "tool_registry"): | ||
| self.tool_registry.cleanup() | ||
|
|
||
| async def stream_async( | ||
| self, | ||
| prompt: AgentInput = None, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| """Experimental tools package.""" | ||
|
|
||
| from .tool_provider import ToolProvider | ||
|
|
||
| __all__ = ["ToolProvider"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| """Tool provider interface.""" | ||
|
|
||
| from abc import ABC, abstractmethod | ||
| from typing import TYPE_CHECKING, Any, Sequence | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ...types.tools import AgentTool | ||
|
|
||
|
|
||
| class ToolProvider(ABC): | ||
dbschmigelski marked this conversation as resolved.
Show resolved
Hide resolved
dbschmigelski marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| """Interface for providing tools with lifecycle management. | ||
|
|
||
| Provides a way to load a collection of tools and clean them up | ||
| when done, with lifecycle managed by the agent. | ||
| """ | ||
|
|
||
| @abstractmethod | ||
| async def load_tools(self, **kwargs: Any) -> Sequence["AgentTool"]: | ||
| """Load and return the tools in this provider. | ||
|
|
||
| Args: | ||
| **kwargs: Additional arguments for future compatibility. | ||
|
|
||
| Returns: | ||
| List of tools that are ready to use. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def add_consumer(self, consumer_id: Any, **kwargs: Any) -> None: | ||
| """Add a consumer to this tool provider. | ||
|
|
||
| Args: | ||
| consumer_id: Unique identifier for the consumer. | ||
| **kwargs: Additional arguments for future compatibility. | ||
| """ | ||
| pass | ||
|
|
||
| @abstractmethod | ||
| def remove_consumer(self, consumer_id: Any, **kwargs: Any) -> None: | ||
| """Remove a consumer from this tool provider. | ||
|
|
||
| This method must be idempotent - calling it multiple times with the same ID | ||
| should have no additional effect after the first call. | ||
|
|
||
| Provider may clean up resources when no consumers remain. | ||
|
|
||
| Args: | ||
| consumer_id: Unique identifier for the consumer. | ||
| **kwargs: Additional arguments for future compatibility. | ||
| """ | ||
| pass | ||
Uh oh!
There was an error while loading. Please reload this page.