Skip to content

Commit 11b59d0

Browse files
authored
pekko: fix spans on server timeout (#13435)
1 parent 01cce80 commit 11b59d0

12 files changed

+346
-72
lines changed

instrumentation/pekko/pekko-http-1.0/javaagent/build.gradle.kts

-2
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ tasks {
9191
jvmArgs("--add-exports=java.base/sun.security.util=ALL-UNNAMED")
9292
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
9393

94-
jvmArgs("-Dio.opentelemetry.javaagent.shaded.io.opentelemetry.context.enableStrictContext=false")
95-
9694
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
9795
}
9896

instrumentation/pekko/pekko-http-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/server/GraphInterpreterInstrumentation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public static class PushAdvice {
3535
public static Scope onEnter(@Advice.Argument(0) GraphInterpreter.Connection connection) {
3636
// processPush is called when execution passes to application or server. Here we propagate the
3737
// context to the application code.
38-
Context context = PekkoFlowWrapper.getContext(connection.outHandler());
38+
Context context = PekkoFlowWrapper.getAndRemoveContext(connection.outHandler());
3939
if (context != null) {
4040
return context.makeCurrent();
4141
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
7+
8+
import static net.bytebuddy.matcher.ElementMatchers.named;
9+
import static net.bytebuddy.matcher.ElementMatchers.returns;
10+
11+
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
12+
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
13+
import net.bytebuddy.asm.Advice;
14+
import net.bytebuddy.description.type.TypeDescription;
15+
import net.bytebuddy.matcher.ElementMatcher;
16+
import org.apache.pekko.http.scaladsl.model.HttpRequest;
17+
import org.apache.pekko.http.scaladsl.model.HttpResponse;
18+
import org.apache.pekko.stream.scaladsl.BidiFlow;
19+
20+
public class HttpServerBluePrintInstrumentation implements TypeInstrumentation {
21+
@Override
22+
public ElementMatcher<TypeDescription> typeMatcher() {
23+
return named("org.apache.pekko.http.impl.engine.server.HttpServerBluePrint$");
24+
}
25+
26+
@Override
27+
public void transform(TypeTransformer transformer) {
28+
transformer.applyAdviceToMethod(
29+
named("requestPreparation")
30+
.and(returns(named("org.apache.pekko.stream.scaladsl.BidiFlow"))),
31+
this.getClass().getName() + "$PekkoBindAndHandleAdvice");
32+
}
33+
34+
@SuppressWarnings("unused")
35+
public static class PekkoBindAndHandleAdvice {
36+
37+
@Advice.AssignReturned.ToReturned
38+
@Advice.OnMethodExit(suppress = Throwable.class)
39+
public static BidiFlow<HttpResponse, ?, ?, HttpRequest, ?> wrapHandler(
40+
@Advice.Return BidiFlow<HttpResponse, ?, ?, HttpRequest, ?> handler) {
41+
42+
return PekkoHttpServerTracer.wrap(handler);
43+
}
44+
}
45+
}

instrumentation/pekko/pekko-http-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/server/PekkoFlowWrapper.java

+15-64
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,9 @@
55

66
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
77

8-
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
9-
108
import io.opentelemetry.context.Context;
11-
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseCustomizerHolder;
12-
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route.PekkoRouteHolder;
139
import java.util.ArrayDeque;
14-
import java.util.Deque;
15-
import java.util.List;
16-
import org.apache.pekko.http.javadsl.model.HttpHeader;
10+
import java.util.Queue;
1711
import org.apache.pekko.http.scaladsl.model.HttpRequest;
1812
import org.apache.pekko.http.scaladsl.model.HttpResponse;
1913
import org.apache.pekko.stream.Attributes;
@@ -42,14 +36,12 @@ public class PekkoFlowWrapper
4236
return handler.join(new PekkoFlowWrapper());
4337
}
4438

45-
public static Context getContext(OutHandler outHandler) {
39+
public static Context getAndRemoveContext(OutHandler outHandler) {
4640
if (outHandler instanceof TracingLogic.ApplicationOutHandler) {
4741
// We have multiple requests here only when requests are pipelined on the same connection.
48-
// It appears that these requests are processed one by one so processing next request won't
49-
// be started before the first one has returned a response, because of this the first request
50-
// in the queue is always the one that is currently being processed.
51-
TracingRequest request =
52-
((TracingLogic.ApplicationOutHandler) outHandler).getRequests().peek();
42+
// We remove the context off of the queue so that the next request can get its context.
43+
PekkoTracingRequest request =
44+
((TracingLogic.ApplicationOutHandler) outHandler).getRequests().poll();
5345
if (request != null) {
5446
return request.context;
5547
}
@@ -69,7 +61,7 @@ public GraphStageLogic createLogic(Attributes attributes) {
6961
}
7062

7163
private class TracingLogic extends GraphStageLogic {
72-
private final Deque<TracingRequest> requests = new ArrayDeque<>();
64+
private final Queue<PekkoTracingRequest> requests = new ArrayDeque<>();
7365

7466
public TracingLogic() {
7567
super(shape);
@@ -112,18 +104,17 @@ public void onDownstreamFinish(Throwable cause) {
112104
@Override
113105
public void onPush() {
114106
HttpRequest request = grab(requestIn);
115-
116-
TracingRequest tracingRequest = TracingRequest.EMPTY;
117-
Context parentContext = currentContext();
118-
if (PekkoHttpServerSingletons.instrumenter().shouldStart(parentContext, request)) {
119-
Context context =
120-
PekkoHttpServerSingletons.instrumenter().start(parentContext, request);
121-
context = PekkoRouteHolder.init(context);
122-
tracingRequest = new TracingRequest(context, request);
107+
PekkoTracingRequest tracingRequest =
108+
request
109+
.getAttribute(PekkoTracingRequest.ATTR_KEY)
110+
.orElse(PekkoTracingRequest.EMPTY);
111+
if (tracingRequest != PekkoTracingRequest.EMPTY) {
112+
// Remove HttpRequest attribute before passing it to user code
113+
request = (HttpRequest) request.removeAttribute(PekkoTracingRequest.ATTR_KEY);
123114
}
124115
// event if span wasn't started we need to push TracingRequest to match response
125116
// with request
126-
requests.push(tracingRequest);
117+
requests.add(tracingRequest);
127118

128119
push(requestOut, request);
129120
}
@@ -146,40 +137,11 @@ public void onUpstreamFailure(Throwable exception) {
146137
@Override
147138
public void onPush() {
148139
HttpResponse response = grab(responseIn);
149-
150-
TracingRequest tracingRequest = requests.poll();
151-
if (tracingRequest != null && tracingRequest != TracingRequest.EMPTY) {
152-
// pekko response is immutable so the customizer just captures the added headers
153-
PekkoHttpResponseMutator responseMutator = new PekkoHttpResponseMutator();
154-
HttpServerResponseCustomizerHolder.getCustomizer()
155-
.customize(tracingRequest.context, response, responseMutator);
156-
// build a new response with the added headers
157-
List<HttpHeader> headers = responseMutator.getHeaders();
158-
if (!headers.isEmpty()) {
159-
response = (HttpResponse) response.addHeaders(headers);
160-
}
161-
162-
PekkoHttpServerSingletons.instrumenter()
163-
.end(tracingRequest.context, tracingRequest.request, response, null);
164-
}
165140
push(responseOut, response);
166141
}
167142

168143
@Override
169144
public void onUpstreamFailure(Throwable exception) {
170-
TracingRequest tracingRequest;
171-
while ((tracingRequest = requests.poll()) != null) {
172-
if (tracingRequest == TracingRequest.EMPTY) {
173-
continue;
174-
}
175-
PekkoHttpServerSingletons.instrumenter()
176-
.end(
177-
tracingRequest.context,
178-
tracingRequest.request,
179-
PekkoHttpServerSingletons.errorResponse(),
180-
exception);
181-
}
182-
183145
fail(responseOut, exception);
184146
}
185147

@@ -191,20 +153,9 @@ public void onUpstreamFinish() {
191153
}
192154

193155
abstract class ApplicationOutHandler extends AbstractOutHandler {
194-
Deque<TracingRequest> getRequests() {
156+
Queue<PekkoTracingRequest> getRequests() {
195157
return requests;
196158
}
197159
}
198160
}
199-
200-
private static class TracingRequest {
201-
static final TracingRequest EMPTY = new TracingRequest(null, null);
202-
final Context context;
203-
final HttpRequest request;
204-
205-
TracingRequest(Context context, HttpRequest request) {
206-
this.context = context;
207-
this.request = request;
208-
}
209-
}
210161
}

instrumentation/pekko/pekko-http-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/server/PekkoHttpServerInstrumentationModule.java

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public String getModuleGroup() {
3838
public List<TypeInstrumentation> typeInstrumentations() {
3939
return asList(
4040
new HttpExtServerInstrumentation(),
41+
new HttpServerBluePrintInstrumentation(),
4142
new GraphInterpreterInstrumentation(),
4243
new PekkoHttpServerSourceInstrumentation());
4344
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server;
7+
8+
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
9+
import static io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.PekkoHttpServerSingletons.instrumenter;
10+
11+
import io.opentelemetry.context.Context;
12+
import io.opentelemetry.javaagent.bootstrap.http.HttpServerResponseCustomizerHolder;
13+
import io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0.server.route.PekkoRouteHolder;
14+
import java.util.ArrayDeque;
15+
import java.util.List;
16+
import java.util.Queue;
17+
import org.apache.pekko.http.javadsl.model.HttpHeader;
18+
import org.apache.pekko.http.scaladsl.model.HttpRequest;
19+
import org.apache.pekko.http.scaladsl.model.HttpResponse;
20+
import org.apache.pekko.stream.Attributes;
21+
import org.apache.pekko.stream.BidiShape;
22+
import org.apache.pekko.stream.Inlet;
23+
import org.apache.pekko.stream.Outlet;
24+
import org.apache.pekko.stream.scaladsl.BidiFlow;
25+
import org.apache.pekko.stream.stage.AbstractInHandler;
26+
import org.apache.pekko.stream.stage.AbstractOutHandler;
27+
import org.apache.pekko.stream.stage.GraphStage;
28+
import org.apache.pekko.stream.stage.GraphStageLogic;
29+
30+
public class PekkoHttpServerTracer
31+
extends GraphStage<BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest>> {
32+
private final Inlet<HttpRequest> requestIn = Inlet.create("otel.requestIn");
33+
private final Outlet<HttpRequest> requestOut = Outlet.create("otel.requestOut");
34+
private final Inlet<HttpResponse> responseIn = Inlet.create("otel.responseIn");
35+
private final Outlet<HttpResponse> responseOut = Outlet.create("otel.responseOut");
36+
37+
private final BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape =
38+
BidiShape.of(responseIn, responseOut, requestIn, requestOut);
39+
40+
public static BidiFlow<HttpResponse, ?, ?, HttpRequest, ?> wrap(
41+
BidiFlow<HttpResponse, ?, ?, HttpRequest, ?> handler) {
42+
return BidiFlow.fromGraph(new PekkoHttpServerTracer()).atopMat(handler, (a, b) -> b);
43+
}
44+
45+
@Override
46+
public BidiShape<HttpResponse, HttpResponse, HttpRequest, HttpRequest> shape() {
47+
return shape;
48+
}
49+
50+
@Override
51+
public GraphStageLogic createLogic(Attributes attributes) {
52+
return new TracingLogic();
53+
}
54+
55+
private class TracingLogic extends GraphStageLogic {
56+
private final Queue<PekkoTracingRequest> requests = new ArrayDeque<>();
57+
58+
public TracingLogic() {
59+
super(shape);
60+
61+
// server pulls response, pass response from user code to server
62+
setHandler(
63+
responseOut,
64+
new AbstractOutHandler() {
65+
@Override
66+
public void onPull() {
67+
pull(responseIn);
68+
}
69+
70+
@Override
71+
public void onDownstreamFinish(Throwable cause) {
72+
cancel(responseIn);
73+
}
74+
});
75+
76+
// user code pulls request, pass request from server to user code
77+
setHandler(
78+
requestOut,
79+
new AbstractOutHandler() {
80+
@Override
81+
public void onPull() {
82+
pull(requestIn);
83+
}
84+
85+
@Override
86+
public void onDownstreamFinish(Throwable cause) {
87+
// Invoked on errors. Don't complete this stage to allow error-capturing
88+
cancel(requestIn);
89+
}
90+
});
91+
92+
// new request from server
93+
setHandler(
94+
requestIn,
95+
new AbstractInHandler() {
96+
@Override
97+
public void onPush() {
98+
HttpRequest request = grab(requestIn);
99+
PekkoTracingRequest tracingRequest = PekkoTracingRequest.EMPTY;
100+
Context parentContext = currentContext();
101+
if (instrumenter().shouldStart(parentContext, request)) {
102+
Context context = instrumenter().start(parentContext, request);
103+
context = PekkoRouteHolder.init(context);
104+
tracingRequest = new PekkoTracingRequest(context, request);
105+
request =
106+
(HttpRequest)
107+
request.addAttribute(PekkoTracingRequest.ATTR_KEY, tracingRequest);
108+
}
109+
// event if span wasn't started we need to push TracingRequest to match response
110+
// with request
111+
requests.add(tracingRequest);
112+
113+
push(requestOut, request);
114+
}
115+
116+
@Override
117+
public void onUpstreamFinish() {
118+
complete(requestOut);
119+
}
120+
121+
@Override
122+
public void onUpstreamFailure(Throwable exception) {
123+
fail(requestOut, exception);
124+
}
125+
});
126+
127+
// response from user code
128+
setHandler(
129+
responseIn,
130+
new AbstractInHandler() {
131+
@Override
132+
public void onPush() {
133+
HttpResponse response = grab(responseIn);
134+
135+
PekkoTracingRequest tracingRequest = requests.poll();
136+
if (tracingRequest != null && tracingRequest != PekkoTracingRequest.EMPTY) {
137+
// pekko response is immutable so the customizer just captures the added headers
138+
PekkoHttpResponseMutator responseMutator = new PekkoHttpResponseMutator();
139+
HttpServerResponseCustomizerHolder.getCustomizer()
140+
.customize(tracingRequest.context, response, responseMutator);
141+
// build a new response with the added headers
142+
List<HttpHeader> headers = responseMutator.getHeaders();
143+
if (!headers.isEmpty()) {
144+
response = (HttpResponse) response.addHeaders(headers);
145+
}
146+
147+
instrumenter().end(tracingRequest.context, tracingRequest.request, response, null);
148+
}
149+
push(responseOut, response);
150+
}
151+
152+
@Override
153+
public void onUpstreamFailure(Throwable exception) {
154+
// End the span for the request that failed
155+
PekkoTracingRequest tracingRequest = requests.poll();
156+
if (tracingRequest != null && tracingRequest != PekkoTracingRequest.EMPTY) {
157+
instrumenter()
158+
.end(
159+
tracingRequest.context,
160+
tracingRequest.request,
161+
PekkoHttpServerSingletons.errorResponse(),
162+
exception);
163+
}
164+
165+
fail(responseOut, exception);
166+
}
167+
168+
@Override
169+
public void onUpstreamFinish() {
170+
// End any ongoing spans, though there should be none.
171+
PekkoTracingRequest tracingRequest;
172+
while ((tracingRequest = requests.poll()) != null) {
173+
if (tracingRequest == PekkoTracingRequest.EMPTY) {
174+
continue;
175+
}
176+
instrumenter()
177+
.end(
178+
tracingRequest.context,
179+
tracingRequest.request,
180+
PekkoHttpServerSingletons.errorResponse(),
181+
null);
182+
}
183+
completeStage();
184+
}
185+
});
186+
}
187+
}
188+
}

0 commit comments

Comments
 (0)