Skip to content

Commit d6a8100

Browse files
Add support for spring-pulsar 1.0
1 parent 9e7edab commit d6a8100

13 files changed

+448
-9
lines changed

.fossa.yml

+3
Original file line numberDiff line numberDiff line change
@@ -892,6 +892,9 @@ targets:
892892
- type: gradle
893893
path: ./
894894
target: ':instrumentation:spring:spring-kafka-2.7:library'
895+
- type: gradle
896+
path: ./
897+
target: ':instrumentation:spring:spring-pulsar-1.0:javaagent'
895898
- type: gradle
896899
path: ./
897900
target: ':instrumentation:spring:spring-rabbit-1.0:javaagent'
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Comparing source compatibility of opentelemetry-instrumentation-annotations-2.14.0-SNAPSHOT.jar against opentelemetry-instrumentation-annotations-2.12.0.jar
1+
Comparing source compatibility of opentelemetry-instrumentation-annotations-2.14.0-SNAPSHOT.jar against opentelemetry-instrumentation-annotations-2.13.0.jar
22
No changes.
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,2 @@
1-
Comparing source compatibility of opentelemetry-instrumentation-api-2.14.0-SNAPSHOT.jar against opentelemetry-instrumentation-api-2.12.0.jar
2-
+++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.instrumentation.api.semconv.util.SpanNames (not serializable)
3-
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
4-
+++ NEW SUPERCLASS: java.lang.Object
5-
+++ NEW METHOD: PUBLIC(+) STATIC(+) java.lang.String fromMethod(java.lang.reflect.Method)
6-
+++ NEW METHOD: PUBLIC(+) STATIC(+) java.lang.String fromMethod(java.lang.Class<?>, java.lang.String)
1+
Comparing source compatibility of opentelemetry-instrumentation-api-2.14.0-SNAPSHOT.jar against opentelemetry-instrumentation-api-2.13.0.jar
2+
No changes.
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Comparing source compatibility of opentelemetry-spring-boot-autoconfigure-2.14.0-SNAPSHOT.jar against opentelemetry-spring-boot-autoconfigure-2.12.0.jar
1+
Comparing source compatibility of opentelemetry-spring-boot-autoconfigure-2.14.0-SNAPSHOT.jar against opentelemetry-spring-boot-autoconfigure-2.13.0.jar
22
No changes.
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Comparing source compatibility of opentelemetry-spring-boot-starter-2.14.0-SNAPSHOT.jar against opentelemetry-spring-boot-starter-2.12.0.jar
1+
Comparing source compatibility of opentelemetry-spring-boot-starter-2.14.0-SNAPSHOT.jar against opentelemetry-spring-boot-starter-2.13.0.jar
22
No changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
plugins {
2+
id("otel.javaagent-instrumentation")
3+
}
4+
5+
muzzle {
6+
pass {
7+
group.set("org.springframework.pulsar")
8+
module.set("spring-pulsar")
9+
versions.set("[1.2.0,]")
10+
}
11+
}
12+
13+
dependencies {
14+
library("org.springframework.pulsar:spring-pulsar:1.2.0")
15+
16+
testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))
17+
testImplementation("org.testcontainers:pulsar")
18+
19+
testLibrary("org.springframework.pulsar:spring-pulsar:1.2.0")
20+
testLibrary("org.springframework.boot:spring-boot-starter-test:3.2.4")
21+
testLibrary("org.springframework.boot:spring-boot-starter:3.2.4")
22+
}
23+
24+
val latestDepTest = findProperty("testLatestDeps") as Boolean
25+
26+
// spring 6 (spring boot 3) requires java 17
27+
if (latestDepTest) {
28+
otelJava {
29+
minJavaVersionSupported.set(JavaVersion.VERSION_17)
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import static io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0.SpringPulsarSingletons.instrumenter;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
12+
13+
import io.opentelemetry.context.Context;
14+
import io.opentelemetry.context.Scope;
15+
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
16+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
17+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
18+
import net.bytebuddy.asm.Advice;
19+
import net.bytebuddy.description.type.TypeDescription;
20+
import net.bytebuddy.matcher.ElementMatcher;
21+
import org.apache.pulsar.client.api.Message;
22+
23+
public class DefaultPulsarMessageListenerContainerInstrumentation implements TypeInstrumentation {
24+
@Override
25+
public ElementMatcher<TypeDescription> typeMatcher() {
26+
return named(
27+
"org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener");
28+
}
29+
30+
@Override
31+
public void transform(TypeTransformer transformer) {
32+
transformer.applyAdviceToMethod(
33+
named("dispatchMessageToListener")
34+
.and(takesArguments(3))
35+
.and(takesArgument(0, named("org.apache.pulsar.client.api.Message"))),
36+
getClass().getName() + "$DispatchMessageToListenerAdvice");
37+
}
38+
39+
@SuppressWarnings("unused")
40+
public static class DispatchMessageToListenerAdvice {
41+
@Advice.OnMethodEnter(suppress = Throwable.class)
42+
public static void onEnter(
43+
@Advice.Argument(0) Message<?> message,
44+
@Advice.Local("otelContext") Context context,
45+
@Advice.Local("otelScope") Scope scope) {
46+
Context parentContext = Java8BytecodeBridge.currentContext();
47+
if (instrumenter().shouldStart(parentContext, message)) {
48+
context = instrumenter().start(parentContext, message);
49+
scope = context.makeCurrent();
50+
}
51+
}
52+
53+
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
54+
public static void onExit(
55+
@Advice.Argument(0) Message<?> message,
56+
@Advice.Local("otelContext") Context context,
57+
@Advice.Local("otelScope") Scope scope,
58+
@Advice.Thrown Throwable throwable) {
59+
if (scope == null) {
60+
return;
61+
}
62+
scope.close();
63+
instrumenter().end(context, message, null, throwable);
64+
}
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import io.opentelemetry.context.propagation.TextMapGetter;
9+
import javax.annotation.Nullable;
10+
import org.apache.pulsar.client.api.Message;
11+
12+
enum MessageHeaderGetter implements TextMapGetter<Message<?>> {
13+
INSTANCE;
14+
15+
@Override
16+
public Iterable<String> keys(Message<?> carrier) {
17+
return carrier.getProperties().keySet();
18+
}
19+
20+
@Nullable
21+
@Override
22+
public String get(@Nullable Message<?> carrier, String key) {
23+
return carrier == null ? null : carrier.getProperties().get(key);
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import static java.util.Collections.singletonList;
9+
10+
import com.google.auto.service.AutoService;
11+
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import java.util.List;
14+
15+
@AutoService(InstrumentationModule.class)
16+
public class SpringPulsarInstrumentationModule extends InstrumentationModule {
17+
public SpringPulsarInstrumentationModule() {
18+
super("spring-pulsar", "spring-pulsar-1.0");
19+
}
20+
21+
@Override
22+
public List<TypeInstrumentation> typeInstrumentations() {
23+
return singletonList(new DefaultPulsarMessageListenerContainerInstrumentation());
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import static java.util.Collections.emptyList;
9+
import static java.util.Collections.singletonList;
10+
11+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
12+
import java.util.List;
13+
import javax.annotation.Nullable;
14+
import org.apache.pulsar.client.api.Message;
15+
16+
enum SpringPulsarMessageAttributesGetter implements MessagingAttributesGetter<Message<?>, Void> {
17+
INSTANCE;
18+
19+
@Override
20+
public String getSystem(Message<?> message) {
21+
return "pulsar";
22+
}
23+
24+
@Override
25+
@Nullable
26+
public String getDestination(Message<?> message) {
27+
return message.getTopicName();
28+
}
29+
30+
@Nullable
31+
@Override
32+
public String getDestinationTemplate(Message<?> message) {
33+
return null;
34+
}
35+
36+
@Override
37+
public boolean isTemporaryDestination(Message<?> message) {
38+
return false;
39+
}
40+
41+
@Override
42+
public boolean isAnonymousDestination(Message<?> message) {
43+
return false;
44+
}
45+
46+
@Override
47+
@Nullable
48+
public String getConversationId(Message<?> message) {
49+
return null;
50+
}
51+
52+
@Override
53+
public Long getMessageBodySize(Message<?> message) {
54+
return (long) message.size();
55+
}
56+
57+
@Nullable
58+
@Override
59+
public Long getMessageEnvelopeSize(Message<?> message) {
60+
return null;
61+
}
62+
63+
@Override
64+
@Nullable
65+
public String getMessageId(Message<?> message, @Nullable Void unused) {
66+
if (message.getMessageId() != null) {
67+
return message.getMessageId().toString();
68+
}
69+
70+
return null;
71+
}
72+
73+
@Nullable
74+
@Override
75+
public String getClientId(Message<?> message) {
76+
return null;
77+
}
78+
79+
@Nullable
80+
@Override
81+
public Long getBatchMessageCount(Message<?> message, @Nullable Void unused) {
82+
return null;
83+
}
84+
85+
@Override
86+
public List<String> getMessageHeader(Message<?> message, String name) {
87+
String value = message.getProperty(name);
88+
return value != null ? singletonList(value) : emptyList();
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry;
9+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
10+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
11+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
12+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
13+
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
14+
import org.apache.pulsar.client.api.Message;
15+
16+
public final class SpringPulsarSingletons {
17+
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-pulsar-1.0";
18+
private static final Instrumenter<Message<?>, Void> INSTRUMENTER;
19+
20+
static {
21+
SpringPulsarMessageAttributesGetter getter = SpringPulsarMessageAttributesGetter.INSTANCE;
22+
MessageOperation operation = MessageOperation.PROCESS;
23+
24+
INSTRUMENTER =
25+
Instrumenter.<Message<?>, Void>builder(
26+
GlobalOpenTelemetry.get(),
27+
INSTRUMENTATION_NAME,
28+
MessagingSpanNameExtractor.create(getter, operation))
29+
.addAttributesExtractor(
30+
MessagingAttributesExtractor.builder(getter, operation)
31+
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
32+
.build())
33+
.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE);
34+
}
35+
36+
public static Instrumenter<Message<?>, Void> instrumenter() {
37+
return INSTRUMENTER;
38+
}
39+
40+
private SpringPulsarSingletons() {}
41+
}

0 commit comments

Comments
 (0)