diff --git a/examples/servers/simple-streamablehttp-stateless/README.md b/examples/servers/simple-streamablehttp-stateless/README.md new file mode 100644 index 00000000..2abb6061 --- /dev/null +++ b/examples/servers/simple-streamablehttp-stateless/README.md @@ -0,0 +1,41 @@ +# MCP Simple StreamableHttp Stateless Server Example + +A stateless MCP server example demonstrating the StreamableHttp transport without maintaining session state. This example is ideal for understanding how to deploy MCP servers in multi-node environments where requests can be routed to any instance. + +## Features + +- Uses the StreamableHTTP transport in stateless mode (mcp_session_id=None) +- Each request creates a new ephemeral connection +- No session state maintained between requests +- Task lifecycle scoped to individual requests +- Suitable for deployment in multi-node environments + + +## Usage + +Start the server: + +```bash +# Using default port 3000 +uv run mcp-simple-streamablehttp-stateless + +# Using custom port +uv run mcp-simple-streamablehttp-stateless --port 3000 + +# Custom logging level +uv run mcp-simple-streamablehttp-stateless --log-level DEBUG + +# Enable JSON responses instead of SSE streams +uv run mcp-simple-streamablehttp-stateless --json-response +``` + +The server exposes a tool named "start-notification-stream" that accepts three arguments: + +- `interval`: Time between notifications in seconds (e.g., 1.0) +- `count`: Number of notifications to send (e.g., 5) +- `caller`: Identifier string for the caller + + +## Client + +You can connect to this server using an HTTP client. For now, only the TypeScript SDK has streamable HTTP client examples, or you can use [Inspector](https://github.com/modelcontextprotocol/inspector) for testing. \ No newline at end of file diff --git a/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/__init__.py b/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/__main__.py b/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/__main__.py new file mode 100644 index 00000000..f5f6e402 --- /dev/null +++ b/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/__main__.py @@ -0,0 +1,4 @@ +from .server import main + +if __name__ == "__main__": + main() diff --git a/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py b/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py new file mode 100644 index 00000000..da8158a9 --- /dev/null +++ b/examples/servers/simple-streamablehttp-stateless/mcp_simple_streamablehttp_stateless/server.py @@ -0,0 +1,168 @@ +import contextlib +import logging + +import anyio +import click +import mcp.types as types +from mcp.server.lowlevel import Server +from mcp.server.streamableHttp import ( + StreamableHTTPServerTransport, +) +from starlette.applications import Starlette +from starlette.routing import Mount + +logger = logging.getLogger(__name__) +# Global task group that will be initialized in the lifespan +task_group = None + + +@contextlib.asynccontextmanager +async def lifespan(app): + """Application lifespan context manager for managing task group.""" + global task_group + + async with anyio.create_task_group() as tg: + task_group = tg + logger.info("Application started, task group initialized!") + try: + yield + finally: + logger.info("Application shutting down, cleaning up resources...") + if task_group: + tg.cancel_scope.cancel() + task_group = None + logger.info("Resources cleaned up successfully.") + + +@click.command() +@click.option("--port", default=3000, help="Port to listen on for HTTP") +@click.option( + "--log-level", + default="INFO", + help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", +) +@click.option( + "--json-response", + is_flag=True, + default=False, + help="Enable JSON responses instead of SSE streams", +) +def main( + port: int, + log_level: str, + json_response: bool, +) -> int: + # Configure logging + logging.basicConfig( + level=getattr(logging, log_level.upper()), + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + app = Server("mcp-streamable-http-stateless-demo") + + @app.call_tool() + async def call_tool( + name: str, arguments: dict + ) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]: + ctx = app.request_context + interval = arguments.get("interval", 1.0) + count = arguments.get("count", 5) + caller = arguments.get("caller", "unknown") + + # Send the specified number of notifications with the given interval + for i in range(count): + await ctx.session.send_log_message( + level="info", + data=f"Notification {i+1}/{count} from caller: {caller}", + logger="notification_stream", + related_request_id=ctx.request_id, + ) + if i < count - 1: # Don't wait after the last notification + await anyio.sleep(interval) + + return [ + types.TextContent( + type="text", + text=( + f"Sent {count} notifications with {interval}s interval" + f" for caller: {caller}" + ), + ) + ] + + @app.list_tools() + async def list_tools() -> list[types.Tool]: + return [ + types.Tool( + name="start-notification-stream", + description=( + "Sends a stream of notifications with configurable count" + " and interval" + ), + inputSchema={ + "type": "object", + "required": ["interval", "count", "caller"], + "properties": { + "interval": { + "type": "number", + "description": "Interval between notifications in seconds", + }, + "count": { + "type": "number", + "description": "Number of notifications to send", + }, + "caller": { + "type": "string", + "description": ( + "Identifier of the caller to include in notifications" + ), + }, + }, + }, + ) + ] + + # ASGI handler for stateless HTTP connections + async def handle_streamable_http(scope, receive, send): + logger.debug("Creating new transport") + # Use lock to prevent race conditions when creating new sessions + http_transport = StreamableHTTPServerTransport( + mcp_session_id=None, + is_json_response_enabled=json_response, + ) + async with http_transport.connect() as streams: + read_stream, write_stream = streams + + if not task_group: + raise RuntimeError("Task group is not initialized") + + async def run_server(): + await app.run( + read_stream, + write_stream, + app.create_initialization_options(), + # Runs in standalone mode for stateless deployments + # where clients perform initialization with any node + standalone_mode=True, + ) + + # Start server task + task_group.start_soon(run_server) + + # Handle the HTTP request and return the response + await http_transport.handle_request(scope, receive, send) + + # Create an ASGI application using the transport + starlette_app = Starlette( + debug=True, + routes=[ + Mount("/mcp", app=handle_streamable_http), + ], + lifespan=lifespan, + ) + + import uvicorn + + uvicorn.run(starlette_app, host="0.0.0.0", port=port) + + return 0 diff --git a/examples/servers/simple-streamablehttp-stateless/pyproject.toml b/examples/servers/simple-streamablehttp-stateless/pyproject.toml new file mode 100644 index 00000000..d2b08945 --- /dev/null +++ b/examples/servers/simple-streamablehttp-stateless/pyproject.toml @@ -0,0 +1,36 @@ +[project] +name = "mcp-simple-streamablehttp-stateless" +version = "0.1.0" +description = "A simple MCP server exposing a StreamableHttp transport in stateless mode" +readme = "README.md" +requires-python = ">=3.10" +authors = [{ name = "Anthropic, PBC." }] +keywords = ["mcp", "llm", "automation", "web", "fetch", "http", "streamable", "stateless"] +license = { text = "MIT" } +dependencies = ["anyio>=4.5", "click>=8.1.0", "httpx>=0.27", "mcp", "starlette", "uvicorn"] + +[project.scripts] +mcp-simple-streamablehttp-stateless = "mcp_simple_streamablehttp_stateless.server:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["mcp_simple_streamablehttp_stateless"] + +[tool.pyright] +include = ["mcp_simple_streamablehttp_stateless"] +venvPath = "." +venv = ".venv" + +[tool.ruff.lint] +select = ["E", "F", "I"] +ignore = [] + +[tool.ruff] +line-length = 88 +target-version = "py310" + +[tool.uv] +dev-dependencies = ["pyright>=1.1.378", "pytest>=8.3.3", "ruff>=0.6.9"] \ No newline at end of file diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index dbaff305..b47f5305 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -479,11 +479,21 @@ async def run( # but also make tracing exceptions much easier during testing and when using # in-process servers. raise_exceptions: bool = False, + # When True, the server runs in standalone mode for stateless deployments where + # clients can perform initialization with any node. The client must still follow + # the initialization lifecycle, but can do so with any available node + # rather than requiring initialization for each connection. + standalone_mode: bool = False, ): async with AsyncExitStack() as stack: lifespan_context = await stack.enter_async_context(self.lifespan(self)) session = await stack.enter_async_context( - ServerSession(read_stream, write_stream, initialization_options) + ServerSession( + read_stream, + write_stream, + initialization_options, + standalone_mode=standalone_mode, + ) ) async with anyio.create_task_group() as tg: diff --git a/src/mcp/server/session.py b/src/mcp/server/session.py index 3a1f210d..07e5a315 100644 --- a/src/mcp/server/session.py +++ b/src/mcp/server/session.py @@ -85,11 +85,15 @@ def __init__( read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception], write_stream: MemoryObjectSendStream[types.JSONRPCMessage], init_options: InitializationOptions, + standalone_mode: bool = False, ) -> None: super().__init__( read_stream, write_stream, types.ClientRequest, types.ClientNotification ) - self._initialization_state = InitializationState.NotInitialized + if standalone_mode: + self._initialization_state = InitializationState.Initialized + else: + self._initialization_state = InitializationState.NotInitialized self._init_options = init_options self._incoming_message_stream_writer, self._incoming_message_stream_reader = ( anyio.create_memory_object_stream[ServerRequestResponder](0) diff --git a/uv.lock b/uv.lock index 3ea01ff8..7eb34e9e 100644 --- a/uv.lock +++ b/uv.lock @@ -11,6 +11,7 @@ members = [ "mcp-simple-prompt", "mcp-simple-resource", "mcp-simple-streamablehttp", + "mcp-simple-streamablehttp-stateless", "mcp-simple-tool", ] @@ -488,6 +489,7 @@ wheels = [ [[package]] name = "mcp" + source = { editable = "." } dependencies = [ { name = "anyio" }, @@ -666,6 +668,43 @@ dev = [ { name = "ruff", specifier = ">=0.6.9" }, ] +[[package]] +name = "mcp-simple-streamablehttp-stateless" +version = "0.1.0" +source = { editable = "examples/servers/simple-streamablehttp-stateless" } +dependencies = [ + { name = "anyio" }, + { name = "click" }, + { name = "httpx" }, + { name = "mcp" }, + { name = "starlette" }, + { name = "uvicorn" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pyright" }, + { name = "pytest" }, + { name = "ruff" }, +] + +[package.metadata] +requires-dist = [ + { name = "anyio", specifier = ">=4.5" }, + { name = "click", specifier = ">=8.1.0" }, + { name = "httpx", specifier = ">=0.27" }, + { name = "mcp", editable = "." }, + { name = "starlette" }, + { name = "uvicorn" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pyright", specifier = ">=1.1.378" }, + { name = "pytest", specifier = ">=8.3.3" }, + { name = "ruff", specifier = ">=0.6.9" }, +] + [[package]] name = "mcp-simple-tool" version = "0.1.0"