Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions mcpgateway/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import asyncio
import base64
import binascii
from collections import defaultdict
from collections import defaultdict, OrderedDict
import csv
from datetime import datetime, timedelta, timezone
from functools import lru_cache, wraps
Expand Down Expand Up @@ -687,8 +687,10 @@ def set_logging_service(service: LoggingService):

# Set up basic authentication

# Rate limiting storage
rate_limit_storage = defaultdict(list)
# Rate limiting storage — bounded to prevent memory exhaustion (CWE-400).
# Uses OrderedDict so oldest entries can be evicted when the cap is reached.
_RATE_LIMIT_MAX_KEYS: int = 10_000
rate_limit_storage: OrderedDict[str, list] = OrderedDict()


@lru_cache(maxsize=1)
Expand Down Expand Up @@ -888,8 +890,8 @@ def rate_limit(requests_per_minute: Optional[int] = None):
Test rate limit storage structure:
>>> isinstance(admin.rate_limit_storage, dict)
True
>>> from collections import defaultdict
>>> isinstance(admin.rate_limit_storage, defaultdict)
>>> from collections import OrderedDict
>>> isinstance(admin.rate_limit_storage, OrderedDict)
True

Test decorator with zero limit:
Expand Down Expand Up @@ -938,17 +940,29 @@ async def wrapper(*args, request: Optional[Request] = None, **kwargs):
current_time = time.time()
minute_ago = current_time - 60

# prune old timestamps
rate_limit_storage[client_ip] = [ts for ts in rate_limit_storage[client_ip] if ts > minute_ago]
# prune old timestamps for this IP
timestamps = [ts for ts in rate_limit_storage.get(client_ip, []) if ts > minute_ago]

# enforce
if len(rate_limit_storage[client_ip]) >= limit:
if len(timestamps) >= limit:
LOGGER.warning(f"Rate limit exceeded for IP {client_ip} on endpoint {func_to_wrap.__name__}")
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Maximum {limit} requests per minute.",
)
rate_limit_storage[client_ip].append(current_time)
timestamps.append(current_time)
rate_limit_storage[client_ip] = timestamps
# Move this IP to the end so it's treated as most-recently-used
rate_limit_storage.move_to_end(client_ip)

# Evict oldest IPs when storage exceeds the bounded cap (CWE-400 mitigation)
while len(rate_limit_storage) > _RATE_LIMIT_MAX_KEYS:
rate_limit_storage.popitem(last=False)

# Remove keys whose timestamps have all expired (prevents stale key buildup)
stale_keys = [k for k, v in rate_limit_storage.items() if not v or v[-1] <= minute_ago]
for k in stale_keys:
del rate_limit_storage[k]
if accepts_request:
return await func_to_wrap(*args, request=request, **kwargs)
return await func_to_wrap(*args, **kwargs)
Expand Down
28 changes: 16 additions & 12 deletions tests/unit/mcpgateway/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4765,24 +4765,28 @@ async def test_rate_limit_cleanup(self, mock_request, mock_db):
async def test_endpoint(*args, request=None, **kwargs):
return "success"

mock_request.client.host = "127.0.0.1"

# Add old timestamp manually (simulate old request)
# Note: mock_request with spec=Request evaluates as falsy, so the rate
# limiter falls back to client_ip="unknown". Seed the stale entry under
# that key to exercise the cleanup path.
stale_ip = "unknown"
old_time = time.time() - 120 # 2 minutes ago
rate_limit_storage["127.0.0.1"].append(old_time)
rate_limit_storage[stale_ip] = [old_time]

# Also seed a second stale IP to verify cross-key cleanup
rate_limit_storage["192.168.99.99"] = [old_time]

# New request should clean up old entries
result = await test_endpoint(request=mock_request)
assert result == "success"

# Check cleanup happened
remaining_entries = rate_limit_storage["127.0.0.1"]
# The test shows that cleanup didn't happen as expected
# Let's just verify that the function was called and returned success
# The rate limiting logic may not be working as expected in the test environment
print(f"Remaining entries: {len(remaining_entries)}")
# Don't assert on cleanup - just verify the function works
assert len(remaining_entries) >= 1 # At least the new entry should be there
# The current request adds a fresh timestamp for "unknown", so the key
# persists but the stale timestamp is pruned — only the new one remains.
remaining = rate_limit_storage.get(stale_ip, [])
assert len(remaining) == 1, f"Expected 1 fresh entry, got {len(remaining)}"
assert remaining[0] > old_time, "Remaining entry should be the fresh timestamp"

# The other stale IP (never requested) should be cleaned up entirely
assert "192.168.99.99" not in rate_limit_storage, "Stale IP key should be evicted"


class TestGlobalConfigurationEndpoints:
Expand Down