Skip to content

Commit 24d191b

Browse files
robsundaybreedx-splk
authored andcommitted
JMX Scraper: Kafka server, producer and consumer YAMLs and integration tests added (open-telemetry#1670)
1 parent 20dd724 commit 24d191b

File tree

13 files changed

+986
-71
lines changed

13 files changed

+986
-71
lines changed

jmx-metrics/src/integrationTest/java/io/opentelemetry/contrib/jmxmetrics/target_systems/KafkaIntegrationTest.java

+31-28
Original file line numberDiff line numberDiff line change
@@ -140,51 +140,51 @@ List<Consumer<Metric>> kafkaBrokerAssertions() {
140140
metric,
141141
"kafka.partition.count",
142142
"The number of partitions on the broker",
143-
"{partitions}"),
143+
"{partition}"),
144144
metric ->
145145
assertGauge(
146146
metric,
147147
"kafka.partition.offline",
148148
"The number of partitions offline",
149-
"{partitions}"),
149+
"{partition}"),
150150
metric ->
151151
assertGauge(
152152
metric,
153153
"kafka.partition.under_replicated",
154154
"The number of under replicated partitions",
155-
"{partitions}"),
155+
"{partition}"),
156156
metric ->
157157
assertSumWithAttributes(
158158
metric,
159159
"kafka.isr.operation.count",
160160
"The number of in-sync replica shrink and expand operations",
161-
"{operations}",
161+
"{operation}",
162162
attrs -> attrs.containsOnly(entry("operation", "shrink")),
163163
attrs -> attrs.containsOnly(entry("operation", "expand"))),
164164
metric ->
165165
assertGauge(
166166
metric,
167167
"kafka.controller.active.count",
168-
"controller is active on broker",
169-
"{controllers}"),
168+
"For KRaft mode, the number of active controllers in the cluster. For ZooKeeper, indicates whether the broker is the controller broker.",
169+
"{controller}"),
170170
metric ->
171171
assertSum(
172172
metric,
173173
"kafka.leader.election.rate",
174-
"leader election rate - increasing indicates broker failures",
175-
"{elections}"),
174+
"Leader election rate - increasing indicates broker failures",
175+
"{election}"),
176176
metric ->
177177
assertGauge(
178178
metric,
179179
"kafka.max.lag",
180-
"max lag in messages between follower and leader replicas",
181-
"{messages}"),
180+
"Max lag in messages between follower and leader replicas",
181+
"{message}"),
182182
metric ->
183183
assertSum(
184184
metric,
185185
"kafka.unclean.election.rate",
186-
"unclean leader election rate - increasing indicates broker failures",
187-
"{elections}"));
186+
"Unclean leader election rate - increasing indicates broker failures",
187+
"{election}"));
188188
}
189189

190190
static class KafkaBrokerTargetIntegrationTest extends KafkaIntegrationTest {
@@ -235,52 +235,52 @@ void endToEnd() {
235235
metric,
236236
"kafka.consumer.bytes-consumed-rate",
237237
"The average number of bytes consumed per second",
238-
"by",
238+
"By",
239239
topics),
240240
metric ->
241241
assertKafkaGauge(
242242
metric,
243243
"kafka.consumer.fetch-rate",
244244
"The number of fetch requests for all topics per second",
245-
"1"),
245+
"{request}"),
246246
metric ->
247247
assertKafkaGauge(
248248
metric,
249249
"kafka.consumer.fetch-size-avg",
250250
"The average number of bytes fetched per request",
251-
"by",
251+
"By",
252252
topics),
253253
metric ->
254254
assertKafkaGauge(
255255
metric,
256256
"kafka.consumer.records-consumed-rate",
257257
"The average number of records consumed per second",
258-
"1",
258+
"{record}",
259259
topics),
260260
metric ->
261261
assertKafkaGauge(
262262
metric,
263263
"kafka.consumer.records-lag-max",
264264
"Number of messages the consumer lags behind the producer",
265-
"1"),
265+
"{record}"),
266266
metric ->
267267
assertKafkaGauge(
268268
metric,
269269
"kafka.consumer.total.bytes-consumed-rate",
270270
"The average number of bytes consumed for all topics per second",
271-
"by"),
271+
"By"),
272272
metric ->
273273
assertKafkaGauge(
274274
metric,
275275
"kafka.consumer.total.fetch-size-avg",
276276
"The average number of bytes fetched per request for all topics",
277-
"by"),
277+
"By"),
278278
metric ->
279279
assertKafkaGauge(
280280
metric,
281281
"kafka.consumer.total.records-consumed-rate",
282282
"The average number of records consumed for all topics per second",
283-
"1"));
283+
"{record}"));
284284
}
285285
}
286286

@@ -300,14 +300,14 @@ void endToEnd() {
300300
metric,
301301
"kafka.producer.byte-rate",
302302
"The average number of bytes sent per second for a topic",
303-
"by",
303+
"By",
304304
topics),
305305
metric ->
306306
assertKafkaGauge(
307307
metric,
308308
"kafka.producer.compression-rate",
309309
"The average compression rate of record batches for a topic",
310-
"1",
310+
"{ratio}",
311311
topics),
312312
metric ->
313313
assertKafkaGauge(
@@ -320,27 +320,27 @@ void endToEnd() {
320320
metric,
321321
"kafka.producer.outgoing-byte-rate",
322322
"The average number of outgoing bytes sent per second to all servers",
323-
"by"),
323+
"By"),
324324
metric ->
325325
assertKafkaGauge(
326326
metric,
327327
"kafka.producer.record-error-rate",
328328
"The average per-second number of record sends that resulted in errors for a topic",
329-
"1",
329+
"{record}",
330330
topics),
331331
metric ->
332332
assertKafkaGauge(
333333
metric,
334334
"kafka.producer.record-retry-rate",
335335
"The average per-second number of retried record sends for a topic",
336-
"1",
336+
"{record}",
337337
topics),
338338
metric ->
339339
assertKafkaGauge(
340340
metric,
341341
"kafka.producer.record-send-rate",
342342
"The average number of records sent per second for a topic",
343-
"1",
343+
"{record}",
344344
topics),
345345
metric ->
346346
assertKafkaGauge(
@@ -353,10 +353,13 @@ void endToEnd() {
353353
metric,
354354
"kafka.producer.request-rate",
355355
"The average number of requests sent per second",
356-
"1"),
356+
"{request}"),
357357
metric ->
358358
assertKafkaGauge(
359-
metric, "kafka.producer.response-rate", "Responses received per second", "1"));
359+
metric,
360+
"kafka.producer.response-rate",
361+
"Responses received per second",
362+
"{response}"));
360363
}
361364
}
362365

jmx-metrics/src/main/resources/target-systems/kafka-consumer.groovy

+8-8
Original file line numberDiff line numberDiff line change
@@ -16,45 +16,45 @@
1616

1717
def consumerFetchManagerMetrics = otel.mbeans("kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics")
1818
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.fetch-rate",
19-
"The number of fetch requests for all topics per second", "1",
19+
"The number of fetch requests for all topics per second", "{request}",
2020
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
2121
"fetch-rate", otel.&doubleValueCallback)
2222

2323
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.records-lag-max",
24-
"Number of messages the consumer lags behind the producer", "1",
24+
"Number of messages the consumer lags behind the producer", "{record}",
2525
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
2626
"records-lag-max", otel.&doubleValueCallback)
2727

2828
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.bytes-consumed-rate",
29-
"The average number of bytes consumed for all topics per second", "by",
29+
"The average number of bytes consumed for all topics per second", "By",
3030
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
3131
"bytes-consumed-rate", otel.&doubleValueCallback)
3232

3333
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.fetch-size-avg",
34-
"The average number of bytes fetched per request for all topics", "by",
34+
"The average number of bytes fetched per request for all topics", "By",
3535
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
3636
"fetch-size-avg", otel.&doubleValueCallback)
3737

3838
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.records-consumed-rate",
39-
"The average number of records consumed for all topics per second", "1",
39+
"The average number of records consumed for all topics per second", "{record}",
4040
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
4141
"records-consumed-rate", otel.&doubleValueCallback)
4242

4343
def consumerFetchManagerMetricsByTopic = otel.mbeans("kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics")
4444
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.bytes-consumed-rate",
45-
"The average number of bytes consumed per second", "by",
45+
"The average number of bytes consumed per second", "By",
4646
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
4747
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
4848
"bytes-consumed-rate", otel.&doubleValueCallback)
4949

5050
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.fetch-size-avg",
51-
"The average number of bytes fetched per request", "by",
51+
"The average number of bytes fetched per request", "By",
5252
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
5353
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
5454
"fetch-size-avg", otel.&doubleValueCallback)
5555

5656
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.records-consumed-rate",
57-
"The average number of records consumed per second", "1",
57+
"The average number of records consumed per second", "{record}",
5858
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
5959
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
6060
"records-consumed-rate", otel.&doubleValueCallback)

jmx-metrics/src/main/resources/target-systems/kafka-producer.groovy

+8-8
Original file line numberDiff line numberDiff line change
@@ -20,45 +20,45 @@ otel.instrument(producerMetrics, "kafka.producer.io-wait-time-ns-avg",
2020
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
2121
"io-wait-time-ns-avg", otel.&doubleValueCallback)
2222
otel.instrument(producerMetrics, "kafka.producer.outgoing-byte-rate",
23-
"The average number of outgoing bytes sent per second to all servers", "by",
23+
"The average number of outgoing bytes sent per second to all servers", "By",
2424
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
2525
"outgoing-byte-rate", otel.&doubleValueCallback)
2626
otel.instrument(producerMetrics, "kafka.producer.request-latency-avg",
2727
"The average request latency", "ms",
2828
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
2929
"request-latency-avg", otel.&doubleValueCallback)
3030
otel.instrument(producerMetrics, "kafka.producer.request-rate",
31-
"The average number of requests sent per second", "1",
31+
"The average number of requests sent per second", "{request}",
3232
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
3333
"request-rate", otel.&doubleValueCallback)
3434
otel.instrument(producerMetrics, "kafka.producer.response-rate",
35-
"Responses received per second", "1",
35+
"Responses received per second", "{response}",
3636
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
3737
"response-rate", otel.&doubleValueCallback)
3838

3939
def producerTopicMetrics = otel.mbeans("kafka.producer:client-id=*,topic=*,type=producer-topic-metrics")
4040
otel.instrument(producerTopicMetrics, "kafka.producer.byte-rate",
41-
"The average number of bytes sent per second for a topic", "by",
41+
"The average number of bytes sent per second for a topic", "By",
4242
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
4343
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
4444
"byte-rate", otel.&doubleValueCallback)
4545
otel.instrument(producerTopicMetrics, "kafka.producer.compression-rate",
46-
"The average compression rate of record batches for a topic", "1",
46+
"The average compression rate of record batches for a topic", "{ratio}",
4747
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
4848
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
4949
"compression-rate", otel.&doubleValueCallback)
5050
otel.instrument(producerTopicMetrics, "kafka.producer.record-error-rate",
51-
"The average per-second number of record sends that resulted in errors for a topic", "1",
51+
"The average per-second number of record sends that resulted in errors for a topic", "{record}",
5252
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
5353
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
5454
"record-error-rate", otel.&doubleValueCallback)
5555
otel.instrument(producerTopicMetrics, "kafka.producer.record-retry-rate",
56-
"The average per-second number of retried record sends for a topic", "1",
56+
"The average per-second number of retried record sends for a topic", "{record}",
5757
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
5858
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
5959
"record-retry-rate", otel.&doubleValueCallback)
6060
otel.instrument(producerTopicMetrics, "kafka.producer.record-send-rate",
61-
"The average number of records sent per second for a topic", "1",
61+
"The average number of records sent per second for a topic", "{record}",
6262
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
6363
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
6464
"record-send-rate", otel.&doubleValueCallback)

0 commit comments

Comments
 (0)