Skip to content

Commit 4ebaf70

Browse files
Add support for spring-pulsar 1.0 (#13320)
Co-authored-by: Lauri Tulmin <[email protected]>
1 parent df111a4 commit 4ebaf70

File tree

14 files changed

+611
-0
lines changed

14 files changed

+611
-0
lines changed

.fossa.yml

+3
Original file line numberDiff line numberDiff line change
@@ -895,6 +895,9 @@ targets:
895895
- type: gradle
896896
path: ./
897897
target: ':instrumentation:spring:spring-kafka-2.7:library'
898+
- type: gradle
899+
path: ./
900+
target: ':instrumentation:spring:spring-pulsar-1.0:javaagent'
898901
- type: gradle
899902
path: ./
900903
target: ':instrumentation:spring:spring-rabbit-1.0:javaagent'

docs/supported-libraries.md

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ These are the supported libraries and frameworks:
138138
| [Spring Integration](https://spring.io/projects/spring-integration) | 4.1+ (not including 6.0+ yet) | [opentelemetry-spring-integration-4.1](../instrumentation/spring/spring-integration-4.1/library) | [Messaging Spans] |
139139
| [Spring JMS](https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms) | 2.0+ | N/A | [Messaging Spans] |
140140
| [Spring Kafka](https://spring.io/projects/spring-kafka) | 2.7+ | [opentelemetry-spring-kafka-2.7](../instrumentation/spring/spring-kafka-2.7/library) | [Messaging Spans] |
141+
| [Spring Pulsar](https://spring.io/projects/spring-pulsar) | 1.0+ | | [Messaging Spans] |
141142
| [Spring RabbitMQ](https://spring.io/projects/spring-amqp) | 1.0+ | N/A | [Messaging Spans] |
142143
| [Spring RestTemplate](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/client/package-summary.html) | 3.1+ | [opentelemetry-spring-web-3.1](../instrumentation/spring/spring-web/spring-web-3.1/library) | [HTTP Client Spans], [HTTP Client Metrics] |
143144
| [Spring RMI](https://docs.spring.io/spring-framework/docs/4.0.x/javadoc-api/org/springframework/remoting/rmi/package-summary.html) | 4.0+ | N/A | [RPC Client Spans], [RPC Server Spans] |

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java

+2
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,8 @@ public static CompletableFuture<Messages<?>> wrapBatch(
263263
(messages, throwable) -> {
264264
Context context =
265265
startAndEndConsumerReceive(parent, messages, timer, consumer, throwable);
266+
// injected context is used in the spring-pulsar instrumentation
267+
messages.forEach(message -> VirtualFieldStore.inject(message, context));
266268
runWithContext(
267269
context,
268270
() -> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("org.springframework.pulsar")
8+
module.set("spring-pulsar")
9+
versions.set("[1.0.0,)")
10+
assertInverse.set(true)
11+
excludeInstrumentationName("pulsar-2.8")
12+
}
13+
}
14+
15+
dependencies {
16+
library("org.springframework.pulsar:spring-pulsar:1.0.0")
17+
implementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))
18+
19+
testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))
20+
21+
testImplementation(project(":instrumentation:spring:spring-pulsar-1.0:testing"))
22+
23+
testLibrary("org.springframework.boot:spring-boot-starter-test:3.2.4")
24+
testLibrary("org.springframework.boot:spring-boot-starter:3.2.4")
25+
}
26+
27+
val latestDepTest = findProperty("testLatestDeps") as Boolean
28+
29+
testing {
30+
suites {
31+
val testReceiveSpansDisabled by registering(JvmTestSuite::class) {
32+
dependencies {
33+
implementation(project(":instrumentation:spring:spring-pulsar-1.0:testing"))
34+
35+
if (latestDepTest) {
36+
implementation("org.springframework.pulsar:spring-pulsar:latest.release")
37+
implementation("org.springframework.boot:spring-boot-starter-test:latest.release")
38+
implementation("org.springframework.boot:spring-boot-starter:latest.release")
39+
} else {
40+
implementation("org.springframework.pulsar:spring-pulsar:1.0.0")
41+
implementation("org.springframework.boot:spring-boot-starter-test:3.2.4")
42+
implementation("org.springframework.boot:spring-boot-starter:3.2.4")
43+
}
44+
}
45+
46+
targets {
47+
all {
48+
testTask.configure {
49+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
50+
51+
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
52+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
53+
}
54+
}
55+
}
56+
}
57+
}
58+
}
59+
60+
tasks {
61+
test {
62+
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
63+
64+
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
65+
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
66+
}
67+
68+
check {
69+
dependsOn(testing.suites)
70+
}
71+
}
72+
73+
// spring 6 requires java 17
74+
otelJava {
75+
minJavaVersionSupported.set(JavaVersion.VERSION_17)
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0.SpringPulsarSingletons.instrumenter;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
12+
13+
import io.opentelemetry.context.Context;
14+
import io.opentelemetry.context.Scope;
15+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
16+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
17+
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore;
18+
import net.bytebuddy.asm.Advice;
19+
import net.bytebuddy.description.type.TypeDescription;
20+
import net.bytebuddy.matcher.ElementMatcher;
21+
import org.apache.pulsar.client.api.Message;
22+
23+
public class DefaultPulsarMessageListenerContainerInstrumentation implements TypeInstrumentation {
24+
@Override
25+
public ElementMatcher<TypeDescription> typeMatcher() {
26+
return named(
27+
"org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener");
28+
}
29+
30+
@Override
31+
public void transform(TypeTransformer transformer) {
32+
transformer.applyAdviceToMethod(
33+
named("dispatchMessageToListener")
34+
.and(takesArguments(3).or(takesArguments(2)))
35+
.and(takesArgument(0, named("org.apache.pulsar.client.api.Message"))),
36+
getClass().getName() + "$DispatchMessageToListenerAdvice");
37+
}
38+
39+
@SuppressWarnings("unused")
40+
public static class DispatchMessageToListenerAdvice {
41+
@Advice.OnMethodEnter(suppress = Throwable.class)
42+
public static void onEnter(
43+
@Advice.Argument(0) Message<?> message,
44+
@Advice.Local("otelContext") Context context,
45+
@Advice.Local("otelScope") Scope scope) {
46+
Context parentContext = VirtualFieldStore.extract(message);
47+
if (instrumenter().shouldStart(parentContext, message)) {
48+
context = instrumenter().start(parentContext, message);
49+
scope = context.makeCurrent();
50+
}
51+
}
52+
53+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
54+
public static void onExit(
55+
@Advice.Argument(0) Message<?> message,
56+
@Advice.Local("otelContext") Context context,
57+
@Advice.Local("otelScope") Scope scope,
58+
@Advice.Thrown Throwable throwable) {
59+
if (scope == null) {
60+
return;
61+
}
62+
scope.close();
63+
instrumenter().end(context, message, null, throwable);
64+
}
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import io.opentelemetry.context.propagation.TextMapGetter;
9+
import javax.annotation.Nullable;
10+
import org.apache.pulsar.client.api.Message;
11+
12+
enum MessageHeaderGetter implements TextMapGetter<Message<?>> {
13+
INSTANCE;
14+
15+
@Override
16+
public Iterable<String> keys(Message<?> carrier) {
17+
return carrier.getProperties().keySet();
18+
}
19+
20+
@Nullable
21+
@Override
22+
public String get(@Nullable Message<?> carrier, String key) {
23+
return carrier == null ? null : carrier.getProperties().get(key);
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
9+
import static java.util.Collections.singletonList;
10+
11+
import com.google.auto.service.AutoService;
12+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import java.util.List;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
17+
@AutoService(InstrumentationModule.class)
18+
public class SpringPulsarInstrumentationModule extends InstrumentationModule {
19+
public SpringPulsarInstrumentationModule() {
20+
super("spring-pulsar", "spring-pulsar-1.0");
21+
}
22+
23+
@Override
24+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
25+
// added in 1.0.0
26+
return hasClassesNamed(
27+
"org.springframework.pulsar.annotation.PulsarListenerConsumerBuilderCustomizer");
28+
}
29+
30+
@Override
31+
public List<TypeInstrumentation> typeInstrumentations() {
32+
return singletonList(new DefaultPulsarMessageListenerContainerInstrumentation());
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import static java.util.Collections.emptyList;
9+
import static java.util.Collections.singletonList;
10+
11+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
12+
import java.util.List;
13+
import javax.annotation.Nullable;
14+
import org.apache.pulsar.client.api.Message;
15+
16+
enum SpringPulsarMessageAttributesGetter implements MessagingAttributesGetter<Message<?>, Void> {
17+
INSTANCE;
18+
19+
@Override
20+
public String getSystem(Message<?> message) {
21+
return "pulsar";
22+
}
23+
24+
@Override
25+
@Nullable
26+
public String getDestination(Message<?> message) {
27+
return message.getTopicName();
28+
}
29+
30+
@Nullable
31+
@Override
32+
public String getDestinationTemplate(Message<?> message) {
33+
return null;
34+
}
35+
36+
@Override
37+
public boolean isTemporaryDestination(Message<?> message) {
38+
return false;
39+
}
40+
41+
@Override
42+
public boolean isAnonymousDestination(Message<?> message) {
43+
return false;
44+
}
45+
46+
@Override
47+
@Nullable
48+
public String getConversationId(Message<?> message) {
49+
return null;
50+
}
51+
52+
@Override
53+
public Long getMessageBodySize(Message<?> message) {
54+
return (long) message.size();
55+
}
56+
57+
@Nullable
58+
@Override
59+
public Long getMessageEnvelopeSize(Message<?> message) {
60+
return null;
61+
}
62+
63+
@Override
64+
@Nullable
65+
public String getMessageId(Message<?> message, @Nullable Void unused) {
66+
if (message.getMessageId() != null) {
67+
return message.getMessageId().toString();
68+
}
69+
70+
return null;
71+
}
72+
73+
@Nullable
74+
@Override
75+
public String getClientId(Message<?> message) {
76+
return null;
77+
}
78+
79+
@Nullable
80+
@Override
81+
public Long getBatchMessageCount(Message<?> message, @Nullable Void unused) {
82+
return null;
83+
}
84+
85+
@Override
86+
public List<String> getMessageHeader(Message<?> message, String name) {
87+
String value = message.getProperty(name);
88+
return value != null ? singletonList(value) : emptyList();
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.api.OpenTelemetry;
10+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
11+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
12+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
13+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
14+
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
15+
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
16+
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
17+
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
18+
import org.apache.pulsar.client.api.Message;
19+
20+
public final class SpringPulsarSingletons {
21+
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-pulsar-1.0";
22+
private static final Instrumenter<Message<?>, Void> INSTRUMENTER;
23+
24+
static {
25+
OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
26+
SpringPulsarMessageAttributesGetter getter = SpringPulsarMessageAttributesGetter.INSTANCE;
27+
MessageOperation operation = MessageOperation.PROCESS;
28+
boolean messagingReceiveInstrumentationEnabled =
29+
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
30+
31+
InstrumenterBuilder<Message<?>, Void> builder =
32+
Instrumenter.<Message<?>, Void>builder(
33+
openTelemetry,
34+
INSTRUMENTATION_NAME,
35+
MessagingSpanNameExtractor.create(getter, operation))
36+
.addAttributesExtractor(
37+
MessagingAttributesExtractor.builder(getter, operation)
38+
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
39+
.build());
40+
if (messagingReceiveInstrumentationEnabled) {
41+
builder.addSpanLinksExtractor(
42+
new PropagatorBasedSpanLinksExtractor<>(
43+
openTelemetry.getPropagators().getTextMapPropagator(), MessageHeaderGetter.INSTANCE));
44+
INSTRUMENTER = builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
45+
} else {
46+
INSTRUMENTER = builder.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE);
47+
}
48+
}
49+
50+
public static Instrumenter<Message<?>, Void> instrumenter() {
51+
return INSTRUMENTER;
52+
}
53+
54+
private SpringPulsarSingletons() {}
55+
}

0 commit comments

Comments
 (0)