3
3
* SPDX-License-Identifier: Apache-2.0
4
4
*/
5
5
6
- package io .opentelemetry .instrumentation . spring .autoconfigure . instrumentation . kafka ;
6
+ package io .opentelemetry .spring .smoketest ;
7
7
8
8
import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .equalTo ;
9
9
import static io .opentelemetry .sdk .testing .assertj .OpenTelemetryAssertions .satisfies ;
10
10
11
11
import io .opentelemetry .api .OpenTelemetry ;
12
12
import io .opentelemetry .api .trace .SpanKind ;
13
- import io .opentelemetry .instrumentation .testing .junit .LibraryInstrumentationExtension ;
14
13
import io .opentelemetry .semconv .incubating .MessagingIncubatingAttributes ;
15
- import java .time .Duration ;
16
14
import org .apache .kafka .clients .admin .NewTopic ;
17
15
import org .apache .kafka .clients .consumer .ConsumerRecord ;
18
16
import org .assertj .core .api .AbstractLongAssert ;
19
17
import org .assertj .core .api .AbstractStringAssert ;
20
- import org .junit .jupiter .api .AfterAll ;
21
- import org .junit .jupiter .api .BeforeAll ;
22
- import org .junit .jupiter .api .BeforeEach ;
23
18
import org .junit .jupiter .api .Test ;
24
- import org .junit .jupiter .api .extension .RegisterExtension ;
25
- import org .springframework .boot .autoconfigure .AutoConfigurations ;
26
- import org .springframework .boot .autoconfigure .kafka .KafkaAutoConfiguration ;
27
- import org .springframework .boot .test .context .runner .ApplicationContextRunner ;
28
- import org .springframework .context .ConfigurableApplicationContext ;
19
+ import org .springframework .beans .factory .annotation .Autowired ;
29
20
import org .springframework .context .annotation .Bean ;
30
21
import org .springframework .context .annotation .Configuration ;
31
22
import org .springframework .kafka .annotation .KafkaListener ;
32
23
import org .springframework .kafka .config .TopicBuilder ;
33
24
import org .springframework .kafka .core .KafkaTemplate ;
34
- import org .testcontainers .containers .KafkaContainer ;
35
- import org .testcontainers .containers .wait .strategy .Wait ;
36
- import org .testcontainers .utility .DockerImageName ;
37
25
38
- class KafkaIntegrationTest {
26
+ abstract class AbstractKafkaSpringStarterSmokeTest extends AbstractSpringStarterSmokeTest {
39
27
40
- @ RegisterExtension
41
- static final LibraryInstrumentationExtension testing = LibraryInstrumentationExtension .create ();
42
-
43
- static KafkaContainer kafka ;
44
-
45
- private ApplicationContextRunner contextRunner ;
46
-
47
- @ BeforeAll
48
- static void setUpKafka () {
49
- kafka =
50
- new KafkaContainer (DockerImageName .parse ("confluentinc/cp-kafka:6.2.10" ))
51
- .withEnv ("KAFKA_HEAP_OPTS" , "-Xmx256m" )
52
- .waitingFor (Wait .forLogMessage (".*started \\ (kafka.server.KafkaServer\\ ).*" , 1 ))
53
- .withStartupTimeout (Duration .ofMinutes (1 ));
54
- kafka .start ();
55
- }
56
-
57
- @ AfterAll
58
- static void tearDownKafka () {
59
- kafka .stop ();
60
- }
61
-
62
- @ BeforeEach
63
- void setUpContext () {
64
- contextRunner =
65
- new ApplicationContextRunner ()
66
- .withConfiguration (
67
- AutoConfigurations .of (
68
- KafkaAutoConfiguration .class ,
69
- KafkaInstrumentationAutoConfiguration .class ,
70
- TestConfig .class ))
71
- .withBean ("openTelemetry" , OpenTelemetry .class , testing ::getOpenTelemetry )
72
- .withPropertyValues (
73
- "spring.kafka.bootstrap-servers=" + kafka .getBootstrapServers (),
74
- "spring.kafka.consumer.auto-offset-reset=earliest" ,
75
- "spring.kafka.consumer.linger-ms=10" ,
76
- "spring.kafka.listener.idle-between-polls=1000" ,
77
- "spring.kafka.producer.transaction-id-prefix=test-" );
78
- }
28
+ @ Autowired protected KafkaTemplate <String , String > kafkaTemplate ;
79
29
80
30
@ Test
81
31
void shouldInstrumentProducerAndConsumer () {
82
- contextRunner .run (KafkaIntegrationTest ::runShouldInstrumentProducerAndConsumer );
83
- }
84
-
85
- // In kafka 2 ops.send is deprecated. We are using it to avoid reflection because kafka 3 also has
86
- // ops.send, although with different return type.
87
- @ SuppressWarnings ({"unchecked" , "deprecation" })
88
- private static void runShouldInstrumentProducerAndConsumer (
89
- ConfigurableApplicationContext applicationContext ) {
90
- KafkaTemplate <String , String > kafkaTemplate = applicationContext .getBean (KafkaTemplate .class );
32
+ testing .clearAllExportedData (); // ignore data from application startup
91
33
92
34
testing .runWithSpan (
93
35
"producer" ,
94
36
() -> {
95
37
kafkaTemplate .executeInTransaction (
96
38
ops -> {
97
- ops .send ("testTopic" , "10" , "testSpan" );
39
+ // return type is incompatible between Spring Boot 2 and 3
40
+ try {
41
+ ops .getClass ()
42
+ .getDeclaredMethod ("send" , String .class , Object .class , Object .class )
43
+ .invoke (ops , "testTopic" , "10" , "testSpan" );
44
+ } catch (Exception e ) {
45
+ throw new IllegalStateException (e );
46
+ }
98
47
return 0 ;
99
48
});
100
49
});
@@ -128,7 +77,7 @@ private static void runShouldInstrumentProducerAndConsumer(
128
77
span .hasName ("testTopic process" )
129
78
.hasKind (SpanKind .CONSUMER )
130
79
.hasParent (trace .getSpan (1 ))
131
- .hasAttributesSatisfyingExactly (
80
+ .hasAttributesSatisfying (
132
81
equalTo (MessagingIncubatingAttributes .MESSAGING_SYSTEM , "kafka" ),
133
82
equalTo (
134
83
MessagingIncubatingAttributes .MESSAGING_DESTINATION_NAME ,
@@ -155,7 +104,9 @@ private static void runShouldInstrumentProducerAndConsumer(
155
104
}
156
105
157
106
@ Configuration
158
- static class TestConfig {
107
+ public static class KafkaConfig {
108
+
109
+ @ Autowired OpenTelemetry openTelemetry ;
159
110
160
111
@ Bean
161
112
public NewTopic testTopic () {
@@ -164,7 +115,12 @@ public NewTopic testTopic() {
164
115
165
116
@ KafkaListener (id = "testListener" , topics = "testTopic" )
166
117
public void listener (ConsumerRecord <String , String > record ) {
167
- testing .runWithSpan ("consumer" , () -> {});
118
+ openTelemetry
119
+ .getTracer ("consumer" , "1.0" )
120
+ .spanBuilder ("consumer" )
121
+ .setSpanKind (SpanKind .CONSUMER )
122
+ .startSpan ()
123
+ .end ();
168
124
}
169
125
}
170
126
}
0 commit comments