Skip to content

Commit e274205

Browse files
authored
[Pulsar] Support Pulsar Client send message with transaction. (#12731)
1 parent e27a76a commit e274205

File tree

5 files changed

+99
-3
lines changed

5 files changed

+99
-3
lines changed

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarInstrumentationModule.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
2525
new ProducerImplInstrumentation(),
2626
new MessageInstrumentation(),
2727
new MessageListenerInstrumentation(),
28-
new SendCallbackInstrumentation());
28+
new SendCallbackInstrumentation(),
29+
new TransactionImplInstrumentation());
2930
}
3031
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
11+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
12+
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
15+
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons;
16+
import java.util.concurrent.CompletableFuture;
17+
import net.bytebuddy.asm.Advice;
18+
import net.bytebuddy.description.type.TypeDescription;
19+
import net.bytebuddy.matcher.ElementMatcher;
20+
21+
public class TransactionImplInstrumentation implements TypeInstrumentation {
22+
@Override
23+
public ElementMatcher<TypeDescription> typeMatcher() {
24+
return named("org.apache.pulsar.client.impl.transaction.TransactionImpl");
25+
}
26+
27+
@Override
28+
public void transform(TypeTransformer transformer) {
29+
transformer.applyAdviceToMethod(
30+
named("registerProducedTopic")
31+
.and(isPublic())
32+
.and(takesArguments(1))
33+
.and(takesArgument(0, String.class)),
34+
TransactionImplInstrumentation.class.getName() + "$RegisterProducedTopicAdvice");
35+
}
36+
37+
@SuppressWarnings("unused")
38+
public static class RegisterProducedTopicAdvice {
39+
40+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
41+
public static void after(@Advice.Return(readOnly = false) CompletableFuture<Void> future) {
42+
future = PulsarSingletons.wrap(future);
43+
}
44+
}
45+
}

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java

+18
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,24 @@ private static Context startAndEndConsumerReceive(
210210
timer.now());
211211
}
212212

213+
public static CompletableFuture<Void> wrap(CompletableFuture<Void> future) {
214+
Context parent = Context.current();
215+
CompletableFuture<Void> result = new CompletableFuture<>();
216+
future.whenComplete(
217+
(unused, t) ->
218+
runWithContext(
219+
parent,
220+
() -> {
221+
if (t != null) {
222+
result.completeExceptionally(t);
223+
} else {
224+
result.complete(null);
225+
}
226+
}));
227+
228+
return result;
229+
}
230+
213231
public static CompletableFuture<Message<?>> wrap(
214232
CompletableFuture<Message<?>> future, Timer timer, Consumer<?> consumer) {
215233
boolean listenerContextActive = MessageListenerContext.isProcessing();

instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/AbstractPulsarClientTest.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,17 @@ static void beforeAll() throws PulsarClientException {
8989
new PulsarContainer(DEFAULT_IMAGE_NAME)
9090
.withEnv("PULSAR_MEM", "-Xmx128m")
9191
.withLogConsumer(new Slf4jLogConsumer(logger))
92-
.withStartupTimeout(Duration.ofMinutes(2));
92+
.withStartupTimeout(Duration.ofMinutes(2))
93+
.withTransactions();
9394
pulsar.start();
9495

9596
brokerHost = pulsar.getHost();
9697
brokerPort = pulsar.getMappedPort(6650);
97-
client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()).build();
98+
client =
99+
PulsarClient.builder()
100+
.serviceUrl(pulsar.getPulsarBrokerUrl())
101+
.enableTransaction(true)
102+
.build();
98103
admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();
99104
}
100105

instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java

+27
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.pulsar.client.api.Messages;
2828
import org.apache.pulsar.client.api.Schema;
2929
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
30+
import org.apache.pulsar.client.api.transaction.Transaction;
3031
import org.junit.jupiter.api.Test;
3132

3233
class PulsarClientTest extends AbstractPulsarClientTest {
@@ -671,4 +672,30 @@ void testConsumePartitionedTopicUsingBatchReceive() throws Exception {
671672
});
672673
}));
673674
}
675+
676+
@Test
677+
void testSendMessageWithTxn() throws Exception {
678+
String topic = "persistent://public/default/testSendMessageWithTxn";
679+
admin.topics().createNonPartitionedTopic(topic);
680+
producer =
681+
client
682+
.newProducer(Schema.STRING)
683+
.topic(topic)
684+
.sendTimeout(0, TimeUnit.SECONDS)
685+
.enableBatching(false)
686+
.create();
687+
Transaction transaction =
688+
client.newTransaction().withTransactionTimeout(15, TimeUnit.SECONDS).build().get();
689+
testing.runWithSpan("parent1", () -> producer.newMessage(transaction).value("test1").send());
690+
transaction.commit();
691+
692+
testing.waitAndAssertTraces(
693+
trace ->
694+
trace.hasSpansSatisfyingExactly(
695+
span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(),
696+
span ->
697+
span.hasName(topic + " publish")
698+
.hasKind(SpanKind.PRODUCER)
699+
.hasParent(trace.getSpan(0))));
700+
}
674701
}

0 commit comments

Comments
 (0)