Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 106 additions & 37 deletions langchain_mcp_adapters/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

from collections.abc import Awaitable, Callable
from typing import Any, get_args
from typing import Any, TypeAlias, cast, get_args

from langchain_core.tools import (
BaseTool,
Expand All @@ -19,6 +19,7 @@
from mcp.server.fastmcp.utilities.func_metadata import ArgModelBase, FuncMetadata
from mcp.types import (
AudioContent,
CallToolResult,
EmbeddedResource,
ImageContent,
ResourceLink,
Expand All @@ -35,51 +36,96 @@
)
from langchain_mcp_adapters.sessions import Connection, create_session

NonTextContent: TypeAlias = (
ImageContent | AudioContent | ResourceLink | EmbeddedResource
)

try:
from langgraph.runtime import get_runtime
from langgraph.runtime import get_runtime # type: ignore[import-not-found]
except ImportError:

def get_runtime() -> None:
"""no-op runtime getter."""
return


NonTextContent = ImageContent | AudioContent | ResourceLink | EmbeddedResource
MAX_ITERATIONS = 1000


def _convert_call_tool_result(
call_tool_result: MCPToolCallResult,
) -> tuple[str | list[str], list[NonTextContent] | None]:
"""Convert MCP MCPToolCallResult to LangChain tool result format.
call_tool_result: CallToolResult,
) -> tuple[str | list[str], dict[str, Any] | None]:
"""Convert MCP CallToolResult to LangChain tool result format.

The returned tuple follows LangChain's "content_and_artifact" format, where
the artifact is a dictionary that can include both non-text contents and
machine-readable structured content from MCP.

Args:
call_tool_result: The result from calling an MCP tool.

Returns:
A tuple containing the text content and any non-text content.
A tuple of (content, artifact). "content" is a string or list of strings
from text content blocks. "artifact" is a dict that may include:
- "nonText": list of non-text content blocks (e.g., images, resources)
- "structuredContent": structuredContent (machine-readable) when provided
by MCP
If there is no non-text nor structured content, artifact will be None.

Raises:
ToolException: If the tool call resulted in an error.
"""
text_contents, non_text_contents = _separate_content_types(call_tool_result.content)
tool_content = _format_text_content(text_contents)

if call_tool_result.isError:
raise ToolException(tool_content)

artifact = _build_artifact(non_text_contents, call_tool_result)

return tool_content, (artifact if artifact else None)


def _separate_content_types(
content: list,
) -> tuple[list[TextContent], list[NonTextContent]]:
"""Separate content into text and non-text types."""
text_contents: list[TextContent] = []
non_text_contents = []
for content in call_tool_result.content:
if isinstance(content, TextContent):
text_contents.append(content)
non_text_contents: list[NonTextContent] = []

for item in content:
if isinstance(item, TextContent):
text_contents.append(item)
else:
non_text_contents.append(content)
non_text_contents.append(item)

return text_contents, non_text_contents


def _format_text_content(text_contents: list[TextContent]) -> str | list[str]:
"""Format text content into string or list of strings."""
tool_content: str | list[str] = [content.text for content in text_contents]
if not text_contents:
tool_content = ""
elif len(text_contents) == 1:
tool_content = tool_content[0]
return ""
if len(text_contents) == 1:
return tool_content[0]
return tool_content

if call_tool_result.isError:
raise ToolException(tool_content)

return tool_content, non_text_contents or None
def _build_artifact(
non_text_contents: list[NonTextContent],
call_tool_result: CallToolResult,
) -> dict[str, Any]:
"""Build artifact dictionary with non-text and structured content."""
artifact: dict[str, Any] = {}
if non_text_contents:
artifact["nonText"] = non_text_contents

structured = getattr(call_tool_result, "structuredContent", None)
if structured is not None:
artifact["structuredContent"] = structured

return artifact


def _build_interceptor_chain(
Expand Down Expand Up @@ -185,18 +231,20 @@ def convert_mcp_tool_to_langchain_tool(

async def call_tool(
**arguments: dict[str, Any],
) -> tuple[str | list[str], list[NonTextContent] | None]:
) -> tuple[str | list[str], dict[str, Any] | None]:
"""Execute tool call with interceptor chain and return formatted result.

Args:
**arguments: Tool arguments as keyword args.

Returns:
Tuple of (text_content, non_text_content).
Tuple of (text_content, artifact).
"""
mcp_callbacks = (
callbacks.to_mcp_format(
context=CallbackContext(server_name=server_name, tool_name=tool.name)
context=CallbackContext(
server_name=server_name or "unknown", tool_name=tool.name
)
)
if callbacks is not None
else _MCPCallbacks()
Expand Down Expand Up @@ -230,14 +278,20 @@ async def execute_tool(request: MCPToolCallRequest) -> MCPToolCallResult:
modified_headers = request.headers
if modified_headers is not None and connection is not None:
# Create a new connection config with updated headers
updated_connection = dict(connection)
updated_connection: dict[str, Any] = dict(connection) # type: ignore[arg-type]
if connection["transport"] in ("sse", "streamable_http"):
existing_headers = connection.get("headers", {})
existing_headers_raw = connection.get("headers", {})
existing_headers: dict[str, Any] = (
existing_headers_raw
if isinstance(existing_headers_raw, dict)
else {}
)
headers_dict: dict[str, Any] = modified_headers
updated_connection["headers"] = {
**existing_headers,
**modified_headers,
**headers_dict,
}
effective_connection = updated_connection
effective_connection = cast("Connection", updated_connection)

captured_exception = None

Expand Down Expand Up @@ -335,7 +389,9 @@ async def load_mcp_tools(
raise ValueError(msg)

mcp_callbacks = (
callbacks.to_mcp_format(context=CallbackContext(server_name=server_name))
callbacks.to_mcp_format(
context=CallbackContext(server_name=server_name or "unknown")
)
if callbacks is not None
else _MCPCallbacks()
)
Expand Down Expand Up @@ -376,13 +432,19 @@ def _get_injected_args(tool: BaseTool) -> list[str]:
List of field names marked as injected arguments.
"""

def _is_injected_arg_type(type_: type) -> bool:
def _is_injected_arg_type(annotation: Any) -> bool:
"""Check if type annotation contains InjectedToolArg."""
return any(
isinstance(arg, InjectedToolArg)
or (isinstance(arg, type) and issubclass(arg, InjectedToolArg))
for arg in get_args(type_)[1:]
)
try:
args = get_args(annotation)
if len(args) > 1:
return any(
isinstance(arg, InjectedToolArg)
or (isinstance(arg, type) and issubclass(arg, InjectedToolArg))
for arg in args[1:]
)
except (TypeError, AttributeError):
pass
return False

return [
field
Expand All @@ -404,20 +466,24 @@ def to_fastmcp(tool: BaseTool) -> FastMCPTool:
TypeError: If args_schema is not BaseModel subclass.
NotImplementedError: If tool has injected arguments.
"""
if not issubclass(tool.args_schema, BaseModel):
if not (
isinstance(tool.args_schema, type) and issubclass(tool.args_schema, BaseModel)
):
msg = (
"Tool args_schema must be a subclass of pydantic.BaseModel. "
"Tools with dict args schema are not supported."
)
raise TypeError(msg)

parameters = tool.tool_call_schema.model_json_schema()
# We already checked that args_schema is a BaseModel subclass
args_schema = cast("type[BaseModel]", tool.args_schema)
parameters = args_schema.model_json_schema()
field_definitions = {
field: (field_info.annotation, field_info)
for field, field_info in tool.tool_call_schema.model_fields.items()
for field, field_info in args_schema.model_fields.items()
}
arg_model = create_model(
f"{tool.name}Arguments", **field_definitions, __base__=ArgModelBase
arg_model = create_model( # type: ignore[call-overload]
f"{tool.name}Arguments", __base__=ArgModelBase, **field_definitions
)
fn_metadata = FuncMetadata(arg_model=arg_model)

Expand All @@ -434,8 +500,11 @@ async def fn(**arguments: dict[str, Any]) -> Any: # noqa: ANN401
return FastMCPTool(
fn=fn,
name=tool.name,
title=tool.name,
description=tool.description,
parameters=parameters,
fn_metadata=fn_metadata,
is_async=True,
context_kwarg=None,
annotations=None,
)
67 changes: 56 additions & 11 deletions tests/test_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ def test_convert_with_non_text_content():
isError=False,
)

text_content, non_text_content = _convert_call_tool_result(result)
text_content, artifact = _convert_call_tool_result(result)

assert text_content == "text result"
assert non_text_content == [image_content, resource_content]
assert artifact == {"nonText": [image_content, resource_content]}


def test_convert_with_error():
Expand Down Expand Up @@ -322,16 +322,16 @@ async def test_convert_langchain_tool_to_fastmcp_tool(tool_instance):
fastmcp_tool = to_fastmcp(tool_instance)
assert fastmcp_tool.name == "add"
assert fastmcp_tool.description == "Add two numbers"
assert fastmcp_tool.parameters == {
"description": "Add two numbers",
"properties": {
"a": {"title": "A", "type": "integer"},
"b": {"title": "B", "type": "integer"},
},
"required": ["a", "b"],
"title": "add",
"type": "object",
# Check parameters schema
parameters = fastmcp_tool.parameters
assert parameters["description"] == "Add two numbers"
assert parameters["properties"] == {
"a": {"title": "A", "type": "integer"},
"b": {"title": "B", "type": "integer"},
}
assert parameters["required"] == ["a", "b"]
assert parameters["type"] == "object"
# Note: title varies by tool type (schema class name vs tool name)
assert fastmcp_tool.fn_metadata.arg_model.model_json_schema() == {
"properties": {
"a": {"title": "A", "type": "integer"},
Expand Down Expand Up @@ -526,3 +526,48 @@ async def test_convert_mcp_tool_metadata_variants():
"openWorldHint": None,
"_meta": {"flag": True},
}


def test_convert_with_structured_content():
"""Test CallToolResult with structuredContent field."""
structured_data = {"results": [{"id": 1}, {"id": 2}], "count": 2}

result = CallToolResult(
content=[TextContent(type="text", text="Search completed")], isError=False
)
result.structuredContent = structured_data

text_content, artifact = _convert_call_tool_result(result)

assert text_content == "Search completed"
assert artifact["structuredContent"] == structured_data


def test_convert_structured_content_includes_json_block():
"""Test that structuredContent is included in artifact only."""
structured_data = {"result": "success"}

result = CallToolResult(
content=[TextContent(type="text", text="Done")], isError=False
)
result.structuredContent = structured_data

content, artifact = _convert_call_tool_result(result)

# Content stays simple - just the text
assert content == "Done"
# Structured data goes in artifact
assert artifact["structuredContent"] == structured_data


def test_convert_with_structured_content_only():
"""Test CallToolResult with only structuredContent, no text content."""
structured_data = {"status": "success"}

result = CallToolResult(content=[], isError=False)
result.structuredContent = structured_data

content, artifact = _convert_call_tool_result(result)

assert content == ""
assert artifact["structuredContent"] == structured_data