Skip to content

Commit a086a82

Browse files
committed
Initial work for adoption of latest semantic conventions for messaging (v1.29.0)
1 parent 3b267cb commit a086a82

File tree

11 files changed

+264
-56
lines changed

11 files changed

+264
-56
lines changed

instrumentation-api-incubator/build.gradle.kts

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ tasks {
4242
}
4343

4444
val testStableSemconv by registering(Test::class) {
45-
jvmArgs("-Dotel.semconv-stability.opt-in=database")
45+
jvmArgs("-Dotel.semconv-stability.opt-in=database,messaging")
4646
}
4747

4848
val testBothSemconv by registering(Test::class) {
49-
jvmArgs("-Dotel.semconv-stability.opt-in=database/dup")
49+
jvmArgs("-Dotel.semconv-stability.opt-in=database/dup,messaging/dup")
5050
}
5151

5252
check {

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessageOperation.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,33 @@
1313
* that may be used in a messaging system.
1414
*/
1515
public enum MessageOperation {
16+
@Deprecated
1617
PUBLISH,
18+
1719
RECEIVE,
18-
PROCESS;
20+
PROCESS,
21+
CREATE,
22+
SEND,
23+
SETTLE;
1924

2025
/**
2126
* Returns the operation name as defined in <a
2227
* href="https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#operation-names">the
2328
* specification</a>.
29+
*
30+
* @deprecated Use {@link #operationType} instead.
2431
*/
32+
@Deprecated
2533
String operationName() {
2634
return name().toLowerCase(Locale.ROOT);
2735
}
36+
37+
/**
38+
* Returns the operation type as defined in <a
39+
* href="https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#operation-types">the
40+
* specification</a>.
41+
*/
42+
String operationType() {
43+
return name().toLowerCase(Locale.ROOT);
44+
}
2845
}

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java

+78-23
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.opentelemetry.api.common.AttributesBuilder;
1212
import io.opentelemetry.context.Context;
1313
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
14+
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
1415
import io.opentelemetry.instrumentation.api.internal.SpanKey;
1516
import io.opentelemetry.instrumentation.api.internal.SpanKeyProvider;
1617
import java.util.List;
@@ -27,35 +28,54 @@
2728
public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
2829
implements AttributesExtractor<REQUEST, RESPONSE>, SpanKeyProvider {
2930

30-
// copied from MessagingIncubatingAttributes
31+
// copied from io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes (stable
32+
// attributes)
33+
private static final AttributeKey<String> MESSAGING_OPERATION_NAME =
34+
AttributeKey.stringKey("messaging.operation.name");
35+
private static final AttributeKey<String> MESSAGING_SYSTEM =
36+
AttributeKey.stringKey("messaging.system");
3137
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
3238
AttributeKey.longKey("messaging.batch.message_count");
33-
private static final AttributeKey<String> MESSAGING_CLIENT_ID =
34-
AttributeKey.stringKey("messaging.client_id");
39+
// Messaging specific
40+
private static final AttributeKey<String> MESSAGING_CONSUMER_GROUP_NAME =
41+
AttributeKey.stringKey("messaging.consumer.group.name");
3542
private static final AttributeKey<Boolean> MESSAGING_DESTINATION_ANONYMOUS =
3643
AttributeKey.booleanKey("messaging.destination.anonymous");
3744
private static final AttributeKey<String> MESSAGING_DESTINATION_NAME =
3845
AttributeKey.stringKey("messaging.destination.name");
39-
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
40-
AttributeKey.stringKey("messaging.destination.partition.id");
46+
// Messaging specific
47+
private static final AttributeKey<String> MESSAGING_DESTINATION_SUBSCRIPTION_NAME =
48+
AttributeKey.stringKey("messaging.destination.subscription.name");
4149
private static final AttributeKey<String> MESSAGING_DESTINATION_TEMPLATE =
4250
AttributeKey.stringKey("messaging.destination.template");
4351
private static final AttributeKey<Boolean> MESSAGING_DESTINATION_TEMPORARY =
4452
AttributeKey.booleanKey("messaging.destination.temporary");
45-
private static final AttributeKey<Long> MESSAGING_MESSAGE_BODY_SIZE =
46-
AttributeKey.longKey("messaging.message.body.size");
53+
private static final AttributeKey<String> MESSAGING_OPERATION_TYPE =
54+
AttributeKey.stringKey("messaging.operation.type");
55+
private static final AttributeKey<String> MESSAGING_CLIENT_ID_STABLE =
56+
AttributeKey.stringKey("messaging.client.id");
57+
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
58+
AttributeKey.stringKey("messaging.destination.partition.id");
4759
private static final AttributeKey<String> MESSAGING_MESSAGE_CONVERSATION_ID =
4860
AttributeKey.stringKey("messaging.message.conversation_id");
49-
private static final AttributeKey<Long> MESSAGING_MESSAGE_ENVELOPE_SIZE =
50-
AttributeKey.longKey("messaging.message.envelope.size");
5161
private static final AttributeKey<String> MESSAGING_MESSAGE_ID =
5262
AttributeKey.stringKey("messaging.message.id");
63+
private static final AttributeKey<Long> MESSAGING_MESSAGE_BODY_SIZE =
64+
AttributeKey.longKey("messaging.message.body.size");
65+
private static final AttributeKey<Long> MESSAGING_MESSAGE_ENVELOPE_SIZE =
66+
AttributeKey.longKey("messaging.message.envelope.size");
67+
68+
// copied from MessagingIncubatingAttributes (old attributes)
69+
@Deprecated
70+
private static final AttributeKey<String> MESSAGING_CLIENT_ID =
71+
AttributeKey.stringKey("messaging.client_id");
72+
73+
@Deprecated
5374
private static final AttributeKey<String> MESSAGING_OPERATION =
5475
AttributeKey.stringKey("messaging.operation");
55-
private static final AttributeKey<String> MESSAGING_SYSTEM =
56-
AttributeKey.stringKey("messaging.system");
5776

5877
static final String TEMP_DESTINATION_NAME = "(temporary)";
78+
static final String ANONYMOUS_DESTINATION_NAME = "(anonymous)";
5979

6080
/**
6181
* Creates the messaging attributes extractor for the given {@link MessageOperation operation}
@@ -89,31 +109,56 @@ public static <REQUEST, RESPONSE> MessagingAttributesExtractorBuilder<REQUEST, R
89109
}
90110

91111
@Override
112+
@SuppressWarnings("deprecation") // using deprecated semconv
92113
public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) {
93114
internalSet(attributes, MESSAGING_SYSTEM, getter.getSystem(request));
115+
116+
// Old messaging attributes
117+
if (SemconvStability.emitOldMessagingSemconv()) {
118+
internalSet(attributes, MESSAGING_CLIENT_ID, getter.getClientId(request));
119+
if (operation != null) { // in old implementation operation could be null
120+
internalSet(attributes, MESSAGING_OPERATION, operation.operationName());
121+
}
122+
}
123+
124+
// New, stable attributes
125+
if (SemconvStability.emitStableMessagingSemconv()) {
126+
internalSet(attributes, MESSAGING_CLIENT_ID_STABLE, getter.getClientId(request));
127+
internalSet(attributes, MESSAGING_OPERATION_TYPE, operation.operationType());
128+
internalSet(attributes, MESSAGING_OPERATION_NAME, getter.getOperationName(request));
129+
internalSet(attributes, MESSAGING_CONSUMER_GROUP_NAME, getter.getConsumerGroupName(request));
130+
internalSet(
131+
attributes,
132+
MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
133+
getter.getDestinationSubscriptionName(request));
134+
}
135+
136+
// Unchanged attributes from 1.25.0 to stable
94137
boolean isTemporaryDestination = getter.isTemporaryDestination(request);
138+
boolean isAnonymousDestination = getter.isAnonymousDestination(request);
139+
140+
String destination =
141+
isTemporaryDestination
142+
? TEMP_DESTINATION_NAME
143+
: (isAnonymousDestination
144+
? ANONYMOUS_DESTINATION_NAME
145+
: getter.getDestination(request));
146+
internalSet(attributes, MESSAGING_DESTINATION_NAME, destination);
147+
95148
if (isTemporaryDestination) {
96149
internalSet(attributes, MESSAGING_DESTINATION_TEMPORARY, true);
97-
internalSet(attributes, MESSAGING_DESTINATION_NAME, TEMP_DESTINATION_NAME);
98150
} else {
99-
internalSet(attributes, MESSAGING_DESTINATION_NAME, getter.getDestination(request));
100151
internalSet(
101152
attributes, MESSAGING_DESTINATION_TEMPLATE, getter.getDestinationTemplate(request));
102153
}
103-
internalSet(
104-
attributes, MESSAGING_DESTINATION_PARTITION_ID, getter.getDestinationPartitionId(request));
105-
boolean isAnonymousDestination = getter.isAnonymousDestination(request);
106154
if (isAnonymousDestination) {
107155
internalSet(attributes, MESSAGING_DESTINATION_ANONYMOUS, true);
108156
}
109-
internalSet(attributes, MESSAGING_MESSAGE_CONVERSATION_ID, getter.getConversationId(request));
110-
internalSet(attributes, MESSAGING_MESSAGE_BODY_SIZE, getter.getMessageBodySize(request));
157+
111158
internalSet(
112-
attributes, MESSAGING_MESSAGE_ENVELOPE_SIZE, getter.getMessageEnvelopeSize(request));
113-
internalSet(attributes, MESSAGING_CLIENT_ID, getter.getClientId(request));
114-
if (operation != null) {
115-
internalSet(attributes, MESSAGING_OPERATION, operation.operationName());
116-
}
159+
attributes, MESSAGING_DESTINATION_PARTITION_ID, getter.getDestinationPartitionId(request));
160+
161+
internalSet(attributes, MESSAGING_MESSAGE_CONVERSATION_ID, getter.getConversationId(request));
117162
}
118163

119164
@Override
@@ -127,6 +172,10 @@ public void onEnd(
127172
internalSet(
128173
attributes, MESSAGING_BATCH_MESSAGE_COUNT, getter.getBatchMessageCount(request, response));
129174

175+
internalSet(attributes, MESSAGING_MESSAGE_BODY_SIZE, getter.getMessageBodySize(request));
176+
internalSet(
177+
attributes, MESSAGING_MESSAGE_ENVELOPE_SIZE, getter.getMessageEnvelopeSize(request));
178+
130179
for (String name : capturedHeaders) {
131180
List<String> values = getter.getMessageHeader(request, name);
132181
if (!values.isEmpty()) {
@@ -146,12 +195,18 @@ public SpanKey internalGetSpanKey() {
146195
}
147196

148197
switch (operation) {
198+
case CREATE:
199+
return SpanKey.PRODUCER;
200+
case SEND:
201+
return SpanKey.PRODUCER;
149202
case PUBLISH:
150203
return SpanKey.PRODUCER;
151204
case RECEIVE:
152205
return SpanKey.CONSUMER_RECEIVE;
153206
case PROCESS:
154207
return SpanKey.CONSUMER_PROCESS;
208+
case SETTLE:
209+
return SpanKey.CONSUMER_SETTLE;
155210
}
156211
throw new IllegalStateException("Can't possibly happen");
157212
}

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesGetter.java

+35-15
Original file line numberDiff line numberDiff line change
@@ -19,51 +19,71 @@
1919
*/
2020
public interface MessagingAttributesGetter<REQUEST, RESPONSE> {
2121

22-
@Nullable
2322
String getSystem(REQUEST request);
2423

2524
@Nullable
26-
String getDestination(REQUEST request);
25+
default Long getBatchMessageCount(REQUEST request, @Nullable RESPONSE response) {
26+
return null;
27+
}
2728

2829
@Nullable
29-
String getDestinationTemplate(REQUEST request);
30-
31-
boolean isTemporaryDestination(REQUEST request);
30+
default String getConsumerGroupName(REQUEST request) {
31+
return null;
32+
}
3233

3334
boolean isAnonymousDestination(REQUEST request);
3435

3536
@Nullable
36-
String getConversationId(REQUEST request);
37+
String getDestination(REQUEST request);
3738

3839
@Nullable
39-
@Deprecated
40-
default Long getMessagePayloadSize(REQUEST request) {
40+
default String getDestinationSubscriptionName(REQUEST request) {
4141
return null;
4242
}
4343

4444
@Nullable
45-
@Deprecated
46-
default Long getMessagePayloadCompressedSize(REQUEST request) {
45+
default String getDestinationTemplate(REQUEST request) {
4746
return null;
4847
}
4948

49+
boolean isTemporaryDestination(REQUEST request);
50+
5051
@Nullable
51-
Long getMessageBodySize(REQUEST request);
52+
default String getOperationName(REQUEST request) {
53+
return null;
54+
}
5255

5356
@Nullable
54-
Long getMessageEnvelopeSize(REQUEST request);
57+
String getClientId(REQUEST request);
58+
59+
@Nullable
60+
default String getDestinationPartitionId(REQUEST request) {
61+
return null;
62+
}
63+
64+
@Nullable
65+
default String getConversationId(REQUEST request) {
66+
return null;
67+
}
5568

5669
@Nullable
5770
String getMessageId(REQUEST request, @Nullable RESPONSE response);
5871

5972
@Nullable
60-
String getClientId(REQUEST request);
73+
Long getMessageBodySize(REQUEST request);
6174

6275
@Nullable
63-
Long getBatchMessageCount(REQUEST request, @Nullable RESPONSE response);
76+
Long getMessageEnvelopeSize(REQUEST request);
6477

6578
@Nullable
66-
default String getDestinationPartitionId(REQUEST request) {
79+
@Deprecated
80+
default Long getMessagePayloadSize(REQUEST request) {
81+
return null;
82+
}
83+
84+
@Nullable
85+
@Deprecated
86+
default Long getMessagePayloadCompressedSize(REQUEST request) {
6787
return null;
6888
}
6989

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingSpanNameExtractor.java

+43-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;
77

88
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
9+
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
10+
import io.opentelemetry.instrumentation.api.semconv.network.ServerAttributesGetter;
11+
import javax.annotation.Nullable;
912

1013
public final class MessagingSpanNameExtractor<REQUEST> implements SpanNameExtractor<REQUEST> {
1114

@@ -19,21 +22,35 @@ public final class MessagingSpanNameExtractor<REQUEST> implements SpanNameExtrac
1922
* @see MessageOperation used to extract {@code <operation name>}.
2023
*/
2124
public static <REQUEST> SpanNameExtractor<REQUEST> create(
22-
MessagingAttributesGetter<REQUEST, ?> getter, MessageOperation operation) {
23-
return new MessagingSpanNameExtractor<>(getter, operation);
25+
MessagingAttributesGetter<REQUEST, ?> getter,
26+
MessageOperation operation,
27+
ServerAttributesGetter<REQUEST> serverAttributesGetter) {
28+
return new MessagingSpanNameExtractor<>(getter, operation, serverAttributesGetter);
2429
}
2530

2631
private final MessagingAttributesGetter<REQUEST, ?> getter;
32+
private final ServerAttributesGetter<REQUEST> serverAttributesGetter;
2733
private final MessageOperation operation;
2834

2935
private MessagingSpanNameExtractor(
30-
MessagingAttributesGetter<REQUEST, ?> getter, MessageOperation operation) {
36+
MessagingAttributesGetter<REQUEST, ?> getter,
37+
MessageOperation operation,
38+
ServerAttributesGetter<REQUEST> serverAttributesGetter) {
3139
this.getter = getter;
40+
this.serverAttributesGetter = serverAttributesGetter;
3241
this.operation = operation;
3342
}
3443

3544
@Override
45+
@SuppressWarnings("deprecation") // using deprecated semconv
3646
public String extract(REQUEST request) {
47+
if (SemconvStability.emitStableMessagingSemconv()) {
48+
String destination = getDestination(request);
49+
if (destination == null) {
50+
return getter.getOperationName(request);
51+
}
52+
return getter.getOperationName(request) + " " + destination;
53+
}
3754
String destinationName =
3855
getter.isTemporaryDestination(request)
3956
? MessagingAttributesExtractor.TEMP_DESTINATION_NAME
@@ -44,4 +61,27 @@ public String extract(REQUEST request) {
4461

4562
return destinationName + " " + operation.operationName();
4663
}
64+
65+
@Nullable
66+
private String getDestination(REQUEST request) {
67+
String destination = null;
68+
if (getter.getDestinationTemplate(request) != null) {
69+
destination = getter.getDestinationTemplate(request);
70+
} else if (getter.isTemporaryDestination(request)) {
71+
destination = MessagingAttributesExtractor.TEMP_DESTINATION_NAME;
72+
} else if (getter.isAnonymousDestination(request)) {
73+
destination = MessagingAttributesExtractor.ANONYMOUS_DESTINATION_NAME;
74+
} else if (getter.getDestination(request) != null) {
75+
destination = getter.getDestination(request);
76+
} else {
77+
if (serverAttributesGetter.getServerAddress(request) != null
78+
&& serverAttributesGetter.getServerPort(request) != null) {
79+
destination =
80+
serverAttributesGetter.getServerAddress(request)
81+
+ ":"
82+
+ serverAttributesGetter.getServerPort(request);
83+
}
84+
}
85+
return destination;
86+
}
4787
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
@ParametersAreNonnullByDefault
2+
package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;
3+
4+
import javax.annotation.ParametersAreNonnullByDefault;

0 commit comments

Comments
 (0)