Skip to content

Commit 06bd699

Browse files
authored
Fix kafka latest dep tests (#13544)
1 parent 3b6d6e3 commit 06bd699

File tree

9 files changed

+80
-24
lines changed

9 files changed

+80
-24
lines changed

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,7 @@ void testKafkaProducerAndConsumerSpan(boolean testHeaders) throws Exception {
6565
});
6666

6767
awaitUntilConsumerIsReady();
68-
@SuppressWarnings("PreferJavaTimeOverload")
69-
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
68+
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
7069
assertThat(records.count()).isEqualTo(1);
7170

7271
// iterate over records to generate spans
@@ -118,8 +117,7 @@ void testPassThroughTombstone()
118117
throws ExecutionException, InterruptedException, TimeoutException {
119118
producer.send(new ProducerRecord<>(SHARED_TOPIC, null)).get(5, TimeUnit.SECONDS);
120119
awaitUntilConsumerIsReady();
121-
@SuppressWarnings("PreferJavaTimeOverload")
122-
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
120+
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
123121
assertThat(records.count()).isEqualTo(1);
124122

125123
// iterate over records to generate spans
@@ -168,8 +166,7 @@ void testRecordsWithTopicPartitionKafkaConsume()
168166
testing.waitForTraces(1);
169167

170168
awaitUntilConsumerIsReady();
171-
@SuppressWarnings("PreferJavaTimeOverload")
172-
ConsumerRecords<?, ?> consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis());
169+
ConsumerRecords<?, ?> consumerRecords = poll(Duration.ofSeconds(5));
173170
List<? extends ConsumerRecord<?, ?>> recordsInPartition =
174171
consumerRecords.records(KafkaClientBaseTest.topicPartition);
175172
assertThat(recordsInPartition.size()).isEqualTo(1);

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientPropagationDisabledTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ void testReadRemoteContextWhenPropagationIsDisabled() throws InterruptedExceptio
4141

4242
awaitUntilConsumerIsReady();
4343

44-
@SuppressWarnings("PreferJavaTimeOverload")
45-
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
44+
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
4645
assertThat(records.count()).isEqualTo(1);
4746

4847
// iterate over records to generate spans

instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientSuppressReceiveSpansTest.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,7 @@ void testKafkaProduceAndConsume() throws InterruptedException {
5858

5959
awaitUntilConsumerIsReady();
6060
// check that the message was received
61-
@SuppressWarnings("PreferJavaTimeOverload")
62-
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
61+
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
6362
for (ConsumerRecord<?, ?> record : records) {
6463
testing.runWithSpan(
6564
"processing",
@@ -99,8 +98,7 @@ void testPassThroughTombstone()
9998
throws ExecutionException, InterruptedException, TimeoutException {
10099
producer.send(new ProducerRecord<>(SHARED_TOPIC, null)).get(5, TimeUnit.SECONDS);
101100
awaitUntilConsumerIsReady();
102-
@SuppressWarnings("PreferJavaTimeOverload")
103-
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
101+
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
104102
assertThat(records.count()).isEqualTo(1);
105103

106104
// iterate over records to generate spans
@@ -136,8 +134,7 @@ void testRecordsWithTopicPartitionKafkaConsume()
136134
testing.waitForTraces(1);
137135

138136
awaitUntilConsumerIsReady();
139-
@SuppressWarnings("PreferJavaTimeOverload")
140-
ConsumerRecords<?, ?> consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis());
137+
ConsumerRecords<?, ?> consumerRecords = poll(Duration.ofSeconds(5));
141138
List<? extends ConsumerRecord<?, ?>> recordsInPartition =
142139
consumerRecords.records(KafkaClientBaseTest.topicPartition);
143140
assertThat(recordsInPartition.size()).isEqualTo(1);

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/AbstractOpenTelemetryMetricsReporterTest.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,8 @@ void observeMetrics() {
268268
"kafka.consumer.io_time_ns_avg",
269269
"kafka.consumer.io_wait_ratio",
270270
"kafka.consumer.io_wait_time_ns_avg",
271-
"kafka.consumer.io_waittime_total",
272-
"kafka.consumer.iotime_total",
271+
// "kafka.consumer.io_waittime_total",
272+
// "kafka.consumer.iotime_total",
273273
"kafka.consumer.last_poll_seconds_ago",
274274
"kafka.consumer.network_io_rate",
275275
"kafka.consumer.network_io_total",
@@ -300,7 +300,7 @@ void observeMetrics() {
300300
"kafka.producer.buffer_exhausted_total",
301301
"kafka.producer.buffer_total_bytes",
302302
"kafka.producer.bufferpool_wait_ratio",
303-
"kafka.producer.bufferpool_wait_time_total",
303+
// "kafka.producer.bufferpool_wait_time_total",
304304
"kafka.producer.compression_rate_avg",
305305
"kafka.producer.connection_close_rate",
306306
"kafka.producer.connection_close_total",
@@ -317,8 +317,8 @@ void observeMetrics() {
317317
"kafka.producer.io_time_ns_avg",
318318
"kafka.producer.io_wait_ratio",
319319
"kafka.producer.io_wait_time_ns_avg",
320-
"kafka.producer.io_waittime_total",
321-
"kafka.producer.iotime_total",
320+
// "kafka.producer.io_waittime_total",
321+
// "kafka.producer.iotime_total",
322322
"kafka.producer.metadata_age",
323323
"kafka.producer.network_io_rate",
324324
"kafka.producer.network_io_total",
@@ -404,7 +404,7 @@ private static void consumeRecords() {
404404
consumer.subscribe(TOPICS);
405405
Instant stopTime = Instant.now().plusSeconds(10);
406406
while (Instant.now().isBefore(stopTime)) {
407-
consumer.poll(1_000);
407+
KafkaTestUtil.poll(consumer, Duration.ofSeconds(1));
408408
}
409409
}
410410

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.kafka.clients.admin.NewTopic;
3838
import org.apache.kafka.clients.consumer.Consumer;
3939
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
40+
import org.apache.kafka.clients.consumer.ConsumerRecords;
4041
import org.apache.kafka.clients.consumer.KafkaConsumer;
4142
import org.apache.kafka.clients.producer.KafkaProducer;
4243
import org.apache.kafka.clients.producer.Producer;
@@ -147,13 +148,12 @@ void cleanupClass() {
147148
kafka.stop();
148149
}
149150

150-
@SuppressWarnings("PreferJavaTimeOverload")
151151
public void awaitUntilConsumerIsReady() throws InterruptedException {
152152
if (consumerReady.await(0, TimeUnit.SECONDS)) {
153153
return;
154154
}
155155
for (int i = 0; i < 10; i++) {
156-
consumer.poll(0);
156+
poll(Duration.ofMillis(100));
157157
if (consumerReady.await(1, TimeUnit.SECONDS)) {
158158
break;
159159
}
@@ -164,6 +164,10 @@ public void awaitUntilConsumerIsReady() throws InterruptedException {
164164
consumer.seekToBeginning(Collections.emptyList());
165165
}
166166

167+
public ConsumerRecords<Integer, String> poll(Duration duration) {
168+
return KafkaTestUtil.poll(consumer, duration);
169+
}
170+
167171
@SuppressWarnings("deprecation") // using deprecated semconv
168172
protected static List<AttributeAssertion> sendAttributes(
169173
String messageKey, String messageValue, boolean testHeaders) {

instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientPropagationBaseTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ void testClientHeaderPropagationManualConfig() throws InterruptedException {
2626

2727
awaitUntilConsumerIsReady();
2828
// check that the message was received
29-
ConsumerRecords<?, ?> records = consumer.poll(Duration.ofSeconds(5).toMillis());
29+
ConsumerRecords<?, ?> records = poll(Duration.ofSeconds(5));
3030
assertThat(records.count()).isEqualTo(1);
3131
for (ConsumerRecord<?, ?> record : records) {
3232
assertThat(record.headers().iterator().hasNext()).isEqualTo(producerPropagationEnabled);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal;
7+
8+
import java.lang.reflect.Method;
9+
import java.time.Duration;
10+
import org.apache.kafka.clients.consumer.Consumer;
11+
import org.apache.kafka.clients.consumer.ConsumerRecords;
12+
13+
@SuppressWarnings("OtelInternalJavadoc")
14+
public class KafkaTestUtil {
15+
private static final Method consumerPollDurationMethod = getConsumerPollDurationMethod();
16+
17+
private static Method getConsumerPollDurationMethod() {
18+
try {
19+
return Consumer.class.getMethod("poll", Duration.class);
20+
} catch (NoSuchMethodException e) {
21+
return null;
22+
}
23+
}
24+
25+
@SuppressWarnings("unchecked")
26+
public static <K, V> ConsumerRecords<K, V> poll(Consumer<K, V> consumer, Duration duration) {
27+
// not present in early versions
28+
if (consumerPollDurationMethod != null) {
29+
try {
30+
return (ConsumerRecords<K, V>) consumerPollDurationMethod.invoke(consumer, duration);
31+
} catch (Exception exception) {
32+
throw new IllegalStateException(exception);
33+
}
34+
}
35+
// not present in 4.x
36+
return consumer.poll(duration.toMillis());
37+
}
38+
39+
private KafkaTestUtil() {}
40+
}

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ plugins {
33
}
44

55
dependencies {
6+
compileOnly(project(":muzzle"))
67
compileOnly("org.apache.kafka:kafka-clients:0.11.0.0")
78
compileOnly("com.google.auto.value:auto-value-annotations")
89
annotationProcessor("com.google.auto.value:auto-value")

instrumentation/kafka/kafka-clients/kafka-clients-common-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaPropagation.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import io.opentelemetry.api.GlobalOpenTelemetry;
99
import io.opentelemetry.context.Context;
10+
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
1011
import org.apache.kafka.clients.ApiVersions;
1112
import org.apache.kafka.clients.producer.ProducerRecord;
1213
import org.apache.kafka.common.record.RecordBatch;
@@ -18,6 +19,7 @@
1819
public final class KafkaPropagation {
1920

2021
private static final KafkaHeadersSetter SETTER = KafkaHeadersSetter.INSTANCE;
22+
private static final boolean hasMaxUsableProduceMagic = hasMaxUsableProduceMagic();
2123

2224
// Do not inject headers for batch versions below 2
2325
// This is how similar check is being done in Kafka client itself:
@@ -27,7 +29,23 @@ public final class KafkaPropagation {
2729
// headers attempt to read messages that were produced by clients > 0.11 and the magic
2830
// value of the broker(s) is >= 2
2931
public static boolean shouldPropagate(ApiVersions apiVersions) {
30-
return apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2;
32+
return !hasMaxUsableProduceMagic
33+
|| maxUsableProduceMagic(apiVersions) >= RecordBatch.MAGIC_VALUE_V2;
34+
}
35+
36+
@NoMuzzle
37+
private static byte maxUsableProduceMagic(ApiVersions apiVersions) {
38+
return apiVersions.maxUsableProduceMagic();
39+
}
40+
41+
private static boolean hasMaxUsableProduceMagic() {
42+
try {
43+
// missing in kafka 4.x
44+
ApiVersions.class.getMethod("maxUsableProduceMagic");
45+
return true;
46+
} catch (NoSuchMethodException e) {
47+
return false;
48+
}
3149
}
3250

3351
public static <K, V> ProducerRecord<K, V> propagateContext(

0 commit comments

Comments
 (0)