Skip to content

Commit 27e8201

Browse files
authored
Rework context propagation to redisson async callback (open-telemetry#5748)
* Rework context propagation to redisson async callback * add comments
1 parent 4815f1e commit 27e8201

File tree

6 files changed

+175
-18
lines changed

6 files changed

+175
-18
lines changed

instrumentation/redisson-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/CompletableFutureWrapper.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,29 +13,58 @@ public final class CompletableFutureWrapper<T> extends CompletableFuture<T>
1313
implements PromiseWrapper<T> {
1414
private volatile EndOperationListener<T> endOperationListener;
1515

16-
private CompletableFutureWrapper(CompletableFuture<T> delegate, Context context) {
16+
private CompletableFutureWrapper(CompletableFuture<T> delegate) {
1717
this.whenComplete(
1818
(result, error) -> {
1919
EndOperationListener<T> endOperationListener = this.endOperationListener;
2020
if (endOperationListener != null) {
2121
endOperationListener.accept(result, error);
2222
}
23-
try (Scope ignored = context.makeCurrent()) {
24-
if (error != null) {
25-
delegate.completeExceptionally(error);
26-
} else {
27-
delegate.complete(result);
28-
}
23+
if (error != null) {
24+
delegate.completeExceptionally(error);
25+
} else {
26+
delegate.complete(result);
2927
}
3028
});
3129
}
3230

31+
/**
32+
* Wrap {@link CompletableFuture} so that {@link EndOperationListener}, that is used to end the
33+
* span, could be attached to it.
34+
*/
3335
public static <T> CompletableFuture<T> wrap(CompletableFuture<T> delegate) {
3436
if (delegate instanceof CompletableFutureWrapper) {
3537
return delegate;
3638
}
3739

38-
return new CompletableFutureWrapper<>(delegate, Context.current());
40+
return new CompletableFutureWrapper<>(delegate);
41+
}
42+
43+
/**
44+
* Wrap {@link CompletableFuture} to run callbacks with the context that was current at the time
45+
* this method was called.
46+
*
47+
* <p>This method should be called on, or as close as possible to, the {@link CompletableFuture}
48+
* that is returned to the user to ensure that the callbacks added by user are run in appropriate
49+
* context.
50+
*/
51+
public static <T> CompletableFuture<T> wrapContext(CompletableFuture<T> future) {
52+
Context context = Context.current();
53+
// when input future is completed, complete result future with context that was current
54+
// at the time when the future was wrapped
55+
CompletableFuture<T> result = new CompletableFuture<>();
56+
future.whenComplete(
57+
(T value, Throwable throwable) -> {
58+
try (Scope ignored = context.makeCurrent()) {
59+
if (throwable != null) {
60+
result.completeExceptionally(throwable);
61+
} else {
62+
result.complete(value);
63+
}
64+
}
65+
});
66+
67+
return result;
3968
}
4069

4170
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.redisson;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
10+
11+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
13+
import net.bytebuddy.asm.Advice;
14+
import net.bytebuddy.description.type.TypeDescription;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
import org.redisson.misc.RPromise;
17+
18+
public class RedisCommandAsyncServiceInstrumentation implements TypeInstrumentation {
19+
@Override
20+
public ElementMatcher<TypeDescription> typeMatcher() {
21+
return named("org.redisson.command.CommandAsyncService");
22+
}
23+
24+
@Override
25+
public void transform(TypeTransformer transformer) {
26+
// used before 3.16.8
27+
transformer.applyAdviceToMethod(
28+
named("async").and(takesArgument(5, named("org.redisson.misc.RPromise"))),
29+
this.getClass().getName() + "$WrapPromiseAdvice");
30+
}
31+
32+
@SuppressWarnings("unused")
33+
public static class WrapPromiseAdvice {
34+
35+
@Advice.OnMethodEnter(suppress = Throwable.class)
36+
public static void onEnter(@Advice.Argument(value = 5, readOnly = false) RPromise<?> promise) {
37+
promise = RedissonPromiseWrapper.wrapContext(promise);
38+
}
39+
}
40+
}

instrumentation/redisson-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/RedisCommandDataInstrumentation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void transform(TypeTransformer transformer) {
3434
this.getClass().getName() + "$WrapPromiseAdvice");
3535
// since 3.16.8
3636
transformer.applyAdviceToMethod(
37-
isConstructor().and(takesArgument(0, named("java.util.concurrent.CompletableFuture"))),
37+
isConstructor().and(takesArgument(0, CompletableFuture.class)),
3838
this.getClass().getName() + "$WrapCompletableFutureAdvice");
3939
}
4040

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.redisson;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
9+
import static net.bytebuddy.matcher.ElementMatchers.named;
10+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
11+
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
13+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
14+
import java.util.concurrent.CompletableFuture;
15+
import java.util.concurrent.CompletionStage;
16+
import net.bytebuddy.asm.Advice;
17+
import net.bytebuddy.description.type.TypeDescription;
18+
import net.bytebuddy.matcher.ElementMatcher;
19+
20+
public class RedissonCompletableFutureWrapperInstrumentation implements TypeInstrumentation {
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("org.redisson.misc.CompletableFutureWrapper");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
// used since 3.16.8
29+
transformer.applyAdviceToMethod(
30+
isConstructor().and(takesArgument(0, CompletionStage.class)),
31+
this.getClass().getName() + "$WrapCompletionStageAdvice");
32+
transformer.applyAdviceToMethod(
33+
isConstructor().and(takesArgument(0, CompletableFuture.class)),
34+
this.getClass().getName() + "$WrapCompletableFutureAdvice");
35+
}
36+
37+
@SuppressWarnings("unused")
38+
public static class WrapCompletableFutureAdvice {
39+
40+
@Advice.OnMethodEnter(suppress = Throwable.class)
41+
public static void onEnter(
42+
@Advice.Argument(value = 0, readOnly = false) CompletableFuture<?> completableFuture) {
43+
completableFuture = CompletableFutureWrapper.wrapContext(completableFuture);
44+
}
45+
}
46+
47+
@SuppressWarnings("unused")
48+
public static class WrapCompletionStageAdvice {
49+
50+
@Advice.OnMethodEnter(suppress = Throwable.class)
51+
public static void onEnter(
52+
@Advice.Argument(value = 0, readOnly = false) CompletionStage<?> completionStage) {
53+
completionStage = CompletableFutureWrapper.wrapContext(completionStage.toCompletableFuture());
54+
}
55+
}
56+
}

instrumentation/redisson-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/RedissonInstrumentationModule.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ public RedissonInstrumentationModule() {
2121

2222
@Override
2323
public List<TypeInstrumentation> typeInstrumentations() {
24-
return asList(new RedisConnectionInstrumentation(), new RedisCommandDataInstrumentation());
24+
return asList(
25+
new RedisConnectionInstrumentation(),
26+
new RedisCommandDataInstrumentation(),
27+
new RedisCommandAsyncServiceInstrumentation(),
28+
new RedissonCompletableFutureWrapperInstrumentation());
2529
}
2630
}

instrumentation/redisson-3.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/redisson/RedissonPromiseWrapper.java

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,29 +13,57 @@
1313
public class RedissonPromiseWrapper<T> extends RedissonPromise<T> implements PromiseWrapper<T> {
1414
private volatile EndOperationListener<T> endOperationListener;
1515

16-
private RedissonPromiseWrapper(RPromise<T> delegate, Context context) {
16+
private RedissonPromiseWrapper(RPromise<T> delegate) {
1717
this.whenComplete(
1818
(result, error) -> {
1919
EndOperationListener<T> endOperationListener = this.endOperationListener;
2020
if (endOperationListener != null) {
2121
endOperationListener.accept(result, error);
2222
}
23-
try (Scope ignored = context.makeCurrent()) {
24-
if (error != null) {
25-
delegate.tryFailure(error);
26-
} else {
27-
delegate.trySuccess(result);
28-
}
23+
if (error != null) {
24+
delegate.tryFailure(error);
25+
} else {
26+
delegate.trySuccess(result);
2927
}
3028
});
3129
}
3230

31+
/**
32+
* Wrap {@link RPromise} so that {@link EndOperationListener}, that is used to end the span, could
33+
* be attached to it.
34+
*/
3335
public static <T> RPromise<T> wrap(RPromise<T> delegate) {
3436
if (delegate instanceof RedissonPromiseWrapper) {
3537
return delegate;
3638
}
3739

38-
return new RedissonPromiseWrapper<>(delegate, Context.current());
40+
return new RedissonPromiseWrapper<>(delegate);
41+
}
42+
43+
/**
44+
* Wrap {@link RPromise} to run callbacks with the context that was current at the time this
45+
* method was called.
46+
*
47+
* <p>This method should be called on, or as close as possible to, the {@link RPromise} that is
48+
* returned to the user to ensure that the callbacks added by user are run in appropriate context.
49+
*/
50+
public static <T> RPromise<T> wrapContext(RPromise<T> promise) {
51+
Context context = Context.current();
52+
// when returned promise is completed, complete input promise with context that was current
53+
// at the time when the promise was wrapped
54+
RPromise<T> result = new RedissonPromise<T>();
55+
result.whenComplete(
56+
(value, error) -> {
57+
try (Scope ignored = context.makeCurrent()) {
58+
if (error != null) {
59+
promise.tryFailure(error);
60+
} else {
61+
promise.trySuccess(value);
62+
}
63+
}
64+
});
65+
66+
return result;
3967
}
4068

4169
@Override

0 commit comments

Comments
 (0)