|
18 | 18 | import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
19 | 19 | import io.opentelemetry.sdk.metrics.data.MetricData;
|
20 | 20 | import io.opentelemetry.sdk.metrics.data.PointData;
|
| 21 | +import java.lang.reflect.Field; |
| 22 | +import java.lang.reflect.Method; |
21 | 23 | import java.nio.charset.StandardCharsets;
|
22 | 24 | import java.time.Duration;
|
23 | 25 | import java.time.Instant;
|
|
33 | 35 | import org.apache.kafka.clients.CommonClientConfigs;
|
34 | 36 | import org.apache.kafka.clients.consumer.ConsumerConfig;
|
35 | 37 | import org.apache.kafka.clients.consumer.KafkaConsumer;
|
36 |
| -import org.apache.kafka.clients.consumer.KafkaConsumerAccess; |
37 | 38 | import org.apache.kafka.clients.producer.KafkaProducer;
|
38 |
| -import org.apache.kafka.clients.producer.KafkaProducerAccess; |
39 | 39 | import org.apache.kafka.clients.producer.ProducerConfig;
|
40 | 40 | import org.apache.kafka.clients.producer.ProducerRecord;
|
41 | 41 | import org.apache.kafka.common.MetricName;
|
42 | 42 | import org.apache.kafka.common.metrics.KafkaMetric;
|
| 43 | +import org.apache.kafka.common.metrics.Metrics; |
43 | 44 | import org.apache.kafka.common.metrics.MetricsReporter;
|
44 | 45 | import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
45 | 46 | import org.apache.kafka.common.serialization.ByteArraySerializer;
|
@@ -139,14 +140,36 @@ protected Map<String, Object> consumerConfig() {
|
139 | 140 |
|
140 | 141 | @Test
|
141 | 142 | void noDuplicateMetricsReporter() {
|
142 |
| - List<MetricsReporter> producerMetricsReporters = |
143 |
| - KafkaProducerAccess.getMetricsReporters(producer); |
| 143 | + List<MetricsReporter> producerMetricsReporters = getMetricsReporters(producer); |
144 | 144 | assertThat(countOpenTelemetryMetricsReporters(producerMetricsReporters)).isEqualTo(1);
|
145 |
| - List<MetricsReporter> consumerMetricsReporters = |
146 |
| - KafkaConsumerAccess.getMetricsReporters(consumer); |
| 145 | + List<MetricsReporter> consumerMetricsReporters = getMetricsReporters(consumer); |
147 | 146 | assertThat(countOpenTelemetryMetricsReporters(consumerMetricsReporters)).isEqualTo(1);
|
148 | 147 | }
|
149 | 148 |
|
| 149 | + private static List<MetricsReporter> getMetricsReporters(Object producerOrConsumer) { |
| 150 | + return getMetricsRegistry(producerOrConsumer).reporters(); |
| 151 | + } |
| 152 | + |
| 153 | + private static Metrics getMetricsRegistry(Object producerOrConsumer) { |
| 154 | + Class<?> clazz = producerOrConsumer.getClass(); |
| 155 | + try { |
| 156 | + Field field = clazz.getDeclaredField("metrics"); |
| 157 | + field.setAccessible(true); |
| 158 | + return (Metrics) field.get(producerOrConsumer); |
| 159 | + } catch (Exception ignored) { |
| 160 | + // Ignore |
| 161 | + } |
| 162 | + try { |
| 163 | + Method method = clazz.getDeclaredMethod("metricsRegistry"); |
| 164 | + method.setAccessible(true); |
| 165 | + return (Metrics) method.invoke(producerOrConsumer); |
| 166 | + } catch (Exception ignored) { |
| 167 | + // Ignore |
| 168 | + } |
| 169 | + throw new IllegalStateException( |
| 170 | + "Failed to get metrics registry from " + producerOrConsumer.getClass().getName()); |
| 171 | + } |
| 172 | + |
150 | 173 | private static long countOpenTelemetryMetricsReporters(List<MetricsReporter> metricsReporters) {
|
151 | 174 | return metricsReporters.stream()
|
152 | 175 | .filter(reporter -> reporter.getClass().getName().endsWith("OpenTelemetryMetricsReporter"))
|
@@ -176,8 +199,6 @@ void observeMetrics() {
|
176 | 199 | "kafka.consumer.join_total",
|
177 | 200 | "kafka.consumer.last_heartbeat_seconds_ago",
|
178 | 201 | "kafka.consumer.last_rebalance_seconds_ago",
|
179 |
| - "kafka.consumer.partition_assigned_latency_avg", |
180 |
| - "kafka.consumer.partition_assigned_latency_max", |
181 | 202 | "kafka.consumer.rebalance_latency_avg",
|
182 | 203 | "kafka.consumer.rebalance_latency_max",
|
183 | 204 | "kafka.consumer.rebalance_latency_total",
|
|
0 commit comments