Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions langchain_mcp_adapters/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@
""" # noqa: E501

import base64
from collections.abc import Awaitable, Callable
from functools import wraps
from typing import Any, ParamSpec, TypeVar

from langchain_core.documents.base import Blob
from langchain_core.tools import BaseTool, StructuredTool
from mcp import ClientSession
from mcp.types import BlobResourceContents, ResourceContents, TextResourceContents
from pydantic import BaseModel, Field

from langchain_mcp_adapters.callbacks import CallbackContext, Callbacks, _MCPCallbacks
from langchain_mcp_adapters.sessions import Connection, create_session
from langchain_mcp_adapters.callbacks import Callbacks, _MCPCallbacks

def convert_mcp_resource_to_langchain_blob(
resource_uri: str, contents: ResourceContents
Expand Down Expand Up @@ -101,3 +109,194 @@ async def load_mcp_resources(
raise RuntimeError(msg) from e

return blobs


# Pydantic schemas for tool arguments
class ListResourcesInput(BaseModel):
"""Input schema for list_resources tool."""

cursor: str | None = Field(
default=None,
description="Pagination cursor returned from a previous list_resources call. "
"If provided, returns the next page of results.",
)


class ReadResourceInput(BaseModel):
"""Input schema for read_resource tool."""

uri: str = Field(
description="The URI of the resource to read. "
"Use list_resources to discover available resource URIs."
)


async def load_mcp_resources_as_tools(
session: ClientSession | None,
*,
connection: Connection | None = None,
callbacks: Callbacks | None = None,
server_name: str | None = None,
) -> list[BaseTool]:
"""Load MCP resources as LangChain tools for listing and reading resources.

This function creates two tools that an agent can use to interact with MCP resources:
- `list_resources`: Lists available resources with pagination support
- `read_resource`: Reads a specific resource by URI and returns its contents

Args:
session: The MCP client session. If `None`, connection must be provided.
connection: Connection config to create a new session if session is `None`.
callbacks: Optional `Callbacks` for handling notifications and events.
server_name: Name of the server these resources belong to.

Returns:
A list of two LangChain tools: list_resources and read_resource.

Raises:
ValueError: If neither session nor connection is provided.

Example:
```python
from langchain_mcp_adapters.resources import load_mcp_resources_as_tools
from langchain_mcp_adapters.sessions import create_session

connection = {
"command": "uvx",
"args": ["mcp-server-fetch"],
"transport": "stdio",
}

async with create_session(connection) as session:
await session.initialize()
tools = await load_mcp_resources_as_tools(session)
# tools can now be used by an agent
```
""" # noqa: E501
if session is None and connection is None:
msg = "Either a session or a connection config must be provided"
raise ValueError(msg)

mcp_callbacks = (
callbacks.to_mcp_format(context=CallbackContext(server_name=server_name))
if callbacks is not None
else _MCPCallbacks()
)

def with_session_context(
func: Callable[..., Awaitable[dict[str, Any]]]
) -> Callable[..., Awaitable[dict[str, Any]]]:
"""Decorator with access to closure variables."""
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> dict[str, Any]:
if session is None:
if connection is None:
msg = "Either session or connection must be provided"
raise ValueError(msg)

async with create_session(
connection, mcp_callbacks=mcp_callbacks
) as temp_session:
await temp_session.initialize()
return await func(temp_session, *args, **kwargs)
else:
return await func(session, *args, **kwargs)

return wrapper

@with_session_context
async def list_resources_fn(
session: ClientSession,
cursor: str | None = None,
) -> dict[str, Any]:
"""List available MCP resources with pagination support.

Args:
session: MCP client session.
cursor: Optional pagination cursor from a previous call.

Returns:
A dictionary containing:
- resources: List of resource dictionaries with uri, name, description, mimeType
- nextCursor: Pagination cursor for the next page (if available)
"""
result = await session.list_resources(cursor=cursor)

resources = [
{
"uri": str(r.uri),
"name": r.name,
"description": r.description,
"mimeType": r.mimeType,
}
for r in (result.resources or [])
]

return {
"resources": resources,
"nextCursor": result.nextCursor,
}

@with_session_context
async def read_resource_fn(
session: ClientSession,
uri: str,
) -> dict[str, Any]:
"""Read a specific MCP resource by URI.

Args:
session: MCP client session.
uri: The URI of the resource to read.

Returns:
A dictionary containing:
- uri: The resource URI
- contents: List of content dictionaries with type, data, and mimeType
"""
blobs = await get_mcp_resource(session, uri)

contents = []
for blob in blobs:
content_dict = {
"mimeType": blob.mimetype,
}
# Return text as string, binary as base64
if isinstance(blob.data, str):
content_dict["type"] = "text"
content_dict["data"] = blob.data
else:
content_dict["type"] = "blob"
content_dict["data"] = base64.b64encode(blob.data).decode()

contents.append(content_dict)

return {
"uri": uri,
"contents": contents,
}

list_tool = StructuredTool(
name="list_resources",
description=(
"List available resources. Resources are data sources that can be read. "
"Returns a list of resources with their URIs, names, descriptions, and MIME types. "
"Supports pagination via the cursor parameter. "
"Use this to discover what resources are available before reading them."
),
args_schema=ListResourcesInput,
coroutine=list_resources_fn,
)

read_tool = StructuredTool(
name="read_resource",
description=(
"Read the contents of a specific resource by its URI. "
"Returns the resource contents which may include text, binary data, or both. "
"Use list_resources first to discover available resource URIs."
),
args_schema=ReadResourceInput,
coroutine=read_resource_fn,
)

return [list_tool, read_tool]

170 changes: 170 additions & 0 deletions tests/test_resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
convert_mcp_resource_to_langchain_blob,
get_mcp_resource,
load_mcp_resources,
load_mcp_resources_as_tools,
)


Expand Down Expand Up @@ -273,3 +274,172 @@ async def test_load_mcp_resources_with_blob_content():
assert isinstance(blobs[0], Blob)
assert blobs[0].data == original_data
assert blobs[0].mimetype == "application/octet-stream"


async def test_load_mcp_resources_as_tools():
"""Test that load_mcp_resources_as_tools returns two tools."""
session = AsyncMock()

tools = await load_mcp_resources_as_tools(session)

assert len(tools) == 2
assert tools[0].name == "list_resources"
assert tools[1].name == "read_resource"


async def test_list_resources_tool():
"""Test the list_resources tool functionality."""
session = AsyncMock()

session.list_resources = AsyncMock(
return_value=ListResourcesResult(
resources=[
Resource(
uri="file:///test1.txt",
name="test1.txt",
description="First test file",
mimeType="text/plain",
),
Resource(
uri="file:///test2.txt",
name="test2.txt",
description="Second test file",
mimeType="text/plain",
),
],
nextCursor="cursor123",
),
)

tools = await load_mcp_resources_as_tools(session)
list_tool = tools[0]

result = await list_tool.ainvoke({})

assert "resources" in result
assert "nextCursor" in result
assert len(result["resources"]) == 2
assert result["resources"][0]["uri"] == "file:///test1.txt"
assert result["resources"][0]["name"] == "test1.txt"
assert result["resources"][0]["description"] == "First test file"
assert result["resources"][0]["mimeType"] == "text/plain"
assert result["nextCursor"] == "cursor123"


async def test_list_resources_tool_with_cursor():
"""Test the list_resources tool with pagination cursor."""
session = AsyncMock()

session.list_resources = AsyncMock(
return_value=ListResourcesResult(
resources=[
Resource(
uri="file:///test3.txt",
name="test3.txt",
mimeType="text/plain",
),
],
nextCursor=None,
),
)

tools = await load_mcp_resources_as_tools(session)
list_tool = tools[0]

result = await list_tool.ainvoke({"cursor": "cursor123"})

assert len(result["resources"]) == 1
assert result["resources"][0]["uri"] == "file:///test3.txt"
assert result["nextCursor"] is None
session.list_resources.assert_called_once_with(cursor="cursor123")


async def test_read_resource_tool():
"""Test the read_resource tool functionality."""
session = AsyncMock()
uri = "file:///test.txt"

session.read_resource = AsyncMock(
return_value=ReadResourceResult(
contents=[
TextResourceContents(uri=uri, mimeType="text/plain", text="Test content"),
],
),
)

tools = await load_mcp_resources_as_tools(session)
read_tool = tools[1]

result = await read_tool.ainvoke({"uri": uri})

assert "uri" in result
assert "contents" in result
assert result["uri"] == uri
assert len(result["contents"]) == 1
assert result["contents"][0]["type"] == "text"
assert result["contents"][0]["data"] == "Test content"
assert result["contents"][0]["mimeType"] == "text/plain"


async def test_read_resource_tool_with_blob():
"""Test the read_resource tool with binary content."""
session = AsyncMock()
uri = "file:///test.bin"
original_data = b"binary data"
base64_blob = base64.b64encode(original_data).decode()

session.read_resource = AsyncMock(
return_value=ReadResourceResult(
contents=[
BlobResourceContents(
uri=uri,
mimeType="application/octet-stream",
blob=base64_blob,
),
],
),
)

tools = await load_mcp_resources_as_tools(session)
read_tool = tools[1]

result = await read_tool.ainvoke({"uri": uri})

assert result["uri"] == uri
assert len(result["contents"]) == 1
assert result["contents"][0]["type"] == "blob"
assert result["contents"][0]["data"] == base64_blob
assert result["contents"][0]["mimeType"] == "application/octet-stream"


async def test_read_resource_tool_with_mixed_content():
"""Test the read_resource tool with both text and binary content."""
session = AsyncMock()
uri = "file:///mixed"
original_data = b"binary data"
base64_blob = base64.b64encode(original_data).decode()

session.read_resource = AsyncMock(
return_value=ReadResourceResult(
contents=[
TextResourceContents(uri=uri, mimeType="text/plain", text="Text content"),
BlobResourceContents(
uri=uri,
mimeType="application/octet-stream",
blob=base64_blob,
),
],
),
)

tools = await load_mcp_resources_as_tools(session)
read_tool = tools[1]

result = await read_tool.ainvoke({"uri": uri})

assert len(result["contents"]) == 2
assert result["contents"][0]["type"] == "text"
assert result["contents"][0]["data"] == "Text content"
assert result["contents"][1]["type"] == "blob"
assert result["contents"][1]["data"] == base64_blob