|
8 | 8 | import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
9 | 9 | import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
10 | 10 |
|
| 11 | +import io.opentelemetry.api.OpenTelemetry; |
11 | 12 | import io.opentelemetry.api.trace.SpanKind;
|
12 | 13 | import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
|
13 | 14 | import io.opentelemetry.spring.smoketest.AbstractSpringStarterSmokeTest;
|
14 | 15 | import io.opentelemetry.spring.smoketest.OtelSpringStarterSmokeTestApplication;
|
15 | 16 | import io.opentelemetry.spring.smoketest.SpringSmokeOtelConfiguration;
|
| 17 | +import java.time.Duration; |
| 18 | +import org.apache.kafka.clients.admin.NewTopic; |
| 19 | +import org.apache.kafka.clients.consumer.ConsumerRecord; |
16 | 20 | import org.assertj.core.api.AbstractLongAssert;
|
17 | 21 | import org.assertj.core.api.AbstractStringAssert;
|
18 | 22 | import org.junit.jupiter.api.Test;
|
19 | 23 | import org.springframework.beans.factory.annotation.Autowired;
|
20 | 24 | import org.springframework.boot.test.context.SpringBootTest;
|
21 | 25 | import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
|
| 26 | +import org.springframework.context.annotation.Bean; |
| 27 | +import org.springframework.context.annotation.Configuration; |
| 28 | +import org.springframework.kafka.annotation.KafkaListener; |
| 29 | +import org.springframework.kafka.config.TopicBuilder; |
22 | 30 | import org.springframework.kafka.core.KafkaTemplate;
|
23 | 31 | import org.testcontainers.containers.KafkaContainer;
|
24 | 32 | import org.testcontainers.containers.wait.strategy.Wait;
|
25 | 33 | import org.testcontainers.junit.jupiter.Container;
|
26 | 34 | import org.testcontainers.junit.jupiter.Testcontainers;
|
27 | 35 | import org.testcontainers.utility.DockerImageName;
|
28 |
| -import java.time.Duration; |
29 | 36 |
|
30 | 37 | @Testcontainers
|
31 | 38 | @SpringBootTest(
|
32 |
| - classes = {OtelSpringStarterSmokeTestApplication.class, SpringSmokeOtelConfiguration.class}, |
33 |
| - webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { |
| 39 | + classes = { |
| 40 | + OtelSpringStarterSmokeTestApplication.class, |
| 41 | + SpringSmokeOtelConfiguration.class, |
| 42 | + KafkaSpringStarterSmokeTest.KafkaConfig.class |
| 43 | + }, |
| 44 | + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, |
| 45 | + properties = { |
34 | 46 | "spring.kafka.consumer.auto-offset-reset=earliest",
|
35 | 47 | "spring.kafka.consumer.linger-ms=10",
|
36 | 48 | "spring.kafka.admin.operation-timeout=5m",
|
37 | 49 | "spring.kafka.listener.idle-between-polls=1000",
|
38 | 50 | "spring.kafka.producer.transaction-id-prefix=test-"
|
39 |
| - |
40 |
| -}) |
| 51 | + }) |
41 | 52 | public class KafkaSpringStarterSmokeTest extends AbstractSpringStarterSmokeTest {
|
42 | 53 |
|
43 | 54 | @Container @ServiceConnection
|
44 |
| - static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.10")) |
45 |
| - .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m") |
46 |
| - .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) |
47 |
| - .withStartupTimeout(Duration.ofMinutes(1)); |
| 55 | + static KafkaContainer kafka = |
| 56 | + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.10")) |
| 57 | + .withEnv("KAFKA_HEAP_OPTS", "-Xmx256m") |
| 58 | + .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) |
| 59 | + .withStartupTimeout(Duration.ofMinutes(1)); |
48 | 60 |
|
49 | 61 | @Autowired private KafkaTemplate<String, String> kafkaTemplate;
|
50 | 62 |
|
@@ -119,4 +131,25 @@ void shouldInstrumentProducerAndConsumer() {
|
119 | 131 | stringAssert -> stringAssert.startsWith("consumer"))),
|
120 | 132 | span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
|
121 | 133 | }
|
| 134 | + |
| 135 | + @Configuration |
| 136 | + public static class KafkaConfig { |
| 137 | + |
| 138 | + @Autowired OpenTelemetry openTelemetry; |
| 139 | + |
| 140 | + @Bean |
| 141 | + public NewTopic testTopic() { |
| 142 | + return TopicBuilder.name("testTopic").partitions(1).replicas(1).build(); |
| 143 | + } |
| 144 | + |
| 145 | + @KafkaListener(id = "testListener", topics = "testTopic") |
| 146 | + public void listener(ConsumerRecord<String, String> record) { |
| 147 | + openTelemetry |
| 148 | + .getTracer("consumer", "1.0") |
| 149 | + .spanBuilder("consumer") |
| 150 | + .setSpanKind(SpanKind.CONSUMER) |
| 151 | + .startSpan() |
| 152 | + .end(); |
| 153 | + } |
| 154 | + } |
122 | 155 | }
|
0 commit comments