diff --git a/mcp_server/requirements.txt b/mcp_server/requirements.txt index e2bda8577844..7b01e9996286 100644 --- a/mcp_server/requirements.txt +++ b/mcp_server/requirements.txt @@ -1,10 +1,13 @@ # MCP Server Dependencies -# FastMCP - MCP protocol implementation with SSE support -fastmcp>=0.1.0 +# FastMCP - MCP protocol implementation with HTTP transport support +# Version 2.14.1+ required for proper task group initialization with streamable-http +fastmcp>=2.14.1 -# MCP client library (for testing) -mcp>=0.1.0 +# MCP client library (for HTTP transport client) +# Version 1.25.0+ includes fixes for streamable-http task group issues (PR #841) +mcp>=1.25.0 # ConfigStore caching for performance optimization -cachetools>=5.0.0 +# Pin to <6.0 for compatibility with google-auth and streamlit +cachetools>=5.0.0,<6.0 diff --git a/mcp_server/run_server.sh b/mcp_server/run_server.sh index 4d2317e92dd4..685574acfcf9 100755 --- a/mcp_server/run_server.sh +++ b/mcp_server/run_server.sh @@ -62,5 +62,5 @@ host = os.environ.get('MCP_HOST', '127.0.0.1') port = int(os.environ.get('MCP_PORT', '8000')) print(f'Server starting on http://{host}:{port}') -mcp.run(transport='http', host=host, port=port) +mcp.run(transport='streamable-http', host=host, port=port) " diff --git a/mcp_server/server.py b/mcp_server/server.py index ea49f7866f95..b19a440d2bfe 100644 --- a/mcp_server/server.py +++ b/mcp_server/server.py @@ -1931,9 +1931,9 @@ async def health_check(request): # Run MCP server with graceful shutdown handling # Default: host=127.0.0.1, port=8000, path=/mcp - # For remote access, override with: mcp.run(transport="http", host="0.0.0.0", port=8080) + # For remote access, override with: mcp.run(transport="streamable-http", host="0.0.0.0", port=8080) try: - mcp.run(transport="http") + mcp.run(transport="streamable-http") except KeyboardInterrupt: logger.info("Received shutdown signal (Ctrl+C)") finally: diff --git a/tools/mcp_data_collector.py b/tools/mcp_data_collector.py index 9a0755c7c567..f00f1fe69ab1 100644 --- a/tools/mcp_data_collector.py +++ b/tools/mcp_data_collector.py @@ -2,16 +2,16 @@ """ MCP Data Collector Module -This module provides a Python interface to collect release data. -It can either use the MCP server (for remote access) or call OAR functions directly (for local access). +This module provides a Python interface to collect release data from the MCP server. Usage: - # Use MCP server (remote) - collector = MCPDataCollector(use_mcp=True, server_url="http://server:8000/sse") + # Use MCP server (local default: http://localhost:8000/mcp) + collector = MCPDataCollector() - # Use direct OAR calls (local, faster) - collector = MCPDataCollector(use_mcp=False) + # Use MCP server (remote) + collector = MCPDataCollector(server_url="http://server:8000/mcp") + # Fetch release data status = collector.get_release_status("4.19.1") metadata = collector.get_release_metadata("4.19.1") """ @@ -21,8 +21,10 @@ import os import sys import asyncio +import concurrent.futures from typing import Dict, Any, Optional -from mcp.client.sse import sse_client +import httpx +from mcp.client.streamable_http import streamable_http_client from mcp import ClientSession logger = logging.getLogger(__name__) @@ -41,6 +43,9 @@ class MCPDataCollector: - Release status (task completion, overall status) - Release metadata (advisories, builds, dates) - Shipment status (shipped, flow type) + + Note: Maintains a persistent event loop to avoid "Task group is not initialized" + errors with FastMCP's streamable-http transport. """ def __init__(self, server_url: Optional[str] = None): @@ -48,69 +53,127 @@ def __init__(self, server_url: Optional[str] = None): Initialize the MCP data collector Args: - server_url: MCP server URL (default: from MCP_SERVER_URL env var or http://localhost:8000/sse) + server_url: MCP server URL (default: from MCP_SERVER_URL env var or http://localhost:8000/mcp) """ - default_url = os.environ.get('MCP_SERVER_URL', 'http://localhost:8000/sse') + default_url = os.environ.get('MCP_SERVER_URL', 'http://localhost:8000/mcp') self.server_url = server_url or default_url - self.timeout = 120 # HTTP request timeout in seconds (increased from 60) - self.sse_read_timeout = 600 # SSE read timeout in seconds (10 min for slow operations, increased from 300) + self.timeout = 120 # HTTP connection timeout in seconds + self.read_timeout = 600 # HTTP read timeout in seconds (10 min for slow operations) + + # Create a persistent event loop for this instance + # This avoids "Task group is not initialized" errors with streamable-http transport + # when making multiple calls (asyncio.run() creates new loop each time which breaks FastMCP) + + # Check if we're in an environment with a running loop (like Streamlit) + try: + asyncio.get_running_loop() + self._loop_is_external = True + logger.info(f"Detected external event loop (e.g., Streamlit)") + except RuntimeError: + self._loop_is_external = False + logger.info(f"No external event loop detected") + + # Always create our own dedicated event loop (don't reuse external loops) + self._loop = asyncio.new_event_loop() + logger.info(f"Created dedicated event loop for MCP data collector") + logger.info(f"Initialized MCP data collector with server: {self.server_url}") - async def _call_mcp_tool_async(self, tool_name: str, **kwargs) -> Dict[str, Any]: + async def _call_mcp_tool_async(self, tool_name: str, max_retries: int = 5, **kwargs) -> Dict[str, Any]: """ - Call an MCP tool via SSE transport (async) + Call an MCP tool via HTTP transport with retry logic (async) + + This implements retry logic similar to Claude Code's MCP client configuration + to handle transient "Task group is not initialized" errors from FastMCP. Args: tool_name: Name of the MCP tool to call + max_retries: Maximum number of retry attempts (default: 5, matching Claude Code) **kwargs: Tool parameters Returns: Parsed JSON response from the tool Raises: - RuntimeError: If tool call fails + RuntimeError: If tool call fails after all retries """ - try: - logger.debug(f"Calling MCP tool: {tool_name} with args: {kwargs}") - - # Connect to MCP server via SSE with explicit timeouts - async with sse_client( - self.server_url, - timeout=self.timeout, - sse_read_timeout=self.sse_read_timeout - ) as (read, write): - async with ClientSession(read, write) as session: - # Initialize the session - await session.initialize() - - # Call the tool (no prefix needed for fastmcp server) - # Set a timeout for the tool call itself - result = await asyncio.wait_for( - session.call_tool(tool_name, arguments=kwargs), - timeout=self.sse_read_timeout - ) - - # Check for error response - if result.isError: - error_msg = result.content[0].text if result.content else "Unknown error" - logger.error(f"MCP tool {tool_name} returned error: {error_msg}") - raise RuntimeError(f"Tool returned error: {error_msg}") - - # Extract text content from result - if result.content and len(result.content) > 0: - text_content = result.content[0].text - # Parse JSON response - return json.loads(text_content) - else: - logger.warning(f"Empty response from tool {tool_name}") - return {} - - except asyncio.TimeoutError: - logger.error(f"Timeout calling MCP tool {tool_name} after {self.sse_read_timeout}s") - raise RuntimeError(f"MCP tool call timed out after {self.sse_read_timeout}s") - except Exception as e: - logger.error(f"Failed to call MCP tool {tool_name}: {str(e)}") - raise RuntimeError(f"MCP tool call failed: {str(e)}") + last_error = None + initial_delay = 2.0 # seconds, matching Claude Code's config + + for attempt in range(max_retries): + try: + logger.debug(f"Calling MCP tool: {tool_name} with args: {kwargs} (attempt {attempt + 1}/{max_retries})") + + # Create httpx client with timeout configuration + # Configure both connect and read timeouts to match MCP server expectations + http_timeout = httpx.Timeout( + connect=self.timeout, # Connection timeout + read=self.read_timeout, # Read timeout for long operations + write=self.timeout, # Write timeout + pool=self.timeout # Pool timeout + ) + + async with httpx.AsyncClient(timeout=http_timeout) as http_client: + # Connect to MCP server via HTTP with configured client + async with streamable_http_client( + self.server_url, + http_client=http_client + ) as (read, write, _): + async with ClientSession(read, write) as session: + # Initialize the session + await session.initialize() + + # Call the tool (no prefix needed for fastmcp server) + # Set a timeout for the tool call itself + result = await asyncio.wait_for( + session.call_tool(tool_name, arguments=kwargs), + timeout=self.read_timeout + ) + + # Check for error response + if result.isError: + error_msg = result.content[0].text if result.content else "Unknown error" + logger.error(f"MCP tool {tool_name} returned error: {error_msg}") + raise RuntimeError(f"Tool returned error: {error_msg}") + + # Extract text content from result + if result.content and len(result.content) > 0: + text_content = result.content[0].text + # Parse JSON response + return json.loads(text_content) + else: + logger.warning(f"Empty response from tool {tool_name}") + return {} + + except asyncio.TimeoutError as e: + last_error = e + logger.warning(f"Timeout calling MCP tool {tool_name} after {self.read_timeout}s (attempt {attempt + 1}/{max_retries})") + if attempt < max_retries - 1: + delay = initial_delay * (2 ** attempt) # Exponential backoff + logger.info(f"Retrying in {delay:.1f}s...") + await asyncio.sleep(delay) + continue + except Exception as e: + last_error = e + # Check if this is a "Task group is not initialized" error + error_str = str(e) + if "Task" in error_str and "group" in error_str: + logger.warning(f"Task group error calling MCP tool {tool_name}: {error_str} (attempt {attempt + 1}/{max_retries})") + if attempt < max_retries - 1: + delay = initial_delay * (2 ** attempt) # Exponential backoff + logger.info(f"Retrying in {delay:.1f}s...") + await asyncio.sleep(delay) + continue + # Non-retryable error, fail immediately + logger.error(f"Failed to call MCP tool {tool_name}: {str(e)}") + raise RuntimeError(f"MCP tool call failed: {str(e)}") + + # All retries exhausted + error_msg = f"MCP tool call failed after {max_retries} attempts" + if last_error: + error_msg += f": {str(last_error)}" + logger.error(error_msg) + raise RuntimeError(error_msg) async def get_release_status_async(self, release: str) -> Dict[str, Any]: """ @@ -224,22 +287,50 @@ async def get_all_release_data_async(self, release: str) -> Dict[str, Any]: 'shipped': shipped } + def _run_async(self, coro): + """ + Run async coroutine using the persistent event loop. + + This method ensures we reuse the same event loop across calls, + which is required for FastMCP's streamable-http transport. + + When running in environments with existing event loops (like Streamlit), + we use our own dedicated loop in a way that doesn't conflict. + """ + if self._loop_is_external: + # We're in an environment with a running loop (e.g., Streamlit) + # We can't use loop.run_until_complete() directly because another loop is running + # Instead, we need to run the coroutine in our persistent loop using threading + # This avoids "This event loop is already running" error + def run_in_thread(): + # Set our loop as the event loop for this thread + asyncio.set_event_loop(self._loop) + # Run the coroutine to completion + return self._loop.run_until_complete(coro) + + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit(run_in_thread) + return future.result() + else: + # Use our persistent loop directly + return self._loop.run_until_complete(coro) + # Synchronous wrappers for backward compatibility def get_release_status(self, release: str) -> Dict[str, Any]: """Get release status (sync wrapper)""" - return asyncio.run(self.get_release_status_async(release)) + return self._run_async(self.get_release_status_async(release)) def get_release_metadata(self, release: str) -> Dict[str, Any]: """Get release metadata (sync wrapper)""" - return asyncio.run(self.get_release_metadata_async(release)) + return self._run_async(self.get_release_metadata_async(release)) def is_release_shipped(self, release: str) -> Dict[str, Any]: """Check if release is shipped (sync wrapper)""" - return asyncio.run(self.is_release_shipped_async(release)) + return self._run_async(self.is_release_shipped_async(release)) def get_all_release_data(self, release: str) -> Dict[str, Any]: """Get all release data (sync wrapper)""" - return asyncio.run(self.get_all_release_data_async(release)) + return self._run_async(self.get_all_release_data_async(release)) if __name__ == "__main__": diff --git a/tools/release_progress_dashboard/requirements.txt b/tools/release_progress_dashboard/requirements.txt index 606eba8224c3..d2e956d6fe8e 100644 --- a/tools/release_progress_dashboard/requirements.txt +++ b/tools/release_progress_dashboard/requirements.txt @@ -10,7 +10,11 @@ tabulate>=0.9.0 # Required for pandas df.to_markdown() # Visualization plotly>=5.17.0 -# HTTP client for MCP server communication +# MCP client library for HTTP transport communication +# Version 1.25.0+ includes fixes for streamable-http task group issues (PR #841) +mcp>=1.25.0 + +# HTTP client for MCP server communication (used by mcp_data_collector) requests>=2.31.0 # Existing dependencies (from main requirements)