From 65b4f3bcda32246749352f9c590e885482f4a552 Mon Sep 17 00:00:00 2001 From: robsunday Date: Fri, 24 Jan 2025 13:26:17 +0100 Subject: [PATCH 1/3] Kafka broker, producer and consumer YAMLs and tests added --- .../TargetSystemIntegrationTest.java | 41 ++- .../kafka/KafkaConsumerIntegrationTest.java | 146 +++++++++++ .../kafka/KafkaContainerFactory.java | 66 +++++ .../kafka/KafkaIntegrationTest.java | 244 ++++++++++++++++++ .../kafka/KafkaProducerIntegrationTest.java | 163 ++++++++++++ .../contrib/jmxscraper/JmxScraper.java | 2 +- .../src/main/resources/kafka-consumer.yaml | 44 ++++ .../src/main/resources/kafka-producer.yaml | 46 ++++ jmx-scraper/src/main/resources/kafka.yaml | 215 +++++++++++++++ 9 files changed, 962 insertions(+), 5 deletions(-) create mode 100644 jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java create mode 100644 jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java create mode 100644 jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java create mode 100644 jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java create mode 100644 jmx-scraper/src/main/resources/kafka-consumer.yaml create mode 100644 jmx-scraper/src/main/resources/kafka-producer.yaml create mode 100644 jmx-scraper/src/main/resources/kafka.yaml diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java index ddfc0567f..9c58c07fc 100644 --- a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/TargetSystemIntegrationTest.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -56,6 +57,7 @@ public abstract class TargetSystemIntegrationTest { private static Network network; private static OtlpGrpcServer otlpServer; + private Collection> prerequisiteContainers; private GenericContainer target; private JmxScraperContainer scraper; @@ -86,12 +88,23 @@ static void afterAll() { @AfterEach void afterEach() { + if (scraper != null && scraper.isRunning()) { + scraper.stop(); + } + if (target != null && target.isRunning()) { target.stop(); } - if (scraper != null && scraper.isRunning()) { - scraper.stop(); + + if (prerequisiteContainers != null) { + prerequisiteContainers.forEach( + container -> { + if (container.isRunning()) { + container.stop(); + } + }); } + if (otlpServer != null) { otlpServer.reset(); } @@ -103,14 +116,31 @@ protected String scraperBaseImage() { @Test void endToEndTest(@TempDir Path tmpDir) { + startContainers(tmpDir); + verifyMetrics(); + } + + protected void startContainers(Path tmpDir) { + prerequisiteContainers = createPrerequisiteContainers(); target = createTargetContainer(JMX_PORT) .withLogConsumer(new Slf4jLogConsumer(targetSystemLogger)) .withNetwork(network) .withNetworkAliases(TARGET_SYSTEM_NETWORK_ALIAS); + + // If there are any containers that must be started before target then initialize them. + // Then make target depending on them, so it is started after dependencies + for (GenericContainer container : prerequisiteContainers) { + container.withNetwork(network); + target.dependsOn(container); + } + + // Target container must be running before scraper container is customized. + // It is necessary to allow interactions with the container, like file copying etc. target.start(); + // Create and initialize scraper container scraper = new JmxScraperContainer(otlpEndpoint, scraperBaseImage()) .withLogConsumer(new Slf4jLogConsumer(jmxScraperLogger)) @@ -119,14 +149,13 @@ void endToEndTest(@TempDir Path tmpDir) { scraper = customizeScraperContainer(scraper, target, tmpDir); scraper.start(); - - verifyMetrics(); } protected void verifyMetrics() { MetricsVerifier metricsVerifier = createMetricsVerifier(); await() .atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted( () -> { List receivedMetrics = otlpServer.getMetrics(); @@ -158,6 +187,10 @@ protected JmxScraperContainer customizeScraperContainer( return scraper; } + protected Collection> createPrerequisiteContainers() { + return Collections.emptyList(); + } + private static class OtlpGrpcServer extends ServerExtension { private final BlockingQueue metricRequests = diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java new file mode 100644 index 000000000..245020b46 --- /dev/null +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java @@ -0,0 +1,146 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.jmxscraper.target_systems.kafka; + +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeWithAnyValue; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaConsumerContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaProducerContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer; + +import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer; +import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier; +import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class KafkaConsumerIntegrationTest extends TargetSystemIntegrationTest { + + @Override + protected Collection> createPrerequisiteContainers() { + GenericContainer zookeeper = + createZookeeperContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))) + .withNetworkAliases("zookeeper"); + + GenericContainer kafka = + createKafkaContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka"))) + .withNetworkAliases("kafka") + .dependsOn(zookeeper); + + GenericContainer kafkaProducer = + createKafkaProducerContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka-producer"))) + .withNetworkAliases("kafka-producer") + .dependsOn(kafka); + + return Arrays.asList(zookeeper, kafka, kafkaProducer); + } + + @Override + protected GenericContainer createTargetContainer(int jmxPort) { + return createKafkaConsumerContainer() + .withEnv("JMX_PORT", Integer.toString(jmxPort)) + .withExposedPorts(jmxPort) + .waitingFor(Wait.forListeningPorts(jmxPort)); + } + + @Override + protected JmxScraperContainer customizeScraperContainer( + JmxScraperContainer scraper, GenericContainer target, Path tempDir) { + return scraper.withTargetSystem("kafka-consumer"); + } + + @Override + protected MetricsVerifier createMetricsVerifier() { + return MetricsVerifier.create() + .add( + "kafka.consumer.fetch-rate", + metric -> + metric + .hasDescription("The number of fetch requests for all topics per second") + .hasUnit("{request}") + .isGauge() + .hasDataPointsWithOneAttribute( + attributeWithAnyValue("client.id"))) // changed to follow semconv + .add( + "kafka.consumer.records-lag-max", + metric -> + metric + .hasDescription("Number of messages the consumer lags behind the producer") + .hasUnit("{message}") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .add( + "kafka.consumer.total.bytes-consumed-rate", + metric -> + metric + .hasDescription( + "The average number of bytes consumed for all topics per second") + .hasUnit("By") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .add( + "kafka.consumer.total.fetch-size-avg", + metric -> + metric + .hasDescription( + "The average number of bytes fetched per request for all topics") + .hasUnit("By") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .add( + "kafka.consumer.total.records-consumed-rate", + metric -> + metric + .hasDescription( + "The average number of records consumed for all topics per second") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .add( + "kafka.consumer.bytes-consumed-rate", + metric -> + metric + .hasDescription("The average number of bytes consumed per second") + .hasUnit("By") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup( + attributeWithAnyValue("client.id"), + attribute("topic", "test-topic-1")))) + .add( + "kafka.consumer.fetch-size-avg", + metric -> + metric + .hasDescription("The average number of bytes fetched per request") + .hasUnit("By") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup( + attributeWithAnyValue("client.id"), + attribute("topic", "test-topic-1")))) + .add( + "kafka.consumer.records-consumed-rate", + metric -> + metric + .hasDescription("The average number of records consumed per second") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup( + attributeWithAnyValue("client.id"), + attribute("topic", "test-topic-1")))); + } +} diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java new file mode 100644 index 000000000..fc2d9eb09 --- /dev/null +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java @@ -0,0 +1,66 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.jmxscraper.target_systems.kafka; + +import java.time.Duration; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class KafkaContainerFactory { + private static final int KAFKA_PORT = 9092; + private static final String KAFKA_BROKER = "kafka:" + KAFKA_PORT; + private static final String KAFKA_DOCKER_IMAGE = "bitnami/kafka:2.8.1"; + + private KafkaContainerFactory() {} + + public static GenericContainer createZookeeperContainer() { + return new GenericContainer<>("zookeeper:3.5") + .withStartupTimeout(Duration.ofMinutes(2)) + .waitingFor(Wait.forListeningPort()); + } + + public static GenericContainer createKafkaContainer() { + return new GenericContainer<>(KAFKA_DOCKER_IMAGE) + .withEnv("KAFKA_CFG_ZOOKEEPER_CONNECT", "zookeeper:2181") + .withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") // Removed in 3.5.1 + .withStartupTimeout(Duration.ofMinutes(2)) + .withExposedPorts(KAFKA_PORT) + // .waitingFor(Wait.forListeningPorts(KAFKA_PORT)); + .waitingFor( + Wait.forLogMessage(".*KafkaServer.*started \\(kafka.server.KafkaServer\\).*", 1)); + } + + public static GenericContainer createKafkaProducerContainer() { + return new GenericContainer<>(KAFKA_DOCKER_IMAGE) + // .withCopyFileToContainer( + // MountableFile.forClasspathResource("kafka-producer.sh"), + // "/usr/bin/kafka-producer.sh") + // .withCommand("/usr/bin/kafka-producer.sh") + .withCommand( + "sh", + "-c", + "echo 'Sending messages to test-topic-1'; " + + "i=1; while true; do echo \"Message $i\"; sleep .25; i=$((i+1)); done | /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server " + + KAFKA_BROKER + + " --topic test-topic-1;") + .withStartupTimeout(Duration.ofMinutes(2)) + .waitingFor(Wait.forLogMessage(".*Welcome to the Bitnami kafka container.*", 1)); + } + + public static GenericContainer createKafkaConsumerContainer() { + return new GenericContainer<>(KAFKA_DOCKER_IMAGE) + .withCommand( + "kafka-console-consumer.sh", + "--bootstrap-server", + KAFKA_BROKER, + "--whitelist", + "test-topic-.*", + "--max-messages", + "100") + .withStartupTimeout(Duration.ofMinutes(2)) + .waitingFor(Wait.forListeningPort()); + } +} diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java new file mode 100644 index 000000000..0d089f4c8 --- /dev/null +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java @@ -0,0 +1,244 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.jmxscraper.target_systems.kafka; + +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer; + +import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer; +import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcherGroup; +import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier; +import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +public class KafkaIntegrationTest extends TargetSystemIntegrationTest { + @Override + protected Collection> createPrerequisiteContainers() { + GenericContainer zookeeper = + createZookeeperContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))) + .withNetworkAliases("zookeeper"); + + return Collections.singletonList(zookeeper); + } + + @Override + protected GenericContainer createTargetContainer(int jmxPort) { + return createKafkaContainer().withEnv("JMX_PORT", Integer.toString(jmxPort)); + } + + @Override + protected JmxScraperContainer customizeScraperContainer( + JmxScraperContainer scraper, GenericContainer target, Path tempDir) { + return scraper.withTargetSystem("kafka"); + } + + @Override + protected MetricsVerifier createMetricsVerifier() { + AttributeMatcherGroup[] requestTypes = { + // attribute values are changed from lowercase to PascalCase + // because it is impossible to make them lowercase with YAML config + // TODO: What about const value attributes defined in YAML that are lowercase? + attributeGroup(attribute("type", "Produce")), + attributeGroup(attribute("type", "FetchFollower")), + attributeGroup(attribute("type", "FetchConsumer")) + }; + + return MetricsVerifier.create() + .add( + "kafka.message.count", + metric -> + metric + .hasDescription("The number of messages received by the broker") + .hasUnit("{message}") + .isCounter() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.request.count", + metric -> + metric + .hasDescription("The number of requests received by the broker") + .hasUnit("{request}") + .isCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("type", "produce")), + attributeGroup(attribute("type", "fetch")))) + .add( + "kafka.request.failed", + metric -> + metric + .hasDescription("The number of requests to the broker resulting in a failure") + .hasUnit("{request}") + .isCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("type", "produce")), + attributeGroup(attribute("type", "fetch")))) + .add( + "kafka.network.io", + metric -> + metric + .hasDescription("The bytes received or sent by the broker") + .hasUnit("By") + .isCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("direction", "in")), + attributeGroup(attribute("direction", "out")))) + .add( + "kafka.purgatory.size", + metric -> + metric + .hasDescription("The number of requests waiting in purgatory") + .hasUnit("{request}") + .isUpDownCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("type", "Produce")), + attributeGroup(attribute("type", "Fetch")))) + .add( + "kafka.request.time.total", + metric -> + metric + .hasDescription("The total time the broker has taken to service requests") + .hasUnit("ms") + .isCounter() + .hasDataPointsWithAttributes(requestTypes)) + .add( + "kafka.request.time.50p", + metric -> + metric + .hasDescription( + "The 50th percentile time the broker has taken to service requests") + .hasUnit("ms") + .isGauge() + .hasDataPointsWithAttributes(requestTypes)) + .add( + "kafka.request.time.99p", + metric -> + metric + .hasDescription( + "The 99th percentile time the broker has taken to service requests") + .hasUnit("ms") + .isGauge() + .hasDataPointsWithAttributes(requestTypes)) + .add( + "kafka.request.time.avg", + metric -> + metric + .hasDescription("The average time the broker has taken to service requests") + .hasUnit("ms") + .isGauge() + .hasDataPointsWithAttributes(requestTypes)) + .add( + "kafka.request.queue", + metric -> + metric + .hasDescription("Size of the request queue") + .hasUnit("{request}") + .isUpDownCounter() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.partition.count", + metric -> + metric + .hasDescription("The number of partitions on the broker") + .hasUnit("{partition}") + .isGauge() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.partition.offline", + metric -> + metric + .hasDescription("The number of partitions offline") + .hasUnit("{partition}") + .isGauge() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.partition.under_replicated", + metric -> + metric + .hasDescription("The number of under replicated partitions") + .hasUnit("{partition}") + .isGauge() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.isr.operation.count", + metric -> + metric + .hasDescription("The number of in-sync replica shrink and expand operations") + .hasUnit("{operation}") + .isCounter() + .hasDataPointsWithAttributes( + attributeGroup(attribute("operation", "shrink")), + attributeGroup(attribute("operation", "expand")))) + .add( + "kafka.controller.active.count", + metric -> + metric + .hasDescription("The number of controllers active on the broker") // CHANGED + .hasUnit("{controller}") + .isUpDownCounter() // CHANGED + .hasDataPointsWithoutAttributes()) + .add( + "kafka.leader.election.count", // CHANGED from "kafka.leader.election.rate" + metric -> + metric + .hasDescription("The leader election count") + .hasUnit("{election}") + .isCounter() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.leader.election.unclean.count", // CHANGED from "kafka.unclean.election.rate" + metric -> + metric + .hasDescription( + "Unclean leader election count - increasing indicates broker failures") // CHANGED + .hasUnit("{election}") + .isCounter() + .hasDataPointsWithoutAttributes()) + .add( + "kafka.lag.max", // CHANGED from "kafka.max.lag" + metric -> + metric + .hasDescription( + "The max lag in messages between follower and leader replicas") // CHANGED + .hasUnit("{message}") + .isGauge() + .hasDataPointsWithoutAttributes()) + + // TODO: Find out how to force Kafka to generate these metrics + // .add( + // "kafka.logs.flush.count", + // metric -> + // metric + // .hasDescription("Log flush count") + // .hasUnit("{flush}") + // .isCounter() + // .hasDataPointsWithoutAttributes()) + // .add( + // "kafka.logs.flush.time.median", + // metric -> + // metric + // .hasDescription("Log flush time - 50th percentile") + // .hasUnit("ms") + // .isGauge() + // .hasDataPointsWithoutAttributes()) + // .add( + // "kafka.logs.flush.time.99p", + // metric -> + // metric + // .hasDescription("Log flush time - 99th percentile") + // .hasUnit("ms") + // .isGauge() + // .hasDataPointsWithoutAttributes()) + ; + } +} diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java new file mode 100644 index 000000000..791c9d9c1 --- /dev/null +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java @@ -0,0 +1,163 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.contrib.jmxscraper.target_systems.kafka; + +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attribute; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeGroup; +import static io.opentelemetry.contrib.jmxscraper.assertions.DataPointAttributes.attributeWithAnyValue; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createKafkaProducerContainer; +import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer; + +import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer; +import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier; +import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class KafkaProducerIntegrationTest extends TargetSystemIntegrationTest { + + @Override + protected Collection> createPrerequisiteContainers() { + GenericContainer zookeeper = + createZookeeperContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("zookeeper"))) + .withNetworkAliases("zookeeper"); + + GenericContainer kafka = + createKafkaContainer() + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("kafka"))) + .withNetworkAliases("kafka") + .dependsOn(zookeeper); + + return Arrays.asList(zookeeper, kafka); + } + + @Override + protected GenericContainer createTargetContainer(int jmxPort) { + return createKafkaProducerContainer() + .withEnv("JMX_PORT", Integer.toString(jmxPort)) + .withExposedPorts(jmxPort) + .waitingFor(Wait.forListeningPorts(jmxPort)); + // .waitingFor(Wait.forLogMessage(".*Sending messages to test-topic-1.*", 1)); + // Container is started and ready for serve JMX metrics once it begins sending messages. + } + + @Override + protected JmxScraperContainer customizeScraperContainer( + JmxScraperContainer scraper, GenericContainer target, Path tempDir) { + return scraper.withTargetSystem("kafka-producer"); + } + + @Override + protected MetricsVerifier createMetricsVerifier() { + String topic = "test-topic-1"; + + return MetricsVerifier.create() + .add( + "kafka.producer.io-wait-time-ns-avg", + metric -> + metric + .hasDescription( + "The average length of time the I/O thread spent waiting for a socket ready for reads or writes") + .hasUnit("ns") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .add( + "kafka.producer.outgoing-byte-rate", + metric -> + metric + .hasDescription( + "The average number of outgoing bytes sent per second to all servers") + .hasUnit("By") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .add( + "kafka.producer.request-latency-avg", + metric -> + metric + .hasDescription("The average request latency") + .hasUnit("ms") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .add( + "kafka.producer.request-rate", + metric -> + metric + .hasDescription("The average number of requests sent per second") + .hasUnit("{request}") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .add( + "kafka.producer.response-rate", + metric -> + metric + .hasDescription("Responses received per second") + .hasUnit("{response}") + .isGauge() + .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + + // Per topic metrics + .add( + "kafka.producer.byte-rate", + metric -> + metric + .hasDescription("The average number of bytes sent per second for a topic") + .hasUnit("By") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup( + attributeWithAnyValue("client.id"), attribute("topic", topic)))) + .add( + "kafka.producer.compression-rate", + metric -> + metric + .hasDescription( + "The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size divided by the uncompressed size") + .hasUnit("{ratio}") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup( + attributeWithAnyValue("client.id"), attribute("topic", topic)))) + .add( + "kafka.producer.record-error-rate", + metric -> + metric + .hasDescription( + "The average per-second number of record sends that resulted in errors for a topic") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup( + attributeWithAnyValue("client.id"), attribute("topic", topic)))) + .add( + "kafka.producer.record-retry-rate", + metric -> + metric + .hasDescription( + "The average per-second number of retried record sends for a topic") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup( + attributeWithAnyValue("client.id"), attribute("topic", topic)))) + .add( + "kafka.producer.record-send-rate", + metric -> + metric + .hasDescription("The average number of records sent per second for a topic") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithAttributes( + attributeGroup( + attributeWithAnyValue("client.id"), attribute("topic", topic)))); + } +} diff --git a/jmx-scraper/src/main/java/io/opentelemetry/contrib/jmxscraper/JmxScraper.java b/jmx-scraper/src/main/java/io/opentelemetry/contrib/jmxscraper/JmxScraper.java index fe82c698d..b8ea617bb 100644 --- a/jmx-scraper/src/main/java/io/opentelemetry/contrib/jmxscraper/JmxScraper.java +++ b/jmx-scraper/src/main/java/io/opentelemetry/contrib/jmxscraper/JmxScraper.java @@ -185,7 +185,7 @@ private static void addRulesForSystem(String system, MetricConfiguration conf) { RuleParser parserInstance = RuleParser.get(); parserInstance.addMetricDefsTo(conf, inputStream, system); } else { - throw new IllegalArgumentException("No support for system" + system); + throw new IllegalArgumentException("No support for system " + system); } } catch (Exception e) { throw new IllegalStateException("Error while loading rules for system " + system, e); diff --git a/jmx-scraper/src/main/resources/kafka-consumer.yaml b/jmx-scraper/src/main/resources/kafka-consumer.yaml new file mode 100644 index 000000000..b227559bb --- /dev/null +++ b/jmx-scraper/src/main/resources/kafka-consumer.yaml @@ -0,0 +1,44 @@ +--- +# Kafka Consumer metrics +rules: + + - bean: kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics + metricAttribute: + client.id: param(client-id) + prefix: kafka.consumer. + type: gauge + mapping: + fetch-rate: + desc: The number of fetch requests for all topics per second + unit: '{request}' + records-lag-max: + desc: Number of messages the consumer lags behind the producer + unit: '{message}' + bytes-consumed-rate: + metric: total.bytes-consumed-rate + desc: The average number of bytes consumed for all topics per second + unit: 'By' + fetch-size-avg: + metric: total.fetch-size-avg + desc: The average number of bytes fetched per request for all topics + unit: 'By' + records-consumed-rate: + metric: total.records-consumed-rate + desc: The average number of records consumed for all topics per second + unit: '{record}' + + - bean: kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics + metricAttribute: + client.id: param(client-id) + topic: param(topic) + prefix: kafka.consumer. + unit: By + type: gauge + mapping: + bytes-consumed-rate: + desc: The average number of bytes consumed per second + fetch-size-avg: + desc: The average number of bytes fetched per request + records-consumed-rate: + desc: The average number of records consumed per second + unit: '{record}' diff --git a/jmx-scraper/src/main/resources/kafka-producer.yaml b/jmx-scraper/src/main/resources/kafka-producer.yaml new file mode 100644 index 000000000..f556e50f2 --- /dev/null +++ b/jmx-scraper/src/main/resources/kafka-producer.yaml @@ -0,0 +1,46 @@ +--- +# Kafka Producer metrics +rules: + - bean: kafka.producer:client-id=*,type=producer-metrics + metricAttribute: + client.id: param(client-id) + prefix: kafka.producer. + type: gauge + mapping: + io-wait-time-ns-avg: + desc: The average length of time the I/O thread spent waiting for a socket ready for reads or writes + unit: ns + outgoing-byte-rate: + desc: The average number of outgoing bytes sent per second to all servers + unit: By + request-latency-avg: + desc: The average request latency + unit: ms + request-rate: + desc: The average number of requests sent per second + unit: '{request}' + response-rate: + desc: Responses received per second + unit: '{response}' + + # per topic metrics + - bean: kafka.producer:client-id=*,topic=*,type=producer-topic-metrics + metricAttribute: + client.id: param(client-id) + topic: param(topic) + prefix: kafka.producer. + unit: '{record}' + type: gauge + mapping: + byte-rate: + desc: The average number of bytes sent per second for a topic + unit: By + compression-rate: + desc: The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size divided by the uncompressed size + unit: '{ratio}' + record-error-rate: + desc: The average per-second number of record sends that resulted in errors for a topic + record-retry-rate: + desc: The average per-second number of retried record sends for a topic + record-send-rate: + desc: The average number of records sent per second for a topic diff --git a/jmx-scraper/src/main/resources/kafka.yaml b/jmx-scraper/src/main/resources/kafka.yaml new file mode 100644 index 000000000..b70da6654 --- /dev/null +++ b/jmx-scraper/src/main/resources/kafka.yaml @@ -0,0 +1,215 @@ +--- +rules: + # Broker metrics + + - bean: kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec + mapping: + Count: + metric: kafka.message.count + type: counter + desc: The number of messages received by the broker + unit: "{message}" + + # TODO: optimize kafka.request.count - use refs + - bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec + metricAttribute: + # TODO: Should we convert all constant values to be PascalCase for consistency with values from bean params? + type: const(fetch) + mapping: + Count: + metric: kafka.request.count + type: counter + desc: The number of requests received by the broker + unit: "{request}" + + - bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec + metricAttribute: + type: const(produce) + mapping: + Count: + metric: kafka.request.count + type: counter + desc: The number of requests received by the broker + unit: "{request}" + + # TODO: optimize kafka.request.failed - use refs + - bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec + metricAttribute: + type: const(fetch) + mapping: + Count: + metric: kafka.request.failed + type: counter + desc: The number of requests to the broker resulting in a failure + unit: "{request}" + + - bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec + metricAttribute: + type: const(produce) + mapping: + Count: + metric: kafka.request.failed + type: counter + desc: The number of requests to the broker resulting in a failure + unit: "{request}" + + - beans: + - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce + - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer + - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchFollower + metricAttribute: + type: param(request) + unit: ms + mapping: + Count: + metric: kafka.request.time.total + type: counter + desc: The total time the broker has taken to service requests + 50thPercentile: + metric: kafka.request.time.50p + type: gauge + desc: The 50th percentile time the broker has taken to service requests + 99thPercentile: + metric: kafka.request.time.99p + type: gauge + desc: The 99th percentile time the broker has taken to service requests + # Added + Mean: + metric: kafka.request.time.avg + type: gauge + desc: The average time the broker has taken to service requests + + - bean: kafka.network:type=RequestChannel,name=RequestQueueSize + mapping: + Value: + metric: kafka.request.queue + type: updowncounter + desc: Size of the request queue + unit: "{request}" + + # TODO: optimize kafka.network.io - use refs + - bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec + metricAttribute: + direction: const(in) + mapping: + Count: + metric: kafka.network.io + type: counter + desc: The bytes received or sent by the broker + unit: By + + - bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec + metricAttribute: + direction: const(out) + mapping: + Count: + metric: kafka.network.io + type: counter + desc: The bytes received or sent by the broker + unit: By + + - beans: + - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce + - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch + metricAttribute: + type: param(delayedOperation) + mapping: + Value: + metric: kafka.purgatory.size + type: updowncounter + desc: The number of requests waiting in purgatory + unit: "{request}" + + - bean: kafka.server:type=ReplicaManager,name=PartitionCount + mapping: + Value: + metric: kafka.partition.count + type: gauge + desc: The number of partitions on the broker + unit: "{partition}" + + - bean: kafka.controller:type=KafkaController,name=OfflinePartitionsCount + mapping: + Value: + metric: kafka.partition.offline + type: gauge + desc: The number of partitions offline + unit: "{partition}" + + - bean: kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions + mapping: + Value: + metric: kafka.partition.under_replicated + type: gauge + desc: The number of under replicated partitions + unit: "{partition}" + + - bean: kafka.server:type=ReplicaManager,name=IsrShrinksPerSec + metricAttribute: + operation: const(shrink) + mapping: + Count: + metric: kafka.isr.operation.count + type: counter + desc: The number of in-sync replica shrink and expand operations + unit: "{operation}" + + - bean: kafka.server:type=ReplicaManager,name=IsrExpandsPerSec + metricAttribute: + operation: const(expand) + mapping: + Count: + metric: kafka.isr.operation.count + type: counter + desc: The number of in-sync replica shrink and expand operations + unit: "{operation}" + + - bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica + mapping: + Value: + metric: kafka.lag.max + desc: The max lag in messages between follower and leader replicas + unit: "{message}" + + - bean: kafka.controller:type=KafkaController,name=ActiveControllerCount + mapping: + Value: + metric: kafka.controller.active.count + type: updowncounter + desc: The number of controllers active on the broker + unit: "{controller}" + + - bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs + mapping: + Count: + metric: kafka.leader.election.count + type: counter + desc: The leader election count + unit: "{election}" + + - bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec + mapping: + Count: + metric: kafka.leader.election.unclean.count + type: counter + desc: Unclean leader election count - increasing indicates broker failures + unit: "{election}" + + # Log metrics + + - bean: kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs + unit: ms + type: gauge + prefix: kafka.logs.flush. + mapping: + Count: + metric: count + unit: '{flush}' + type: counter + desc: Log flush count + 50thPercentile: + metric: time.50p + desc: Log flush time - 50th percentile + 99thPercentile: + metric: time.99p + desc: Log flush time - 99th percentile From 31c4fc4a6d51718ada4f816db73528b4f1cfbee5 Mon Sep 17 00:00:00 2001 From: robsunday Date: Tue, 4 Feb 2025 17:11:51 +0100 Subject: [PATCH 2/3] Units updated in groovy files Code cleanups Other code review related changes --- .../target_systems/KafkaIntegrationTest.java | 51 +++++++++--------- .../target-systems/kafka-consumer.groovy | 16 +++--- .../target-systems/kafka-producer.groovy | 16 +++--- .../resources/target-systems/kafka.groovy | 28 +++++----- .../kafka/KafkaConsumerIntegrationTest.java | 44 +++++++--------- .../kafka/KafkaContainerFactory.java | 5 -- .../kafka/KafkaIntegrationTest.java | 3 -- .../kafka/KafkaProducerIntegrationTest.java | 36 +++++-------- .../src/main/resources/kafka-consumer.yaml | 17 +++--- .../src/main/resources/kafka-producer.yaml | 8 +-- jmx-scraper/src/main/resources/kafka.yaml | 52 +++++++++---------- 11 files changed, 129 insertions(+), 147 deletions(-) diff --git a/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java b/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java index a54909b67..8e9fc4acb 100644 --- a/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java +++ b/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java @@ -140,25 +140,25 @@ List> kafkaBrokerAssertions() { metric, "kafka.partition.count", "The number of partitions on the broker", - "{partitions}"), + "{partition}"), metric -> assertGauge( metric, "kafka.partition.offline", "The number of partitions offline", - "{partitions}"), + "{partition}"), metric -> assertGauge( metric, "kafka.partition.under_replicated", "The number of under replicated partitions", - "{partitions}"), + "{partition}"), metric -> assertSumWithAttributes( metric, "kafka.isr.operation.count", "The number of in-sync replica shrink and expand operations", - "{operations}", + "{operation}", attrs -> attrs.containsOnly(entry("operation", "shrink")), attrs -> attrs.containsOnly(entry("operation", "expand"))), metric -> @@ -166,25 +166,25 @@ List> kafkaBrokerAssertions() { metric, "kafka.controller.active.count", "controller is active on broker", - "{controllers}"), + "{controller}"), metric -> assertSum( metric, "kafka.leader.election.rate", "leader election rate - increasing indicates broker failures", - "{elections}"), + "{election}"), metric -> assertGauge( metric, "kafka.max.lag", "max lag in messages between follower and leader replicas", - "{messages}"), + "{message}"), metric -> assertSum( metric, "kafka.unclean.election.rate", "unclean leader election rate - increasing indicates broker failures", - "{elections}")); + "{election}")); } static class KafkaBrokerTargetIntegrationTest extends KafkaIntegrationTest { @@ -235,52 +235,52 @@ void endToEnd() { metric, "kafka.consumer.bytes-consumed-rate", "The average number of bytes consumed per second", - "by", + "By", topics), metric -> assertKafkaGauge( metric, "kafka.consumer.fetch-rate", "The number of fetch requests for all topics per second", - "1"), + "{request}"), metric -> assertKafkaGauge( metric, "kafka.consumer.fetch-size-avg", "The average number of bytes fetched per request", - "by", + "By", topics), metric -> assertKafkaGauge( metric, "kafka.consumer.records-consumed-rate", "The average number of records consumed per second", - "1", + "{record}", topics), metric -> assertKafkaGauge( metric, "kafka.consumer.records-lag-max", "Number of messages the consumer lags behind the producer", - "1"), + "{record}"), metric -> assertKafkaGauge( metric, "kafka.consumer.total.bytes-consumed-rate", "The average number of bytes consumed for all topics per second", - "by"), + "By"), metric -> assertKafkaGauge( metric, "kafka.consumer.total.fetch-size-avg", "The average number of bytes fetched per request for all topics", - "by"), + "By"), metric -> assertKafkaGauge( metric, "kafka.consumer.total.records-consumed-rate", "The average number of records consumed for all topics per second", - "1")); + "{record}")); } } @@ -300,14 +300,14 @@ void endToEnd() { metric, "kafka.producer.byte-rate", "The average number of bytes sent per second for a topic", - "by", + "By", topics), metric -> assertKafkaGauge( metric, "kafka.producer.compression-rate", "The average compression rate of record batches for a topic", - "1", + "{ratio}", topics), metric -> assertKafkaGauge( @@ -320,27 +320,27 @@ void endToEnd() { metric, "kafka.producer.outgoing-byte-rate", "The average number of outgoing bytes sent per second to all servers", - "by"), + "By"), metric -> assertKafkaGauge( metric, "kafka.producer.record-error-rate", "The average per-second number of record sends that resulted in errors for a topic", - "1", + "{record}", topics), metric -> assertKafkaGauge( metric, "kafka.producer.record-retry-rate", "The average per-second number of retried record sends for a topic", - "1", + "{record}", topics), metric -> assertKafkaGauge( metric, "kafka.producer.record-send-rate", "The average number of records sent per second for a topic", - "1", + "{record}", topics), metric -> assertKafkaGauge( @@ -353,10 +353,13 @@ void endToEnd() { metric, "kafka.producer.request-rate", "The average number of requests sent per second", - "1"), + "{request}"), metric -> assertKafkaGauge( - metric, "kafka.producer.response-rate", "Responses received per second", "1")); + metric, + "kafka.producer.response-rate", + "Responses received per second", + "{response}")); } } diff --git a/jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy b/jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy index a2b8e3a74..dcd05a78e 100644 --- a/jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy +++ b/jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy @@ -16,45 +16,45 @@ def consumerFetchManagerMetrics = otel.mbeans("kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics") otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.fetch-rate", - "The number of fetch requests for all topics per second", "1", + "The number of fetch requests for all topics per second", "{request}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "fetch-rate", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.records-lag-max", - "Number of messages the consumer lags behind the producer", "1", + "Number of messages the consumer lags behind the producer", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "records-lag-max", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.bytes-consumed-rate", - "The average number of bytes consumed for all topics per second", "by", + "The average number of bytes consumed for all topics per second", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "bytes-consumed-rate", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.fetch-size-avg", - "The average number of bytes fetched per request for all topics", "by", + "The average number of bytes fetched per request for all topics", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "fetch-size-avg", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.records-consumed-rate", - "The average number of records consumed for all topics per second", "1", + "The average number of records consumed for all topics per second", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "records-consumed-rate", otel.&doubleValueCallback) def consumerFetchManagerMetricsByTopic = otel.mbeans("kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics") otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.bytes-consumed-rate", - "The average number of bytes consumed per second", "by", + "The average number of bytes consumed per second", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "bytes-consumed-rate", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.fetch-size-avg", - "The average number of bytes fetched per request", "by", + "The average number of bytes fetched per request", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "fetch-size-avg", otel.&doubleValueCallback) otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.records-consumed-rate", - "The average number of records consumed per second", "1", + "The average number of records consumed per second", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "records-consumed-rate", otel.&doubleValueCallback) diff --git a/jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy b/jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy index 1d9583f32..2b3d99154 100644 --- a/jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy +++ b/jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy @@ -20,7 +20,7 @@ otel.instrument(producerMetrics, "kafka.producer.io-wait-time-ns-avg", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "io-wait-time-ns-avg", otel.&doubleValueCallback) otel.instrument(producerMetrics, "kafka.producer.outgoing-byte-rate", - "The average number of outgoing bytes sent per second to all servers", "by", + "The average number of outgoing bytes sent per second to all servers", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "outgoing-byte-rate", otel.&doubleValueCallback) otel.instrument(producerMetrics, "kafka.producer.request-latency-avg", @@ -28,37 +28,37 @@ otel.instrument(producerMetrics, "kafka.producer.request-latency-avg", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "request-latency-avg", otel.&doubleValueCallback) otel.instrument(producerMetrics, "kafka.producer.request-rate", - "The average number of requests sent per second", "1", + "The average number of requests sent per second", "{request}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "request-rate", otel.&doubleValueCallback) otel.instrument(producerMetrics, "kafka.producer.response-rate", - "Responses received per second", "1", + "Responses received per second", "{response}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }], "response-rate", otel.&doubleValueCallback) def producerTopicMetrics = otel.mbeans("kafka.producer:client-id=*,topic=*,type=producer-topic-metrics") otel.instrument(producerTopicMetrics, "kafka.producer.byte-rate", - "The average number of bytes sent per second for a topic", "by", + "The average number of bytes sent per second for a topic", "By", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "byte-rate", otel.&doubleValueCallback) otel.instrument(producerTopicMetrics, "kafka.producer.compression-rate", - "The average compression rate of record batches for a topic", "1", + "The average compression rate of record batches for a topic", "{ratio}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "compression-rate", otel.&doubleValueCallback) otel.instrument(producerTopicMetrics, "kafka.producer.record-error-rate", - "The average per-second number of record sends that resulted in errors for a topic", "1", + "The average per-second number of record sends that resulted in errors for a topic", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "record-error-rate", otel.&doubleValueCallback) otel.instrument(producerTopicMetrics, "kafka.producer.record-retry-rate", - "The average per-second number of retried record sends for a topic", "1", + "The average per-second number of retried record sends for a topic", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "record-retry-rate", otel.&doubleValueCallback) otel.instrument(producerTopicMetrics, "kafka.producer.record-send-rate", - "The average number of records sent per second for a topic", "1", + "The average number of records sent per second for a topic", "{record}", ["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }, "topic" : { mbean -> mbean.name().getKeyProperty("topic") }], "record-send-rate", otel.&doubleValueCallback) diff --git a/jmx-metrics/src/main/resources/target-systems/kafka.groovy b/jmx-metrics/src/main/resources/target-systems/kafka.groovy index 456f28573..695feeab3 100644 --- a/jmx-metrics/src/main/resources/target-systems/kafka.groovy +++ b/jmx-metrics/src/main/resources/target-systems/kafka.groovy @@ -18,7 +18,7 @@ def messagesInPerSec = otel.mbean("kafka.server:type=BrokerTopicMetrics,name=Mes otel.instrument(messagesInPerSec, "kafka.message.count", "The number of messages received by the broker", - "{messages}", + "{message}", "Count", otel.&longCounterCallback) def requests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec", @@ -26,7 +26,7 @@ def requests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=TotalProd otel.instrument(requests, "kafka.request.count", "The number of requests received by the broker", - "{requests}", + "{request}", [ "type" : { mbean -> switch(mbean.name().getKeyProperty("name")) { case "TotalProduceRequestsPerSec": @@ -45,7 +45,7 @@ def failedRequests = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=Fai otel.instrument(failedRequests, "kafka.request.failed", "The number of requests to the broker resulting in a failure", - "{requests}", + "{request}", [ "type" : { mbean -> switch(mbean.name().getKeyProperty("name")) { case "FailedProduceRequestsPerSec": @@ -100,7 +100,7 @@ def network = otel.mbeans(["kafka.server:type=BrokerTopicMetrics,name=BytesInPer otel.instrument(network, "kafka.network.io", "The bytes received or sent by the broker", - "by", + "By", [ "state" : { mbean -> switch(mbean.name().getKeyProperty("name")) { case "BytesInPerSec": @@ -119,7 +119,7 @@ def purgatorySize = otel.mbeans(["kafka.server:type=DelayedOperationPurgatory,na otel.instrument(purgatorySize, "kafka.purgatory.size", "The number of requests waiting in purgatory", - "{requests}", + "{request}", [ "type" : { mbean -> mbean.name().getKeyProperty("delayedOperation").toLowerCase() }, ], @@ -129,21 +129,21 @@ def partitionCount = otel.mbean("kafka.server:type=ReplicaManager,name=Partition otel.instrument(partitionCount, "kafka.partition.count", "The number of partitions on the broker", - "{partitions}", + "{partition}", "Value", otel.&longValueCallback) def partitionOffline = otel.mbean("kafka.controller:type=KafkaController,name=OfflinePartitionsCount") otel.instrument(partitionOffline, "kafka.partition.offline", "The number of partitions offline", - "{partitions}", + "{partition}", "Value", otel.&longValueCallback) def partitionUnderReplicated = otel.mbean("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions") otel.instrument(partitionUnderReplicated, "kafka.partition.under_replicated", "The number of under replicated partitions", - "{partitions}", + "{partition}", "Value", otel.&longValueCallback) def isrOperations = otel.mbeans(["kafka.server:type=ReplicaManager,name=IsrShrinksPerSec", @@ -151,7 +151,7 @@ def isrOperations = otel.mbeans(["kafka.server:type=ReplicaManager,name=IsrShrin otel.instrument(isrOperations, "kafka.isr.operation.count", "The number of in-sync replica shrink and expand operations", - "{operations}", + "{operation}", [ "operation" : { mbean -> switch(mbean.name().getKeyProperty("name")) { case "IsrShrinksPerSec": @@ -168,23 +168,23 @@ otel.instrument(isrOperations, def maxLag = otel.mbean("kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica") otel.instrument(maxLag, "kafka.max.lag", "max lag in messages between follower and leader replicas", - "{messages}", "Value", otel.&longValueCallback) + "{message}", "Value", otel.&longValueCallback) def activeControllerCount = otel.mbean("kafka.controller:type=KafkaController,name=ActiveControllerCount") otel.instrument(activeControllerCount, "kafka.controller.active.count", "controller is active on broker", - "{controllers}", "Value", otel.&longValueCallback) + "{controller}", "Value", otel.&longValueCallback) def leaderElectionRate = otel.mbean("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs") otel.instrument(leaderElectionRate, "kafka.leader.election.rate", "leader election rate - increasing indicates broker failures", - "{elections}", "Count", otel.&longCounterCallback) + "{election}", "Count", otel.&longCounterCallback) def uncleanLeaderElections = otel.mbean("kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec") otel.instrument(uncleanLeaderElections, "kafka.unclean.election.rate", "unclean leader election rate - increasing indicates broker failures", - "{elections}", "Count", otel.&longCounterCallback) + "{election}", "Count", otel.&longCounterCallback) def requestQueueSize = otel.mbean("kafka.network:type=RequestChannel,name=RequestQueueSize") otel.instrument(requestQueueSize, "kafka.request.queue", "size of the request queue", - "{requests}", "Value", otel.&longValueCallback) + "{request}", "Value", otel.&longValueCallback) def logFlushRate = otel.mbean("kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs") otel.instrument(logFlushRate, "kafka.logs.flush.time.count", "log flush count", diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java index 245020b46..ed61aabbb 100644 --- a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaConsumerIntegrationTest.java @@ -14,6 +14,7 @@ import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer; import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer; +import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcher; import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier; import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest; import java.nio.file.Path; @@ -64,6 +65,10 @@ protected JmxScraperContainer customizeScraperContainer( @Override protected MetricsVerifier createMetricsVerifier() { + // TODO: change to follow semconv + AttributeMatcher clientIdAttribute = attributeWithAnyValue("client-id"); + AttributeMatcher topicAttribute = attribute("topic", "test-topic-1"); + return MetricsVerifier.create() .add( "kafka.consumer.fetch-rate", @@ -72,16 +77,7 @@ protected MetricsVerifier createMetricsVerifier() { .hasDescription("The number of fetch requests for all topics per second") .hasUnit("{request}") .isGauge() - .hasDataPointsWithOneAttribute( - attributeWithAnyValue("client.id"))) // changed to follow semconv - .add( - "kafka.consumer.records-lag-max", - metric -> - metric - .hasDescription("Number of messages the consumer lags behind the producer") - .hasUnit("{message}") - .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) .add( "kafka.consumer.total.bytes-consumed-rate", metric -> @@ -90,7 +86,7 @@ protected MetricsVerifier createMetricsVerifier() { "The average number of bytes consumed for all topics per second") .hasUnit("By") .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) .add( "kafka.consumer.total.fetch-size-avg", metric -> @@ -99,7 +95,7 @@ protected MetricsVerifier createMetricsVerifier() { "The average number of bytes fetched per request for all topics") .hasUnit("By") .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) .add( "kafka.consumer.total.records-consumed-rate", metric -> @@ -108,7 +104,15 @@ protected MetricsVerifier createMetricsVerifier() { "The average number of records consumed for all topics per second") .hasUnit("{record}") .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) + .add( + "kafka.consumer.records-lag-max", + metric -> + metric + .hasDescription("Number of messages the consumer lags behind the producer") + .hasUnit("{record}") + .isGauge() + .hasDataPointsWithOneAttribute(clientIdAttribute)) .add( "kafka.consumer.bytes-consumed-rate", metric -> @@ -116,10 +120,7 @@ protected MetricsVerifier createMetricsVerifier() { .hasDescription("The average number of bytes consumed per second") .hasUnit("By") .isGauge() - .hasDataPointsWithAttributes( - attributeGroup( - attributeWithAnyValue("client.id"), - attribute("topic", "test-topic-1")))) + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) .add( "kafka.consumer.fetch-size-avg", metric -> @@ -127,10 +128,7 @@ protected MetricsVerifier createMetricsVerifier() { .hasDescription("The average number of bytes fetched per request") .hasUnit("By") .isGauge() - .hasDataPointsWithAttributes( - attributeGroup( - attributeWithAnyValue("client.id"), - attribute("topic", "test-topic-1")))) + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) .add( "kafka.consumer.records-consumed-rate", metric -> @@ -139,8 +137,6 @@ protected MetricsVerifier createMetricsVerifier() { .hasUnit("{record}") .isGauge() .hasDataPointsWithAttributes( - attributeGroup( - attributeWithAnyValue("client.id"), - attribute("topic", "test-topic-1")))); + attributeGroup(clientIdAttribute, topicAttribute))); } } diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java index fc2d9eb09..8eb9432a5 100644 --- a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaContainerFactory.java @@ -28,17 +28,12 @@ public static GenericContainer createKafkaContainer() { .withEnv("ALLOW_PLAINTEXT_LISTENER", "yes") // Removed in 3.5.1 .withStartupTimeout(Duration.ofMinutes(2)) .withExposedPorts(KAFKA_PORT) - // .waitingFor(Wait.forListeningPorts(KAFKA_PORT)); .waitingFor( Wait.forLogMessage(".*KafkaServer.*started \\(kafka.server.KafkaServer\\).*", 1)); } public static GenericContainer createKafkaProducerContainer() { return new GenericContainer<>(KAFKA_DOCKER_IMAGE) - // .withCopyFileToContainer( - // MountableFile.forClasspathResource("kafka-producer.sh"), - // "/usr/bin/kafka-producer.sh") - // .withCommand("/usr/bin/kafka-producer.sh") .withCommand( "sh", "-c", diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java index 0d089f4c8..e20485e46 100644 --- a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java @@ -46,9 +46,6 @@ protected JmxScraperContainer customizeScraperContainer( @Override protected MetricsVerifier createMetricsVerifier() { AttributeMatcherGroup[] requestTypes = { - // attribute values are changed from lowercase to PascalCase - // because it is impossible to make them lowercase with YAML config - // TODO: What about const value attributes defined in YAML that are lowercase? attributeGroup(attribute("type", "Produce")), attributeGroup(attribute("type", "FetchFollower")), attributeGroup(attribute("type", "FetchConsumer")) diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java index 791c9d9c1..155cb9fc5 100644 --- a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaProducerIntegrationTest.java @@ -13,6 +13,7 @@ import static io.opentelemetry.contrib.jmxscraper.target_systems.kafka.KafkaContainerFactory.createZookeeperContainer; import io.opentelemetry.contrib.jmxscraper.JmxScraperContainer; +import io.opentelemetry.contrib.jmxscraper.assertions.AttributeMatcher; import io.opentelemetry.contrib.jmxscraper.target_systems.MetricsVerifier; import io.opentelemetry.contrib.jmxscraper.target_systems.TargetSystemIntegrationTest; import java.nio.file.Path; @@ -47,8 +48,6 @@ protected GenericContainer createTargetContainer(int jmxPort) { .withEnv("JMX_PORT", Integer.toString(jmxPort)) .withExposedPorts(jmxPort) .waitingFor(Wait.forListeningPorts(jmxPort)); - // .waitingFor(Wait.forLogMessage(".*Sending messages to test-topic-1.*", 1)); - // Container is started and ready for serve JMX metrics once it begins sending messages. } @Override @@ -59,7 +58,9 @@ protected JmxScraperContainer customizeScraperContainer( @Override protected MetricsVerifier createMetricsVerifier() { - String topic = "test-topic-1"; + // TODO: change to follow semconv + AttributeMatcher clientIdAttribute = attributeWithAnyValue("client-id"); + AttributeMatcher topicAttribute = attribute("topic", "test-topic-1"); return MetricsVerifier.create() .add( @@ -70,7 +71,7 @@ protected MetricsVerifier createMetricsVerifier() { "The average length of time the I/O thread spent waiting for a socket ready for reads or writes") .hasUnit("ns") .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) .add( "kafka.producer.outgoing-byte-rate", metric -> @@ -79,7 +80,7 @@ protected MetricsVerifier createMetricsVerifier() { "The average number of outgoing bytes sent per second to all servers") .hasUnit("By") .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) .add( "kafka.producer.request-latency-avg", metric -> @@ -87,7 +88,7 @@ protected MetricsVerifier createMetricsVerifier() { .hasDescription("The average request latency") .hasUnit("ms") .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) .add( "kafka.producer.request-rate", metric -> @@ -95,7 +96,7 @@ protected MetricsVerifier createMetricsVerifier() { .hasDescription("The average number of requests sent per second") .hasUnit("{request}") .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) .add( "kafka.producer.response-rate", metric -> @@ -103,7 +104,7 @@ protected MetricsVerifier createMetricsVerifier() { .hasDescription("Responses received per second") .hasUnit("{response}") .isGauge() - .hasDataPointsWithOneAttribute(attributeWithAnyValue("client.id"))) + .hasDataPointsWithOneAttribute(clientIdAttribute)) // Per topic metrics .add( @@ -113,9 +114,7 @@ protected MetricsVerifier createMetricsVerifier() { .hasDescription("The average number of bytes sent per second for a topic") .hasUnit("By") .isGauge() - .hasDataPointsWithAttributes( - attributeGroup( - attributeWithAnyValue("client.id"), attribute("topic", topic)))) + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) .add( "kafka.producer.compression-rate", metric -> @@ -124,9 +123,7 @@ protected MetricsVerifier createMetricsVerifier() { "The average compression rate of record batches for a topic, defined as the average ratio of the compressed batch size divided by the uncompressed size") .hasUnit("{ratio}") .isGauge() - .hasDataPointsWithAttributes( - attributeGroup( - attributeWithAnyValue("client.id"), attribute("topic", topic)))) + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) .add( "kafka.producer.record-error-rate", metric -> @@ -135,9 +132,7 @@ protected MetricsVerifier createMetricsVerifier() { "The average per-second number of record sends that resulted in errors for a topic") .hasUnit("{record}") .isGauge() - .hasDataPointsWithAttributes( - attributeGroup( - attributeWithAnyValue("client.id"), attribute("topic", topic)))) + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) .add( "kafka.producer.record-retry-rate", metric -> @@ -146,9 +141,7 @@ protected MetricsVerifier createMetricsVerifier() { "The average per-second number of retried record sends for a topic") .hasUnit("{record}") .isGauge() - .hasDataPointsWithAttributes( - attributeGroup( - attributeWithAnyValue("client.id"), attribute("topic", topic)))) + .hasDataPointsWithAttributes(attributeGroup(clientIdAttribute, topicAttribute))) .add( "kafka.producer.record-send-rate", metric -> @@ -157,7 +150,6 @@ protected MetricsVerifier createMetricsVerifier() { .hasUnit("{record}") .isGauge() .hasDataPointsWithAttributes( - attributeGroup( - attributeWithAnyValue("client.id"), attribute("topic", topic)))); + attributeGroup(clientIdAttribute, topicAttribute))); } } diff --git a/jmx-scraper/src/main/resources/kafka-consumer.yaml b/jmx-scraper/src/main/resources/kafka-consumer.yaml index b227559bb..0324c4b19 100644 --- a/jmx-scraper/src/main/resources/kafka-consumer.yaml +++ b/jmx-scraper/src/main/resources/kafka-consumer.yaml @@ -4,41 +4,42 @@ rules: - bean: kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics metricAttribute: - client.id: param(client-id) + client-id: param(client-id) prefix: kafka.consumer. type: gauge mapping: fetch-rate: desc: The number of fetch requests for all topics per second unit: '{request}' - records-lag-max: - desc: Number of messages the consumer lags behind the producer - unit: '{message}' bytes-consumed-rate: metric: total.bytes-consumed-rate desc: The average number of bytes consumed for all topics per second - unit: 'By' + unit: By fetch-size-avg: metric: total.fetch-size-avg desc: The average number of bytes fetched per request for all topics - unit: 'By' + unit: By records-consumed-rate: metric: total.records-consumed-rate desc: The average number of records consumed for all topics per second unit: '{record}' + records-lag-max: + desc: Number of messages the consumer lags behind the producer + unit: '{record}' - bean: kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics metricAttribute: - client.id: param(client-id) + client-id: param(client-id) topic: param(topic) prefix: kafka.consumer. - unit: By type: gauge mapping: bytes-consumed-rate: desc: The average number of bytes consumed per second + unit: By fetch-size-avg: desc: The average number of bytes fetched per request + unit: By records-consumed-rate: desc: The average number of records consumed per second unit: '{record}' diff --git a/jmx-scraper/src/main/resources/kafka-producer.yaml b/jmx-scraper/src/main/resources/kafka-producer.yaml index f556e50f2..f3d5ff69c 100644 --- a/jmx-scraper/src/main/resources/kafka-producer.yaml +++ b/jmx-scraper/src/main/resources/kafka-producer.yaml @@ -3,7 +3,7 @@ rules: - bean: kafka.producer:client-id=*,type=producer-metrics metricAttribute: - client.id: param(client-id) + client-id: param(client-id) prefix: kafka.producer. type: gauge mapping: @@ -26,10 +26,9 @@ rules: # per topic metrics - bean: kafka.producer:client-id=*,topic=*,type=producer-topic-metrics metricAttribute: - client.id: param(client-id) + client-id: param(client-id) topic: param(topic) prefix: kafka.producer. - unit: '{record}' type: gauge mapping: byte-rate: @@ -40,7 +39,10 @@ rules: unit: '{ratio}' record-error-rate: desc: The average per-second number of record sends that resulted in errors for a topic + unit: '{record}' record-retry-rate: desc: The average per-second number of retried record sends for a topic + unit: '{record}' record-send-rate: desc: The average number of records sent per second for a topic + unit: '{record}' diff --git a/jmx-scraper/src/main/resources/kafka.yaml b/jmx-scraper/src/main/resources/kafka.yaml index b70da6654..dbee26e80 100644 --- a/jmx-scraper/src/main/resources/kafka.yaml +++ b/jmx-scraper/src/main/resources/kafka.yaml @@ -10,48 +10,45 @@ rules: desc: The number of messages received by the broker unit: "{message}" - # TODO: optimize kafka.request.count - use refs - bean: kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec metricAttribute: - # TODO: Should we convert all constant values to be PascalCase for consistency with values from bean params? type: const(fetch) mapping: Count: - metric: kafka.request.count - type: counter - desc: The number of requests received by the broker - unit: "{request}" + metric: &metric kafka.request.count + type: &type counter + desc: &desc The number of requests received by the broker + unit: &unit "{request}" - bean: kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: - metric: kafka.request.count - type: counter - desc: The number of requests received by the broker - unit: "{request}" + metric: *metric + type: *type + desc: *desc + unit: *unit - # TODO: optimize kafka.request.failed - use refs - bean: kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec metricAttribute: type: const(fetch) mapping: Count: - metric: kafka.request.failed - type: counter - desc: The number of requests to the broker resulting in a failure - unit: "{request}" + metric: &metric kafka.request.failed + type: &type counter + desc: &desc The number of requests to the broker resulting in a failure + unit: &unit "{request}" - bean: kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec metricAttribute: type: const(produce) mapping: Count: - metric: kafka.request.failed - type: counter - desc: The number of requests to the broker resulting in a failure - unit: "{request}" + metric: *metric + type: *type + desc: *desc + unit: *unit - beans: - kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce @@ -87,26 +84,25 @@ rules: desc: Size of the request queue unit: "{request}" - # TODO: optimize kafka.network.io - use refs - bean: kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec metricAttribute: direction: const(in) mapping: Count: - metric: kafka.network.io - type: counter - desc: The bytes received or sent by the broker - unit: By + metric: &metric kafka.network.io + type: &type counter + desc: &desc The bytes received or sent by the broker + unit: &unit By - bean: kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec metricAttribute: direction: const(out) mapping: Count: - metric: kafka.network.io - type: counter - desc: The bytes received or sent by the broker - unit: By + metric: *metric + type: *type + desc: *desc + unit: *unit - beans: - kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce From 665c390149fb8c1d0e1c1a625a12b8a92005c4cc Mon Sep 17 00:00:00 2001 From: robsunday Date: Thu, 13 Feb 2025 17:14:02 +0100 Subject: [PATCH 3/3] JMX Scraper: reverted some metric name changes JMX Metrics: metric descriptions updated to match common convention --- .../target_systems/KafkaIntegrationTest.java | 8 ++-- .../resources/target-systems/kafka.groovy | 16 +++---- .../kafka/KafkaIntegrationTest.java | 44 ++++--------------- jmx-scraper/src/main/resources/kafka.yaml | 16 ++++--- 4 files changed, 30 insertions(+), 54 deletions(-) diff --git a/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java b/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java index 8e9fc4acb..f206ff1f8 100644 --- a/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java +++ b/jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java @@ -165,25 +165,25 @@ List> kafkaBrokerAssertions() { assertGauge( metric, "kafka.controller.active.count", - "controller is active on broker", + "For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.", "{controller}"), metric -> assertSum( metric, "kafka.leader.election.rate", - "leader election rate - increasing indicates broker failures", + "Leader election rate - increasing indicates broker failures", "{election}"), metric -> assertGauge( metric, "kafka.max.lag", - "max lag in messages between follower and leader replicas", + "Max lag in messages between follower and leader replicas", "{message}"), metric -> assertSum( metric, "kafka.unclean.election.rate", - "unclean leader election rate - increasing indicates broker failures", + "Unclean leader election rate - increasing indicates broker failures", "{election}")); } diff --git a/jmx-metrics/src/main/resources/target-systems/kafka.groovy b/jmx-metrics/src/main/resources/target-systems/kafka.groovy index 695feeab3..c759e688d 100644 --- a/jmx-metrics/src/main/resources/target-systems/kafka.groovy +++ b/jmx-metrics/src/main/resources/target-systems/kafka.groovy @@ -167,29 +167,29 @@ otel.instrument(isrOperations, def maxLag = otel.mbean("kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica") -otel.instrument(maxLag, "kafka.max.lag", "max lag in messages between follower and leader replicas", +otel.instrument(maxLag, "kafka.max.lag", "Max lag in messages between follower and leader replicas", "{message}", "Value", otel.&longValueCallback) def activeControllerCount = otel.mbean("kafka.controller:type=KafkaController,name=ActiveControllerCount") -otel.instrument(activeControllerCount, "kafka.controller.active.count", "controller is active on broker", +otel.instrument(activeControllerCount, "kafka.controller.active.count", "For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.", "{controller}", "Value", otel.&longValueCallback) def leaderElectionRate = otel.mbean("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs") -otel.instrument(leaderElectionRate, "kafka.leader.election.rate", "leader election rate - increasing indicates broker failures", +otel.instrument(leaderElectionRate, "kafka.leader.election.rate", "Leader election rate - increasing indicates broker failures", "{election}", "Count", otel.&longCounterCallback) def uncleanLeaderElections = otel.mbean("kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec") -otel.instrument(uncleanLeaderElections, "kafka.unclean.election.rate", "unclean leader election rate - increasing indicates broker failures", +otel.instrument(uncleanLeaderElections, "kafka.unclean.election.rate", "Unclean leader election rate - increasing indicates broker failures", "{election}", "Count", otel.&longCounterCallback) def requestQueueSize = otel.mbean("kafka.network:type=RequestChannel,name=RequestQueueSize") -otel.instrument(requestQueueSize, "kafka.request.queue", "size of the request queue", +otel.instrument(requestQueueSize, "kafka.request.queue", "Size of the request queue", "{request}", "Value", otel.&longValueCallback) def logFlushRate = otel.mbean("kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs") -otel.instrument(logFlushRate, "kafka.logs.flush.time.count", "log flush count", +otel.instrument(logFlushRate, "kafka.logs.flush.time.count", "Log flush count", "ms", "Count", otel.&longCounterCallback) -otel.instrument(logFlushRate, "kafka.logs.flush.time.median", "log flush time - 50th percentile", +otel.instrument(logFlushRate, "kafka.logs.flush.time.median", "Log flush time - 50th percentile", "ms", "50thPercentile", otel.&doubleValueCallback) -otel.instrument(logFlushRate, "kafka.logs.flush.time.99p", "log flush time - 99th percentile", +otel.instrument(logFlushRate, "kafka.logs.flush.time.99p", "Log flush time - 99th percentile", "ms", "99thPercentile", otel.&doubleValueCallback) diff --git a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java index e20485e46..f59040509 100644 --- a/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java +++ b/jmx-scraper/src/integrationTest/java/io/opentelemetry/contrib/jmxscraper/target_systems/kafka/KafkaIntegrationTest.java @@ -96,7 +96,7 @@ protected MetricsVerifier createMetricsVerifier() { metric .hasDescription("The number of requests waiting in purgatory") .hasUnit("{request}") - .isUpDownCounter() + .isGauge() .hasDataPointsWithAttributes( attributeGroup(attribute("type", "Produce")), attributeGroup(attribute("type", "Fetch")))) @@ -140,7 +140,7 @@ protected MetricsVerifier createMetricsVerifier() { metric .hasDescription("Size of the request queue") .hasUnit("{request}") - .isUpDownCounter() + .isGauge() .hasDataPointsWithoutAttributes()) .add( "kafka.partition.count", @@ -180,12 +180,13 @@ protected MetricsVerifier createMetricsVerifier() { "kafka.controller.active.count", metric -> metric - .hasDescription("The number of controllers active on the broker") // CHANGED + .hasDescription( + "For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.") .hasUnit("{controller}") - .isUpDownCounter() // CHANGED + .isGauge() .hasDataPointsWithoutAttributes()) .add( - "kafka.leader.election.count", // CHANGED from "kafka.leader.election.rate" + "kafka.leader.election.rate", metric -> metric .hasDescription("The leader election count") @@ -193,7 +194,7 @@ protected MetricsVerifier createMetricsVerifier() { .isCounter() .hasDataPointsWithoutAttributes()) .add( - "kafka.leader.election.unclean.count", // CHANGED from "kafka.unclean.election.rate" + "kafka.unclean.election.rate", metric -> metric .hasDescription( @@ -202,40 +203,13 @@ protected MetricsVerifier createMetricsVerifier() { .isCounter() .hasDataPointsWithoutAttributes()) .add( - "kafka.lag.max", // CHANGED from "kafka.max.lag" + "kafka.max.lag", metric -> metric .hasDescription( "The max lag in messages between follower and leader replicas") // CHANGED .hasUnit("{message}") .isGauge() - .hasDataPointsWithoutAttributes()) - - // TODO: Find out how to force Kafka to generate these metrics - // .add( - // "kafka.logs.flush.count", - // metric -> - // metric - // .hasDescription("Log flush count") - // .hasUnit("{flush}") - // .isCounter() - // .hasDataPointsWithoutAttributes()) - // .add( - // "kafka.logs.flush.time.median", - // metric -> - // metric - // .hasDescription("Log flush time - 50th percentile") - // .hasUnit("ms") - // .isGauge() - // .hasDataPointsWithoutAttributes()) - // .add( - // "kafka.logs.flush.time.99p", - // metric -> - // metric - // .hasDescription("Log flush time - 99th percentile") - // .hasUnit("ms") - // .isGauge() - // .hasDataPointsWithoutAttributes()) - ; + .hasDataPointsWithoutAttributes()); } } diff --git a/jmx-scraper/src/main/resources/kafka.yaml b/jmx-scraper/src/main/resources/kafka.yaml index dbee26e80..cf737aa56 100644 --- a/jmx-scraper/src/main/resources/kafka.yaml +++ b/jmx-scraper/src/main/resources/kafka.yaml @@ -80,7 +80,7 @@ rules: mapping: Value: metric: kafka.request.queue - type: updowncounter + type: gauge desc: Size of the request queue unit: "{request}" @@ -112,7 +112,7 @@ rules: mapping: Value: metric: kafka.purgatory.size - type: updowncounter + type: gauge desc: The number of requests waiting in purgatory unit: "{request}" @@ -163,22 +163,24 @@ rules: - bean: kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica mapping: Value: - metric: kafka.lag.max + metric: kafka.max.lag + type: gauge desc: The max lag in messages between follower and leader replicas unit: "{message}" + - bean: kafka.controller:type=KafkaController,name=ActiveControllerCount mapping: Value: metric: kafka.controller.active.count - type: updowncounter - desc: The number of controllers active on the broker + type: gauge + desc: For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker. unit: "{controller}" - bean: kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs mapping: Count: - metric: kafka.leader.election.count + metric: kafka.leader.election.rate type: counter desc: The leader election count unit: "{election}" @@ -186,7 +188,7 @@ rules: - bean: kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec mapping: Count: - metric: kafka.leader.election.unclean.count + metric: kafka.unclean.election.rate type: counter desc: Unclean leader election count - increasing indicates broker failures unit: "{election}"