Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions mcp_server/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
# MCP Server Dependencies

# FastMCP - MCP protocol implementation with SSE support
fastmcp>=0.1.0
# FastMCP - MCP protocol implementation with HTTP transport support
# Version 2.14.1+ required for proper task group initialization with streamable-http
fastmcp>=2.14.1

# MCP client library (for testing)
mcp>=0.1.0
# MCP client library (for HTTP transport client)
# Version 1.25.0+ includes fixes for streamable-http task group issues (PR #841)
mcp>=1.25.0

# ConfigStore caching for performance optimization
cachetools>=5.0.0
# Pin to <6.0 for compatibility with google-auth and streamlit
cachetools>=5.0.0,<6.0
2 changes: 1 addition & 1 deletion mcp_server/run_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ host = os.environ.get('MCP_HOST', '127.0.0.1')
port = int(os.environ.get('MCP_PORT', '8000'))

print(f'Server starting on http://{host}:{port}')
mcp.run(transport='http', host=host, port=port)
mcp.run(transport='streamable-http', host=host, port=port)
"
4 changes: 2 additions & 2 deletions mcp_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1931,9 +1931,9 @@ async def health_check(request):

# Run MCP server with graceful shutdown handling
# Default: host=127.0.0.1, port=8000, path=/mcp
# For remote access, override with: mcp.run(transport="http", host="0.0.0.0", port=8080)
# For remote access, override with: mcp.run(transport="streamable-http", host="0.0.0.0", port=8080)
try:
mcp.run(transport="http")
mcp.run(transport="streamable-http")
except KeyboardInterrupt:
logger.info("Received shutdown signal (Ctrl+C)")
finally:
Expand Down
209 changes: 150 additions & 59 deletions tools/mcp_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
"""
MCP Data Collector Module

This module provides a Python interface to collect release data.
It can either use the MCP server (for remote access) or call OAR functions directly (for local access).
This module provides a Python interface to collect release data from the MCP server.

Usage:
# Use MCP server (remote)
collector = MCPDataCollector(use_mcp=True, server_url="http://server:8000/sse")
# Use MCP server (local default: http://localhost:8000/mcp)
collector = MCPDataCollector()

# Use direct OAR calls (local, faster)
collector = MCPDataCollector(use_mcp=False)
# Use MCP server (remote)
collector = MCPDataCollector(server_url="http://server:8000/mcp")

# Fetch release data
status = collector.get_release_status("4.19.1")
metadata = collector.get_release_metadata("4.19.1")
"""
Expand All @@ -21,8 +21,10 @@
import os
import sys
import asyncio
import concurrent.futures
from typing import Dict, Any, Optional
from mcp.client.sse import sse_client
import httpx
from mcp.client.streamable_http import streamable_http_client
from mcp import ClientSession

logger = logging.getLogger(__name__)
Expand All @@ -41,76 +43,137 @@ class MCPDataCollector:
- Release status (task completion, overall status)
- Release metadata (advisories, builds, dates)
- Shipment status (shipped, flow type)

Note: Maintains a persistent event loop to avoid "Task group is not initialized"
errors with FastMCP's streamable-http transport.
"""

def __init__(self, server_url: Optional[str] = None):
"""
Initialize the MCP data collector

Args:
server_url: MCP server URL (default: from MCP_SERVER_URL env var or http://localhost:8000/sse)
server_url: MCP server URL (default: from MCP_SERVER_URL env var or http://localhost:8000/mcp)
"""
default_url = os.environ.get('MCP_SERVER_URL', 'http://localhost:8000/sse')
default_url = os.environ.get('MCP_SERVER_URL', 'http://localhost:8000/mcp')
self.server_url = server_url or default_url
self.timeout = 120 # HTTP request timeout in seconds (increased from 60)
self.sse_read_timeout = 600 # SSE read timeout in seconds (10 min for slow operations, increased from 300)
self.timeout = 120 # HTTP connection timeout in seconds
self.read_timeout = 600 # HTTP read timeout in seconds (10 min for slow operations)

# Create a persistent event loop for this instance
# This avoids "Task group is not initialized" errors with streamable-http transport
# when making multiple calls (asyncio.run() creates new loop each time which breaks FastMCP)

# Check if we're in an environment with a running loop (like Streamlit)
try:
asyncio.get_running_loop()
self._loop_is_external = True
logger.info(f"Detected external event loop (e.g., Streamlit)")
except RuntimeError:
self._loop_is_external = False
logger.info(f"No external event loop detected")

# Always create our own dedicated event loop (don't reuse external loops)
self._loop = asyncio.new_event_loop()
logger.info(f"Created dedicated event loop for MCP data collector")

logger.info(f"Initialized MCP data collector with server: {self.server_url}")

async def _call_mcp_tool_async(self, tool_name: str, **kwargs) -> Dict[str, Any]:
async def _call_mcp_tool_async(self, tool_name: str, max_retries: int = 5, **kwargs) -> Dict[str, Any]:
"""
Call an MCP tool via SSE transport (async)
Call an MCP tool via HTTP transport with retry logic (async)

This implements retry logic similar to Claude Code's MCP client configuration
to handle transient "Task group is not initialized" errors from FastMCP.

Args:
tool_name: Name of the MCP tool to call
max_retries: Maximum number of retry attempts (default: 5, matching Claude Code)
**kwargs: Tool parameters

Returns:
Parsed JSON response from the tool

Raises:
RuntimeError: If tool call fails
RuntimeError: If tool call fails after all retries
"""
try:
logger.debug(f"Calling MCP tool: {tool_name} with args: {kwargs}")

# Connect to MCP server via SSE with explicit timeouts
async with sse_client(
self.server_url,
timeout=self.timeout,
sse_read_timeout=self.sse_read_timeout
) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the session
await session.initialize()

# Call the tool (no prefix needed for fastmcp server)
# Set a timeout for the tool call itself
result = await asyncio.wait_for(
session.call_tool(tool_name, arguments=kwargs),
timeout=self.sse_read_timeout
)

# Check for error response
if result.isError:
error_msg = result.content[0].text if result.content else "Unknown error"
logger.error(f"MCP tool {tool_name} returned error: {error_msg}")
raise RuntimeError(f"Tool returned error: {error_msg}")

# Extract text content from result
if result.content and len(result.content) > 0:
text_content = result.content[0].text
# Parse JSON response
return json.loads(text_content)
else:
logger.warning(f"Empty response from tool {tool_name}")
return {}

except asyncio.TimeoutError:
logger.error(f"Timeout calling MCP tool {tool_name} after {self.sse_read_timeout}s")
raise RuntimeError(f"MCP tool call timed out after {self.sse_read_timeout}s")
except Exception as e:
logger.error(f"Failed to call MCP tool {tool_name}: {str(e)}")
raise RuntimeError(f"MCP tool call failed: {str(e)}")
last_error = None
initial_delay = 2.0 # seconds, matching Claude Code's config

for attempt in range(max_retries):
try:
logger.debug(f"Calling MCP tool: {tool_name} with args: {kwargs} (attempt {attempt + 1}/{max_retries})")

# Create httpx client with timeout configuration
# Configure both connect and read timeouts to match MCP server expectations
http_timeout = httpx.Timeout(
connect=self.timeout, # Connection timeout
read=self.read_timeout, # Read timeout for long operations
write=self.timeout, # Write timeout
pool=self.timeout # Pool timeout
)

async with httpx.AsyncClient(timeout=http_timeout) as http_client:
# Connect to MCP server via HTTP with configured client
async with streamable_http_client(
self.server_url,
http_client=http_client
) as (read, write, _):
async with ClientSession(read, write) as session:
# Initialize the session
await session.initialize()

# Call the tool (no prefix needed for fastmcp server)
# Set a timeout for the tool call itself
result = await asyncio.wait_for(
session.call_tool(tool_name, arguments=kwargs),
timeout=self.read_timeout
)

# Check for error response
if result.isError:
error_msg = result.content[0].text if result.content else "Unknown error"
logger.error(f"MCP tool {tool_name} returned error: {error_msg}")
raise RuntimeError(f"Tool returned error: {error_msg}")

# Extract text content from result
if result.content and len(result.content) > 0:
text_content = result.content[0].text
# Parse JSON response
return json.loads(text_content)
else:
logger.warning(f"Empty response from tool {tool_name}")
return {}

except asyncio.TimeoutError as e:
last_error = e
logger.warning(f"Timeout calling MCP tool {tool_name} after {self.read_timeout}s (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
delay = initial_delay * (2 ** attempt) # Exponential backoff
logger.info(f"Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
continue
except Exception as e:
last_error = e
# Check if this is a "Task group is not initialized" error
error_str = str(e)
if "Task" in error_str and "group" in error_str:
logger.warning(f"Task group error calling MCP tool {tool_name}: {error_str} (attempt {attempt + 1}/{max_retries})")
if attempt < max_retries - 1:
delay = initial_delay * (2 ** attempt) # Exponential backoff
logger.info(f"Retrying in {delay:.1f}s...")
await asyncio.sleep(delay)
continue
# Non-retryable error, fail immediately
logger.error(f"Failed to call MCP tool {tool_name}: {str(e)}")
raise RuntimeError(f"MCP tool call failed: {str(e)}")

# All retries exhausted
error_msg = f"MCP tool call failed after {max_retries} attempts"
if last_error:
error_msg += f": {str(last_error)}"
logger.error(error_msg)
raise RuntimeError(error_msg)

async def get_release_status_async(self, release: str) -> Dict[str, Any]:
"""
Expand Down Expand Up @@ -224,22 +287,50 @@ async def get_all_release_data_async(self, release: str) -> Dict[str, Any]:
'shipped': shipped
}

def _run_async(self, coro):
"""
Run async coroutine using the persistent event loop.

This method ensures we reuse the same event loop across calls,
which is required for FastMCP's streamable-http transport.

When running in environments with existing event loops (like Streamlit),
we use our own dedicated loop in a way that doesn't conflict.
"""
if self._loop_is_external:
# We're in an environment with a running loop (e.g., Streamlit)
# We can't use loop.run_until_complete() directly because another loop is running
# Instead, we need to run the coroutine in our persistent loop using threading
# This avoids "This event loop is already running" error
def run_in_thread():
# Set our loop as the event loop for this thread
asyncio.set_event_loop(self._loop)
# Run the coroutine to completion
return self._loop.run_until_complete(coro)

with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(run_in_thread)
return future.result()
else:
# Use our persistent loop directly
return self._loop.run_until_complete(coro)

# Synchronous wrappers for backward compatibility
def get_release_status(self, release: str) -> Dict[str, Any]:
"""Get release status (sync wrapper)"""
return asyncio.run(self.get_release_status_async(release))
return self._run_async(self.get_release_status_async(release))

def get_release_metadata(self, release: str) -> Dict[str, Any]:
"""Get release metadata (sync wrapper)"""
return asyncio.run(self.get_release_metadata_async(release))
return self._run_async(self.get_release_metadata_async(release))

def is_release_shipped(self, release: str) -> Dict[str, Any]:
"""Check if release is shipped (sync wrapper)"""
return asyncio.run(self.is_release_shipped_async(release))
return self._run_async(self.is_release_shipped_async(release))

def get_all_release_data(self, release: str) -> Dict[str, Any]:
"""Get all release data (sync wrapper)"""
return asyncio.run(self.get_all_release_data_async(release))
return self._run_async(self.get_all_release_data_async(release))


if __name__ == "__main__":
Expand Down
6 changes: 5 additions & 1 deletion tools/release_progress_dashboard/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ tabulate>=0.9.0 # Required for pandas df.to_markdown()
# Visualization
plotly>=5.17.0

# HTTP client for MCP server communication
# MCP client library for HTTP transport communication
# Version 1.25.0+ includes fixes for streamable-http task group issues (PR #841)
mcp>=1.25.0

# HTTP client for MCP server communication (used by mcp_data_collector)
requests>=2.31.0

# Existing dependencies (from main requirements)
Expand Down