Skip to content

Commit 53138e6

Browse files
committed
Add mcp_proxy for bidirectional message forwarding
Adds a convenience function for proxying messages between two MCP transports, enabling bidirectional message forwarding with proper error handling. Features: - Bidirectional forwarding between client and server transports - Optional error callback (sync or async) for exceptions on streams - Graceful handling of closed/broken streams - Clean shutdown on context exit This is a simpler reimplementation of the proxy pattern from #1711/#1763, addressing all review feedback.
1 parent a3a4b8d commit 53138e6

File tree

3 files changed

+384
-0
lines changed

3 files changed

+384
-0
lines changed

src/mcp/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from .client.session import ClientSession
22
from .client.session_group import ClientSessionGroup
33
from .client.stdio import StdioServerParameters, stdio_client
4+
from .proxy import MessageStream, mcp_proxy
45
from .server.session import ServerSession
56
from .server.stdio import stdio_server
67
from .shared.exceptions import McpError, UrlElicitationRequiredError
@@ -97,6 +98,7 @@
9798
"LoggingLevel",
9899
"LoggingMessageNotification",
99100
"McpError",
101+
"MessageStream",
100102
"Notification",
101103
"PingRequest",
102104
"ProgressNotification",
@@ -130,6 +132,7 @@
130132
"ToolUseContent",
131133
"UnsubscribeRequest",
132134
"UrlElicitationRequiredError",
135+
"mcp_proxy",
133136
"stdio_client",
134137
"stdio_server",
135138
]

src/mcp/proxy.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Utilities for proxying messages between MCP transports."""
2+
3+
from collections.abc import AsyncGenerator, Awaitable, Callable
4+
from contextlib import asynccontextmanager
5+
6+
import anyio
7+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
8+
9+
from mcp.shared.message import SessionMessage
10+
11+
MessageStream = tuple[
12+
MemoryObjectReceiveStream[SessionMessage | Exception],
13+
MemoryObjectSendStream[SessionMessage],
14+
]
15+
16+
17+
@asynccontextmanager
18+
async def mcp_proxy(
19+
client_streams: MessageStream,
20+
server_streams: MessageStream,
21+
on_error: Callable[[Exception], None | Awaitable[None]] | None = None,
22+
) -> AsyncGenerator[None, None]:
23+
"""Proxy messages bidirectionally between two MCP transports.
24+
25+
Sets up bidirectional message forwarding between two transport pairs.
26+
Messages from the client are forwarded to the server, and vice versa.
27+
When the context exits, both forwarding directions are cancelled.
28+
29+
Args:
30+
client_streams: A tuple of (read_stream, write_stream) for the client side.
31+
server_streams: A tuple of (read_stream, write_stream) for the server side.
32+
on_error: Optional callback for handling exceptions received on streams.
33+
Can be sync or async. Called with the Exception object.
34+
35+
Example:
36+
```python
37+
async with mcp_proxy(
38+
client_streams=(client_read, client_write),
39+
server_streams=(server_read, server_write),
40+
on_error=lambda e: print(f"Error: {e}"),
41+
):
42+
# Proxy is active, forwarding messages bidirectionally
43+
await some_operation()
44+
# Forwarding stops when exiting the context
45+
```
46+
"""
47+
client_read, client_write = client_streams
48+
server_read, server_write = server_streams
49+
50+
async def forward(
51+
read: MemoryObjectReceiveStream[SessionMessage | Exception],
52+
write: MemoryObjectSendStream[SessionMessage],
53+
) -> None:
54+
async for msg in read:
55+
if isinstance(msg, Exception):
56+
if on_error:
57+
try:
58+
result = on_error(msg)
59+
if isinstance(result, Awaitable):
60+
await result
61+
except Exception:
62+
pass # Don't let callback errors crash the proxy
63+
else:
64+
try:
65+
await write.send(msg)
66+
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
67+
return # Destination closed, stop this direction
68+
69+
async with anyio.create_task_group() as tg:
70+
tg.start_soon(forward, client_read, server_write)
71+
tg.start_soon(forward, server_read, client_write)
72+
try:
73+
yield
74+
finally:
75+
tg.cancel_scope.cancel()

0 commit comments

Comments
 (0)