Skip to content

Async resource support #717

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/mcp/server/fastmcp/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import Resource
from .async_resource import AsyncResource, AsyncStatus
from .resource_manager import ResourceManager
from .templates import ResourceTemplate
from .types import (
Expand All @@ -12,6 +13,8 @@

__all__ = [
"Resource",
"AsyncResource",
"AsyncStatus",
"TextResource",
"BinaryResource",
"FunctionResource",
Expand Down
176 changes: 176 additions & 0 deletions src/mcp/server/fastmcp/resources/async_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Asynchronous resource implementation for long-running operations."""

import asyncio
import enum
# import time
from typing import Any, Optional

import pydantic
from pydantic import Field

from mcp.server.fastmcp.resources.base import Resource
from mcp.server.fastmcp.utilities.logging import get_logger

logger = get_logger(__name__)


class AsyncStatus(str, enum.Enum):
"""Status of an asynchronous operation."""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"


class AsyncResource(Resource):
"""A resource representing an asynchronous operation.

This resource type is used to track long-running operations that are executed
asynchronously. It provides methods for updating progress, completing with a result,
failing with an error, and canceling the operation.
"""

status: AsyncStatus = Field(
default=AsyncStatus.PENDING,
description="Current status of the asynchronous operation"
)
# progress: float = Field(
# default=0,
# description="Current progress value (0-100 or raw count)"
# )
error: Optional[str] = Field(
default=None,
description="Error message if the operation failed"
)
# created_at: float = Field(
# default_factory=time.time,
# description="Timestamp when the resource was created"
# )
# started_at: Optional[float] = Field(
# default=None,
# description="Timestamp when the operation started running"
# )
# completed_at: Optional[float] = Field(
# default=None,
# description="Timestamp when the operation completed, failed, or was canceled"
# )

# Fields not included in serialization
_task: Optional[asyncio.Task[Any]] = pydantic.PrivateAttr(default=None)
# _mcp_server = pydantic.PrivateAttr(default=None)

# def set_mcp_server(self, server: Any) -> None:
# """Set the MCP server reference.

# Args:
# server: The MCP server instance
# """
# self._mcp_server = server

async def read(self) -> str:
"""Read the current state of the resource as JSON.

Returns the current status and progress information.
"""
# Convert the resource to a dictionary, excluding private fields
data = self.model_dump(exclude={"_task"})

# Return status info as JSON
import json
return json.dumps(data, indent=2)

async def start(self, task: asyncio.Task[Any]) -> None:
"""Mark the resource as running and store the task.

Args:
task: The asyncio task that is executing the operation
"""
self._task = task
self.status = AsyncStatus.RUNNING
# self.started_at = time.time()
# await self._notify_changed()

logger.debug(
"Started async operation",
extra={
"uri": self.uri,
}
)

# async def update_progress(self, progress: float) -> None:
# """Update the progress information.

# Args:
# progress: Current progress value
# total: Total expected progress value, if known
# """
# self.progress = progress
# # await self._notify_changed()

# logger.debug(
# "Updated async operation progress",
# extra={
# "uri": self.uri,
# "progress": self.progress,
# }
# )

async def complete(self) -> None:
"""Mark the resource as completed.
"""
self.status = AsyncStatus.COMPLETED
# self.completed_at = time.time()

# await self._notify_changed()

logger.info(
"Completed async operation",
extra={
"uri": self.uri,
# "duration": self.completed_at - (self.started_at or self.created_at),
}
)

async def fail(self, error: str) -> None:
"""Mark the resource as failed and store the error.

Args:
error: Error message describing why the operation failed
"""
self.status = AsyncStatus.FAILED
self.error = error
# self.completed_at = time.time()
# await self._notify_changed()

logger.error(
"Failed async operation",
extra={
"uri": self.uri,
"error": error,
# "duration": self.completed_at - (self.started_at or self.created_at),
}
)

async def cancel(self) -> None:
"""Cancel the operation if it's still running."""
if self.status in (AsyncStatus.PENDING, AsyncStatus.RUNNING) and self._task:
self._task.cancel()
self.status = AsyncStatus.CANCELED
# self.completed_at = time.time()
# await self._notify_changed()

logger.info(
"Canceled async operation",
extra={
"uri": self.uri,
# "duration": self.completed_at - (self.started_at or self.created_at),
}
)

# async def _notify_changed(self) -> None:
# """Notify subscribers that the resource has changed."""
# if self._mcp_server:
# # This will be implemented in the MCP server to notify clients
# # of resource changes via the notification protocol
# self._mcp_server.notify_resource_changed(self.uri)
41 changes: 41 additions & 0 deletions src/mcp/server/fastmcp/resources/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Resource manager functionality."""

import uuid
from collections.abc import Callable
from typing import Any

from pydantic import AnyUrl

from mcp.server.fastmcp.resources.base import Resource
from mcp.server.fastmcp.resources.async_resource import AsyncResource
from mcp.server.fastmcp.resources.templates import ResourceTemplate
from mcp.server.fastmcp.utilities.logging import get_logger

Expand All @@ -19,6 +21,7 @@ def __init__(self, warn_on_duplicate_resources: bool = True):
self._resources: dict[str, Resource] = {}
self._templates: dict[str, ResourceTemplate] = {}
self.warn_on_duplicate_resources = warn_on_duplicate_resources
# self._mcp_server = None

def add_resource(self, resource: Resource) -> Resource:
"""Add a resource to the manager.
Expand Down Expand Up @@ -93,3 +96,41 @@ def list_templates(self) -> list[ResourceTemplate]:
"""List all registered templates."""
logger.debug("Listing templates", extra={"count": len(self._templates)})
return list(self._templates.values())

# def set_mcp_server(self, server: Any) -> None:
# """Set the MCP server reference.

# This allows resources to notify the server when they change.

# Args:
# server: The MCP server instance
# """
# self._mcp_server = server

def create_async_resource(
self,
name: str | None = None,
description: str | None = None,
) -> AsyncResource:
"""Create a new async resource.

Args:
name: Optional name for the resource
description: Optional description of the resource

Returns:
A new AsyncResource instance
"""
resource_uri = f"resource://tasks/{uuid.uuid4()}"
resource = AsyncResource(
uri=AnyUrl(resource_uri),
name=name,
description=description,
)

# # Set the MCP server reference if available
# if self._mcp_server:
# resource.set_mcp_server(self._mcp_server)

self.add_resource(resource)
return resource
49 changes: 47 additions & 2 deletions src/mcp/server/fastmcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ def __init__(
self._prompt_manager = PromptManager(
warn_on_duplicate_prompts=self.settings.warn_on_duplicate_prompts
)

# Connect the resource manager and tool manager
self._tool_manager.set_resource_manager(self._resource_manager)
self._resource_manager.set_mcp_server(self._mcp_server)

if (self.settings.auth is not None) != (auth_server_provider is not None):
# TODO: after we support separate authorization servers (see
# https://github.com/modelcontextprotocol/modelcontextprotocol/pull/284)
Expand Down Expand Up @@ -321,6 +326,7 @@ def add_tool(
name: str | None = None,
description: str | None = None,
annotations: ToolAnnotations | None = None,
async_supported: bool = False,
) -> None:
"""Add a tool to the server.

Expand All @@ -332,16 +338,19 @@ def add_tool(
name: Optional name for the tool (defaults to function name)
description: Optional description of what the tool does
annotations: Optional ToolAnnotations providing additional tool information
async_supported: Whether this tool supports asynchronous execution
"""
self._tool_manager.add_tool(
fn, name=name, description=description, annotations=annotations
fn, name=name, description=description, annotations=annotations,
async_supported=async_supported
)

def tool(
self,
name: str | None = None,
description: str | None = None,
annotations: ToolAnnotations | None = None,
async_supported: bool = False,
) -> Callable[[AnyFunction], AnyFunction]:
"""Decorator to register a tool.

Expand All @@ -353,6 +362,7 @@ def tool(
name: Optional name for the tool (defaults to function name)
description: Optional description of what the tool does
annotations: Optional ToolAnnotations providing additional tool information
async_supported: Whether this tool supports asynchronous execution

Example:
@server.tool()
Expand All @@ -368,6 +378,16 @@ def tool_with_context(x: int, ctx: Context) -> str:
async def async_tool(x: int, context: Context) -> str:
await context.report_progress(50, 100)
return str(x)

@server.tool(async_supported=True)
async def long_running_tool(x: int, context: Context) -> str:
# This tool will be executed asynchronously
# The client will receive a resource URI immediately
# and can track progress through that resource
for i in range(100):
await asyncio.sleep(0.1)
await context.report_progress(i, 100)
return f"Processed {x}"
"""
# Check if user passed function directly instead of calling decorator
if callable(name):
Expand All @@ -378,7 +398,8 @@ async def async_tool(x: int, context: Context) -> str:

def decorator(fn: AnyFunction) -> AnyFunction:
self.add_tool(
fn, name=name, description=description, annotations=annotations
fn, name=name, description=description, annotations=annotations,
async_supported=async_supported
)
return fn

Expand Down Expand Up @@ -917,14 +938,38 @@ def my_tool(x: int, ctx: Context) -> str:
client_id = ctx.client_id

return str(x)

@server.tool(async_supported=True)
async def long_running_tool(x: int, ctx: Context) -> str:
# For async tools, the context.resource will be set to an AsyncResource
# that can be used to update progress and status

total_steps = 100
for i in range(total_steps):
# Do some work
await asyncio.sleep(0.1)

# Update progress through the AsyncResource
if ctx.resource:
await ctx.resource.update_progress(i, total_steps)

# You can also use the standard progress reporting
await ctx.report_progress(i, total_steps)

return f"Processed {x}"
```

The context parameter name can be anything as long as it's annotated with Context.
The context is optional - tools that don't need it can omit the parameter.

For asynchronous tools (marked with async_supported=True), the context will have
a resource attribute set to an AsyncResource instance that can be used to update
progress and status information.
"""

_request_context: RequestContext[ServerSessionT, LifespanContextT] | None
_fastmcp: FastMCP | None
resource: Any = None # Can hold a reference to an AsyncResource for async operations

def __init__(
self,
Expand Down
5 changes: 5 additions & 0 deletions src/mcp/server/fastmcp/tools/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class Tool(BaseModel):
annotations: ToolAnnotations | None = Field(
None, description="Optional annotations for the tool"
)
async_supported: bool = Field(
False, description="Whether this tool supports asynchronous execution"
)

@classmethod
def from_function(
Expand All @@ -43,6 +46,7 @@ def from_function(
description: str | None = None,
context_kwarg: str | None = None,
annotations: ToolAnnotations | None = None,
async_supported: bool = False,
) -> Tool:
"""Create a Tool from a function."""
from mcp.server.fastmcp.server import Context
Expand Down Expand Up @@ -79,6 +83,7 @@ def from_function(
is_async=is_async,
context_kwarg=context_kwarg,
annotations=annotations,
async_supported=async_supported,
)

async def run(
Expand Down
Loading
Loading