Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation for the latest Messaging Semantic conventions. #13192

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a086a82
Initial work for adoption of latest semantic conventions for messagin…
cbos Jan 28, 2025
138cce0
Update MessagingSpanNameExtractorTest
cbos Jan 30, 2025
3ecd05d
Update MessagingSpanNameExtractor javadoc
cbos Jan 30, 2025
f9d51fa
Adapt new MessagingAttributesExtractor and updated MessagingSpanNameE…
cbos Feb 4, 2025
79c0d1e
Merge branch 'main' into messaging_semconv_update
cbos Feb 6, 2025
d1594a1
Fix NullPointerException which caused spans not to be created
cbos Feb 11, 2025
118bf1b
Add operation as get the operation name, otherwise the default is 'nu…
cbos Feb 11, 2025
1cf86e5
Do not use producer span as parent in case of stable conventions
cbos Feb 11, 2025
bacd3d3
Make Process messages as link in case of stable messaging
cbos Feb 12, 2025
91ae29b
Add generic Baggage propagator which can be used in the same situatio…
cbos Feb 27, 2025
aa83bad
Merge branch 'open-telemetry:main' into messaging_semconv_update
cbos Feb 27, 2025
fcfd92b
Add optional (experimental) process span on consuming a message. This…
cbos Mar 3, 2025
4a3ac25
Fix failing MessagingSpanNameExtractorTest
cbos Mar 3, 2025
b2dd7c5
Fix failing MessagingSpanNameExtractorTest
cbos Mar 3, 2025
eab1903
Merge branch 'open-telemetry:main' into messaging_semconv_update
cbos Mar 3, 2025
18f46bc
Fix failing AwsSdk instrumented tests
cbos Mar 3, 2025
7d26666
Fix failing AwsSdk instrumented tests
cbos Mar 3, 2025
f18f981
Fix failing MessagingSpanNameExtractorTest when running with both old…
cbos Mar 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions instrumentation-api-incubator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ tasks {
}

val testStableSemconv by registering(Test::class) {
jvmArgs("-Dotel.semconv-stability.opt-in=database")
jvmArgs("-Dotel.semconv-stability.opt-in=database,messaging")
}

val testBothSemconv by registering(Test::class) {
jvmArgs("-Dotel.semconv-stability.opt-in=database/dup")
jvmArgs("-Dotel.semconv-stability.opt-in=database/dup,messaging/dup")
}

check {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,33 @@
* that may be used in a messaging system.
*/
public enum MessageOperation {
@Deprecated
PUBLISH,

RECEIVE,
PROCESS;
PROCESS,
CREATE,
SEND,
SETTLE;

/**
* Returns the operation name as defined in <a
* href="https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#operation-names">the
* specification</a>.
*
* @deprecated Use {@link #operationType} instead.
*/
@Deprecated
String operationName() {
return name().toLowerCase(Locale.ROOT);
}

/**
* Returns the operation type as defined in <a
* href="https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-spans.md#operation-types">the
* specification</a>.
*/
String operationType() {
return name().toLowerCase(Locale.ROOT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.internal.SemconvStability;
import io.opentelemetry.instrumentation.api.internal.SpanKey;
import io.opentelemetry.instrumentation.api.internal.SpanKeyProvider;
import java.util.List;
Expand All @@ -27,35 +28,54 @@
public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
implements AttributesExtractor<REQUEST, RESPONSE>, SpanKeyProvider {

// copied from MessagingIncubatingAttributes
// copied from io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes (stable
// attributes)
private static final AttributeKey<String> MESSAGING_OPERATION_NAME =
AttributeKey.stringKey("messaging.operation.name");
private static final AttributeKey<String> MESSAGING_SYSTEM =
AttributeKey.stringKey("messaging.system");
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
AttributeKey.longKey("messaging.batch.message_count");
private static final AttributeKey<String> MESSAGING_CLIENT_ID =
AttributeKey.stringKey("messaging.client_id");
// Messaging specific
private static final AttributeKey<String> MESSAGING_CONSUMER_GROUP_NAME =
AttributeKey.stringKey("messaging.consumer.group.name");
private static final AttributeKey<Boolean> MESSAGING_DESTINATION_ANONYMOUS =
AttributeKey.booleanKey("messaging.destination.anonymous");
private static final AttributeKey<String> MESSAGING_DESTINATION_NAME =
AttributeKey.stringKey("messaging.destination.name");
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
AttributeKey.stringKey("messaging.destination.partition.id");
// Messaging specific
private static final AttributeKey<String> MESSAGING_DESTINATION_SUBSCRIPTION_NAME =
AttributeKey.stringKey("messaging.destination.subscription.name");
private static final AttributeKey<String> MESSAGING_DESTINATION_TEMPLATE =
AttributeKey.stringKey("messaging.destination.template");
private static final AttributeKey<Boolean> MESSAGING_DESTINATION_TEMPORARY =
AttributeKey.booleanKey("messaging.destination.temporary");
private static final AttributeKey<Long> MESSAGING_MESSAGE_BODY_SIZE =
AttributeKey.longKey("messaging.message.body.size");
private static final AttributeKey<String> MESSAGING_OPERATION_TYPE =
AttributeKey.stringKey("messaging.operation.type");
private static final AttributeKey<String> MESSAGING_CLIENT_ID_STABLE =
AttributeKey.stringKey("messaging.client.id");
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
AttributeKey.stringKey("messaging.destination.partition.id");
private static final AttributeKey<String> MESSAGING_MESSAGE_CONVERSATION_ID =
AttributeKey.stringKey("messaging.message.conversation_id");
private static final AttributeKey<Long> MESSAGING_MESSAGE_ENVELOPE_SIZE =
AttributeKey.longKey("messaging.message.envelope.size");
private static final AttributeKey<String> MESSAGING_MESSAGE_ID =
AttributeKey.stringKey("messaging.message.id");
private static final AttributeKey<Long> MESSAGING_MESSAGE_BODY_SIZE =
AttributeKey.longKey("messaging.message.body.size");
private static final AttributeKey<Long> MESSAGING_MESSAGE_ENVELOPE_SIZE =
AttributeKey.longKey("messaging.message.envelope.size");

// copied from MessagingIncubatingAttributes (old attributes)
@Deprecated
private static final AttributeKey<String> MESSAGING_CLIENT_ID =
AttributeKey.stringKey("messaging.client_id");

@Deprecated
private static final AttributeKey<String> MESSAGING_OPERATION =
AttributeKey.stringKey("messaging.operation");
private static final AttributeKey<String> MESSAGING_SYSTEM =
AttributeKey.stringKey("messaging.system");

static final String TEMP_DESTINATION_NAME = "(temporary)";
static final String ANONYMOUS_DESTINATION_NAME = "(anonymous)";

/**
* Creates the messaging attributes extractor for the given {@link MessageOperation operation}
Expand Down Expand Up @@ -89,31 +109,57 @@ public static <REQUEST, RESPONSE> MessagingAttributesExtractorBuilder<REQUEST, R
}

@Override
@SuppressWarnings("deprecation") // using deprecated semconv
public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) {
internalSet(attributes, MESSAGING_SYSTEM, getter.getSystem(request));

// Old messaging attributes
if (SemconvStability.emitOldMessagingSemconv()) {
internalSet(attributes, MESSAGING_CLIENT_ID, getter.getClientId(request));
if (operation != null) { // in old implementation operation could be null
internalSet(attributes, MESSAGING_OPERATION, operation.operationName());
}
}

// New, stable attributes
if (SemconvStability.emitStableMessagingSemconv()) {
internalSet(attributes, MESSAGING_CLIENT_ID_STABLE, getter.getClientId(request));
internalSet(attributes, MESSAGING_OPERATION_TYPE, operation.operationType());
internalSet(
attributes, MESSAGING_OPERATION_NAME, getter.getOperationName(request, operation));
internalSet(attributes, MESSAGING_CONSUMER_GROUP_NAME, getter.getConsumerGroupName(request));
internalSet(
attributes,
MESSAGING_DESTINATION_SUBSCRIPTION_NAME,
getter.getDestinationSubscriptionName(request));
}

// Unchanged attributes from 1.25.0 to stable
boolean isTemporaryDestination = getter.isTemporaryDestination(request);
boolean isAnonymousDestination = getter.isAnonymousDestination(request);

String destination =
isTemporaryDestination
? TEMP_DESTINATION_NAME
: (isAnonymousDestination
? ANONYMOUS_DESTINATION_NAME
: getter.getDestination(request));
internalSet(attributes, MESSAGING_DESTINATION_NAME, destination);

if (isTemporaryDestination) {
internalSet(attributes, MESSAGING_DESTINATION_TEMPORARY, true);
internalSet(attributes, MESSAGING_DESTINATION_NAME, TEMP_DESTINATION_NAME);
} else {
internalSet(attributes, MESSAGING_DESTINATION_NAME, getter.getDestination(request));
internalSet(
attributes, MESSAGING_DESTINATION_TEMPLATE, getter.getDestinationTemplate(request));
}
internalSet(
attributes, MESSAGING_DESTINATION_PARTITION_ID, getter.getDestinationPartitionId(request));
boolean isAnonymousDestination = getter.isAnonymousDestination(request);
if (isAnonymousDestination) {
internalSet(attributes, MESSAGING_DESTINATION_ANONYMOUS, true);
}
internalSet(attributes, MESSAGING_MESSAGE_CONVERSATION_ID, getter.getConversationId(request));
internalSet(attributes, MESSAGING_MESSAGE_BODY_SIZE, getter.getMessageBodySize(request));

internalSet(
attributes, MESSAGING_MESSAGE_ENVELOPE_SIZE, getter.getMessageEnvelopeSize(request));
internalSet(attributes, MESSAGING_CLIENT_ID, getter.getClientId(request));
if (operation != null) {
internalSet(attributes, MESSAGING_OPERATION, operation.operationName());
}
attributes, MESSAGING_DESTINATION_PARTITION_ID, getter.getDestinationPartitionId(request));

internalSet(attributes, MESSAGING_MESSAGE_CONVERSATION_ID, getter.getConversationId(request));
}

@Override
Expand All @@ -127,6 +173,10 @@ public void onEnd(
internalSet(
attributes, MESSAGING_BATCH_MESSAGE_COUNT, getter.getBatchMessageCount(request, response));

internalSet(attributes, MESSAGING_MESSAGE_BODY_SIZE, getter.getMessageBodySize(request));
internalSet(
attributes, MESSAGING_MESSAGE_ENVELOPE_SIZE, getter.getMessageEnvelopeSize(request));

for (String name : capturedHeaders) {
List<String> values = getter.getMessageHeader(request, name);
if (!values.isEmpty()) {
Expand All @@ -140,18 +190,25 @@ public void onEnd(
* any time.
*/
@Override
@SuppressWarnings("deprecation") // using deprecated semconv
public SpanKey internalGetSpanKey() {
if (operation == null) {
return null;
}

switch (operation) {
case CREATE:
return SpanKey.PRODUCER;
case SEND:
return SpanKey.PRODUCER;
case PUBLISH:
return SpanKey.PRODUCER;
case RECEIVE:
return SpanKey.CONSUMER_RECEIVE;
case PROCESS:
return SpanKey.CONSUMER_PROCESS;
case SETTLE:
return SpanKey.CONSUMER_SETTLE;
}
throw new IllegalStateException("Can't possibly happen");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,71 @@
*/
public interface MessagingAttributesGetter<REQUEST, RESPONSE> {

@Nullable
String getSystem(REQUEST request);

@Nullable
String getDestination(REQUEST request);
default Long getBatchMessageCount(REQUEST request, @Nullable RESPONSE response) {
return null;
}

@Nullable
String getDestinationTemplate(REQUEST request);

boolean isTemporaryDestination(REQUEST request);
default String getConsumerGroupName(REQUEST request) {
return null;
}

boolean isAnonymousDestination(REQUEST request);

@Nullable
String getConversationId(REQUEST request);
String getDestination(REQUEST request);

@Nullable
@Deprecated
default Long getMessagePayloadSize(REQUEST request) {
default String getDestinationSubscriptionName(REQUEST request) {
return null;
}

@Nullable
@Deprecated
default Long getMessagePayloadCompressedSize(REQUEST request) {
default String getDestinationTemplate(REQUEST request) {
return null;
}

boolean isTemporaryDestination(REQUEST request);

@Nullable
Long getMessageBodySize(REQUEST request);
default String getOperationName(REQUEST request, MessageOperation operation) {
return operation.operationType();
}

@Nullable
Long getMessageEnvelopeSize(REQUEST request);
String getClientId(REQUEST request);

@Nullable
default String getDestinationPartitionId(REQUEST request) {
return null;
}

@Nullable
default String getConversationId(REQUEST request) {
return null;
}

@Nullable
String getMessageId(REQUEST request, @Nullable RESPONSE response);

@Nullable
String getClientId(REQUEST request);
Long getMessageBodySize(REQUEST request);

@Nullable
Long getBatchMessageCount(REQUEST request, @Nullable RESPONSE response);
Long getMessageEnvelopeSize(REQUEST request);

@Nullable
default String getDestinationPartitionId(REQUEST request) {
@Deprecated
default Long getMessagePayloadSize(REQUEST request) {
return null;
}

@Nullable
@Deprecated
default Long getMessagePayloadCompressedSize(REQUEST request) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;

import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_ADDRESS;
import static io.opentelemetry.semconv.NetworkAttributes.NETWORK_PEER_PORT;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;

import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import javax.annotation.Nullable;

public final class MessagingNetworkAttributesExtractor<REQUEST, RESPONSE>
implements AttributesExtractor<REQUEST, RESPONSE> {

/**
* Creates the network attributes extractor.
*
* @see InstrumenterBuilder#addAttributesExtractor(AttributesExtractor)
*/
public static <REQUEST, RESPONSE> MessagingNetworkAttributesExtractor<REQUEST, RESPONSE> create(
MessagingNetworkAttributesGetter<REQUEST, RESPONSE> getter) {
return new MessagingNetworkAttributesExtractor<>(getter);
}

private final MessagingNetworkAttributesGetter<REQUEST, RESPONSE> getter;

MessagingNetworkAttributesExtractor(MessagingNetworkAttributesGetter<REQUEST, RESPONSE> getter) {
this.getter = getter;
}

@Override
public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST request) {
attributes.put(NETWORK_PEER_ADDRESS, getter.getNetworkPeerAddress(request, null));
Integer networkPeerPort = getter.getNetworkPeerPort(request, null);
if (networkPeerPort != null) {
attributes.put(NETWORK_PEER_PORT, networkPeerPort);
}

attributes.put(SERVER_ADDRESS, getter.getServerAddress(request));
Integer serverPort = getter.getServerPort(request);
if (serverPort != null) {
attributes.put(SERVER_PORT, serverPort);
}
}

@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
REQUEST request,
@Nullable RESPONSE response,
@Nullable Throwable error) {}
}
Loading
Loading