Skip to content

Commit 2cb3961

Browse files
authoredOct 2, 2024··
try to handle issues in Pekko tracing when the scheduler is used (#12359)
1 parent b003541 commit 2cb3961

File tree

3 files changed

+116
-1
lines changed

3 files changed

+116
-1
lines changed
 

‎instrumentation/pekko/pekko-actor-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pekkoactor/v1_0/PekkoActorInstrumentationModule.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
2323
return asList(
2424
new PekkoDispatcherInstrumentation(),
2525
new PekkoActorCellInstrumentation(),
26-
new PekkoDefaultSystemMessageQueueInstrumentation());
26+
new PekkoDefaultSystemMessageQueueInstrumentation(),
27+
new PekkoScheduleInstrumentation());
2728
}
2829
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
10+
11+
import io.opentelemetry.context.Context;
12+
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
15+
import net.bytebuddy.asm.Advice;
16+
import net.bytebuddy.description.type.TypeDescription;
17+
import net.bytebuddy.matcher.ElementMatcher;
18+
19+
public class PekkoScheduleInstrumentation implements TypeInstrumentation {
20+
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("org.apache.pekko.actor.LightArrayRevolverScheduler");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
transformer.applyAdviceToMethod(
29+
named("schedule")
30+
.and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration")))
31+
.and(takesArgument(1, named("scala.concurrent.duration.FiniteDuration")))
32+
.and(takesArgument(2, named("java.lang.Runnable")))
33+
.and(takesArgument(3, named("scala.concurrent.ExecutionContext"))),
34+
PekkoScheduleInstrumentation.class.getName() + "$ScheduleAdvice");
35+
transformer.applyAdviceToMethod(
36+
named("scheduleOnce")
37+
.and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration")))
38+
.and(takesArgument(1, named("java.lang.Runnable")))
39+
.and(takesArgument(2, named("scala.concurrent.ExecutionContext"))),
40+
PekkoScheduleInstrumentation.class.getName() + "$ScheduleOnceAdvice");
41+
}
42+
43+
@SuppressWarnings("unused")
44+
public static class ScheduleAdvice {
45+
46+
@Advice.OnMethodEnter(suppress = Throwable.class)
47+
public static void enterSchedule(
48+
@Advice.Argument(value = 2, readOnly = false) Runnable runnable) {
49+
Context context = Java8BytecodeBridge.currentContext();
50+
runnable = context.wrap(runnable);
51+
}
52+
}
53+
54+
@SuppressWarnings("unused")
55+
public static class ScheduleOnceAdvice {
56+
57+
@Advice.OnMethodEnter(suppress = Throwable.class)
58+
public static void enterScheduleOnce(
59+
@Advice.Argument(value = 1, readOnly = false) Runnable runnable) {
60+
Context context = Java8BytecodeBridge.currentContext();
61+
runnable = context.wrap(runnable);
62+
}
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pekkoactor.v1_0
7+
8+
import io.opentelemetry.api.GlobalOpenTelemetry
9+
import io.opentelemetry.api.trace.Span
10+
import org.apache.pekko.actor.ActorSystem
11+
import org.apache.pekko.pattern.after
12+
import org.assertj.core.api.Assertions.assertThat
13+
import org.junit.jupiter.api.Test
14+
import scala.concurrent.{Await, Future}
15+
import scala.concurrent.duration.DurationInt
16+
17+
class PekkoSchedulerTest {
18+
19+
@Test
20+
def checkThatSpanWorksWithPekkoScheduledEvents(): Unit = {
21+
val system = ActorSystem("my-system")
22+
implicit val executionContext = system.dispatcher
23+
val tracer = GlobalOpenTelemetry.get.getTracer("test-tracer")
24+
val initialSpan = tracer.spanBuilder("test").startSpan()
25+
val scope = initialSpan.makeCurrent()
26+
try {
27+
val futureResult = for {
28+
result1 <- Future {
29+
compareSpanContexts(Span.current(), initialSpan)
30+
1
31+
}
32+
_ = compareSpanContexts(Span.current(), initialSpan)
33+
result2 <- after(200.millis, system.scheduler)(Future.successful(2))
34+
_ = compareSpanContexts(Span.current(), initialSpan)
35+
} yield result1 + result2
36+
assertThat(Await.result(futureResult, 5.seconds)).isEqualTo(3)
37+
} finally {
38+
system.terminate()
39+
scope.close()
40+
initialSpan.end()
41+
}
42+
}
43+
44+
private def compareSpanContexts(span1: Span, span2: Span): Unit = {
45+
assertThat(span1.getSpanContext().getTraceId())
46+
.isEqualTo(span2.getSpanContext().getTraceId())
47+
assertThat(span1.getSpanContext().getSpanId())
48+
.isEqualTo(span2.getSpanContext().getSpanId())
49+
}
50+
}

0 commit comments

Comments
 (0)
Please sign in to comment.