From b20d974e81cceb850f6c5456f607ba3c18bca81c Mon Sep 17 00:00:00 2001 From: Tiago Santana <54704492+SantanaTiago@users.noreply.github.com> Date: Tue, 19 Aug 2025 09:00:54 +0100 Subject: [PATCH 1/3] add prometheus telemetry middleware Signed-off-by: Tiago Santana <54704492+SantanaTiago@users.noreply.github.com> --- docling_serve/app.py | 60 ++++++++++++++++++++++++++++---------------- pyproject.toml | 1 + uv.lock | 24 ++++++++++++++++++ 3 files changed, 63 insertions(+), 22 deletions(-) diff --git a/docling_serve/app.py b/docling_serve/app.py index f04b836e..48b85c29 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -28,6 +28,7 @@ ) from fastapi.responses import JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles +from prometheus_fastapi_instrumentator import Instrumentator from scalar_fastapi import get_scalar_api_reference from docling.datamodel.base_models import DocumentStream @@ -108,33 +109,44 @@ def format(self, record): # Context manager to initialize and clean up the lifespan of the FastAPI app -@asynccontextmanager -async def lifespan(app: FastAPI): - scratch_dir = get_scratch() +def create_lifespan_handler(instrumentator: Instrumentator): + """ + Create a FastAPI lifespan handler for the application - orchestrator = get_async_orchestrator() - notifier = WebsocketNotifier(orchestrator) - orchestrator.bind_notifier(notifier) + @param instrumentator: A prometheus instrumentator used to expose metrics + """ - # Warm up processing cache - if docling_serve_settings.load_models_at_boot: - await orchestrator.warm_up_caches() + @asynccontextmanager + async def lifespan(app: FastAPI): + scratch_dir = get_scratch() - # Start the background queue processor - queue_task = asyncio.create_task(orchestrator.process_queue()) + orchestrator = get_async_orchestrator() + notifier = WebsocketNotifier(orchestrator) + orchestrator.bind_notifier(notifier) - yield + # Warm up processing cache + if docling_serve_settings.load_models_at_boot: + await orchestrator.warm_up_caches() - # Cancel the background queue processor on shutdown - queue_task.cancel() - try: - await queue_task - except asyncio.CancelledError: - _log.info("Queue processor cancelled.") + # Start the background queue processor + queue_task = asyncio.create_task(orchestrator.process_queue()) + + instrumentator.expose(app) + + yield + + # Cancel the background queue processor on shutdown + queue_task.cancel() + try: + await queue_task + except asyncio.CancelledError: + _log.info("Queue processor cancelled.") + + # Remove scratch directory in case it was a tempfile + if docling_serve_settings.scratch_path is not None: + shutil.rmtree(scratch_dir, ignore_errors=True) - # Remove scratch directory in case it was a tempfile - if docling_serve_settings.scratch_path is not None: - shutil.rmtree(scratch_dir, ignore_errors=True) + return lifespan ################################## @@ -159,11 +171,13 @@ def create_app(): # noqa: C901 _log.info("Found static assets.") require_auth = APIKeyAuth(docling_serve_settings.api_key) + + instrumentator = Instrumentator() app = FastAPI( title="Docling Serve", docs_url=None if offline_docs_assets else "/swagger", redoc_url=None if offline_docs_assets else "/docs", - lifespan=lifespan, + lifespan=create_lifespan_handler(instrumentator), version=version, ) @@ -179,6 +193,8 @@ def create_app(): # noqa: C901 allow_headers=headers, ) + instrumentator.expose(app) + # Mount the Gradio app if docling_serve_settings.enable_ui: try: diff --git a/pyproject.toml b/pyproject.toml index 63e3361a..0dc4e922 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "websockets~=14.0", "scalar-fastapi>=1.0.3", "docling-mcp>=1.0.0", + "prometheus-fastapi-instrumentator>=7.1.0", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 5e8820b5..fbbd2622 100644 --- a/uv.lock +++ b/uv.lock @@ -1031,6 +1031,7 @@ dependencies = [ { name = "docling-mcp", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, { name = "fastapi", extra = ["standard"], marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, { name = "httpx", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, + { name = "prometheus-fastapi-instrumentator", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, { name = "pydantic", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, { name = "pydantic-settings", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, { name = "python-multipart", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, @@ -1106,6 +1107,7 @@ requires-dist = [ { name = "gradio", marker = "extra == 'ui'", specifier = "~=5.9" }, { name = "httpx", specifier = "~=0.28" }, { name = "onnxruntime", marker = "extra == 'rapidocr'", specifier = "~=1.7" }, + { name = "prometheus-fastapi-instrumentator", specifier = ">=7.1.0" }, { name = "pydantic", specifier = "~=2.10" }, { name = "pydantic", marker = "extra == 'ui'", specifier = "<2.11.0" }, { name = "pydantic-settings", specifier = "~=2.4" }, @@ -3906,6 +3908,28 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f1/70/1b65f9118ef64f6ffe5d57a67170bbff25d4f4a3d1cb78e8ed3392e16114/pre_commit_uv-4.1.4-py3-none-any.whl", hash = "sha256:7f01fb494fa1caa5097d20a38f71df7cea0209197b2564699cef9b3f3aa9d135", size = 5578, upload-time = "2024-10-29T23:07:27.128Z" }, ] +[[package]] +name = "prometheus-client" +version = "0.22.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/5e/cf/40dde0a2be27cc1eb41e333d1a674a74ce8b8b0457269cc640fd42b07cf7/prometheus_client-0.22.1.tar.gz", hash = "sha256:190f1331e783cf21eb60bca559354e0a4d4378facecf78f5428c39b675d20d28", size = 69746, upload-time = "2025-06-02T14:29:01.152Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/32/ae/ec06af4fe3ee72d16973474f122541746196aaa16cea6f66d18b963c6177/prometheus_client-0.22.1-py3-none-any.whl", hash = "sha256:cca895342e308174341b2cbf99a56bef291fbc0ef7b9e5412a0f26d653ba7094", size = 58694, upload-time = "2025-06-02T14:29:00.068Z" }, +] + +[[package]] +name = "prometheus-fastapi-instrumentator" +version = "7.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "prometheus-client", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, + { name = "starlette", marker = "platform_machine != 'x86_64' or sys_platform != 'darwin' or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu124') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cpu' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu126') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu124' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-cu128') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu126' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-pypi') or (extra == 'group-13-docling-serve-cu128' and extra == 'group-13-docling-serve-rocm') or (extra == 'group-13-docling-serve-pypi' and extra == 'group-13-docling-serve-rocm')" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/69/6d/24d53033cf93826aa7857699a4450c1c67e5b9c710e925b1ed2b320c04df/prometheus_fastapi_instrumentator-7.1.0.tar.gz", hash = "sha256:be7cd61eeea4e5912aeccb4261c6631b3f227d8924542d79eaf5af3f439cbe5e", size = 20220, upload-time = "2025-03-19T19:35:05.351Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/27/72/0824c18f3bc75810f55dacc2dd933f6ec829771180245ae3cc976195dec0/prometheus_fastapi_instrumentator-7.1.0-py3-none-any.whl", hash = "sha256:978130f3c0bb7b8ebcc90d35516a6fe13e02d2eb358c8f83887cdef7020c31e9", size = 19296, upload-time = "2025-03-19T19:35:04.323Z" }, +] + [[package]] name = "propcache" version = "0.3.2" From 780ec51e1442db85de340374d37b9d1c93ab3419 Mon Sep 17 00:00:00 2001 From: Tiago Santana <54704492+SantanaTiago@users.noreply.github.com> Date: Tue, 26 Aug 2025 11:10:56 +0100 Subject: [PATCH 2/3] improve metrics Signed-off-by: Tiago Santana <54704492+SantanaTiago@users.noreply.github.com> --- docling_serve/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docling_serve/app.py b/docling_serve/app.py index 48b85c29..9e0475d5 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -193,7 +193,7 @@ def create_app(): # noqa: C901 allow_headers=headers, ) - instrumentator.expose(app) + instrumentator.instrument(app).expose(app) # Mount the Gradio app if docling_serve_settings.enable_ui: From 97d4315bc480ad0438b1c053356ea940ec6b4f79 Mon Sep 17 00:00:00 2001 From: Tiago Santana <54704492+SantanaTiago@users.noreply.github.com> Date: Wed, 3 Sep 2025 11:16:59 +0100 Subject: [PATCH 3/3] add rq metrics Signed-off-by: Tiago Santana <54704492+SantanaTiago@users.noreply.github.com> --- docling_serve/app.py | 10 +- docling_serve/rq_metrics_collector.py | 132 ++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 docling_serve/rq_metrics_collector.py diff --git a/docling_serve/app.py b/docling_serve/app.py index 9e0475d5..1490c34d 100644 --- a/docling_serve/app.py +++ b/docling_serve/app.py @@ -28,6 +28,7 @@ ) from fastapi.responses import JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles +from prometheus_client.core import REGISTRY from prometheus_fastapi_instrumentator import Instrumentator from scalar_fastapi import get_scalar_api_reference @@ -71,7 +72,8 @@ from docling_serve.helper_functions import FormDepends from docling_serve.orchestrator_factory import get_async_orchestrator from docling_serve.response_preparation import prepare_response -from docling_serve.settings import docling_serve_settings +from docling_serve.rq_metrics_collector import RQCollector, get_redis_connection +from docling_serve.settings import AsyncEngine, docling_serve_settings from docling_serve.storage import get_scratch from docling_serve.websocket_notifier import WebsocketNotifier @@ -131,6 +133,12 @@ async def lifespan(app: FastAPI): # Start the background queue processor queue_task = asyncio.create_task(orchestrator.process_queue()) + if docling_serve_settings.eng_kind == AsyncEngine.RQ: + connection = get_redis_connection( + url=docling_serve_settings.eng_rq_redis_url + ) + REGISTRY.register(RQCollector(connection)) + instrumentator.expose(app) yield diff --git a/docling_serve/rq_metrics_collector.py b/docling_serve/rq_metrics_collector.py new file mode 100644 index 00000000..7f596b39 --- /dev/null +++ b/docling_serve/rq_metrics_collector.py @@ -0,0 +1,132 @@ +# Heavily based on https://github.com/mdawar/rq-exporter, thank you <3 +import logging + +from prometheus_client import Summary +from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily +from prometheus_client.registry import Collector +from redis import Redis +from rq import Queue, Worker +from rq.job import JobStatus + +logger = logging.getLogger(__name__) + + +def get_redis_connection(url: str): + return Redis.from_url(url) + + +def get_workers_stats(connection): + """Get the RQ workers stats.""" + + workers = Worker.all(connection) + + return [ + { + "name": w.name, + "queues": w.queue_names(), + "state": w.get_state(), + "successful_job_count": w.successful_job_count, + "failed_job_count": w.failed_job_count, + "total_working_time": w.total_working_time, + } + for w in workers + ] + + +def get_queue_jobs(connection, queue_name): + """Get the jobs by status of a Queue.""" + + queue = Queue(connection=connection, name=queue_name) + + return { + JobStatus.QUEUED: queue.count, + JobStatus.STARTED: queue.started_job_registry.count, + JobStatus.FINISHED: queue.finished_job_registry.count, + JobStatus.FAILED: queue.failed_job_registry.count, + JobStatus.DEFERRED: queue.deferred_job_registry.count, + JobStatus.SCHEDULED: queue.scheduled_job_registry.count, + } + + +def get_jobs_by_queue(connection): + """Get the current jobs by queue""" + + queues = Queue.all(connection) + + return {q.name: get_queue_jobs(connection, q.name) for q in queues} + + +class RQCollector(Collector): + """RQ stats collector.""" + + def __init__(self, connection=None): + self.connection = connection + + # RQ data collection count and time in seconds + self.summary = Summary( + "rq_request_processing_seconds", "Time spent collecting RQ data" + ) + + def collect(self): + """Collect RQ Metrics.""" + logger.debug("Collecting the RQ metrics...") + + with self.summary.time(): + rq_workers = GaugeMetricFamily( + "rq_workers", + "RQ workers", + labels=["name", "state", "queues"], + ) + rq_workers_success = CounterMetricFamily( + "rq_workers_success", + "RQ workers success count", + labels=["name", "queues"], + ) + rq_workers_failed = CounterMetricFamily( + "rq_workers_failed", + "RQ workers fail count", + labels=["name", "queues"], + ) + rq_workers_working_time = CounterMetricFamily( + "rq_workers_working_time", + "RQ workers spent seconds", + labels=["name", "queues"], + ) + rq_jobs = GaugeMetricFamily( + "rq_jobs", + "RQ jobs by state", + labels=["queue", "status"], + ) + + workers = get_workers_stats(self.connection) + for worker in workers: + label_queues = ",".join(worker["queues"]) + rq_workers.add_metric( + [worker["name"], worker["state"], label_queues], + 1, + ) + rq_workers_success.add_metric( + [worker["name"], label_queues], + worker["successful_job_count"], + ) + rq_workers_failed.add_metric( + [worker["name"], label_queues], + worker["failed_job_count"], + ) + rq_workers_working_time.add_metric( + [worker["name"], label_queues], + worker["total_working_time"], + ) + + yield rq_workers + yield rq_workers_success + yield rq_workers_failed + yield rq_workers_working_time + + for queue_name, jobs in get_jobs_by_queue(self.connection).items(): + for status, count in jobs.items(): + rq_jobs.add_metric([queue_name, status], count) + + yield rq_jobs + + logger.debug("RQ metrics collection finished")