Skip to content

Commit 2db1ce8

Browse files
committed
correct spans when receive telemetry is enabled
1 parent de5f599 commit 2db1ce8

File tree

6 files changed

+65
-30
lines changed

6 files changed

+65
-30
lines changed

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java

+2
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@ public static CompletableFuture<Messages<?>> wrapBatch(
263263
(messages, throwable) -> {
264264
Context context =
265265
startAndEndConsumerReceive(parent, messages, timer, consumer, throwable);
266+
// injected context is used in the spring-pulsar instrumentation
267+
messages.forEach(message -> VirtualFieldStore.inject(message, context));
266268
runWithContext(
267269
context,
268270
() -> {

instrumentation/spring/spring-pulsar-1.2/javaagent/build.gradle.kts

+17-3
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ muzzle {
1212

1313
dependencies {
1414
library("org.springframework.pulsar:spring-pulsar:1.2.0")
15+
implementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))
1516

1617
testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))
18+
1719
testImplementation(project(":instrumentation:spring:spring-pulsar-1.2:testing"))
18-
testLibrary("org.springframework.pulsar:spring-pulsar:1.2.0")
1920

21+
testLibrary("org.springframework.pulsar:spring-pulsar:1.2.0")
2022
testLibrary("org.springframework.boot:spring-boot-starter-test:3.2.4")
2123
testLibrary("org.springframework.boot:spring-boot-starter:3.2.4")
2224
}
@@ -30,15 +32,27 @@ testing {
3032
implementation("org.springframework.boot:spring-boot-starter-test:3.2.4")
3133
implementation("org.springframework.boot:spring-boot-starter:3.2.4")
3234
}
35+
36+
targets {
37+
all {
38+
testTask.configure {
39+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
40+
41+
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
42+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
43+
}
44+
}
45+
}
3346
}
3447
}
3548
}
3649

3750
tasks {
38-
withType<Test>().configureEach {
51+
test {
52+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
53+
3954
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
4055
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
41-
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
4256
}
4357

4458
check {

instrumentation/spring/spring-pulsar-1.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_2/DefaultPulsarMessageListenerContainerInstrumentation.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212

1313
import io.opentelemetry.context.Context;
1414
import io.opentelemetry.context.Scope;
15-
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
1615
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
1716
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
17+
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore;
1818
import net.bytebuddy.asm.Advice;
1919
import net.bytebuddy.description.type.TypeDescription;
2020
import net.bytebuddy.matcher.ElementMatcher;
@@ -43,7 +43,7 @@ public static void onEnter(
4343
@Advice.Argument(0) Message<?> message,
4444
@Advice.Local("otelContext") Context context,
4545
@Advice.Local("otelScope") Scope scope) {
46-
Context parentContext = Java8BytecodeBridge.currentContext();
46+
Context parentContext = VirtualFieldStore.extract(message);
4747
if (instrumenter().shouldStart(parentContext, message)) {
4848
context = instrumenter().start(parentContext, message);
4949
scope = context.makeCurrent();

instrumentation/spring/spring-pulsar-1.2/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_2/SpringPulsarSingletons.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_2;
77

88
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.api.OpenTelemetry;
910
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
1011
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
1112
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
1213
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
14+
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
15+
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
16+
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
1317
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
1418
import org.apache.pulsar.client.api.Message;
1519

@@ -18,19 +22,29 @@ public final class SpringPulsarSingletons {
1822
private static final Instrumenter<Message<?>, Void> INSTRUMENTER;
1923

2024
static {
25+
OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
2126
SpringPulsarMessageAttributesGetter getter = SpringPulsarMessageAttributesGetter.INSTANCE;
2227
MessageOperation operation = MessageOperation.PROCESS;
28+
boolean messagingReceiveInstrumentationEnabled =
29+
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
2330

24-
INSTRUMENTER =
31+
InstrumenterBuilder<Message<?>, Void> builder =
2532
Instrumenter.<Message<?>, Void>builder(
26-
GlobalOpenTelemetry.get(),
33+
openTelemetry,
2734
INSTRUMENTATION_NAME,
2835
MessagingSpanNameExtractor.create(getter, operation))
2936
.addAttributesExtractor(
3037
MessagingAttributesExtractor.builder(getter, operation)
3138
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
32-
.build())
33-
.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE);
39+
.build());
40+
if (messagingReceiveInstrumentationEnabled) {
41+
builder.addSpanLinksExtractor(
42+
new PropagatorBasedSpanLinksExtractor<>(
43+
openTelemetry.getPropagators().getTextMapPropagator(), MessageHeaderGetter.INSTANCE));
44+
INSTRUMENTER = builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
45+
} else {
46+
INSTRUMENTER = builder.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE);
47+
}
3448
}
3549

3650
public static Instrumenter<Message<?>, Void> instrumenter() {

instrumentation/spring/spring-pulsar-1.2/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/pulsar/v1_2/SpringPulsarTest.java

+21-12
Original file line numberDiff line numberDiff line change
@@ -11,33 +11,42 @@
1111
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
1212

1313
import io.opentelemetry.instrumentation.spring.pulsar.v1_0.AbstractSpringPulsarTest;
14+
import io.opentelemetry.sdk.trace.data.LinkData;
15+
import io.opentelemetry.sdk.trace.data.SpanData;
16+
import java.util.concurrent.atomic.AtomicReference;
1417

1518
class SpringPulsarTest extends AbstractSpringPulsarTest {
1619

1720
@Override
1821
protected void assertSpringPulsar() {
22+
AtomicReference<SpanData> producer = new AtomicReference<>();
23+
1924
testing.waitAndAssertSortedTraces(
2025
orderByRootSpanKind(INTERNAL, CONSUMER),
2126
trace ->
2227
trace.hasSpansSatisfyingExactly(
2328
span -> span.hasName("parent").hasNoParent(),
24-
span ->
25-
span.hasName(OTEL_TOPIC + " publish")
26-
.hasKind(PRODUCER)
27-
.hasParent(trace.getSpan(0))
28-
.hasAttributesSatisfyingExactly(publishAttributes()),
29-
span ->
30-
span.hasName(String.format("%s process", OTEL_TOPIC))
31-
.hasKind(CONSUMER)
32-
.hasParent(trace.getSpan(1))
33-
.hasAttributesSatisfyingExactly(processAttributes()),
34-
span -> span.hasName("consumer").hasParent(trace.getSpan(2))),
29+
span -> {
30+
span.hasName(OTEL_TOPIC + " publish")
31+
.hasKind(PRODUCER)
32+
.hasParent(trace.getSpan(0))
33+
.hasAttributesSatisfyingExactly(publishAttributes());
34+
35+
producer.set(trace.getSpan(1));
36+
}),
3537
trace ->
3638
trace.hasSpansSatisfyingExactly(
3739
span ->
3840
span.hasName(String.format("%s receive", OTEL_TOPIC))
3941
.hasKind(CONSUMER)
4042
.hasNoParent()
41-
.hasAttributesSatisfyingExactly(receiveAttributes())));
43+
.hasAttributesSatisfyingExactly(receiveAttributes()),
44+
span ->
45+
span.hasName(String.format("%s process", OTEL_TOPIC))
46+
.hasKind(CONSUMER)
47+
.hasParent(trace.getSpan(0))
48+
.hasLinks(LinkData.create(producer.get().getSpanContext()))
49+
.hasAttributesSatisfyingExactly(processAttributes()),
50+
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
4251
}
4352
}

instrumentation/spring/spring-pulsar-1.2/testing/src/main/java/io/opentelemetry/instrumentation/spring/pulsar/v1_0/AbstractSpringPulsarTest.java

+5-9
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
2222
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
2323
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
24-
import java.net.InetAddress;
25-
import java.net.UnknownHostException;
2624
import java.time.Duration;
2725
import java.util.Arrays;
2826
import java.util.HashMap;
@@ -59,7 +57,6 @@ public abstract class AbstractSpringPulsarTest {
5957
static ConfigurableApplicationContext applicationContext;
6058
static PulsarTemplate<String> pulsarTemplate;
6159
static PulsarClient client;
62-
static String ip;
6360
static CountDownLatch latch = new CountDownLatch(1);
6461
static final String OTEL_SUBSCRIPTION = "otel-subscription";
6562
protected static String brokerHost;
@@ -68,7 +65,7 @@ public abstract class AbstractSpringPulsarTest {
6865

6966
@BeforeAll
7067
@SuppressWarnings("unchecked")
71-
static void setUp() throws PulsarClientException, UnknownHostException {
68+
static void setUp() throws PulsarClientException {
7269
pulsarContainer =
7370
new PulsarContainer(DEFAULT_IMAGE_NAME)
7471
.withEnv("PULSAR_MEM", "-Xmx128m")
@@ -87,11 +84,10 @@ static void setUp() throws PulsarClientException, UnknownHostException {
8784
pulsarTemplate = applicationContext.getBean(PulsarTemplate.class);
8885

8986
client = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
90-
ip = InetAddress.getByName(pulsarContainer.getHost()).getHostAddress();
9187
}
9288

9389
@Test
94-
void testSpringPulsar() throws PulsarClientException, InterruptedException {
90+
void testSpringPulsar() throws InterruptedException {
9591
testing.runWithSpan(
9692
"parent",
9793
() -> {
@@ -103,12 +99,12 @@ void testSpringPulsar() throws PulsarClientException, InterruptedException {
10399

104100
@AfterAll
105101
static void teardown() {
106-
if (pulsarContainer != null) {
107-
pulsarContainer.stop();
108-
}
109102
if (applicationContext != null) {
110103
applicationContext.close();
111104
}
105+
if (pulsarContainer != null) {
106+
pulsarContainer.stop();
107+
}
112108
}
113109

114110
protected abstract void assertSpringPulsar();

0 commit comments

Comments
 (0)