Skip to content

Commit b2290e9

Browse files
committed
✨ Add RabbitMQ lifespan management and validation in FastAPI
1 parent 89669b0 commit b2290e9

File tree

3 files changed

+184
-1
lines changed

3 files changed

+184
-1
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
import logging
2+
from collections.abc import AsyncIterator
3+
4+
from fastapi import FastAPI
5+
from fastapi_lifespan_manager import State
6+
from pydantic import BaseModel, ValidationError
7+
from servicelib.logging_utils import log_context
8+
from servicelib.rabbitmq import wait_till_rabbitmq_responsive
9+
from settings_library.rabbit import RabbitSettings
10+
11+
from .lifespan_utils import LifespanOnStartupError, record_lifespan_called_once
12+
13+
_logger = logging.getLogger(__name__)
14+
15+
16+
class RabbitMQConfigurationError(LifespanOnStartupError):
17+
msg_template = "Invalid RabbitMQ config on startup : {validation_error}"
18+
19+
20+
class RabbitMQLifespanState(BaseModel):
21+
RABBIT_SETTINGS: RabbitSettings
22+
23+
24+
async def rabbitmq_connectivity_lifespan(
25+
_: FastAPI, state: State
26+
) -> AsyncIterator[State]:
27+
"""Ensures RabbitMQ connectivity during lifespan.
28+
29+
For creating clients, use additional lifespans like rabbitmq_rpc_client_context.
30+
"""
31+
_lifespan_name = f"{__name__}.{rabbitmq_connectivity_lifespan.__name__}"
32+
33+
with log_context(_logger, logging.INFO, _lifespan_name):
34+
35+
# Check if lifespan has already been called
36+
called_state = record_lifespan_called_once(state, _lifespan_name)
37+
38+
# Validate input state
39+
try:
40+
rabbit_state = RabbitMQLifespanState.model_validate(state)
41+
rabbit_dsn_with_secrets = rabbit_state.RABBIT_SETTINGS.dsn
42+
except ValidationError as exc:
43+
raise RabbitMQConfigurationError(validation_error=exc, state=state) from exc
44+
45+
# Wait for RabbitMQ to be responsive
46+
await wait_till_rabbitmq_responsive(rabbit_dsn_with_secrets)
47+
48+
yield {"RABBIT_CONNECTIVITY_LIFESPAN_NAME": _lifespan_name, **called_state}

packages/service-library/src/servicelib/rabbitmq/_client_rpc.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import asyncio
22
import functools
33
import logging
4-
from collections.abc import Callable
4+
from collections.abc import AsyncIterator, Callable
5+
from contextlib import asynccontextmanager
56
from dataclasses import dataclass
67
from typing import Any
78

@@ -156,3 +157,19 @@ async def unregister_handler(self, handler: Callable[..., Any]) -> None:
156157
raise RPCNotInitializedError
157158

158159
await self._rpc.unregister(handler)
160+
161+
162+
@asynccontextmanager
163+
async def rabbitmq_rpc_client_context(
164+
rpc_client_name: str, settings: RabbitSettings, **kwargs
165+
) -> AsyncIterator[RabbitMQRPCClient]:
166+
"""
167+
Adapter to create and close a RabbitMQRPCClient using an async context manager.
168+
"""
169+
rpc_client = await RabbitMQRPCClient.create(
170+
client_name=rpc_client_name, settings=settings, **kwargs
171+
)
172+
try:
173+
yield rpc_client
174+
finally:
175+
await rpc_client.close()
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# pylint: disable=protected-access
2+
# pylint: disable=redefined-outer-name
3+
# pylint: disable=too-many-arguments
4+
# pylint: disable=unused-argument
5+
# pylint: disable=unused-variable
6+
7+
from collections.abc import AsyncIterator
8+
9+
import pytest
10+
import servicelib.fastapi.rabbitmq_lifespan
11+
from asgi_lifespan import LifespanManager as ASGILifespanManager
12+
from fastapi import FastAPI
13+
from fastapi_lifespan_manager import LifespanManager, State
14+
from pydantic import Field
15+
from pytest_mock import MockerFixture, MockType
16+
from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict
17+
from pytest_simcore.helpers.typing_env import EnvVarsDict
18+
from servicelib.fastapi.rabbitmq_lifespan import (
19+
RabbitMQConfigurationError,
20+
RabbitMQLifespanState,
21+
rabbitmq_connectivity_lifespan,
22+
)
23+
from settings_library.application import BaseApplicationSettings
24+
from settings_library.rabbit import RabbitSettings
25+
26+
27+
@pytest.fixture
28+
def mock_rabbitmq_connection(mocker: MockerFixture) -> MockType:
29+
return mocker.patch.object(
30+
servicelib.fastapi.rabbitmq_lifespan,
31+
"wait_till_rabbitmq_responsive",
32+
return_value=mocker.AsyncMock(),
33+
)
34+
35+
36+
@pytest.fixture
37+
def app_environment(monkeypatch: pytest.MonkeyPatch) -> EnvVarsDict:
38+
return setenvs_from_dict(
39+
monkeypatch, RabbitSettings.model_json_schema()["examples"][0]
40+
)
41+
42+
43+
@pytest.fixture
44+
def app_lifespan(
45+
app_environment: EnvVarsDict,
46+
mock_rabbitmq_connection: MockType,
47+
) -> LifespanManager:
48+
assert app_environment
49+
50+
class AppSettings(BaseApplicationSettings):
51+
RABBITMQ: RabbitSettings = Field(
52+
..., json_schema_extra={"auto_default_from_env": True}
53+
)
54+
55+
async def my_app_settings(app: FastAPI) -> AsyncIterator[State]:
56+
app.state.settings = AppSettings.create_from_envs()
57+
58+
yield RabbitMQLifespanState(
59+
RABBIT_SETTINGS=app.state.settings.RABBITMQ,
60+
).model_dump()
61+
62+
app_lifespan = LifespanManager()
63+
app_lifespan.add(my_app_settings)
64+
app_lifespan.add(rabbitmq_connectivity_lifespan)
65+
66+
assert not mock_rabbitmq_connection.called
67+
68+
return app_lifespan
69+
70+
71+
async def test_lifespan_rabbitmq_in_an_app(
72+
is_pdb_enabled: bool,
73+
app_environment: EnvVarsDict,
74+
mock_rabbitmq_connection: MockType,
75+
app_lifespan: LifespanManager,
76+
):
77+
app = FastAPI(lifespan=app_lifespan)
78+
79+
async with ASGILifespanManager(
80+
app,
81+
startup_timeout=None if is_pdb_enabled else 10,
82+
shutdown_timeout=None if is_pdb_enabled else 10,
83+
) as asgi_manager:
84+
# Verify that RabbitMQ responsiveness was checked
85+
mock_rabbitmq_connection.assert_called_once_with(
86+
app.state.settings.RABBITMQ.dsn
87+
)
88+
89+
# Verify that RabbitMQ settings are in the lifespan manager state
90+
assert app.state.settings.RABBITMQ
91+
92+
# No explicit shutdown logic for RabbitMQ in this case
93+
94+
95+
async def test_lifespan_rabbitmq_with_invalid_settings(
96+
is_pdb_enabled: bool,
97+
):
98+
async def my_app_settings(app: FastAPI) -> AsyncIterator[State]:
99+
yield {"RABBIT_SETTINGS": None}
100+
101+
app_lifespan = LifespanManager()
102+
app_lifespan.add(my_app_settings)
103+
app_lifespan.add(rabbitmq_connectivity_lifespan)
104+
105+
app = FastAPI(lifespan=app_lifespan)
106+
107+
with pytest.raises(RabbitMQConfigurationError, match="Invalid RabbitMQ") as excinfo:
108+
async with ASGILifespanManager(
109+
app,
110+
startup_timeout=None if is_pdb_enabled else 10,
111+
shutdown_timeout=None if is_pdb_enabled else 10,
112+
):
113+
...
114+
115+
exception = excinfo.value
116+
assert isinstance(exception, RabbitMQConfigurationError)
117+
assert exception.validation_error
118+
assert exception.state["RABBIT_SETTINGS"] is None

0 commit comments

Comments
 (0)