Skip to content

Commit ee7aa8a

Browse files
Add ContextPropagation runtime util + captureContext operator (reactor#3145)
This commit introduces a `ContextPropagation` runtime-detection utility class as well as a `contextCapture()` operator. If context-propagation isn't on the classpath, this operator is NO-OP. If context-propagation is on the classpath however, the operator will use it to capture relevant ThreadLocals during the subscription phase (at the point a contextWrite would be effected) and store it in the ContextView visible from upstream of the operator. Co-authored-by: Rossen Stoyanchev <[email protected]>
1 parent 5379c6f commit ee7aa8a

File tree

7 files changed

+412
-9
lines changed

7 files changed

+412
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
[[context.propagation]]
2+
= Context-Propagation Support
3+
4+
Since 3.5.0, Reactor-Core embeds support for the `io.micrometer:context-propagation` SPI.
5+
This library is intended as a means to easily adapt between various implementations of the concept of a Context, of which
6+
`ContextView`/`Context` is an example, and between `ThreadLocal` variables as well.
7+
8+
`ReactorContextAccessor` allows the Context-Propagation library to understand Reactor `Context` and `Contextview`.
9+
It implements the SPI and is loaded via `java.util.ServiceLoader`.
10+
No user action is required, other than having a dependency on both reactor-core and `io.micrometer:context-propagation`. The `ReactorContextAccessor` class is public but shouldn't generally be accessed by user code.
11+
12+
On top of that, Reactor-Core 3.5.0 also introduces the `contextCapture` operator that transparently deals with `ContextSnapshot`s if the library is available at runtime, for users' convenience.
13+
14+
== `contextCapture` Operator
15+
This operator can be used when one needs to capture `ThreadLocal` value(s) at subscription time and reflect these values in the Reactor `Context` for the benefit of upstream operators.
16+
It relies on the `context-propagation` library and notably the registered `ThreadLocalAccessor`(s) to discover relevant ThreadLocal values.
17+
18+
This is a convenient alternative to `contextWrite` which uses the `context-propagation` API to obtain a `ContextSnapshot` and then uses that snapshot to populate the Reactor `Context`.
19+
20+
As a result, if there were any ThreadLocal values during subscription phase, for which there is a registered `ThreadLocalAccessor`, their values would now be stored in the Reactor `Context` and visible
21+
at runtime in upstream operators.
22+
23+
====
24+
[source,java]
25+
----
26+
//assuming TL is known to Context-Propagation as key TLKEY.
27+
static final ThreadLocal<String> TL = new ThreadLocal<>();
28+
29+
//in the main thread, TL is set to "HELLO"
30+
TL.set("HELLO");
31+
32+
Mono.deferContextual(ctx ->
33+
Mono.delay(Duration.ofSeconds(1))
34+
//we're now in another thread, TL is not set
35+
.map(v -> "delayed ctx[" + TLKEY + "]=" + ctx.getOrDefault(TLKEY, "not found") + ", TL=" + TL.get())
36+
)
37+
.contextCapture()
38+
.block(); // returns "delayed ctx[TLKEY]=HELLO, TL=null"
39+
----
40+
====

docs/asciidoc/advancedFeatures.adoc

+3-9
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ This chapter covers advanced features and concepts of Reactor, including the fol
1111
* <<scheduler-factory>>
1212
* <<hooks>>
1313
* <<context>>
14+
* <<context.propagation>>
1415
* <<null-safety>>
1516
* <<cleanup>>
1617

@@ -841,15 +842,6 @@ TIP: In order to read from the `Context` without misleading users into thinking
841842
while data is running through the pipeline, only the `ContextView` is exposed by the operators above.
842843
In case one needs to use one of the remaining APIs that still require a `Context`, one can use `Context.of(contextView)` for conversion.
843844

844-
[[context.propagation]]
845-
=== Micrometer Context-Propagation Support
846-
Since 3.5.0, Reactor-Core embeds basic support for the `io.micrometer:context-propagation` SPI.
847-
This library is intended as a mean to easily adapt between various implementations of the concept of a Context, of which
848-
`ContextView`/`Context` is an example, and between `ThreadLocal` variables as well.
849-
850-
`ReactorContextAccessor` is one implementation of this SPI that is loaded via `ServiceLoader`. No user action is required,
851-
other than depending on reactor-core and `context-propagation` (the class is public but shouldn't generally be accessed by user code).
852-
853845
=== Simple `Context` Examples
854846

855847
The examples in this section are meant as ways to better understand some of the caveats of
@@ -1088,6 +1080,8 @@ public void contextForLibraryReactivePut() {
10881080
----
10891081
====
10901082

1083+
include::advanced-contextPropagation.adoc[leveloffset=1]
1084+
10911085
[[cleanup]]
10921086
== Dealing with Objects that Need Cleanup
10931087

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.core.publisher;
18+
19+
import java.util.function.Function;
20+
import java.util.function.Predicate;
21+
22+
import io.micrometer.context.ContextRegistry;
23+
import io.micrometer.context.ContextSnapshot;
24+
25+
import reactor.util.annotation.Nullable;
26+
import reactor.util.context.Context;
27+
28+
/**
29+
* Utility private class to detect if the <a href="https://github.com/micrometer-metrics/context-propagation">context-propagation library</a> is on the classpath and to offer
30+
* ContextSnapshot support to {@link Flux} and {@link Mono}.
31+
*
32+
* @author Simon Baslé
33+
*/
34+
final class ContextPropagation {
35+
36+
static final boolean isContextPropagationAvailable;
37+
38+
static final Predicate<Object> PREDICATE_TRUE = v -> true;
39+
static final Function<Context, Context> NO_OP = c -> c;
40+
static final Function<Context, Context> WITH_GLOBAL_REGISTRY_NO_PREDICATE;
41+
42+
static {
43+
boolean contextPropagation;
44+
try {
45+
Class.forName("io.micrometer.context.ContextRegistry", false, ContextPropagation.class.getClassLoader());
46+
contextPropagation = true;
47+
}
48+
catch (Throwable t) {
49+
contextPropagation = false;
50+
}
51+
isContextPropagationAvailable = contextPropagation;
52+
if (contextPropagation) {
53+
WITH_GLOBAL_REGISTRY_NO_PREDICATE = new ContextCaptureFunction(PREDICATE_TRUE, ContextRegistry.getInstance());
54+
}
55+
else {
56+
WITH_GLOBAL_REGISTRY_NO_PREDICATE = NO_OP;
57+
}
58+
}
59+
60+
/**
61+
* Is Micrometer {@code context-propagation} API on the classpath?
62+
*
63+
* @return true if context-propagation is available at runtime, false otherwise
64+
*/
65+
static boolean isContextPropagationAvailable() {
66+
return isContextPropagationAvailable;
67+
}
68+
69+
/**
70+
* Create a support function that takes a snapshot of thread locals and merges them with the
71+
* provided {@link Context}, resulting in a new {@link Context} which includes entries
72+
* captured from threadLocals by the Context-Propagation API.
73+
*
74+
* @return the {@link Context} augmented with captured entries
75+
*/
76+
public static Function<Context, Context> contextCapture() {
77+
if (!isContextPropagationAvailable) {
78+
return NO_OP;
79+
}
80+
return WITH_GLOBAL_REGISTRY_NO_PREDICATE;
81+
}
82+
83+
/**
84+
* Create a support function that takes a snapshot of thread locals and merges them with the
85+
* provided {@link Context}, resulting in a new {@link Context} which includes entries
86+
* captured from threadLocals by the Context-Propagation API.
87+
* <p>
88+
* The provided {@link Predicate} is used on keys associated to said thread locals
89+
* by the Context-Propagation API to filter which entries should be captured in the
90+
* first place.
91+
*
92+
* @param captureKeyPredicate a {@link Predicate} used on keys to determine if each entry
93+
* should be injected into the new {@link Context}
94+
* @return a {@link Function} augmenting {@link Context} with captured entries
95+
*/
96+
public static Function<Context, Context> contextCapture(Predicate<Object> captureKeyPredicate) {
97+
if (!isContextPropagationAvailable) {
98+
return NO_OP;
99+
}
100+
return new ContextCaptureFunction(captureKeyPredicate, null);
101+
}
102+
103+
//the Function indirection allows tests to directly assert code in this class rather than static methods
104+
static final class ContextCaptureFunction implements Function<Context, Context> {
105+
106+
final Predicate<Object> capturePredicate;
107+
final ContextRegistry registry;
108+
109+
ContextCaptureFunction(Predicate<Object> capturePredicate, @Nullable ContextRegistry registry) {
110+
this.capturePredicate = capturePredicate;
111+
this.registry = registry != null ? registry : ContextRegistry.getInstance();
112+
}
113+
114+
@Override
115+
public Context apply(Context target) {
116+
return ContextSnapshot.captureAllUsing(capturePredicate, this.registry).updateContext(target);
117+
}
118+
}
119+
120+
}

reactor-core/src/main/java/reactor/core/publisher/Flux.java

+21
Original file line numberDiff line numberDiff line change
@@ -4029,6 +4029,27 @@ public final Flux<T> concatWith(Publisher<? extends T> other) {
40294029
return concat(this, other);
40304030
}
40314031

4032+
/**
4033+
* If <a href="https://github.com/micrometer-metrics/context-propagation">context-propagation library</a>
4034+
* is on the classpath, this is a convenience shortcut to capture thread local values during the
4035+
* subscription phase and put them in the {@link Context} that is visible upstream of this operator.
4036+
* <p>
4037+
* As a result this operator should generally be used as close as possible to the end of
4038+
* the chain / subscription point.
4039+
* <p>
4040+
* If context-propagation is not available at runtime, this operator simply returns the current {@link Flux}
4041+
* instance.
4042+
*
4043+
* @return a new {@link Flux} where context-propagation API has been used to capture entries and
4044+
* inject them into the {@link Context}
4045+
*/
4046+
public final Flux<T> contextCapture() {
4047+
if (!ContextPropagation.isContextPropagationAvailable()) {
4048+
return this;
4049+
}
4050+
return onAssembly(new FluxContextWrite<>(this, ContextPropagation.contextCapture()));
4051+
}
4052+
40324053
/**
40334054
* Enrich the {@link Context} visible from downstream for the benefit of upstream
40344055
* operators, by making all values from the provided {@link ContextView} visible on top

reactor-core/src/main/java/reactor/core/publisher/Mono.java

+21
Original file line numberDiff line numberDiff line change
@@ -2227,6 +2227,27 @@ public final Flux<T> concatWith(Publisher<? extends T> other) {
22272227
return Flux.concat(this, other);
22282228
}
22292229

2230+
/**
2231+
* If <a href="https://github.com/micrometer-metrics/context-propagation">context-propagation library</a>
2232+
* is on the classpath, this is a convenience shortcut to capture thread local values during the
2233+
* subscription phase and put them in the {@link Context} that is visible upstream of this operator.
2234+
* <p>
2235+
* As a result this operator should generally be used as close as possible to the end of
2236+
* the chain / subscription point.
2237+
* <p>
2238+
* If context-propagation is not available at runtime, this operator simply returns the current {@link Mono}
2239+
* instance.
2240+
*
2241+
* @return a new {@link Flux} where context-propagation API has been used to capture entries and
2242+
* inject them into the {@link Context}
2243+
*/
2244+
public final Mono<T> contextCapture() {
2245+
if (!ContextPropagation.isContextPropagationAvailable()) {
2246+
return this;
2247+
}
2248+
return onAssembly(new MonoContextWrite<>(this, ContextPropagation.contextCapture()));
2249+
}
2250+
22302251
/**
22312252
* Enrich the {@link Context} visible from downstream for the benefit of upstream
22322253
* operators, by making all values from the provided {@link ContextView} visible on top
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.core.publisher;
18+
19+
import java.util.function.Predicate;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import static org.assertj.core.api.Assertions.assertThat;
24+
25+
/**
26+
* For tests that actually assert things when context-propagation is available, see
27+
* {@code withMicrometer} test set.
28+
* @author Simon Baslé
29+
*/
30+
class ContextPropagationNotThereSmokeTest {
31+
32+
@Test
33+
void contextPropagationIsNotAvailable() {
34+
assertThat(ContextPropagation.isContextPropagationAvailable()).isFalse();
35+
}
36+
37+
@Test
38+
void contextCaptureIsNoOp() {
39+
assertThat(ContextPropagation.contextCapture()).as("without predicate").isSameAs(ContextPropagation.NO_OP);
40+
assertThat(ContextPropagation.contextCapture(v -> true)).as("with predicate").isSameAs(ContextPropagation.NO_OP);
41+
}
42+
43+
@Test
44+
void contextCaptureFluxApiIsNoOp() {
45+
Flux<Integer> source = Flux.empty();
46+
assertThat(source.contextCapture()).isSameAs(source);
47+
}
48+
49+
@Test
50+
void contextCaptureMonoApiIsNoOp() {
51+
Mono<Integer> source = Mono.empty();
52+
assertThat(source.contextCapture()).isSameAs(source);
53+
}
54+
55+
}

0 commit comments

Comments
 (0)