Skip to content

Commit 477650e

Browse files
committed
fix spans for consuming messages to not be the "parent" span for following spans
1 parent 4d9372b commit 477650e

File tree

3 files changed

+40
-27
lines changed

3 files changed

+40
-27
lines changed

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java

+18-14
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@
3737
import javax.jms.MessageListener;
3838
import javax.jms.TextMessage;
3939

40+
import org.apache.pekko.Done;
41+
import org.apache.pekko.actor.ActorRef;
42+
import org.apache.pekko.actor.Props;
43+
import org.apache.pekko.actor.Status;
44+
import org.apache.pekko.japi.pf.ReceiveBuilder;
45+
import org.apache.pekko.pattern.Patterns;
46+
import org.apache.pekko.stream.javadsl.Sink;
4047
import org.apache.qpid.jms.JmsAcknowledgeCallback;
4148
import org.apache.qpid.jms.JmsMessageConsumer;
4249
import org.apache.qpid.jms.message.JmsMessage;
@@ -63,20 +70,12 @@
6370
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
6471
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
6572
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
66-
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
6773
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
74+
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
6875
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
6976
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
7077
import org.eclipse.ditto.internal.utils.tracing.span.TracingSpans;
7178

72-
import org.apache.pekko.Done;
73-
import org.apache.pekko.actor.ActorRef;
74-
import org.apache.pekko.actor.Props;
75-
import org.apache.pekko.actor.Status;
76-
import org.apache.pekko.japi.pf.ReceiveBuilder;
77-
import org.apache.pekko.pattern.Patterns;
78-
import org.apache.pekko.stream.javadsl.Sink;
79-
8079
/**
8180
* Actor which receives message from an AMQP source and forwards them to a {@code MessageMappingProcessorActor}.
8281
*/
@@ -342,10 +341,11 @@ private void messageConsumerFailed(final Status.Failure failure) {
342341
handleAddressStatus(addressStatus);
343342
}
344343

345-
private void handleJmsMessage(final JmsMessage message) {
344+
private void handleJmsMessage(final JmsMessage message) throws JMSException {
346345
Map<String, String> headers = null;
347-
String correlationId = null;
348-
var startedSpan = TracingSpans.emptyStartedSpan(SpanOperationName.of("amqp_consume"));
346+
var startedSpan = TracingSpans.emptyStartedSpan(
347+
SpanOperationName.of("amqp_consume: " + message.getJMSDestination())
348+
);
349349
try {
350350
recordIncomingForRateLimit(message.getJMSMessageID());
351351
if (logger.isDebugEnabled()) {
@@ -358,12 +358,16 @@ private void handleJmsMessage(final JmsMessage message) {
358358
ackType);
359359
}
360360
headers = extractHeadersMapFromJmsMessage(message);
361-
correlationId = headers.get(DittoHeaderDefinition.CORRELATION_ID.getKey());
361+
final String correlationId = headers.get(DittoHeaderDefinition.CORRELATION_ID.getKey());
362362
startedSpan = DittoTracing.newPreparedSpan(headers, startedSpan.getOperationName())
363363
.correlationId(correlationId)
364364
.connectionId(connectionId)
365365
.start();
366-
headers = startedSpan.propagateContext(headers);
366+
headers = startedSpan.propagateContext(DittoHeaders.of(headers)
367+
.toBuilder()
368+
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
369+
.build()
370+
);
367371
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
368372
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
369373
.withAuthorizationContext(source.getAuthorizationContext())

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.kafka.clients.consumer.ConsumerRecord;
2626
import org.apache.kafka.common.header.Header;
27+
import org.apache.pekko.kafka.ConsumerMessage;
2728
import org.eclipse.ditto.base.model.common.ByteBufferUtils;
2829
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
2930
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
@@ -41,8 +42,6 @@
4142
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
4243
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
4344

44-
import org.apache.pekko.kafka.ConsumerMessage;
45-
4645
/**
4746
* Transforms incoming messages from Apache Kafka to {@link org.eclipse.ditto.connectivity.api.ExternalMessage}.
4847
*/
@@ -108,11 +107,16 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
108107
final String correlationId = messageHeaders
109108
.getOrDefault(DittoHeaderDefinition.CORRELATION_ID.getKey(), UUID.randomUUID().toString());
110109

111-
final var startedSpan = DittoTracing.newPreparedSpan(messageHeaders, SpanOperationName.of("kafka_consume"))
112-
.correlationId(correlationId)
110+
final var startedSpan = DittoTracing.newPreparedSpan(messageHeaders,
111+
SpanOperationName.of("kafka_consume: " + consumerRecord.topic())
112+
).correlationId(correlationId)
113113
.connectionId(connectionId)
114114
.start();
115-
messageHeaders = startedSpan.propagateContext(messageHeaders);
115+
messageHeaders = startedSpan.propagateContext(DittoHeaders.of(messageHeaders)
116+
.toBuilder()
117+
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
118+
.build()
119+
);
116120

117121
try {
118122
final String key = consumerRecord.key();

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/rabbitmq/RabbitMQConsumerActor.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222

2323
import javax.annotation.Nullable;
2424

25+
import org.apache.pekko.Done;
26+
import org.apache.pekko.NotUsed;
27+
import org.apache.pekko.actor.Props;
28+
import org.apache.pekko.japi.pf.ReceiveBuilder;
29+
import org.apache.pekko.stream.javadsl.Sink;
2530
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
2631
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
2732
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
@@ -51,12 +56,6 @@
5156
import com.rabbitmq.client.Delivery;
5257
import com.rabbitmq.client.Envelope;
5358

54-
import org.apache.pekko.Done;
55-
import org.apache.pekko.NotUsed;
56-
import org.apache.pekko.actor.Props;
57-
import org.apache.pekko.japi.pf.ReceiveBuilder;
58-
import org.apache.pekko.stream.javadsl.Sink;
59-
6059

6160
/**
6261
* Actor which receives message from an RabbitMQ source and forwards them to a {@code MessageMappingProcessorActor}.
@@ -135,7 +134,9 @@ private void handleDelivery(final Delivery delivery) {
135134
final Envelope envelope = delivery.getEnvelope();
136135
final byte[] body = delivery.getBody();
137136

138-
var startedSpan = TracingSpans.emptyStartedSpan(SpanOperationName.of("rabbitmq_consume"));
137+
var startedSpan = TracingSpans.emptyStartedSpan(
138+
SpanOperationName.of("rabbitmq_consume: " + envelope.getExchange())
139+
);
139140
Map<String, String> headers = null;
140141
try {
141142
@Nullable final String correlationId = properties.getCorrelationId();
@@ -150,7 +151,11 @@ private void handleDelivery(final Delivery delivery) {
150151
.connectionId(connectionId)
151152
.correlationId(correlationId)
152153
.start();
153-
headers = startedSpan.propagateContext(headers);
154+
headers = startedSpan.propagateContext(DittoHeaders.of(headers)
155+
.toBuilder()
156+
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
157+
.build()
158+
);
154159

155160
final ExternalMessageBuilder externalMessageBuilder =
156161
ExternalMessageFactory.newExternalMessageBuilder(headers);

0 commit comments

Comments
 (0)