Skip to content

Commit 3b7c225

Browse files
authored
Pulsar: use span links when receive telemetry is enabled (#10650)
1 parent 678750f commit 3b7c225

File tree

8 files changed

+960
-435
lines changed

8 files changed

+960
-435
lines changed

instrumentation/pulsar/pulsar-2.8/javaagent/build.gradle.kts

+20-1
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,28 @@ dependencies {
1919
testImplementation("org.apache.pulsar:pulsar-client-admin:2.8.0")
2020
}
2121

22+
tasks {
23+
val testReceiveSpanDisabled by registering(Test::class) {
24+
filter {
25+
includeTestsMatching("PulsarClientSuppressReceiveSpansTest")
26+
}
27+
include("**/PulsarClientSuppressReceiveSpansTest.*")
28+
}
29+
30+
test {
31+
filter {
32+
excludeTestsMatching("PulsarClientSuppressReceiveSpansTest")
33+
}
34+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
35+
}
36+
37+
check {
38+
dependsOn(testReceiveSpanDisabled)
39+
}
40+
}
41+
2242
tasks.withType<Test>().configureEach {
2343
// TODO run tests both with and without experimental span attributes
2444
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
25-
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
2645
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
2746
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
10+
11+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
13+
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.MessageListenerContext;
14+
import net.bytebuddy.asm.Advice;
15+
import net.bytebuddy.description.type.TypeDescription;
16+
import net.bytebuddy.matcher.ElementMatcher;
17+
18+
public class ConsumerBaseInstrumentation implements TypeInstrumentation {
19+
20+
@Override
21+
public ElementMatcher<TypeDescription> typeMatcher() {
22+
return named("org.apache.pulsar.client.impl.ConsumerBase")
23+
.or(named("org.apache.pulsar.client.impl.MultiTopicsConsumerImpl"));
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
// these methods receive a message and pass it on to a message listener
29+
// we instrument them so that the span for the receive operation could be suppressed
30+
transformer.applyAdviceToMethod(
31+
named("triggerListener").and(takesArguments(0)).or(named("receiveMessageFromConsumer")),
32+
this.getClass().getName() + "$TriggerListenerAdvice");
33+
}
34+
35+
@SuppressWarnings("unused")
36+
public static class TriggerListenerAdvice {
37+
38+
@Advice.OnMethodEnter(suppress = Throwable.class)
39+
public static void onEnter() {
40+
MessageListenerContext.startProcessing();
41+
}
42+
43+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
44+
public static void onExit() {
45+
MessageListenerContext.endProcessing();
46+
}
47+
}
48+
}

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public PulsarInstrumentationModule() {
2020
@Override
2121
public List<TypeInstrumentation> typeInstrumentations() {
2222
return Arrays.asList(
23+
new ConsumerBaseInstrumentation(),
2324
new ConsumerImplInstrumentation(),
2425
new ProducerImplInstrumentation(),
2526
new MessageInstrumentation(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;
7+
8+
/**
9+
* Helper class used to determine whether message is going to be processed by a listener. If we know
10+
* that message is going to be passed to a message listener, that would produce a span for the
11+
* "process" operation, we are going to suppress the span from the message "receive" operation.
12+
*/
13+
public final class MessageListenerContext {
14+
private static final ThreadLocal<Boolean> processing = new ThreadLocal<>();
15+
16+
private MessageListenerContext() {}
17+
18+
/** Call on entry to a method that will pass the received message to a message listener. */
19+
public static void startProcessing() {
20+
processing.set(Boolean.TRUE);
21+
}
22+
23+
public static void endProcessing() {
24+
processing.remove();
25+
}
26+
27+
/** Returns true if we expect a received message to be passed to a listener. */
28+
public static boolean isProcessing() {
29+
return processing.get() != null;
30+
}
31+
}

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java

+53-25
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
1919
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
2020
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
21+
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
2122
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
23+
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
2224
import io.opentelemetry.instrumentation.api.internal.Timer;
2325
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesExtractor;
2426
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
@@ -38,6 +40,8 @@ public final class PulsarSingletons {
3840
TELEMETRY.getPropagators().getTextMapPropagator();
3941
private static final List<String> capturedHeaders =
4042
ExperimentalConfig.get().getMessagingHeaders();
43+
private static final boolean receiveInstrumentationEnabled =
44+
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
4145

4246
private static final Instrumenter<PulsarRequest, Void> CONSUMER_PROCESS_INSTRUMENTER =
4347
createConsumerProcessInstrumenter();
@@ -64,15 +68,23 @@ private static Instrumenter<PulsarRequest, Void> createConsumerReceiveInstrument
6468
MessagingAttributesGetter<PulsarRequest, Void> getter =
6569
PulsarMessagingAttributesGetter.INSTANCE;
6670

67-
return Instrumenter.<PulsarRequest, Void>builder(
68-
TELEMETRY,
69-
INSTRUMENTATION_NAME,
70-
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
71-
.addAttributesExtractor(
72-
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
73-
.addAttributesExtractor(
74-
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
75-
.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
71+
InstrumenterBuilder<PulsarRequest, Void> instrumenterBuilder =
72+
Instrumenter.<PulsarRequest, Void>builder(
73+
TELEMETRY,
74+
INSTRUMENTATION_NAME,
75+
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
76+
.addAttributesExtractor(
77+
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
78+
.addAttributesExtractor(
79+
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()));
80+
81+
if (receiveInstrumentationEnabled) {
82+
return instrumenterBuilder
83+
.addSpanLinksExtractor(
84+
new PropagatorBasedSpanLinksExtractor<>(PROPAGATOR, MessageTextMapGetter.INSTANCE))
85+
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
86+
}
87+
return instrumenterBuilder.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
7688
}
7789

7890
private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceiveInstrumenter() {
@@ -87,24 +99,29 @@ private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceive
8799
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
88100
.addAttributesExtractor(
89101
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
90-
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
91-
.addSpanLinksExtractor(
92-
new PulsarBatchRequestSpanLinksExtractor(
93-
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()))
102+
.addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR))
94103
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
95104
}
96105

97106
private static Instrumenter<PulsarRequest, Void> createConsumerProcessInstrumenter() {
98107
MessagingAttributesGetter<PulsarRequest, Void> getter =
99108
PulsarMessagingAttributesGetter.INSTANCE;
100109

101-
return Instrumenter.<PulsarRequest, Void>builder(
102-
TELEMETRY,
103-
INSTRUMENTATION_NAME,
104-
MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS))
105-
.addAttributesExtractor(
106-
createMessagingAttributesExtractor(getter, MessageOperation.PROCESS))
107-
.buildInstrumenter();
110+
InstrumenterBuilder<PulsarRequest, Void> instrumenterBuilder =
111+
Instrumenter.<PulsarRequest, Void>builder(
112+
TELEMETRY,
113+
INSTRUMENTATION_NAME,
114+
MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS))
115+
.addAttributesExtractor(
116+
createMessagingAttributesExtractor(getter, MessageOperation.PROCESS));
117+
118+
if (receiveInstrumentationEnabled) {
119+
SpanLinksExtractor<PulsarRequest> spanLinksExtractor =
120+
new PropagatorBasedSpanLinksExtractor<>(PROPAGATOR, MessageTextMapGetter.INSTANCE);
121+
instrumenterBuilder.addSpanLinksExtractor(spanLinksExtractor);
122+
return instrumenterBuilder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
123+
}
124+
return instrumenterBuilder.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
108125
}
109126

110127
private static Instrumenter<PulsarRequest, Void> createProducerInstrumenter() {
@@ -146,12 +163,17 @@ public static Context startAndEndConsumerReceive(
146163
if (!CONSUMER_RECEIVE_INSTRUMENTER.shouldStart(parent, request)) {
147164
return null;
148165
}
149-
// startAndEnd not supports extract trace context from carrier
150-
// start not supports custom startTime
151-
// extract trace context by using TEXT_MAP_PROPAGATOR here.
166+
if (!receiveInstrumentationEnabled) {
167+
// suppress receive span when receive telemetry is not enabled and message is going to be
168+
// processed by a listener
169+
if (MessageListenerContext.isProcessing()) {
170+
return null;
171+
}
172+
parent = PROPAGATOR.extract(parent, request, MessageTextMapGetter.INSTANCE);
173+
}
152174
return InstrumenterUtil.startAndEnd(
153175
CONSUMER_RECEIVE_INSTRUMENTER,
154-
PROPAGATOR.extract(parent, request, MessageTextMapGetter.INSTANCE),
176+
parent,
155177
request,
156178
null,
157179
throwable,
@@ -185,11 +207,17 @@ private static Context startAndEndConsumerReceive(
185207

186208
public static CompletableFuture<Message<?>> wrap(
187209
CompletableFuture<Message<?>> future, Timer timer, Consumer<?> consumer) {
210+
boolean listenerContextActive = MessageListenerContext.isProcessing();
188211
Context parent = Context.current();
189212
CompletableFuture<Message<?>> result = new CompletableFuture<>();
190213
future.whenComplete(
191214
(message, throwable) -> {
192-
Context context = startAndEndConsumerReceive(parent, message, timer, consumer, throwable);
215+
// we create a "receive" span when receive telemetry is enabled or when we know that
216+
// this message will not be passed to a listener that would create the "process" span
217+
Context context =
218+
receiveInstrumentationEnabled || !listenerContextActive
219+
? startAndEndConsumerReceive(parent, message, timer, consumer, throwable)
220+
: parent;
193221
runWithContext(
194222
context,
195223
() -> {

0 commit comments

Comments
 (0)