Skip to content

RabbitMQ: Adding the queue_delivery_metrics OpenMetrics endpoint #19963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Apr 22, 2025
1 change: 1 addition & 0 deletions rabbitmq/changelog.d/19963.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for queue_delivery_metrics group
16 changes: 16 additions & 0 deletions rabbitmq/datadog_checks/rabbitmq/openmetrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@
"rabbitmq_queue_index_write_ops_total": "queue.index.write_ops",
"rabbitmq_queue_messages_published_total": "queue.messages.published",
"rabbitmq_queue_process_reductions_total": "queue.process_reductions",
"rabbitmq_queue_get_ack_total": "queue.get.ack",
"rabbitmq_queue_get_total": "queue.get",
"rabbitmq_queue_messages_delivered_ack_total": "queue.messages.delivered.ack",
"rabbitmq_queue_messages_delivered_total": "queue.messages.delivered",
"rabbitmq_queue_messages_redelivered_total": "queue.messages.redelivered",
"rabbitmq_queue_messages_acked_total": "queue.messages.acked",
"rabbitmq_queue_get_empty_total": "queue.get.empty",
"rabbitmq_queues_created_total": "queues.created",
"rabbitmq_queues_declared_total": "queues.declared",
"rabbitmq_queues_deleted_total": "queues.deleted",
Expand Down Expand Up @@ -356,6 +363,15 @@
"rabbitmq_queue_disk_reads_total",
"rabbitmq_queue_disk_writes_total",
},
"queue_delivery_metrics": {
"rabbitmq_queue_get_ack_total",
"rabbitmq_queue_get_total",
"rabbitmq_queue_messages_delivered_ack_total",
"rabbitmq_queue_messages_delivered_total",
"rabbitmq_queue_messages_redelivered_total",
"rabbitmq_queue_messages_acked_total",
"rabbitmq_queue_get_empty_total",
},
# Connection/channel metrics
"connection_coarse_metrics": {
"rabbitmq_connection_incoming_bytes_total",
Expand Down
2 changes: 1 addition & 1 deletion rabbitmq/hatch.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies = [
# We still support metrics from management plugin as a legacy option.
[[envs.default.matrix]]
python = ["3.12"]
version = ["3.7", "3.11"]
version = ["3.7", "3.11","4.0"]
flavor = ["mgmt", "openmetrics"]

[envs.default.overrides]
Expand Down
613 changes: 310 additions & 303 deletions rabbitmq/metadata.csv

Large diffs are not rendered by default.

51 changes: 51 additions & 0 deletions rabbitmq/tests/fixtures/detailed-queue_delivery_metrics.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# TYPE rabbitmq_identity_info untyped
# HELP rabbitmq_identity_info RabbitMQ node & cluster identity info
rabbitmq_identity_info{rabbitmq_node="rabbit@54cfac2199f1",rabbitmq_cluster="rabbit@54cfac2199f1",rabbitmq_cluster_permanent_id="rabbitmq-cluster-id-cyw_z6c4UMIBoK51iVq9rw"} 1
# TYPE rabbitmq_build_info untyped
# HELP rabbitmq_build_info RabbitMQ & Erlang/OTP version info
rabbitmq_build_info{rabbitmq_version="4.0.7",prometheus_plugin_version="4.0.7",prometheus_client_version="4.11.0",erlang_version="27.3.1"} 1
# TYPE rabbitmq_detailed_queue_get_ack_total counter
# HELP rabbitmq_detailed_queue_get_ack_total Total number of messages fetched from a queue with basic.get in manual acknowledgement mode
rabbitmq_detailed_queue_get_ack_total{vhost="/",queue="queue1"} 0
rabbitmq_detailed_queue_get_ack_total{vhost="/",queue="queue3"} 0
rabbitmq_detailed_queue_get_ack_total{vhost="/",queue="queue2"} 2
# TYPE rabbitmq_detailed_queue_get_total counter
# HELP rabbitmq_detailed_queue_get_total Total number of messages fetched from a queue with basic.get in automatic acknowledgement mode
rabbitmq_detailed_queue_get_total{vhost="/",queue="queue1"} 3
rabbitmq_detailed_queue_get_total{vhost="/",queue="queue3"} 0
rabbitmq_detailed_queue_get_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_detailed_queue_messages_delivered_ack_total counter
# HELP rabbitmq_detailed_queue_messages_delivered_ack_total Total number of messages delivered from a queue to consumers in manual acknowledgement mode
rabbitmq_detailed_queue_messages_delivered_ack_total{vhost="/",queue="queue1"} 0
rabbitmq_detailed_queue_messages_delivered_ack_total{vhost="/",queue="queue3"} 0
rabbitmq_detailed_queue_messages_delivered_ack_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_detailed_queue_messages_delivered_total counter
# HELP rabbitmq_detailed_queue_messages_delivered_total Total number of messages delivered from a queue to consumers in automatic acknowledgement mode
rabbitmq_detailed_queue_messages_delivered_total{vhost="/",queue="queue1"} 0
rabbitmq_detailed_queue_messages_delivered_total{vhost="/",queue="queue3"} 0
rabbitmq_detailed_queue_messages_delivered_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_detailed_queue_messages_redelivered_total counter
# HELP rabbitmq_detailed_queue_messages_redelivered_total Total number of messages redelivered from a queue to consumers
rabbitmq_detailed_queue_messages_redelivered_total{vhost="/",queue="queue1"} 0
rabbitmq_detailed_queue_messages_redelivered_total{vhost="/",queue="queue3"} 0
rabbitmq_detailed_queue_messages_redelivered_total{vhost="/",queue="queue2"} 1
# TYPE rabbitmq_detailed_queue_messages_acked_total counter
# HELP rabbitmq_detailed_queue_messages_acked_total Total number of messages acknowledged by consumers on a queue
rabbitmq_detailed_queue_messages_acked_total{vhost="/",queue="queue1"} 0
rabbitmq_detailed_queue_messages_acked_total{vhost="/",queue="queue3"} 0
rabbitmq_detailed_queue_messages_acked_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_detailed_queue_get_empty_total counter
# HELP rabbitmq_detailed_queue_get_empty_total Total number of times basic.get operations fetched no message on a queue
rabbitmq_detailed_queue_get_empty_total{vhost="/",queue="queue1"} 0
rabbitmq_detailed_queue_get_empty_total{vhost="/",queue="queue3"} 1
rabbitmq_detailed_queue_get_empty_total{vhost="/",queue="queue2"} 0
# TYPE telemetry_scrape_encoded_size_bytes summary
# HELP telemetry_scrape_encoded_size_bytes Scrape size, encoded
# TYPE telemetry_scrape_duration_seconds summary
# HELP telemetry_scrape_duration_seconds Scrape duration
telemetry_scrape_duration_seconds_count{registry="detailed",content_type="text/plain; version=0.0.4"} 1
telemetry_scrape_duration_seconds_sum{registry="detailed",content_type="text/plain; version=0.0.4"} 0.003794368
# TYPE telemetry_scrape_size_bytes summary
# HELP telemetry_scrape_size_bytes Scrape size, not encoded
telemetry_scrape_size_bytes_count{registry="detailed",content_type="text/plain; version=0.0.4"} 1
telemetry_scrape_size_bytes_sum{registry="detailed",content_type="text/plain; version=0.0.4"} 2726
21 changes: 21 additions & 0 deletions rabbitmq/tests/fixtures/metrics.txt
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,27 @@ rabbitmq_queue_disk_reads_total 0
# TYPE rabbitmq_queue_disk_writes_total counter
# HELP rabbitmq_queue_disk_writes_total Total number of times queue wrote messages to disk
rabbitmq_queue_disk_writes_total 0
# TYPE rabbitmq_queue_get_ack_total counter
# HELP rabbitmq_queue_get_ack_total Total number of messages fetched from a queue with basic.get in manual acknowledgement mode
rabbitmq_queue_get_ack_total 0
# TYPE rabbitmq_queue_get_total counter
# HELP rabbitmq_queue_get_total Total number of messages fetched from a queue with basic.get in automatic acknowledgement mode
rabbitmq_queue_get_total 0
# TYPE rabbitmq_queue_messages_delivered_ack_total counter
# HELP rabbitmq_queue_messages_delivered_ack_total Total number of messages delivered from a queue to consumers in manual acknowledgement mode
rabbitmq_queue_messages_delivered_ack_total 17217348
# TYPE rabbitmq_queue_messages_delivered_total counter
# HELP rabbitmq_queue_messages_delivered_total Total number of messages delivered from a queue to consumers in automatic acknowledgement mode
rabbitmq_queue_messages_delivered_total 0
# TYPE rabbitmq_queue_messages_redelivered_total counter
# HELP rabbitmq_queue_messages_redelivered_total Total number of messages redelivered from a queue to consumers
rabbitmq_queue_messages_redelivered_total 67170
# TYPE rabbitmq_queue_messages_acked_total counter
# HELP rabbitmq_queue_messages_acked_total Total number of messages acknowledged by consumers on a queue
rabbitmq_queue_messages_acked_total 17148315
# TYPE rabbitmq_queue_get_empty_total counter
# HELP rabbitmq_queue_get_empty_total Total number of times basic.get operations fetched no message on a queue
rabbitmq_queue_get_empty_total 0
# TYPE rabbitmq_channel_consumers gauge
# HELP rabbitmq_channel_consumers Consumers on a channel
rabbitmq_channel_consumers 0
Expand Down
35 changes: 35 additions & 0 deletions rabbitmq/tests/fixtures/per-object.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1392,6 +1392,41 @@ rabbitmq_queue_messages_unacked{vhost="/",queue="queue2"} 0
rabbitmq_queue_messages{vhost="/",queue="queue3"} 0
rabbitmq_queue_messages{vhost="/",queue="queue1"} 0
rabbitmq_queue_messages{vhost="/",queue="queue2"} 1
# TYPE rabbitmq_queue_get_ack_total counter
# HELP rabbitmq_queue_get_ack_total Total number of messages fetched from a queue with basic.get in manual acknowledgement mode
rabbitmq_queue_get_ack_total{vhost="/",queue="queue3"} 0
rabbitmq_queue_get_ack_total{vhost="/",queue="queue1"} 0
rabbitmq_queue_get_ack_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_queue_get_total counter
# HELP rabbitmq_queue_get_total Total number of messages fetched from a queue with basic.get in automatic acknowledgement mode
rabbitmq_queue_get_total{vhost="/",queue="queue3"} 10
rabbitmq_queue_get_total{vhost="/",queue="queue1"} 0
rabbitmq_queue_get_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_queue_messages_delivered_ack_total counter
# HELP rabbitmq_queue_messages_delivered_ack_total Total number of messages delivered from a queue to consumers in manual acknowledgement mode
rabbitmq_queue_messages_delivered_ack_total{vhost="/",queue="queue3"} 12
rabbitmq_queue_messages_delivered_ack_total{vhost="/",queue="queue1"} 0
rabbitmq_queue_messages_delivered_ack_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_queue_messages_delivered_total counter
# HELP rabbitmq_queue_messages_delivered_total Total number of messages delivered from a queue to consumers in automatic acknowledgement mode
rabbitmq_queue_messages_delivered_total{vhost="/",queue="queue3"} 10
rabbitmq_queue_messages_delivered_total{vhost="/",queue="queue1"} 0
rabbitmq_queue_messages_delivered_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_queue_messages_redelivered_total counter
# HELP rabbitmq_queue_messages_redelivered_total Total number of messages redelivered from a queue to consumers
rabbitmq_queue_messages_redelivered_total{vhost="/",queue="queue3"} 67170
rabbitmq_queue_messages_redelivered_total{vhost="/",queue="queue1"} 0
rabbitmq_queue_messages_redelivered_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_queue_messages_acked_total counter
# HELP rabbitmq_queue_messages_acked_total Total number of messages acknowledged by consumers on a queue
rabbitmq_queue_messages_acked_total{vhost="/",queue="queue3"} 17148315
rabbitmq_queue_messages_acked_total{vhost="/",queue="queue1"} 0
rabbitmq_queue_messages_acked_total{vhost="/",queue="queue2"} 0
# TYPE rabbitmq_queue_get_empty_total counter
# HELP rabbitmq_queue_get_empty_total Total number of times basic.get operations fetched no message on a queue
rabbitmq_queue_get_empty_total{vhost="/",queue="queue3"} 0
rabbitmq_queue_get_empty_total{vhost="/",queue="queue2"} 0
rabbitmq_queue_get_empty_total{vhost="/",queue="queue1"} 0
# TYPE rabbitmq_queue_process_reductions_total counter
# HELP rabbitmq_queue_process_reductions_total Total number of queue process reductions
rabbitmq_queue_process_reductions_total{vhost="/",queue="queue3"} 30020
Expand Down
26 changes: 24 additions & 2 deletions rabbitmq/tests/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,28 @@
'rabbitmq.telemetry.scrape.encoded_size_bytes.sum',
'rabbitmq.telemetry.scrape.size_bytes.count',
'rabbitmq.telemetry.scrape.size_bytes.sum',
'rabbitmq.queue.get.ack.count',
'rabbitmq.queue.get.count',
'rabbitmq.queue.messages.delivered.ack.count',
'rabbitmq.queue.messages.delivered.count',
'rabbitmq.queue.messages.redelivered.count',
'rabbitmq.queue.messages.acked.count',
'rabbitmq.queue.get.empty.count',
}

RABBITMQ_4_0_ADDED = {
'rabbitmq.queue.get.ack.count',
'rabbitmq.queue.get.count',
'rabbitmq.queue.messages.delivered.ack.count',
'rabbitmq.queue.messages.delivered.count',
'rabbitmq.queue.messages.redelivered.count',
'rabbitmq.queue.messages.acked.count',
'rabbitmq.queue.get.empty.count',
}

RABBITMQ_4_0_REMOVED = {
'rabbitmq.process.open_tcp_sockets',
'rabbitmq.process.max_tcp_sockets',
}

SUMMARY_METRICS = {
Expand Down Expand Up @@ -353,7 +375,7 @@
"rabbitmq.alarms.memory_used_watermark",
}

FLAKY_E2E_METRICS = [
FLAKY_E2E_METRICS = {
'rabbitmq.erlang.vm.statistics.run_queues_length',
'rabbitmq.global.consumers',
'rabbitmq.global.messages.acknowledged.count',
Expand All @@ -377,7 +399,7 @@
'rabbitmq.global.messages.unroutable.returned.count',
'rabbitmq.global.publishers',
'rabbitmq.process_start_time_seconds',
]
}


def assert_metric_covered(aggregator):
Expand Down
22 changes: 19 additions & 3 deletions rabbitmq/tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,22 @@
# Licensed under a 3-clause BSD style license (see LICENSE)

import logging
from copy import deepcopy

import pytest
from packaging import version

from datadog_checks.dev.utils import get_metadata_metrics

from .common import CONFIG, OPENMETRICS_CONFIG, requires_management, requires_prometheus
from .metrics import DEFAULT_OPENMETRICS, FLAKY_E2E_METRICS, assert_metric_covered
from .metrics import (
DEFAULT_OPENMETRICS,
FLAKY_E2E_METRICS,
RABBITMQ_4_0_ADDED,
RABBITMQ_4_0_REMOVED,
RABBITMQ_VERSION,
assert_metric_covered,
)

log = logging.getLogger(__file__)

Expand All @@ -27,8 +36,15 @@ def test_rabbitmq_e2e_management(dd_agent_check):
def test_rabbitmq_e2e_openmetrics(dd_agent_check):
aggregator = dd_agent_check(OPENMETRICS_CONFIG, rate=True)
metadata_metrics = get_metadata_metrics()
for metric in DEFAULT_OPENMETRICS:
if metric in FLAKY_E2E_METRICS:
expected_metrics = deepcopy(DEFAULT_OPENMETRICS)
unexpected_metrics = deepcopy(FLAKY_E2E_METRICS)
if RABBITMQ_VERSION == version.parse('4.0'):
expected_metrics |= RABBITMQ_4_0_ADDED
expected_metrics -= RABBITMQ_4_0_REMOVED
else:
unexpected_metrics |= RABBITMQ_4_0_ADDED
for metric in expected_metrics:
if metric in unexpected_metrics:
aggregator.assert_metric(metric, at_least=0)
else:
aggregator.assert_metric(metric)
Expand Down
62 changes: 54 additions & 8 deletions rabbitmq/tests/test_openmetrics.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,29 @@
# (C) Datadog, Inc. 2023-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
import os
from itertools import product
from pathlib import Path
from urllib.parse import urlparse

import pytest
from packaging import version

from datadog_checks.base.errors import ConfigurationError
from datadog_checks.base.types import ServiceCheck
from datadog_checks.dev.http import MockResponse
from datadog_checks.dev.utils import get_metadata_metrics
from datadog_checks.rabbitmq import RabbitMQ

from .common import HERE
from .metrics import AGGREGATED_ONLY_METRICS, DEFAULT_OPENMETRICS, MISSING_OPENMETRICS, SUMMARY_METRICS
from .common import HERE, RABBITMQ_VERSION
from .metrics import (
AGGREGATED_ONLY_METRICS,
DEFAULT_OPENMETRICS,
MISSING_OPENMETRICS,
RABBITMQ_4_0_ADDED,
SUMMARY_METRICS,
)

OM_RESPONSE_FIXTURES = HERE / Path('fixtures')
OM_RESPONSE_FIXTURES = os.path.join(HERE, 'fixtures')
TEST_URL = "http://localhost:15692"
OM_ENDPOINT_TAG = f"endpoint:{TEST_URL}/metrics"
BUILD_INFO_TAGS = [
Expand Down Expand Up @@ -54,7 +61,7 @@ def test_aggregated_endpoint(aggregated_setting, aggregator, dd_run_check, mock_

We expect in this case all the metrics from the '/metrics' endpoint.
"""
mock_http_response(file_path=OM_RESPONSE_FIXTURES / "metrics.txt")
mock_http_response(file_path=os.path.join(OM_RESPONSE_FIXTURES, "metrics.txt"))
prometheus_settings = {'url': TEST_URL, **aggregated_setting}
check = _rmq_om_check(prometheus_settings)
dd_run_check(check)
Expand All @@ -77,7 +84,7 @@ def test_aggregated_endpoint_as_per_object(aggregator, dd_run_check, mock_http_r

We expect all metrics except the ones unique to the `/metrics` endpoint to be collected.
"""
mock_http_response(file_path=OM_RESPONSE_FIXTURES / "per-object.txt")
mock_http_response(file_path=os.path.join(OM_RESPONSE_FIXTURES, "per-object.txt"))
prometheus_settings = {'url': TEST_URL}
check = _rmq_om_check(prometheus_settings)
dd_run_check(check)
Expand Down Expand Up @@ -117,7 +124,7 @@ def test_unaggregated_endpoint(endpoint, fixture_file, expected_metrics, aggrega

We expect in this case only the metrics for the unaggregated endpoint, nothing from '/metrics'.
"""
mock_http_response(file_path=OM_RESPONSE_FIXTURES / fixture_file)
mock_http_response(file_path=os.path.join(OM_RESPONSE_FIXTURES, fixture_file))
check = _rmq_om_check(
{
'url': TEST_URL,
Expand All @@ -142,21 +149,60 @@ def test_unaggregated_endpoint(endpoint, fixture_file, expected_metrics, aggrega
_common_assertions(aggregator)


@pytest.mark.parametrize(
'endpoint, fixture_file, expected_metrics',
[
pytest.param(
'detailed?family=queue_delivery_metrics',
"detailed-queue_delivery_metrics.txt",
RABBITMQ_4_0_ADDED,
id="detailed, query queue_delivery_metrics family",
),
],
)
@pytest.mark.skipif(
RABBITMQ_VERSION < version.parse('4.0'),
reason=f"Skipping test because RABBITMQ_VERSION is {RABBITMQ_VERSION} (not greater than 4.0)",
)
def test_unaggregated_endpoint_v4(
endpoint, fixture_file, expected_metrics, aggregator, dd_run_check, mock_http_response
):
mock_http_response(file_path=os.path.join(OM_RESPONSE_FIXTURES, fixture_file))
check = _rmq_om_check(
{
'url': TEST_URL,
'unaggregated_endpoint': endpoint,
"include_aggregated_endpoint": False,
}
)
dd_run_check(check)

for m in expected_metrics:
aggregator.assert_metric(m)
for tag in IDENTITY_INFO_TAGS:
aggregator.assert_metric_has_tag(m, tag)

for m in (DEFAULT_OPENMETRICS - expected_metrics) | MISSING_OPENMETRICS:
# We check that all metrics that are not in the query don't show up at all.
aggregator.assert_metric(m, at_least=0)


def mock_http_responses(url, **_params):
parsed = urlparse(url)
fname = {
'/metrics': 'metrics.txt',
'/metrics/per-object': 'per-object.txt',
'/metrics/detailed?family=queue_consumer_count': 'detailed-queue_consumer_count.txt',
'/metrics/detailed?family=queue_consumer_count&vhost=test': 'detailed-queue_consumer_count.txt',
'/metrics/detailed?family=queue_delivery_metrics': 'detailed-queue_delivery_metrics.txt',
(
'/metrics/detailed?family=queue_consumer_count' '&family=queue_coarse_metrics'
): 'detailed-queue_coarse_metrics-queue_consumer_count.txt',
(
'/metrics/detailed?family=vhost_status&family=exchange_names&family=exchange_bindings'
): 'detailed-only-metrics.txt',
}[parsed.path + (f"?{parsed.query}" if parsed.query else "")]
with open(OM_RESPONSE_FIXTURES / fname) as fh:
with open(os.path.join(OM_RESPONSE_FIXTURES, fname)) as fh:
return MockResponse(content=fh.read())


Expand Down