Skip to content

Commit ff97f6c

Browse files
authored
Implement capturing message headers for aws1 sqs spans (#9824)
1 parent 55681e6 commit ff97f6c

File tree

20 files changed

+311
-103
lines changed

20 files changed

+311
-103
lines changed

instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ConfigPropertiesUtil.java

+18
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55

66
package io.opentelemetry.instrumentation.api.internal;
77

8+
import java.util.Arrays;
9+
import java.util.List;
810
import java.util.Locale;
11+
import java.util.stream.Collectors;
912
import javax.annotation.Nullable;
1013

1114
/**
@@ -40,6 +43,21 @@ public static String getString(String propertyName) {
4043
return System.getenv(toEnvVarName(propertyName));
4144
}
4245

46+
public static List<String> getList(String propertyName, List<String> defaultValue) {
47+
String value = getString(propertyName);
48+
if (value == null) {
49+
return defaultValue;
50+
}
51+
return filterBlanksAndNulls(value.split(","));
52+
}
53+
54+
private static List<String> filterBlanksAndNulls(String[] values) {
55+
return Arrays.stream(values)
56+
.map(String::trim)
57+
.filter(s -> !s.isEmpty())
58+
.collect(Collectors.toList());
59+
}
60+
4361
private static String toEnvVarName(String propertyName) {
4462
return propertyName.toUpperCase(Locale.ROOT).replace('-', '_').replace('.', '_');
4563
}

instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ public class TracingRequestHandler extends RequestHandler2 {
4040
.getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false))
4141
.setMessagingReceiveInstrumentationEnabled(
4242
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
43+
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
4344
.build()
4445
.newRequestHandler();
4546

instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dependencies {
2626
tasks {
2727
withType<Test>().configureEach {
2828
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true")
29+
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header")
2930
}
3031

3132
val testReceiveSpansDisabled by registering(Test::class) {

instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java

+5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package io.opentelemetry.instrumentation.awssdk.v1_11.autoconfigure;
77

8+
import static java.util.Collections.emptyList;
9+
810
import com.amazonaws.AmazonWebServiceRequest;
911
import com.amazonaws.Request;
1012
import com.amazonaws.Response;
@@ -26,6 +28,9 @@ public class TracingRequestHandler extends RequestHandler2 {
2628
.setMessagingReceiveInstrumentationEnabled(
2729
ConfigPropertiesUtil.getBoolean(
2830
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
31+
.setCapturedHeaders(
32+
ConfigPropertiesUtil.getList(
33+
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
2934
.build()
3035
.newRequestHandler();
3136

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.awssdk.v1_11;
7+
8+
import com.amazonaws.Request;
9+
10+
abstract class AbstractSqsRequest {
11+
12+
public abstract Request<?> getRequest();
13+
}

instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java

+77-59
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@
2222
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
2323
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
2424
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
25+
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
2526
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
2627
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
2728
import java.util.ArrayList;
2829
import java.util.Arrays;
2930
import java.util.List;
31+
import java.util.function.Function;
3032
import javax.annotation.Nullable;
3133

3234
final class AwsSdkInstrumenterFactory {
@@ -47,48 +49,73 @@ final class AwsSdkInstrumenterFactory {
4749
httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor);
4850
private static final AwsSdkSpanNameExtractor spanName = new AwsSdkSpanNameExtractor();
4951

50-
static Instrumenter<Request<?>, Response<?>> requestInstrumenter(
51-
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
52+
private final OpenTelemetry openTelemetry;
53+
private final List<String> capturedHeaders;
54+
private final boolean captureExperimentalSpanAttributes;
55+
private final boolean messagingReceiveInstrumentationEnabled;
5256

57+
AwsSdkInstrumenterFactory(
58+
OpenTelemetry openTelemetry,
59+
List<String> capturedHeaders,
60+
boolean captureExperimentalSpanAttributes,
61+
boolean messagingReceiveInstrumentationEnabled) {
62+
this.openTelemetry = openTelemetry;
63+
this.capturedHeaders = capturedHeaders;
64+
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
65+
this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
66+
}
67+
68+
Instrumenter<Request<?>, Response<?>> requestInstrumenter() {
5369
return createInstrumenter(
5470
openTelemetry,
55-
captureExperimentalSpanAttributes,
5671
spanName,
5772
SpanKindExtractor.alwaysClient(),
73+
attributesExtractors(),
5874
emptyList(),
5975
true);
6076
}
6177

62-
static Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter(
63-
OpenTelemetry openTelemetry,
64-
boolean captureExperimentalSpanAttributes,
65-
boolean messagingReceiveInstrumentationEnabled) {
66-
return sqsInstrumenter(
78+
private List<AttributesExtractor<Request<?>, Response<?>>> attributesExtractors() {
79+
return captureExperimentalSpanAttributes
80+
? extendedAttributesExtractors
81+
: defaultAttributesExtractors;
82+
}
83+
84+
private <REQUEST, RESPONSE> AttributesExtractor<REQUEST, RESPONSE> messagingAttributesExtractor(
85+
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
86+
return MessagingAttributesExtractor.builder(getter, operation)
87+
.setCapturedHeaders(capturedHeaders)
88+
.build();
89+
}
90+
91+
Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter() {
92+
MessageOperation operation = MessageOperation.RECEIVE;
93+
SqsReceiveRequestAttributesGetter getter = SqsReceiveRequestAttributesGetter.INSTANCE;
94+
AttributesExtractor<SqsReceiveRequest, Response<?>> messagingAttributeExtractor =
95+
messagingAttributesExtractor(getter, operation);
96+
97+
return createInstrumenter(
6798
openTelemetry,
68-
MessageOperation.RECEIVE,
69-
captureExperimentalSpanAttributes,
99+
MessagingSpanNameExtractor.create(getter, operation),
100+
SpanKindExtractor.alwaysConsumer(),
101+
toSqsRequestExtractors(attributesExtractors(), Function.identity()),
102+
singletonList(messagingAttributeExtractor),
70103
messagingReceiveInstrumentationEnabled);
71104
}
72105

73-
static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
74-
OpenTelemetry openTelemetry,
75-
boolean captureExperimentalSpanAttributes,
76-
boolean messagingReceiveInstrumentationEnabled) {
106+
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
77107
MessageOperation operation = MessageOperation.PROCESS;
78108
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
109+
AttributesExtractor<SqsProcessRequest, Void> messagingAttributeExtractor =
110+
messagingAttributesExtractor(getter, operation);
79111

80112
InstrumenterBuilder<SqsProcessRequest, Void> builder =
81113
Instrumenter.<SqsProcessRequest, Void>builder(
82114
openTelemetry,
83115
INSTRUMENTATION_NAME,
84116
MessagingSpanNameExtractor.create(getter, operation))
85-
.addAttributesExtractors(
86-
toProcessRequestExtractors(
87-
captureExperimentalSpanAttributes
88-
? extendedAttributesExtractors
89-
: defaultAttributesExtractors))
90-
.addAttributesExtractor(
91-
MessagingAttributesExtractor.builder(getter, operation).build());
117+
.addAttributesExtractors(toSqsRequestExtractors(attributesExtractors(), unused -> null))
118+
.addAttributesExtractor(messagingAttributeExtractor);
92119

93120
if (messagingReceiveInstrumentationEnabled) {
94121
builder.addSpanLinksExtractor(
@@ -101,77 +128,68 @@ static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
101128
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
102129
}
103130

104-
private static List<AttributesExtractor<SqsProcessRequest, Void>> toProcessRequestExtractors(
105-
List<AttributesExtractor<Request<?>, Response<?>>> extractors) {
106-
List<AttributesExtractor<SqsProcessRequest, Void>> result = new ArrayList<>();
131+
private static <RESPONSE>
132+
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
133+
List<AttributesExtractor<Request<?>, Response<?>>> extractors,
134+
Function<RESPONSE, Response<?>> responseConverter) {
135+
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
107136
for (AttributesExtractor<Request<?>, Response<?>> extractor : extractors) {
108137
result.add(
109-
new AttributesExtractor<SqsProcessRequest, Void>() {
138+
new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
110139
@Override
111140
public void onStart(
112141
AttributesBuilder attributes,
113142
Context parentContext,
114-
SqsProcessRequest sqsProcessRequest) {
115-
extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest());
143+
AbstractSqsRequest sqsRequest) {
144+
extractor.onStart(attributes, parentContext, sqsRequest.getRequest());
116145
}
117146

118147
@Override
119148
public void onEnd(
120149
AttributesBuilder attributes,
121150
Context context,
122-
SqsProcessRequest sqsProcessRequest,
123-
@Nullable Void unused,
151+
AbstractSqsRequest sqsRequest,
152+
@Nullable RESPONSE response,
124153
@Nullable Throwable error) {
125-
extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error);
154+
extractor.onEnd(
155+
attributes,
156+
context,
157+
sqsRequest.getRequest(),
158+
responseConverter.apply(response),
159+
error);
126160
}
127161
});
128162
}
129163
return result;
130164
}
131165

132-
static Instrumenter<Request<?>, Response<?>> producerInstrumenter(
133-
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
134-
return sqsInstrumenter(
135-
openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes, true);
136-
}
137-
138-
private static Instrumenter<Request<?>, Response<?>> sqsInstrumenter(
139-
OpenTelemetry openTelemetry,
140-
MessageOperation operation,
141-
boolean captureExperimentalSpanAttributes,
142-
boolean enabled) {
166+
Instrumenter<Request<?>, Response<?>> producerInstrumenter() {
167+
MessageOperation operation = MessageOperation.PUBLISH;
143168
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
144169
AttributesExtractor<Request<?>, Response<?>> messagingAttributeExtractor =
145-
MessagingAttributesExtractor.builder(getter, operation).build();
170+
messagingAttributesExtractor(getter, operation);
146171

147172
return createInstrumenter(
148173
openTelemetry,
149-
captureExperimentalSpanAttributes,
150174
MessagingSpanNameExtractor.create(getter, operation),
151-
operation == MessageOperation.PUBLISH
152-
? SpanKindExtractor.alwaysProducer()
153-
: SpanKindExtractor.alwaysConsumer(),
175+
SpanKindExtractor.alwaysProducer(),
176+
attributesExtractors(),
154177
singletonList(messagingAttributeExtractor),
155-
enabled);
178+
true);
156179
}
157180

158-
private static Instrumenter<Request<?>, Response<?>> createInstrumenter(
181+
private static <REQUEST, RESPONSE> Instrumenter<REQUEST, RESPONSE> createInstrumenter(
159182
OpenTelemetry openTelemetry,
160-
boolean captureExperimentalSpanAttributes,
161-
SpanNameExtractor<Request<?>> spanNameExtractor,
162-
SpanKindExtractor<Request<?>> spanKindExtractor,
163-
List<AttributesExtractor<Request<?>, Response<?>>> additionalAttributeExtractors,
183+
SpanNameExtractor<REQUEST> spanNameExtractor,
184+
SpanKindExtractor<REQUEST> spanKindExtractor,
185+
List<? extends AttributesExtractor<? super REQUEST, ? super RESPONSE>> attributeExtractors,
186+
List<AttributesExtractor<REQUEST, RESPONSE>> additionalAttributeExtractors,
164187
boolean enabled) {
165-
return Instrumenter.<Request<?>, Response<?>>builder(
188+
return Instrumenter.<REQUEST, RESPONSE>builder(
166189
openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
167-
.addAttributesExtractors(
168-
captureExperimentalSpanAttributes
169-
? extendedAttributesExtractors
170-
: defaultAttributesExtractors)
190+
.addAttributesExtractors(attributeExtractors)
171191
.addAttributesExtractors(additionalAttributeExtractors)
172192
.setEnabled(enabled)
173193
.buildInstrumenter(spanKindExtractor);
174194
}
175-
176-
private AwsSdkInstrumenterFactory() {}
177195
}

instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java

+10-14
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.opentelemetry.api.OpenTelemetry;
1212
import io.opentelemetry.context.Context;
1313
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
14+
import java.util.List;
1415

1516
/**
1617
* Entrypoint for instrumenting AWS SDK v1 clients.
@@ -45,30 +46,25 @@ public static AwsSdkTelemetryBuilder builder(OpenTelemetry openTelemetry) {
4546
}
4647

4748
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
48-
private final Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter;
49+
private final Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter;
4950
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
5051
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
5152

5253
AwsSdkTelemetry(
5354
OpenTelemetry openTelemetry,
55+
List<String> capturedHeaders,
5456
boolean captureExperimentalSpanAttributes,
5557
boolean messagingReceiveInstrumentationEnabled) {
56-
requestInstrumenter =
57-
AwsSdkInstrumenterFactory.requestInstrumenter(
58-
openTelemetry, captureExperimentalSpanAttributes);
59-
consumerReceiveInstrumenter =
60-
AwsSdkInstrumenterFactory.consumerReceiveInstrumenter(
58+
AwsSdkInstrumenterFactory instrumenterFactory =
59+
new AwsSdkInstrumenterFactory(
6160
openTelemetry,
61+
capturedHeaders,
6262
captureExperimentalSpanAttributes,
6363
messagingReceiveInstrumentationEnabled);
64-
consumerProcessInstrumenter =
65-
AwsSdkInstrumenterFactory.consumerProcessInstrumenter(
66-
openTelemetry,
67-
captureExperimentalSpanAttributes,
68-
messagingReceiveInstrumentationEnabled);
69-
producerInstrumenter =
70-
AwsSdkInstrumenterFactory.producerInstrumenter(
71-
openTelemetry, captureExperimentalSpanAttributes);
64+
requestInstrumenter = instrumenterFactory.requestInstrumenter();
65+
consumerReceiveInstrumenter = instrumenterFactory.consumerReceiveInstrumenter();
66+
consumerProcessInstrumenter = instrumenterFactory.consumerProcessInstrumenter();
67+
producerInstrumenter = instrumenterFactory.producerInstrumenter();
7268
}
7369

7470
/**

instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,36 @@
55

66
package io.opentelemetry.instrumentation.awssdk.v1_11;
77

8+
import static java.util.Collections.emptyList;
9+
810
import com.google.errorprone.annotations.CanIgnoreReturnValue;
911
import io.opentelemetry.api.OpenTelemetry;
12+
import java.util.List;
1013

1114
/** A builder of {@link AwsSdkTelemetry}. */
1215
public class AwsSdkTelemetryBuilder {
1316

1417
private final OpenTelemetry openTelemetry;
1518

19+
private List<String> capturedHeaders = emptyList();
1620
private boolean captureExperimentalSpanAttributes;
1721
private boolean messagingReceiveInstrumentationEnabled;
1822

1923
AwsSdkTelemetryBuilder(OpenTelemetry openTelemetry) {
2024
this.openTelemetry = openTelemetry;
2125
}
2226

27+
/**
28+
* Configures the messaging headers that will be captured as span attributes.
29+
*
30+
* @param capturedHeaders A list of messaging header names.
31+
*/
32+
@CanIgnoreReturnValue
33+
public AwsSdkTelemetryBuilder setCapturedHeaders(List<String> capturedHeaders) {
34+
this.capturedHeaders = capturedHeaders;
35+
return this;
36+
}
37+
2338
/**
2439
* Sets whether experimental attributes should be set to spans. These attributes may be changed or
2540
* removed in the future, so only enable this if you know you do not require attributes filled by
@@ -50,6 +65,9 @@ public AwsSdkTelemetryBuilder setMessagingReceiveInstrumentationEnabled(
5065
*/
5166
public AwsSdkTelemetry build() {
5267
return new AwsSdkTelemetry(
53-
openTelemetry, captureExperimentalSpanAttributes, messagingReceiveInstrumentationEnabled);
68+
openTelemetry,
69+
capturedHeaders,
70+
captureExperimentalSpanAttributes,
71+
messagingReceiveInstrumentationEnabled);
5472
}
5573
}

0 commit comments

Comments
 (0)