From 5a7cb977e18a43bd99dd41b0c375219f30959b3e Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Wed, 16 Apr 2025 18:59:57 +0200 Subject: [PATCH 1/6] updates redis lifespan --- .../src/servicelib/fastapi/redis_lifespan.py | 71 ++++++++++ .../tests/fastapi/test_redis_lifespan.py | 130 ++++++++++++++++++ .../src/settings_library/redis.py | 16 +++ 3 files changed, 217 insertions(+) create mode 100644 packages/service-library/src/servicelib/fastapi/redis_lifespan.py create mode 100644 packages/service-library/tests/fastapi/test_redis_lifespan.py diff --git a/packages/service-library/src/servicelib/fastapi/redis_lifespan.py b/packages/service-library/src/servicelib/fastapi/redis_lifespan.py new file mode 100644 index 00000000000..8594138750b --- /dev/null +++ b/packages/service-library/src/servicelib/fastapi/redis_lifespan.py @@ -0,0 +1,71 @@ +import asyncio +import logging +from collections.abc import AsyncIterator +from typing import Annotated + +from fastapi import FastAPI +from fastapi_lifespan_manager import State +from pydantic import BaseModel, ConfigDict, StringConstraints, ValidationError +from servicelib.logging_utils import log_catch, log_context +from settings_library.redis import RedisDatabase, RedisSettings + +from ..redis import RedisClientSDK +from .lifespan_utils import LifespanOnStartupError + +_logger = logging.getLogger(__name__) + + +class RedisConfigurationError(LifespanOnStartupError): + msg_template = "Invalid redis config on startup : {validation_error}" + + +class RedisLifespanState(BaseModel): + REDIS_SETTINGS: RedisSettings + REDIS_CLIENT_NAME: Annotated[str, StringConstraints(min_length=3, max_length=32)] + REDIS_CLIENT_DB: RedisDatabase + + model_config = ConfigDict( + extra="allow", + arbitrary_types_allowed=True, # RedisClientSDK has some arbitrary types and this class will never be serialized + ) + + +async def redis_database_lifespan(_: FastAPI, state: State) -> AsyncIterator[State]: + + with log_context(_logger, logging.INFO, f"{__name__}"): + + # Validate input state + try: + redis_state = RedisLifespanState.model_validate(state) + redis_dsn_with_secrets = redis_state.REDIS_SETTINGS.build_redis_dsn( + redis_state.REDIS_CLIENT_DB + ) + + except ValidationError as exc: + raise RedisConfigurationError(validation_error=exc, state=state) from exc + + # Setup client + with log_context( + _logger, + logging.INFO, + f"Creating redis client with name={redis_state.REDIS_CLIENT_NAME}", + ): + + redis_client = RedisClientSDK( + redis_dsn_with_secrets, + client_name=redis_state.REDIS_CLIENT_NAME, + ) + + try: + + yield {"REDIS_CLIENT_SDK": redis_client} + + finally: + # Teardown client + if redis_client: + with log_catch(_logger, reraise=False): + await asyncio.wait_for( + redis_client.shutdown(), + # NOTE: shutdown already has a _HEALTHCHECK_TASK_TIMEOUT_S of 10s + timeout=20, + ) diff --git a/packages/service-library/tests/fastapi/test_redis_lifespan.py b/packages/service-library/tests/fastapi/test_redis_lifespan.py new file mode 100644 index 00000000000..9095a271d21 --- /dev/null +++ b/packages/service-library/tests/fastapi/test_redis_lifespan.py @@ -0,0 +1,130 @@ +# pylint: disable=protected-access +# pylint: disable=redefined-outer-name +# pylint: disable=too-many-arguments +# pylint: disable=unused-argument +# pylint: disable=unused-variable + +from collections.abc import AsyncIterator +from typing import Annotated, Any + +import pytest +import servicelib.fastapi.redis_lifespan +from asgi_lifespan import LifespanManager as ASGILifespanManager +from fastapi import FastAPI +from fastapi_lifespan_manager import LifespanManager, State +from pydantic import Field +from pytest_mock import MockerFixture, MockType +from pytest_simcore.helpers.monkeypatch_envs import setenvs_from_dict +from pytest_simcore.helpers.typing_env import EnvVarsDict +from servicelib.fastapi.redis_lifespan import ( + RedisConfigurationError, + RedisLifespanState, + redis_database_lifespan, +) +from settings_library.application import BaseApplicationSettings +from settings_library.redis import RedisDatabase, RedisSettings + + +@pytest.fixture +def mock_redis_client_sdk(mocker: MockerFixture) -> MockType: + return mocker.patch.object( + servicelib.fastapi.redis_lifespan, + "RedisClientSDK", + return_value=mocker.AsyncMock(), + ) + + +@pytest.fixture +def app_environment(monkeypatch: pytest.MonkeyPatch) -> EnvVarsDict: + return setenvs_from_dict( + monkeypatch, RedisSettings.model_json_schema()["examples"][0] + ) + + +@pytest.fixture +def app_lifespan( + app_environment: EnvVarsDict, + mock_redis_client_sdk: MockType, +) -> LifespanManager: + assert app_environment + + class AppSettings(BaseApplicationSettings): + CATALOG_REDIS: Annotated[ + RedisSettings, + Field(json_schema_extra={"auto_default_from_env": True}), + ] + + async def my_app_settings(app: FastAPI) -> AsyncIterator[State]: + app.state.settings = AppSettings.create_from_envs() + + yield RedisLifespanState( + REDIS_SETTINGS=app.state.settings.CATALOG_REDIS, + REDIS_CLIENT_NAME="test_client", + REDIS_CLIENT_DB=RedisDatabase.LOCKS, + ).model_dump() + + app_lifespan = LifespanManager() + app_lifespan.add(my_app_settings) + app_lifespan.add(redis_database_lifespan) + + assert not mock_redis_client_sdk.called + + return app_lifespan + + +async def test_lifespan_redis_database_in_an_app( + is_pdb_enabled: bool, + app_environment: EnvVarsDict, + mock_redis_client_sdk: MockType, + app_lifespan: LifespanManager, +): + app = FastAPI(lifespan=app_lifespan) + + async with ASGILifespanManager( + app, + startup_timeout=None if is_pdb_enabled else 10, + shutdown_timeout=None if is_pdb_enabled else 10, + ) as asgi_manager: + # Verify that the Redis client SDK was created + mock_redis_client_sdk.assert_called_once_with( + app.state.settings.CATALOG_REDIS.build_redis_dsn(RedisDatabase.LOCKS), + client_name="test_client", + ) + + # Verify that the Redis client SDK is in the lifespan manager state + assert "REDIS_CLIENT_SDK" in asgi_manager._state # noqa: SLF001 + assert app.state.settings.CATALOG_REDIS + assert ( + asgi_manager._state["REDIS_CLIENT_SDK"] # noqa: SLF001 + == mock_redis_client_sdk.return_value + ) + + # Verify that the Redis client SDK was shut down + redis_client: Any = mock_redis_client_sdk.return_value + redis_client.shutdown.assert_called_once() + + +async def test_lifespan_redis_database_with_invalid_settings( + is_pdb_enabled: bool, +): + async def my_app_settings(app: FastAPI) -> AsyncIterator[State]: + yield {"REDIS_SETTINGS": None} + + app_lifespan = LifespanManager() + app_lifespan.add(my_app_settings) + app_lifespan.add(redis_database_lifespan) + + app = FastAPI(lifespan=app_lifespan) + + with pytest.raises(RedisConfigurationError, match="Invalid redis") as excinfo: + async with ASGILifespanManager( + app, + startup_timeout=None if is_pdb_enabled else 10, + shutdown_timeout=None if is_pdb_enabled else 10, + ): + ... + + exception = excinfo.value + assert isinstance(exception, RedisConfigurationError) + assert exception.validation_error + assert exception.state["REDIS_SETTINGS"] is None diff --git a/packages/settings-library/src/settings_library/redis.py b/packages/settings-library/src/settings_library/redis.py index 6e21968d043..96ae2ebf8aa 100644 --- a/packages/settings-library/src/settings_library/redis.py +++ b/packages/settings-library/src/settings_library/redis.py @@ -2,6 +2,7 @@ from pydantic.networks import RedisDsn from pydantic.types import SecretStr +from pydantic_settings import SettingsConfigDict from .base import BaseCustomSettings from .basic_types import PortInt @@ -45,3 +46,18 @@ def build_redis_dsn(self, db_index: RedisDatabase) -> str: path=f"{db_index}", ) ) + + model_config = SettingsConfigDict( + json_schema_extra={ + "examples": [ + # minimal required + { + "REDIS_SECURE": "0", + "REDIS_HOST": "localhost", + "REDIS_PORT": "6379", + "REDIS_USER": "user", + "REDIS_PASSWORD": "secret", + } + ], + } + ) From 11db497d61b2753edf9a63fa7a680d6b9ea2de25 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Wed, 16 Apr 2025 19:19:10 +0200 Subject: [PATCH 2/6] =?UTF-8?q?=E2=9C=A8=20Add=20Redis=20settings=20to=20a?= =?UTF-8?q?pplication=20configuration=20and=20Docker=20Compose?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- services/catalog/src/simcore_service_catalog/cli.py | 11 +++++++++++ .../src/simcore_service_catalog/core/settings.py | 5 +++++ services/catalog/tests/unit/test_core_settings.py | 3 +++ services/docker-compose.yml | 10 ++++++++-- 4 files changed, 27 insertions(+), 2 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/cli.py b/services/catalog/src/simcore_service_catalog/cli.py index 0d4fbf5107b..970e57746a5 100644 --- a/services/catalog/src/simcore_service_catalog/cli.py +++ b/services/catalog/src/simcore_service_catalog/cli.py @@ -5,6 +5,7 @@ from settings_library.http_client_request import ClientRequestSettings from settings_library.postgres import PostgresSettings from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings from settings_library.utils_cli import ( create_settings_command, create_version_callback, @@ -80,6 +81,16 @@ def echo_dotenv(ctx: typer.Context, *, minimal: bool = True) -> None: ), ), ), + CATALOG_REDIS=os.environ.get( + "CATALOG_REDIS", + RedisSettings.create_from_envs( + REDIS_HOST=os.environ.get("REDIS_HOST", "replace-with-redis-host"), + REDIS_PORT=os.environ.get("REDIS_PORT", "6379"), + REDIS_PASSWORD=os.environ.get( + "REDIS_PASSWORD", "replace-with-redis-password" + ), + ), + ), CATALOG_DIRECTOR=DirectorSettings.create_from_envs( DIRECTOR_HOST=os.environ.get("DIRECTOR_HOST", "fake-director") ), diff --git a/services/catalog/src/simcore_service_catalog/core/settings.py b/services/catalog/src/simcore_service_catalog/core/settings.py index 5581bf4ba99..21a56f9ad01 100644 --- a/services/catalog/src/simcore_service_catalog/core/settings.py +++ b/services/catalog/src/simcore_service_catalog/core/settings.py @@ -22,6 +22,7 @@ from settings_library.http_client_request import ClientRequestSettings from settings_library.postgres import PostgresSettings from settings_library.rabbit import RabbitSettings +from settings_library.redis import RedisSettings from settings_library.tracing import TracingSettings from settings_library.utils_logging import MixinLoggingSettings @@ -96,6 +97,10 @@ class ApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): RabbitSettings, Field(json_schema_extra={"auto_default_from_env": True}) ] + CATALOG_REDIS: Annotated[ + RedisSettings, Field(json_schema_extra={"auto_default_from_env": True}) + ] + CATALOG_CLIENT_REQUEST: Annotated[ ClientRequestSettings | None, Field(json_schema_extra={"auto_default_from_env": True}), diff --git a/services/catalog/tests/unit/test_core_settings.py b/services/catalog/tests/unit/test_core_settings.py index 9f94c6c3588..68820a87df6 100644 --- a/services/catalog/tests/unit/test_core_settings.py +++ b/services/catalog/tests/unit/test_core_settings.py @@ -20,3 +20,6 @@ def test_valid_web_application_settings(app_environment: EnvVarsDict): assert settings assert settings == ApplicationSettings.create_from_envs() + + assert app_environment["REDIS_HOST"] == settings.CATALOG_REDIS.REDIS_HOST + assert app_environment["POSTGRES_HOST"] == settings.CATALOG_POSTGRES.POSTGRES_HOST diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 3378df3272e..dd14b4e7870 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -150,18 +150,20 @@ services: init: true hostname: "cat-{{.Node.Hostname}}-{{.Task.Slot}}" environment: + CATALOG_BACKGROUND_TASK_REST_TIME: ${CATALOG_BACKGROUND_TASK_REST_TIME} CATALOG_DEV_FEATURES_ENABLED: ${CATALOG_DEV_FEATURES_ENABLED} CATALOG_LOGLEVEL: ${CATALOG_LOGLEVEL} CATALOG_PROFILING: ${CATALOG_PROFILING} CATALOG_SERVICES_DEFAULT_RESOURCES: ${CATALOG_SERVICES_DEFAULT_RESOURCES} CATALOG_SERVICES_DEFAULT_SPECIFICATIONS: ${CATALOG_SERVICES_DEFAULT_SPECIFICATIONS} + CATALOG_TRACING: ${CATALOG_TRACING} DIRECTOR_DEFAULT_MAX_MEMORY: ${DIRECTOR_DEFAULT_MAX_MEMORY} DIRECTOR_DEFAULT_MAX_NANO_CPUS: ${DIRECTOR_DEFAULT_MAX_NANO_CPUS} DIRECTOR_HOST: ${DIRECTOR_HOST:-director} DIRECTOR_PORT: ${DIRECTOR_PORT:-8080} - LOG_FORMAT_LOCAL_DEV_ENABLED: ${LOG_FORMAT_LOCAL_DEV_ENABLED} LOG_FILTER_MAPPING : ${LOG_FILTER_MAPPING} + LOG_FORMAT_LOCAL_DEV_ENABLED: ${LOG_FORMAT_LOCAL_DEV_ENABLED} POSTGRES_DB: ${POSTGRES_DB} POSTGRES_HOST: ${POSTGRES_HOST} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} @@ -172,7 +174,11 @@ services: RABBIT_PORT: ${RABBIT_PORT} RABBIT_SECURE: ${RABBIT_SECURE} RABBIT_USER: ${RABBIT_USER} - CATALOG_TRACING: ${CATALOG_TRACING} + REDIS_HOST: ${REDIS_HOST} + REDIS_PASSWORD: ${REDIS_PASSWORD} + REDIS_PORT: ${REDIS_PORT} + REDIS_SECURE: ${REDIS_SECURE} + REDIS_USER: ${REDIS_USER} TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT} TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT} networks: From d0ba85221b68ae2eecbce78cac2058fc34c7bc53 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Wed, 16 Apr 2025 19:22:04 +0200 Subject: [PATCH 3/6] =?UTF-8?q?=E2=9C=A8=20Integrate=20Redis=20lifespan=20?= =?UTF-8?q?management=20into=20application=20settings?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/simcore_service_catalog/core/events.py | 14 +++++++++++++- services/catalog/tests/unit/conftest.py | 13 ++++++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/core/events.py b/services/catalog/src/simcore_service_catalog/core/events.py index 57089d9fc5e..e5d703a74dc 100644 --- a/services/catalog/src/simcore_service_catalog/core/events.py +++ b/services/catalog/src/simcore_service_catalog/core/events.py @@ -10,8 +10,13 @@ create_prometheus_instrumentationmain_input_state, prometheus_instrumentation_lifespan, ) +from servicelib.fastapi.redis_lifespan import ( + RedisLifespanState, + redis_database_lifespan, +) +from settings_library.redis import RedisDatabase -from .._meta import APP_FINISHED_BANNER_MSG, APP_STARTED_BANNER_MSG +from .._meta import APP_FINISHED_BANNER_MSG, APP_NAME, APP_STARTED_BANNER_MSG from ..api.rpc.events import rpc_api_lifespan from ..clients.director import director_lifespan from ..clients.rabbitmq import rabbitmq_lifespan @@ -43,6 +48,11 @@ async def _settings_lifespan(app: FastAPI) -> AsyncIterator[State]: settings: ApplicationSettings = app.state.settings yield { + **RedisLifespanState( + REDIS_SETTINGS=settings.CATALOG_REDIS, + REDIS_CLIENT_NAME=APP_NAME, + REDIS_CLIENT_DB=RedisDatabase.LOCKS, + ).model_dump(), **create_postgres_database_input_state(settings.CATALOG_POSTGRES), **create_prometheus_instrumentationmain_input_state( enabled=settings.CATALOG_PROMETHEUS_INSTRUMENTATION_ENABLED @@ -70,7 +80,9 @@ def create_app_lifespan() -> LifespanManager: # - function services app_lifespan.add(function_services_lifespan) + # - redis # - background task + app_lifespan.add(redis_database_lifespan) app_lifespan.add(background_task_lifespan) # - prometheus instrumentation diff --git a/services/catalog/tests/unit/conftest.py b/services/catalog/tests/unit/conftest.py index 296cc47bd19..d050bbf68d8 100644 --- a/services/catalog/tests/unit/conftest.py +++ b/services/catalog/tests/unit/conftest.py @@ -227,7 +227,18 @@ def repository_lifespan_disabled(mocker: MockerFixture): @pytest.fixture -def background_task_lifespan_disabled(mocker: MockerFixture) -> None: +def redis_lifespan_disabled(mocker: MockerFixture): + mocker.patch.object( + simcore_service_catalog.core.events, + "redis_database_lifespan", + autospec=True, + ) + + +@pytest.fixture +def background_task_lifespan_disabled( + mocker: MockerFixture, redis_lifespan_disabled: None +) -> None: class MockedBackgroundTaskContextManager: async def __aenter__(self): print( From 428f6a691d4e8cf6b703bde9dc1393ba4938230f Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Wed, 16 Apr 2025 20:14:18 +0200 Subject: [PATCH 4/6] =?UTF-8?q?=E2=9C=A8=20Enhance=20lifespan=20management?= =?UTF-8?q?:=20add=20checks=20for=20multiple=20calls=20and=20improve=20err?= =?UTF-8?q?or=20messaging?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/servicelib/fastapi/lifespan_utils.py | 36 +++++++++++++++++- .../servicelib/fastapi/postgres_lifespan.py | 6 ++- .../src/servicelib/fastapi/redis_lifespan.py | 12 +++--- .../tests/fastapi/test_lifespan_utils.py | 38 +++++++++++++++++-- 4 files changed, 78 insertions(+), 14 deletions(-) diff --git a/packages/service-library/src/servicelib/fastapi/lifespan_utils.py b/packages/service-library/src/servicelib/fastapi/lifespan_utils.py index 8b16f5bec19..e8605e66ccd 100644 --- a/packages/service-library/src/servicelib/fastapi/lifespan_utils.py +++ b/packages/service-library/src/servicelib/fastapi/lifespan_utils.py @@ -1,12 +1,44 @@ +from typing import Final + from common_library.errors_classes import OsparcErrorMixin +from fastapi import FastAPI +from fastapi_lifespan_manager import State class LifespanError(OsparcErrorMixin, RuntimeError): ... class LifespanOnStartupError(LifespanError): - msg_template = "Failed during startup of {module}" + msg_template = "Failed during startup of {lifespan_name}" class LifespanOnShutdownError(LifespanError): - msg_template = "Failed during shutdown of {module}" + msg_template = "Failed during shutdown of {lifespan_name}" + + +class LifespanAlreadyCalledError(LifespanError): + msg_template = "The lifespan '{lifespan_name}' has already been called." + + +_CALLED_LIFESPANS_KEY: Final[str] = "_CALLED_LIFESPANS" + + +def is_lifespan_called(state: State, lifespan_name: str) -> bool: + called_lifespans = state.get(_CALLED_LIFESPANS_KEY, set()) + return lifespan_name in called_lifespans + + +def record_lifespan_called_once(state: State, lifespan_name: str) -> State: + """Validates if a lifespan has already been called and records it in the state. + Raises LifespanAlreadyCalledError if the lifespan has already been called. + """ + assert not isinstance( # nosec + state, FastAPI + ), "TIP: lifespan func has (app, state) positional arguments" + + if is_lifespan_called(state, lifespan_name): + raise LifespanAlreadyCalledError(lifespan_name=lifespan_name) + + called_lifespans = state.get(_CALLED_LIFESPANS_KEY, set()) + called_lifespans.add(lifespan_name) + return {_CALLED_LIFESPANS_KEY: called_lifespans} diff --git a/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py b/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py index cc207e6f397..5d257725062 100644 --- a/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py +++ b/packages/service-library/src/servicelib/fastapi/postgres_lifespan.py @@ -10,7 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine from ..db_asyncpg_utils import create_async_engine_and_database_ready -from .lifespan_utils import LifespanOnStartupError +from .lifespan_utils import LifespanOnStartupError, record_lifespan_called_once _logger = logging.getLogger(__name__) @@ -32,6 +32,9 @@ async def postgres_database_lifespan(_: FastAPI, state: State) -> AsyncIterator[ with log_context(_logger, logging.INFO, f"{__name__}"): + # Mark lifespan as called + called_state = record_lifespan_called_once(state, "postgres_database_lifespan") + settings = state[PostgresLifespanState.POSTGRES_SETTINGS] if settings is None or not isinstance(settings, PostgresSettings): @@ -48,6 +51,7 @@ async def postgres_database_lifespan(_: FastAPI, state: State) -> AsyncIterator[ yield { PostgresLifespanState.POSTGRES_ASYNC_ENGINE: async_engine, + **called_state, } finally: diff --git a/packages/service-library/src/servicelib/fastapi/redis_lifespan.py b/packages/service-library/src/servicelib/fastapi/redis_lifespan.py index 8594138750b..3424da57517 100644 --- a/packages/service-library/src/servicelib/fastapi/redis_lifespan.py +++ b/packages/service-library/src/servicelib/fastapi/redis_lifespan.py @@ -10,7 +10,7 @@ from settings_library.redis import RedisDatabase, RedisSettings from ..redis import RedisClientSDK -from .lifespan_utils import LifespanOnStartupError +from .lifespan_utils import LifespanOnStartupError, record_lifespan_called_once _logger = logging.getLogger(__name__) @@ -31,16 +31,17 @@ class RedisLifespanState(BaseModel): async def redis_database_lifespan(_: FastAPI, state: State) -> AsyncIterator[State]: - with log_context(_logger, logging.INFO, f"{__name__}"): + # Check if lifespan has already been called + called_state = record_lifespan_called_once(state, "redis_database_lifespan") + # Validate input state try: redis_state = RedisLifespanState.model_validate(state) redis_dsn_with_secrets = redis_state.REDIS_SETTINGS.build_redis_dsn( redis_state.REDIS_CLIENT_DB ) - except ValidationError as exc: raise RedisConfigurationError(validation_error=exc, state=state) from exc @@ -50,16 +51,13 @@ async def redis_database_lifespan(_: FastAPI, state: State) -> AsyncIterator[Sta logging.INFO, f"Creating redis client with name={redis_state.REDIS_CLIENT_NAME}", ): - redis_client = RedisClientSDK( redis_dsn_with_secrets, client_name=redis_state.REDIS_CLIENT_NAME, ) try: - - yield {"REDIS_CLIENT_SDK": redis_client} - + yield {"REDIS_CLIENT_SDK": redis_client, **called_state} finally: # Teardown client if redis_client: diff --git a/packages/service-library/tests/fastapi/test_lifespan_utils.py b/packages/service-library/tests/fastapi/test_lifespan_utils.py index 0c3d2767d2a..e247cf2a19b 100644 --- a/packages/service-library/tests/fastapi/test_lifespan_utils.py +++ b/packages/service-library/tests/fastapi/test_lifespan_utils.py @@ -16,8 +16,10 @@ from pytest_mock import MockerFixture from pytest_simcore.helpers.logging_tools import log_context from servicelib.fastapi.lifespan_utils import ( + LifespanAlreadyCalledError, LifespanOnShutdownError, LifespanOnStartupError, + record_lifespan_called_once, ) @@ -186,7 +188,7 @@ async def lifespan_failing_on_startup(app: FastAPI) -> AsyncIterator[State]: startup_step(_name) except RuntimeError as exc: handle_error(_name, exc) - raise LifespanOnStartupError(module=_name) from exc + raise LifespanOnStartupError(lifespan_name=_name) from exc yield {} shutdown_step(_name) @@ -201,7 +203,7 @@ async def lifespan_failing_on_shutdown(app: FastAPI) -> AsyncIterator[State]: shutdown_step(_name) except RuntimeError as exc: handle_error(_name, exc) - raise LifespanOnShutdownError(module=_name) from exc + raise LifespanOnShutdownError(lifespan_name=_name) from exc return { "startup_step": startup_step, @@ -228,7 +230,7 @@ async def test_app_lifespan_with_error_on_startup( assert not failing_lifespan_manager["startup_step"].called assert not failing_lifespan_manager["shutdown_step"].called assert exception.error_context() == { - "module": "lifespan_failing_on_startup", + "lifespan_name": "lifespan_failing_on_startup", "message": "Failed during startup of lifespan_failing_on_startup", "code": "RuntimeError.LifespanError.LifespanOnStartupError", } @@ -250,7 +252,35 @@ async def test_app_lifespan_with_error_on_shutdown( assert failing_lifespan_manager["startup_step"].called assert not failing_lifespan_manager["shutdown_step"].called assert exception.error_context() == { - "module": "lifespan_failing_on_shutdown", + "lifespan_name": "lifespan_failing_on_shutdown", "message": "Failed during shutdown of lifespan_failing_on_shutdown", "code": "RuntimeError.LifespanError.LifespanOnShutdownError", } + + +async def test_lifespan_called_more_than_once(is_pdb_enabled: bool): + state = {} + + app_lifespan = LifespanManager() + + @app_lifespan.add + async def _one(_, state: State) -> AsyncIterator[State]: + called_state = record_lifespan_called_once(state, "test_lifespan_one") + yield {"other": 0, **called_state} + + @app_lifespan.add + async def _two(_, state: State) -> AsyncIterator[State]: + called_state = record_lifespan_called_once(state, "test_lifespan_two") + yield {"something": 0, **called_state} + + app_lifespan.add(_one) # added "by mistake" + + with pytest.raises(LifespanAlreadyCalledError) as err_info: + async with ASGILifespanManager( + FastAPI(lifespan=app_lifespan), + startup_timeout=None if is_pdb_enabled else 10, + shutdown_timeout=None if is_pdb_enabled else 10, + ): + ... + + assert err_info.value.lifespan_name == "test_lifespan_one" From 3ba956db2094d7505aba4d994658738db965ab41 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Wed, 16 Apr 2025 20:15:17 +0200 Subject: [PATCH 5/6] =?UTF-8?q?=E2=9C=A8=20Implement=20Redis=20client=20li?= =?UTF-8?q?fespan=20management=20and=20integrate=20into=20application=20li?= =?UTF-8?q?fecycle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../simcore_service_catalog/clients/redis.py | 25 +++++++++++++++++++ .../simcore_service_catalog/core/events.py | 5 ++-- services/catalog/tests/unit/conftest.py | 6 ++--- 3 files changed, 31 insertions(+), 5 deletions(-) create mode 100644 services/catalog/src/simcore_service_catalog/clients/redis.py diff --git a/services/catalog/src/simcore_service_catalog/clients/redis.py b/services/catalog/src/simcore_service_catalog/clients/redis.py new file mode 100644 index 00000000000..0dfa5ed3949 --- /dev/null +++ b/services/catalog/src/simcore_service_catalog/clients/redis.py @@ -0,0 +1,25 @@ +from collections.abc import AsyncIterator +from typing import cast + +from fastapi import FastAPI +from fastapi_lifespan_manager import LifespanManager, State +from servicelib.fastapi.redis_lifespan import ( + redis_database_lifespan, +) +from servicelib.redis import RedisClientSDK + +redis_client_lifespan = LifespanManager() +redis_client_lifespan.add(redis_database_lifespan) + + +@redis_client_lifespan.add +async def _redis_client_sdk_lifespan( + app: FastAPI, state: State +) -> AsyncIterator[State]: + app.state.redis_client_sdk = state["REDIS_CLIENT_SDK"] + yield {} + del app.state.redis_client_sdk + + +def get_redis_client(app: FastAPI) -> RedisClientSDK: + return cast(RedisClientSDK, app.state.redis_client_sdk) diff --git a/services/catalog/src/simcore_service_catalog/core/events.py b/services/catalog/src/simcore_service_catalog/core/events.py index e5d703a74dc..f8144346f80 100644 --- a/services/catalog/src/simcore_service_catalog/core/events.py +++ b/services/catalog/src/simcore_service_catalog/core/events.py @@ -12,7 +12,6 @@ ) from servicelib.fastapi.redis_lifespan import ( RedisLifespanState, - redis_database_lifespan, ) from settings_library.redis import RedisDatabase @@ -20,6 +19,7 @@ from ..api.rpc.events import rpc_api_lifespan from ..clients.director import director_lifespan from ..clients.rabbitmq import rabbitmq_lifespan +from ..clients.redis import redis_client_lifespan from ..repository.events import repository_lifespan_manager from ..service.function_services import function_services_lifespan from .background_tasks import background_task_lifespan @@ -81,8 +81,9 @@ def create_app_lifespan() -> LifespanManager: app_lifespan.add(function_services_lifespan) # - redis + app_lifespan.include(redis_client_lifespan) + # - background task - app_lifespan.add(redis_database_lifespan) app_lifespan.add(background_task_lifespan) # - prometheus instrumentation diff --git a/services/catalog/tests/unit/conftest.py b/services/catalog/tests/unit/conftest.py index d050bbf68d8..f4efc4c539c 100644 --- a/services/catalog/tests/unit/conftest.py +++ b/services/catalog/tests/unit/conftest.py @@ -227,17 +227,17 @@ def repository_lifespan_disabled(mocker: MockerFixture): @pytest.fixture -def redis_lifespan_disabled(mocker: MockerFixture): +def redis_client_lifespan(mocker: MockerFixture): mocker.patch.object( simcore_service_catalog.core.events, - "redis_database_lifespan", + "redis_client_lifespan", autospec=True, ) @pytest.fixture def background_task_lifespan_disabled( - mocker: MockerFixture, redis_lifespan_disabled: None + mocker: MockerFixture, redis_client_lifespan: None ) -> None: class MockedBackgroundTaskContextManager: async def __aenter__(self): From 21b0759920c69706ce9990516ebe353f460dfcd2 Mon Sep 17 00:00:00 2001 From: Pedro Crespo-Valero <32402063+pcrespov@users.noreply.github.com> Date: Wed, 16 Apr 2025 20:31:10 +0200 Subject: [PATCH 6/6] =?UTF-8?q?=E2=9C=A8=20Refactor=20background=20task=20?= =?UTF-8?q?synchronization:=20rename=20functions=20and=20improve=20task=20?= =?UTF-8?q?management?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/background_tasks.py | 93 +++++++------------ .../test_core_background_task__sync.py | 6 +- 2 files changed, 39 insertions(+), 60 deletions(-) diff --git a/services/catalog/src/simcore_service_catalog/core/background_tasks.py b/services/catalog/src/simcore_service_catalog/core/background_tasks.py index fea4c6537d8..630c01482cc 100644 --- a/services/catalog/src/simcore_service_catalog/core/background_tasks.py +++ b/services/catalog/src/simcore_service_catalog/core/background_tasks.py @@ -12,16 +12,18 @@ import asyncio import logging from collections.abc import AsyncIterator -from contextlib import suppress -from pprint import pformat +from datetime import timedelta from typing import Final +from common_library.json_serialization import json_dumps, representation_encoder from fastapi import FastAPI, HTTPException from fastapi_lifespan_manager import State from models_library.services import ServiceMetaDataPublished from models_library.services_types import ServiceKey, ServiceVersion from packaging.version import Version from pydantic import ValidationError +from servicelib.async_utils import cancel_wait_task +from servicelib.background_task_utils import exclusive_periodic from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncEngine @@ -35,10 +37,8 @@ _logger = logging.getLogger(__name__) -async def _list_services_in_database( - db_engine: AsyncEngine, -): - services_repo = ServicesRepository(db_engine=db_engine) +async def _list_services_in_database(app: FastAPI): + services_repo = ServicesRepository(app.state.engine) return { (service.key, service.version) for service in await services_repo.list_services() @@ -117,7 +117,7 @@ async def _ensure_registry_and_database_are_synced(app: FastAPI) -> None: services_in_manifest_map = await manifest.get_services_map(director_api) services_in_db: set[tuple[ServiceKey, ServiceVersion]] = ( - await _list_services_in_database(app.state.engine) + await _list_services_in_database(app) ) # check that the db has all the services at least once @@ -125,7 +125,9 @@ async def _ensure_registry_and_database_are_synced(app: FastAPI) -> None: if missing_services_in_db: _logger.debug( "Missing services in db: %s", - pformat(missing_services_in_db), + json_dumps( + missing_services_in_db, default=representation_encoder, indent=1 + ), ) # update db @@ -175,7 +177,7 @@ async def _ensure_published_templates_accessible( await services_repo.upsert_service_access_rights(missing_services_access_rights) -async def _run_sync_services(app: FastAPI): +async def _sync_services_in_registry(app: FastAPI): default_product: Final[str] = app.state.default_product_name engine: AsyncEngine = app.state.engine @@ -187,57 +189,32 @@ async def _run_sync_services(app: FastAPI): await _ensure_published_templates_accessible(engine, default_product) -async def _sync_services_task(app: FastAPI) -> None: - while app.state.registry_syncer_running: - try: - _logger.debug("Syncing services between registry and database...") - - await _run_sync_services(app) - - await asyncio.sleep(app.state.settings.CATALOG_BACKGROUND_TASK_REST_TIME) +_TASK_NAME_PERIODIC_SYNC_SERVICES = f"{__name__}.{_sync_services_in_registry.__name__}" - except asyncio.CancelledError: # noqa: PERF203 - # task is stopped - _logger.info("registry syncing task cancelled") - raise - - except Exception: # pylint: disable=broad-except - if not app.state.registry_syncer_running: - _logger.warning("registry syncing task forced to stop") - break - _logger.exception( - "Unexpected error while syncing registry entries, restarting now..." - ) - # wait a bit before retrying, so it does not block everything until the director is up - await asyncio.sleep( - app.state.settings.CATALOG_BACKGROUND_TASK_WAIT_AFTER_FAILURE - ) - - -async def start_registry_sync_task(app: FastAPI) -> None: - # FIXME: added this variable to overcome the state in which the - # task cancelation is ignored and the exceptions enter in a loop - # that never stops the background task. This flag is an additional - # mechanism to enforce stopping the background task - app.state.registry_syncer_running = True - task = asyncio.create_task(_sync_services_task(app)) - app.state.registry_sync_task = task - _logger.info("registry syncing task started") +async def background_task_lifespan(app: FastAPI) -> AsyncIterator[State]: + assert app.state.settings # nosec + assert app.state.redis_client # nosec + assert app.state.engine # nosec + assert app.state.default_product_name # nosec + + settings = app.state.settings + + @exclusive_periodic( + app.state.redis_client, + task_interval=timedelta(seconds=settings.CATALOG_BACKGROUND_TASK_REST_TIME), + retry_after=timedelta( + seconds=settings.CATALOG_BACKGROUND_TASK_WAIT_AFTER_FAILURE + ), + ) + async def _sync_services_task() -> None: + await _sync_services_in_registry(app) -async def stop_registry_sync_task(app: FastAPI) -> None: - if task := app.state.registry_sync_task: - with suppress(asyncio.CancelledError): - app.state.registry_syncer_running = False - task.cancel() - await task - app.state.registry_sync_task = None - _logger.info("registry syncing task stopped") + app.state.sync_services_task = asyncio.create_task( + _sync_services_task(), name=_TASK_NAME_PERIODIC_SYNC_SERVICES + ) + yield {} -async def background_task_lifespan(app: FastAPI) -> AsyncIterator[State]: - await start_registry_sync_task(app) - try: - yield {} - finally: - await stop_registry_sync_task(app) + assert isinstance(app.state.sync_services_task, asyncio.Task) # nosec + await cancel_wait_task(app.state.sync_services_task) diff --git a/services/catalog/tests/unit/with_dbs/test_core_background_task__sync.py b/services/catalog/tests/unit/with_dbs/test_core_background_task__sync.py index 8e61016ad06..1f06d12b55b 100644 --- a/services/catalog/tests/unit/with_dbs/test_core_background_task__sync.py +++ b/services/catalog/tests/unit/with_dbs/test_core_background_task__sync.py @@ -14,7 +14,9 @@ from pytest_mock import MockerFixture from respx.router import MockRouter from simcore_postgres_database.models.services import services_meta_data -from simcore_service_catalog.core.background_tasks import _run_sync_services +from simcore_service_catalog.core.background_tasks import ( + _sync_services_in_registry, +) from simcore_service_catalog.repository.services import ServicesRepository from sqlalchemy.ext.asyncio.engine import AsyncEngine @@ -80,7 +82,7 @@ async def test_registry_sync_task( assert not got_from_db # let's sync - await _run_sync_services(app) + await _sync_services_in_registry(app) # after sync, it should be in db as well got_from_db = await services_repo.get_service_with_history(