Skip to content

Commit 05fc637

Browse files
authored
add Akka Scheduler context propagation (#12373)
1 parent 2cb3961 commit 05fc637

File tree

3 files changed

+114
-1
lines changed

3 files changed

+114
-1
lines changed

instrumentation/akka/akka-actor-2.3/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/akkaactor/AkkaActorInstrumentationModule.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
2323
return asList(
2424
new AkkaDispatcherInstrumentation(),
2525
new AkkaActorCellInstrumentation(),
26-
new AkkaDefaultSystemMessageQueueInstrumentation());
26+
new AkkaDefaultSystemMessageQueueInstrumentation(),
27+
new AkkaScheduleInstrumentation());
2728
}
2829
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.akkaactor;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
10+
11+
import io.opentelemetry.context.Context;
12+
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
14+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
15+
import net.bytebuddy.asm.Advice;
16+
import net.bytebuddy.description.type.TypeDescription;
17+
import net.bytebuddy.matcher.ElementMatcher;
18+
19+
public class AkkaScheduleInstrumentation implements TypeInstrumentation {
20+
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("akka.actor.LightArrayRevolverScheduler");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
transformer.applyAdviceToMethod(
29+
named("schedule")
30+
.and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration")))
31+
.and(takesArgument(1, named("scala.concurrent.duration.FiniteDuration")))
32+
.and(takesArgument(2, named("java.lang.Runnable")))
33+
.and(takesArgument(3, named("scala.concurrent.ExecutionContext"))),
34+
AkkaScheduleInstrumentation.class.getName() + "$ScheduleAdvice");
35+
transformer.applyAdviceToMethod(
36+
named("scheduleOnce")
37+
.and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration")))
38+
.and(takesArgument(1, named("java.lang.Runnable")))
39+
.and(takesArgument(2, named("scala.concurrent.ExecutionContext"))),
40+
AkkaScheduleInstrumentation.class.getName() + "$ScheduleOnceAdvice");
41+
}
42+
43+
@SuppressWarnings("unused")
44+
public static class ScheduleAdvice {
45+
46+
@Advice.OnMethodEnter(suppress = Throwable.class)
47+
public static void enterSchedule(
48+
@Advice.Argument(value = 2, readOnly = false) Runnable runnable) {
49+
Context context = Java8BytecodeBridge.currentContext();
50+
runnable = context.wrap(runnable);
51+
}
52+
}
53+
54+
@SuppressWarnings("unused")
55+
public static class ScheduleOnceAdvice {
56+
57+
@Advice.OnMethodEnter(suppress = Throwable.class)
58+
public static void enterScheduleOnce(
59+
@Advice.Argument(value = 1, readOnly = false) Runnable runnable) {
60+
Context context = Java8BytecodeBridge.currentContext();
61+
runnable = context.wrap(runnable);
62+
}
63+
}
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.akkaactor
7+
8+
import akka.pattern.after
9+
import io.opentelemetry.api.GlobalOpenTelemetry
10+
import io.opentelemetry.api.trace.Span
11+
import org.assertj.core.api.Assertions.assertThat
12+
import org.junit.jupiter.api.Test
13+
import scala.concurrent.{Await, Future}
14+
import scala.concurrent.duration.DurationInt
15+
16+
class AkkaSchedulerTest {
17+
18+
@Test
19+
def checkThatSpanWorksWithAkkaScheduledEvents(): Unit = {
20+
val system = AkkaActors.system
21+
implicit val executionContext = system.dispatcher
22+
val tracer = GlobalOpenTelemetry.get.getTracer("test-tracer")
23+
val initialSpan = tracer.spanBuilder("test").startSpan()
24+
val scope = initialSpan.makeCurrent()
25+
try {
26+
val futureResult = for {
27+
result1 <- Future {
28+
compareSpanContexts(Span.current(), initialSpan)
29+
1
30+
}
31+
_ = compareSpanContexts(Span.current(), initialSpan)
32+
result2 <- after(200.millis, system.scheduler)(Future.successful(2))
33+
_ = compareSpanContexts(Span.current(), initialSpan)
34+
} yield result1 + result2
35+
assertThat(Await.result(futureResult, 5.seconds)).isEqualTo(3)
36+
} finally {
37+
scope.close()
38+
initialSpan.end()
39+
}
40+
}
41+
42+
private def compareSpanContexts(span1: Span, span2: Span): Unit = {
43+
assertThat(span1.getSpanContext().getTraceId())
44+
.isEqualTo(span2.getSpanContext().getTraceId())
45+
assertThat(span1.getSpanContext().getSpanId())
46+
.isEqualTo(span2.getSpanContext().getSpanId())
47+
}
48+
}

0 commit comments

Comments
 (0)