1+ # -*- coding: utf-8 -*-
2+ """Extended tests for session_registry.py to improve coverage.
3+
4+ Copyright 2025
5+ SPDX-License-Identifier: Apache-2.0
6+ Authors: Mihai Criveti
7+
8+ This test suite focuses on uncovered code paths in session_registry.py
9+ including import error handling, backend edge cases, and error scenarios.
10+ """
11+
12+ # Future
13+ from __future__ import annotations
14+
15+ # Standard
16+ import sys
17+ from unittest .mock import patch , AsyncMock , Mock
18+ import pytest
19+ import asyncio
20+
21+ # First-Party
22+ from mcpgateway .cache .session_registry import SessionRegistry
23+
24+
25+ class TestImportErrors :
26+ """Test import error handling for optional dependencies."""
27+
28+ def test_redis_import_error_flag (self ):
29+ """Test REDIS_AVAILABLE flag when redis import fails."""
30+ with patch .dict (sys .modules , {'redis.asyncio' : None }):
31+ import importlib
32+ import mcpgateway .cache .session_registry
33+ importlib .reload (mcpgateway .cache .session_registry )
34+
35+ # Should set REDIS_AVAILABLE = False
36+ assert not mcpgateway .cache .session_registry .REDIS_AVAILABLE
37+
38+ def test_sqlalchemy_import_error_flag (self ):
39+ """Test SQLALCHEMY_AVAILABLE flag when sqlalchemy import fails."""
40+ with patch .dict (sys .modules , {'sqlalchemy' : None }):
41+ import importlib
42+ import mcpgateway .cache .session_registry
43+ importlib .reload (mcpgateway .cache .session_registry )
44+
45+ # Should set SQLALCHEMY_AVAILABLE = False
46+ assert not mcpgateway .cache .session_registry .SQLALCHEMY_AVAILABLE
47+
48+
49+ class TestNoneBackend :
50+ """Test 'none' backend functionality."""
51+
52+ @pytest .mark .asyncio
53+ async def test_none_backend_initialization_logging (self , caplog ):
54+ """Test that 'none' backend logs initialization message."""
55+ registry = SessionRegistry (backend = "none" )
56+
57+ # Check that initialization message is logged
58+ assert "Session registry initialized with 'none' backend - session tracking disabled" in caplog .text
59+
60+ @pytest .mark .asyncio
61+ async def test_none_backend_initialize_method (self ):
62+ """Test 'none' backend initialize method does nothing."""
63+ registry = SessionRegistry (backend = "none" )
64+
65+ # Should not raise any errors
66+ await registry .initialize ()
67+
68+ # No cleanup task should be created
69+ assert registry ._cleanup_task is None
70+
71+
72+ class TestRedisBackendErrors :
73+ """Test Redis backend error scenarios."""
74+
75+ @pytest .mark .asyncio
76+ async def test_redis_add_session_error (self , monkeypatch , caplog ):
77+ """Test Redis error during add_session."""
78+ mock_redis = AsyncMock ()
79+ mock_redis .setex = AsyncMock (side_effect = Exception ("Redis connection error" ))
80+ mock_redis .publish = AsyncMock ()
81+
82+ with patch ('mcpgateway.cache.session_registry.REDIS_AVAILABLE' , True ):
83+ with patch ('mcpgateway.cache.session_registry.Redis' ) as MockRedis :
84+ MockRedis .from_url .return_value = mock_redis
85+
86+ registry = SessionRegistry (backend = "redis" , redis_url = "redis://localhost" )
87+
88+ class DummyTransport :
89+ async def disconnect (self ):
90+ pass
91+ async def is_connected (self ):
92+ return True
93+
94+ transport = DummyTransport ()
95+ await registry .add_session ("test_session" , transport )
96+
97+ # Should log the Redis error
98+ assert "Redis error adding session test_session: Redis connection error" in caplog .text
99+
100+ @pytest .mark .asyncio
101+ async def test_redis_broadcast_error (self , monkeypatch , caplog ):
102+ """Test Redis error during broadcast."""
103+ mock_redis = AsyncMock ()
104+ mock_redis .publish = AsyncMock (side_effect = Exception ("Redis publish error" ))
105+
106+ with patch ('mcpgateway.cache.session_registry.REDIS_AVAILABLE' , True ):
107+ with patch ('mcpgateway.cache.session_registry.Redis' ) as MockRedis :
108+ MockRedis .from_url .return_value = mock_redis
109+
110+ registry = SessionRegistry (backend = "redis" , redis_url = "redis://localhost" )
111+
112+ await registry .broadcast ("test_session" , {"test" : "message" })
113+
114+ # Should log the Redis error
115+ assert "Redis error during broadcast: Redis publish error" in caplog .text
116+
117+
118+ class TestDatabaseBackendErrors :
119+ """Test database backend error scenarios."""
120+
121+ @pytest .mark .asyncio
122+ async def test_database_add_session_error (self , monkeypatch , caplog ):
123+ """Test database error during add_session."""
124+ def mock_get_db ():
125+ mock_session = Mock ()
126+ mock_session .add = Mock (side_effect = Exception ("Database connection error" ))
127+ mock_session .rollback = Mock ()
128+ mock_session .close = Mock ()
129+ yield mock_session
130+
131+ with patch ('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE' , True ):
132+ with patch ('mcpgateway.cache.session_registry.get_db' , mock_get_db ):
133+ with patch ('asyncio.to_thread' ) as mock_to_thread :
134+ # Simulate the database error being raised from the thread
135+ mock_to_thread .side_effect = Exception ("Database connection error" )
136+
137+ registry = SessionRegistry (backend = "database" , database_url = "sqlite:///test.db" )
138+
139+ class DummyTransport :
140+ async def disconnect (self ):
141+ pass
142+ async def is_connected (self ):
143+ return True
144+
145+ transport = DummyTransport ()
146+ await registry .add_session ("test_session" , transport )
147+
148+ # Should log the database error
149+ assert "Database error adding session test_session: Database connection error" in caplog .text
150+
151+ @pytest .mark .asyncio
152+ async def test_database_broadcast_error (self , monkeypatch , caplog ):
153+ """Test database error during broadcast."""
154+ def mock_get_db ():
155+ mock_session = Mock ()
156+ mock_session .add = Mock (side_effect = Exception ("Database broadcast error" ))
157+ mock_session .rollback = Mock ()
158+ mock_session .close = Mock ()
159+ yield mock_session
160+
161+ with patch ('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE' , True ):
162+ with patch ('mcpgateway.cache.session_registry.get_db' , mock_get_db ):
163+ with patch ('asyncio.to_thread' ) as mock_to_thread :
164+ # Simulate the database error being raised from the thread
165+ mock_to_thread .side_effect = Exception ("Database broadcast error" )
166+
167+ registry = SessionRegistry (backend = "database" , database_url = "sqlite:///test.db" )
168+
169+ await registry .broadcast ("test_session" , {"test" : "message" })
170+
171+ # Should log the database error
172+ assert "Database error during broadcast: Database broadcast error" in caplog .text
173+
174+
175+ class TestInitializationAndShutdown :
176+ """Test initialization and shutdown methods."""
177+
178+ @pytest .mark .asyncio
179+ async def test_memory_backend_initialization_logging (self , caplog ):
180+ """Test memory backend initialization creates cleanup task."""
181+ registry = SessionRegistry (backend = "memory" )
182+ await registry .initialize ()
183+
184+ try :
185+ # Should log initialization
186+ assert "Initializing session registry with backend: memory" in caplog .text
187+ assert "Memory cleanup task started" in caplog .text
188+
189+ # Should have created cleanup task
190+ assert registry ._cleanup_task is not None
191+ assert not registry ._cleanup_task .done ()
192+
193+ finally :
194+ await registry .shutdown ()
195+
196+ @pytest .mark .asyncio
197+ async def test_database_backend_initialization_logging (self , caplog ):
198+ """Test database backend initialization creates cleanup task."""
199+ with patch ('mcpgateway.cache.session_registry.SQLALCHEMY_AVAILABLE' , True ):
200+ registry = SessionRegistry (backend = "database" , database_url = "sqlite:///test.db" )
201+ await registry .initialize ()
202+
203+ try :
204+ # Should log initialization
205+ assert "Initializing session registry with backend: database" in caplog .text
206+ assert "Database cleanup task started" in caplog .text
207+
208+ # Should have created cleanup task
209+ assert registry ._cleanup_task is not None
210+ assert not registry ._cleanup_task .done ()
211+
212+ finally :
213+ await registry .shutdown ()
214+
215+ @pytest .mark .asyncio
216+ async def test_redis_initialization_subscribe (self , monkeypatch ):
217+ """Test Redis backend initialization subscribes to events."""
218+ mock_redis = AsyncMock ()
219+ mock_pubsub = AsyncMock ()
220+ mock_redis .pubsub = Mock (return_value = mock_pubsub ) # Use Mock for sync method
221+
222+ with patch ('mcpgateway.cache.session_registry.REDIS_AVAILABLE' , True ):
223+ with patch ('mcpgateway.cache.session_registry.Redis' ) as MockRedis :
224+ MockRedis .from_url .return_value = mock_redis
225+
226+ registry = SessionRegistry (backend = "redis" , redis_url = "redis://localhost" )
227+ await registry .initialize ()
228+
229+ try :
230+ # Should have subscribed to events channel
231+ mock_pubsub .subscribe .assert_called_once_with ("mcp_session_events" )
232+
233+ finally :
234+ await registry .shutdown ()
235+
236+ @pytest .mark .asyncio
237+ async def test_shutdown_cancels_cleanup_task (self ):
238+ """Test shutdown properly cancels cleanup tasks."""
239+ registry = SessionRegistry (backend = "memory" )
240+ await registry .initialize ()
241+
242+ original_task = registry ._cleanup_task
243+ assert not original_task .cancelled ()
244+
245+ await registry .shutdown ()
246+
247+ # Task should be cancelled
248+ assert original_task .cancelled ()
249+
250+ @pytest .mark .asyncio
251+ async def test_shutdown_handles_already_cancelled_task (self ):
252+ """Test shutdown handles already cancelled cleanup task."""
253+ registry = SessionRegistry (backend = "memory" )
254+ await registry .initialize ()
255+
256+ # Cancel task before shutdown
257+ registry ._cleanup_task .cancel ()
258+
259+ # Shutdown should not raise error
260+ await registry .shutdown ()
0 commit comments