Skip to content

Commit 1995852

Browse files
authored
Instrument ContextPropagationOperator to bridge lib/agent calls (#4786)
* Instrument ContextPropagationOperator to bridge lib/agent calls * more tests * clean up * up * lint * more lint * make runWithContext(Flux, ..) public * lint
1 parent ebe4c65 commit 1995852

File tree

6 files changed

+309
-5
lines changed

6 files changed

+309
-5
lines changed

instrumentation/opentelemetry-api/opentelemetry-api-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/opentelemetryapi/context/AgentContextStorage.java

+8
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,10 @@ public static io.opentelemetry.context.Context getAgentContext(Context applicati
128128
return io.opentelemetry.context.Context.root();
129129
}
130130

131+
public static Context toApplicationContext(io.opentelemetry.context.Context agentContext) {
132+
return new AgentContextWrapper(agentContext);
133+
}
134+
131135
public static Context newContextWrapper(
132136
io.opentelemetry.context.Context agentContext, Context applicationContext) {
133137
if (applicationContext instanceof AgentContextWrapper) {
@@ -227,6 +231,10 @@ private static class AgentContextWrapper implements Context {
227231
final io.opentelemetry.context.Context agentContext;
228232
final Context applicationContext;
229233

234+
AgentContextWrapper(io.opentelemetry.context.Context agentContext) {
235+
this(agentContext, agentContext.get(APPLICATION_CONTEXT));
236+
}
237+
230238
AgentContextWrapper(io.opentelemetry.context.Context agentContext, Context applicationContext) {
231239
if (applicationContext instanceof AgentContextWrapper) {
232240
throw new IllegalStateException("Expected unwrapped context");

instrumentation/reactor-3.1/javaagent/build.gradle.kts

+7-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ muzzle {
77
group.set("io.projectreactor")
88
module.set("reactor-core")
99
versions.set("[3.1.0.RELEASE,)")
10+
extraDependency("io.opentelemetry:opentelemetry-api:1.0.0")
1011
assertInverse.set(true)
1112
}
1213
}
@@ -18,11 +19,15 @@ tasks.withType<Test>().configureEach {
1819

1920
dependencies {
2021
implementation(project(":instrumentation:reactor-3.1:library"))
22+
library("io.projectreactor:reactor-core:3.1.0.RELEASE")
23+
24+
implementation(project(":instrumentation:opentelemetry-api:opentelemetry-api-1.0:javaagent"))
25+
26+
compileOnly(project(":javaagent-tooling"))
2127
compileOnly(project(":instrumentation-api-annotation-support"))
28+
compileOnly(project(path = ":opentelemetry-api-shaded-for-instrumenting", configuration = "shadow"))
2229

23-
testLibrary("io.projectreactor:reactor-core:3.1.0.RELEASE")
2430
testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE")
25-
2631
testImplementation(project(":instrumentation:reactor-3.1:testing"))
2732
testImplementation("io.opentelemetry:opentelemetry-extension-annotations")
2833

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.reactor;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
9+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
10+
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
11+
import static net.bytebuddy.matcher.ElementMatchers.named;
12+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
13+
import static net.bytebuddy.matcher.ElementMatchers.returns;
14+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
15+
16+
import application.io.opentelemetry.context.Context;
17+
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
18+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
19+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
20+
import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage;
21+
import net.bytebuddy.asm.Advice;
22+
import net.bytebuddy.description.type.TypeDescription;
23+
import net.bytebuddy.matcher.ElementMatcher;
24+
25+
public class ContextPropagationOperatorInstrumentation implements TypeInstrumentation {
26+
@Override
27+
public ElementMatcher<TypeDescription> typeMatcher() {
28+
return named("application.io.opentelemetry.instrumentation.reactor.ContextPropagationOperator");
29+
}
30+
31+
@Override
32+
public void transform(TypeTransformer transformer) {
33+
transformer.applyAdviceToMethod(
34+
isMethod()
35+
.and(isPublic())
36+
.and(isStatic())
37+
.and(named("storeOpenTelemetryContext"))
38+
.and(takesArgument(0, named("reactor.util.context.Context")))
39+
.and(takesArgument(1, named("application.io.opentelemetry.context.Context")))
40+
.and(returns(named("reactor.util.context.Context"))),
41+
ContextPropagationOperatorInstrumentation.class.getName() + "$StoreAdvice");
42+
transformer.applyAdviceToMethod(
43+
isMethod()
44+
.and(isPublic())
45+
.and(isStatic())
46+
.and(named("getOpenTelemetryContext"))
47+
.and(takesArgument(0, named("reactor.util.context.Context")))
48+
.and(takesArgument(1, named("application.io.opentelemetry.context.Context")))
49+
.and(returns(named("application.io.opentelemetry.context.Context"))),
50+
ContextPropagationOperatorInstrumentation.class.getName() + "$GetAdvice");
51+
transformer.applyAdviceToMethod(
52+
isMethod()
53+
.and(isPublic())
54+
.and(isStatic())
55+
.and(named("runWithContext"))
56+
.and(
57+
takesArgument(
58+
0, namedOneOf("reactor.core.publisher.Mono", "reactor.core.publisher.Flux")))
59+
.and(takesArgument(1, named("application.io.opentelemetry.context.Context")))
60+
.and(returns(namedOneOf("reactor.core.publisher.Mono", "reactor.core.publisher.Flux"))),
61+
ContextPropagationOperatorInstrumentation.class.getName() + "$RunWithAdvice");
62+
}
63+
64+
@SuppressWarnings("unused")
65+
public static class StoreAdvice {
66+
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnDefaultValue.class)
67+
public static boolean methodEnter() {
68+
return false;
69+
}
70+
71+
@Advice.OnMethodExit(suppress = Throwable.class)
72+
public static void methodExit(
73+
@Advice.Argument(0) reactor.util.context.Context reactorContext,
74+
@Advice.Argument(1) Context applicationContext,
75+
@Advice.Return(readOnly = false) reactor.util.context.Context updatedReactorContext) {
76+
updatedReactorContext =
77+
ContextPropagationOperator.storeOpenTelemetryContext(
78+
reactorContext, AgentContextStorage.getAgentContext(applicationContext));
79+
}
80+
}
81+
82+
@SuppressWarnings("unused")
83+
public static class GetAdvice {
84+
@Advice.OnMethodEnter(skipOn = Advice.OnDefaultValue.class)
85+
public static boolean methodEnter() {
86+
return false;
87+
}
88+
89+
@Advice.OnMethodExit(suppress = Throwable.class)
90+
public static void methodExit(
91+
@Advice.Argument(0) reactor.util.context.Context reactorContext,
92+
@Advice.Argument(1) Context defaultContext,
93+
@Advice.Return(readOnly = false) Context applicationContext) {
94+
95+
io.opentelemetry.context.Context agentContext =
96+
ContextPropagationOperator.getOpenTelemetryContext(reactorContext, null);
97+
if (agentContext == null) {
98+
applicationContext = defaultContext;
99+
} else {
100+
applicationContext = AgentContextStorage.toApplicationContext(agentContext);
101+
}
102+
}
103+
}
104+
105+
@SuppressWarnings("unused")
106+
public static class RunWithAdvice {
107+
@Advice.OnMethodEnter
108+
public static void methodEnter(
109+
@Advice.FieldValue(value = "enabled", readOnly = false) boolean enabled) {
110+
enabled = true;
111+
}
112+
}
113+
}

instrumentation/reactor-3.1/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/reactor/ReactorInstrumentationModule.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
package io.opentelemetry.javaagent.instrumentation.reactor;
77

8-
import static java.util.Collections.singletonList;
8+
import static java.util.Arrays.asList;
99

1010
import com.google.auto.service.AutoService;
1111
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
@@ -21,6 +21,6 @@ public ReactorInstrumentationModule() {
2121

2222
@Override
2323
public List<TypeInstrumentation> typeInstrumentations() {
24-
return singletonList(new HooksInstrumentation());
24+
return asList(new HooksInstrumentation(), new ContextPropagationOperatorInstrumentation());
2525
}
2626
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
import io.opentelemetry.api.GlobalOpenTelemetry
7+
import io.opentelemetry.api.trace.Span
8+
import io.opentelemetry.api.trace.SpanKind
9+
import io.opentelemetry.context.Context
10+
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator
11+
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
12+
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
13+
import reactor.core.publisher.Flux
14+
import reactor.core.publisher.Mono
15+
import reactor.test.StepVerifier
16+
17+
import java.time.Duration
18+
19+
class ContextPropagationOperatorInstrumentationTest extends AgentInstrumentationSpecification {
20+
def "store and get context"() {
21+
22+
def reactorContext = reactor.util.context.Context.empty()
23+
def traceContext = Context.root()
24+
setup:
25+
runWithSpan("parent") { ->
26+
reactorContext = ContextPropagationOperator.storeOpenTelemetryContext(reactorContext, Context.current())
27+
traceContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, null)
28+
assert traceContext != null
29+
Span.fromContext(traceContext).setAttribute("foo", "bar")
30+
}
31+
32+
expect:
33+
assert reactorContext.stream().count() == 1
34+
assertTraces(1) {
35+
trace(0, 1) {
36+
span(0) {
37+
name "parent"
38+
kind SpanKind.INTERNAL
39+
hasNoParent()
40+
41+
attributes {
42+
"foo" "bar"
43+
}
44+
}
45+
}
46+
}
47+
}
48+
49+
def "get missing context"() {
50+
def traceContext = Context.root()
51+
setup:
52+
runWithSpan("parent") { ->
53+
assert ContextPropagationOperator.getOpenTelemetryContext(reactor.util.context.Context.empty(), null) == null
54+
traceContext = ContextPropagationOperator.getOpenTelemetryContext(reactor.util.context.Context.empty(), Context.current())
55+
Span.fromContext(traceContext).setAttribute("foo", "bar")
56+
}
57+
58+
expect:
59+
assertTraces(1) {
60+
trace(0, 1) {
61+
span(0) {
62+
name "parent"
63+
kind SpanKind.INTERNAL
64+
hasNoParent()
65+
66+
attributes {
67+
"foo" "bar"
68+
}
69+
}
70+
}
71+
}
72+
}
73+
74+
def "run Mono with context forces it to become current"() {
75+
setup:
76+
def result = Mono.defer({ ->
77+
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("parent").startSpan()
78+
def outer = Mono.defer({ -> new TracedWithSpan().mono(Mono.just("Value") )});
79+
return ContextPropagationOperator
80+
.runWithContext(outer, Context.current().with(span))
81+
.doFinally({ i -> span.end() })
82+
})
83+
84+
StepVerifier.create(result)
85+
.expectNext("Value")
86+
.verifyComplete()
87+
88+
expect:
89+
assertTraces(1) {
90+
trace(0, 2) {
91+
span(0) {
92+
name "parent"
93+
kind SpanKind.INTERNAL
94+
hasNoParent()
95+
}
96+
span(1) {
97+
name "TracedWithSpan.mono"
98+
kind SpanKind.INTERNAL
99+
childOf span(0)
100+
attributes {
101+
}
102+
}
103+
}
104+
}
105+
}
106+
107+
def "run Flux with context forces it to become current"() {
108+
setup:
109+
def result = Flux.defer({ ->
110+
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("parent").startSpan()
111+
def outer = Flux.defer({ -> new TracedWithSpan().flux(Flux.just("Value") )});
112+
return ContextPropagationOperator
113+
.runWithContext(outer, Context.current().with(span))
114+
.doFinally({ i -> span.end() })
115+
})
116+
117+
StepVerifier.create(result)
118+
.expectNext("Value")
119+
.verifyComplete()
120+
121+
expect:
122+
assertTraces(1) {
123+
trace(0, 2) {
124+
span(0) {
125+
name "parent"
126+
kind SpanKind.INTERNAL
127+
hasNoParent()
128+
}
129+
span(1) {
130+
name "TracedWithSpan.flux"
131+
kind SpanKind.INTERNAL
132+
childOf span(0)
133+
attributes {
134+
}
135+
}
136+
}
137+
}
138+
}
139+
140+
def "store context forces it to become current"() {
141+
setup:
142+
def result = Mono.defer({ ->
143+
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("parent").startSpan()
144+
145+
Mono.delay(Duration.ofMillis(1))
146+
.flatMap({ t ->
147+
// usual trick to force this to run under new TracingSubscriber with context written in the next call
148+
new TracedWithSpan().mono(Mono.just("Value"))
149+
})
150+
.subscriberContext({ ctx ->
151+
ContextPropagationOperator.storeOpenTelemetryContext(ctx, Context.current().with(span))
152+
})
153+
.doFinally({ i -> span.end() })
154+
})
155+
156+
StepVerifier.create(result)
157+
.expectNext("Value")
158+
.verifyComplete()
159+
160+
expect:
161+
assertTraces(1) {
162+
trace(0, 2) {
163+
span(0) {
164+
name "parent"
165+
kind SpanKind.INTERNAL
166+
hasNoParent()
167+
}
168+
span(1) {
169+
name "TracedWithSpan.mono"
170+
kind SpanKind.INTERNAL
171+
childOf span(0)
172+
attributes {
173+
}
174+
}
175+
}
176+
}
177+
}
178+
}

instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/ContextPropagationOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingConte
131131
}
132132

133133
/** Forces Flux to run in traceContext scope. */
134-
static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
134+
public static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
135135
if (!enabled) {
136136
return publisher;
137137
}

0 commit comments

Comments
 (0)