Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for spring-pulsar 1.0 #13320

Merged
merged 11 commits into from
Mar 11, 2025
3 changes: 3 additions & 0 deletions .fossa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,9 @@ targets:
- type: gradle
path: ./
target: ':instrumentation:spring:spring-kafka-2.7:library'
- type: gradle
path: ./
target: ':instrumentation:spring:spring-pulsar-1.0:javaagent'
- type: gradle
path: ./
target: ':instrumentation:spring:spring-rabbit-1.0:javaagent'
Expand Down
1 change: 1 addition & 0 deletions docs/supported-libraries.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ These are the supported libraries and frameworks:
| [Spring Integration](https://spring.io/projects/spring-integration) | 4.1+ (not including 6.0+ yet) | [opentelemetry-spring-integration-4.1](../instrumentation/spring/spring-integration-4.1/library) | [Messaging Spans] |
| [Spring JMS](https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms) | 2.0+ | N/A | [Messaging Spans] |
| [Spring Kafka](https://spring.io/projects/spring-kafka) | 2.7+ | [opentelemetry-spring-kafka-2.7](../instrumentation/spring/spring-kafka-2.7/library) | [Messaging Spans] |
| [Spring Pulsar](https://spring.io/projects/spring-pulsar) | 1.0+ | | [Messaging Spans] |
| [Spring RabbitMQ](https://spring.io/projects/spring-amqp) | 1.0+ | N/A | [Messaging Spans] |
| [Spring RestTemplate](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/client/package-summary.html) | 3.1+ | [opentelemetry-spring-web-3.1](../instrumentation/spring/spring-web/spring-web-3.1/library) | [HTTP Client Spans], [HTTP Client Metrics] |
| [Spring RMI](https://docs.spring.io/spring-framework/docs/4.0.x/javadoc-api/org/springframework/remoting/rmi/package-summary.html) | 4.0+ | N/A | [RPC Client Spans], [RPC Server Spans] |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ public static CompletableFuture<Messages<?>> wrapBatch(
(messages, throwable) -> {
Context context =
startAndEndConsumerReceive(parent, messages, timer, consumer, throwable);
// injected context is used in the spring-pulsar instrumentation
messages.forEach(message -> VirtualFieldStore.inject(message, context));
runWithContext(
context,
() -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
plugins {
id("otel.javaagent-instrumentation")
}

muzzle {
pass {
group.set("org.springframework.pulsar")
module.set("spring-pulsar")
versions.set("[1.0.0,)")
assertInverse.set(true)
excludeInstrumentationName("pulsar-2.8")
}
}

dependencies {
library("org.springframework.pulsar:spring-pulsar:1.0.0")
implementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))

testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))

testImplementation(project(":instrumentation:spring:spring-pulsar-1.0:testing"))

testLibrary("org.springframework.boot:spring-boot-starter-test:3.2.4")
testLibrary("org.springframework.boot:spring-boot-starter:3.2.4")
}

val latestDepTest = findProperty("testLatestDeps") as Boolean

testing {
suites {
val testReceiveSpansDisabled by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":instrumentation:spring:spring-pulsar-1.0:testing"))

if (latestDepTest) {
implementation("org.springframework.pulsar:spring-pulsar:latest.release")
implementation("org.springframework.boot:spring-boot-starter-test:latest.release")
implementation("org.springframework.boot:spring-boot-starter:latest.release")
} else {
implementation("org.springframework.pulsar:spring-pulsar:1.0.0")
implementation("org.springframework.boot:spring-boot-starter-test:3.2.4")
implementation("org.springframework.boot:spring-boot-starter:3.2.4")
}
}

targets {
all {
testTask.configure {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
}
}
}
}
}
}

tasks {
test {
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)

jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}

check {
dependsOn(testing.suites)
}
}

// spring 6 requires java 17
otelJava {
minJavaVersionSupported.set(JavaVersion.VERSION_17)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import static io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0.SpringPulsarSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Message;

public class DefaultPulsarMessageListenerContainerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named(
"org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("dispatchMessageToListener")
.and(takesArguments(3).or(takesArguments(2)))
.and(takesArgument(0, named("org.apache.pulsar.client.api.Message"))),
getClass().getName() + "$DispatchMessageToListenerAdvice");
}

@SuppressWarnings("unused")
public static class DispatchMessageToListenerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) Message<?> message,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
Context parentContext = VirtualFieldStore.extract(message);
if (instrumenter().shouldStart(parentContext, message)) {
context = instrumenter().start(parentContext, message);
scope = context.makeCurrent();
}
}

@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
public static void onExit(
@Advice.Argument(0) Message<?> message,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope,
@Advice.Thrown Throwable throwable) {
if (scope == null) {
return;
}
scope.close();
instrumenter().end(context, message, null, throwable);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import io.opentelemetry.context.propagation.TextMapGetter;
import javax.annotation.Nullable;
import org.apache.pulsar.client.api.Message;

enum MessageHeaderGetter implements TextMapGetter<Message<?>> {
INSTANCE;

@Override
public Iterable<String> keys(Message<?> carrier) {
return carrier.getProperties().keySet();
}

@Nullable
@Override
public String get(@Nullable Message<?> carrier, String key) {
return carrier == null ? null : carrier.getProperties().get(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static java.util.Collections.singletonList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(InstrumentationModule.class)
public class SpringPulsarInstrumentationModule extends InstrumentationModule {
public SpringPulsarInstrumentationModule() {
super("spring-pulsar", "spring-pulsar-1.0");
}

@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
// added in 1.0.0
return hasClassesNamed(
"org.springframework.pulsar.annotation.PulsarListenerConsumerBuilderCustomizer");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new DefaultPulsarMessageListenerContainerInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;

import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pulsar.client.api.Message;

enum SpringPulsarMessageAttributesGetter implements MessagingAttributesGetter<Message<?>, Void> {
INSTANCE;

@Override
public String getSystem(Message<?> message) {
return "pulsar";
}

@Override
@Nullable
public String getDestination(Message<?> message) {
return message.getTopicName();
}

@Nullable
@Override
public String getDestinationTemplate(Message<?> message) {
return null;
}

@Override
public boolean isTemporaryDestination(Message<?> message) {
return false;
}

@Override
public boolean isAnonymousDestination(Message<?> message) {
return false;
}

@Override
@Nullable
public String getConversationId(Message<?> message) {
return null;
}

@Override
public Long getMessageBodySize(Message<?> message) {
return (long) message.size();
}

@Nullable
@Override
public Long getMessageEnvelopeSize(Message<?> message) {
return null;
}

@Override
@Nullable
public String getMessageId(Message<?> message, @Nullable Void unused) {
if (message.getMessageId() != null) {
return message.getMessageId().toString();
}

return null;
}

@Nullable
@Override
public String getClientId(Message<?> message) {
return null;
}

@Nullable
@Override
public Long getBatchMessageCount(Message<?> message, @Nullable Void unused) {
return null;
}

@Override
public List<String> getMessageHeader(Message<?> message, String name) {
String value = message.getProperty(name);
return value != null ? singletonList(value) : emptyList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import org.apache.pulsar.client.api.Message;

public final class SpringPulsarSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-pulsar-1.0";
private static final Instrumenter<Message<?>, Void> INSTRUMENTER;

static {
OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
SpringPulsarMessageAttributesGetter getter = SpringPulsarMessageAttributesGetter.INSTANCE;
MessageOperation operation = MessageOperation.PROCESS;
boolean messagingReceiveInstrumentationEnabled =
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();

InstrumenterBuilder<Message<?>, Void> builder =
Instrumenter.<Message<?>, Void>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build());
if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
new PropagatorBasedSpanLinksExtractor<>(
openTelemetry.getPropagators().getTextMapPropagator(), MessageHeaderGetter.INSTANCE));
INSTRUMENTER = builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
} else {
INSTRUMENTER = builder.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE);
}
}

public static Instrumenter<Message<?>, Void> instrumenter() {
return INSTRUMENTER;
}

private SpringPulsarSingletons() {}
}
Loading
Loading