Skip to content

Commit 4adc99a

Browse files
committed
fix spans for consuming messages to not be the "parent" span for following spans
1 parent 4d9372b commit 4adc99a

File tree

3 files changed

+29
-20
lines changed

3 files changed

+29
-20
lines changed

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@
3737
import javax.jms.MessageListener;
3838
import javax.jms.TextMessage;
3939

40+
import org.apache.pekko.Done;
41+
import org.apache.pekko.actor.ActorRef;
42+
import org.apache.pekko.actor.Props;
43+
import org.apache.pekko.actor.Status;
44+
import org.apache.pekko.japi.pf.ReceiveBuilder;
45+
import org.apache.pekko.pattern.Patterns;
46+
import org.apache.pekko.stream.javadsl.Sink;
4047
import org.apache.qpid.jms.JmsAcknowledgeCallback;
4148
import org.apache.qpid.jms.JmsMessageConsumer;
4249
import org.apache.qpid.jms.message.JmsMessage;
@@ -63,20 +70,12 @@
6370
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
6471
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
6572
import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.InfoProviderFactory;
66-
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
6773
import org.eclipse.ditto.internal.utils.config.InstanceIdentifierSupplier;
74+
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
6875
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
6976
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
7077
import org.eclipse.ditto.internal.utils.tracing.span.TracingSpans;
7178

72-
import org.apache.pekko.Done;
73-
import org.apache.pekko.actor.ActorRef;
74-
import org.apache.pekko.actor.Props;
75-
import org.apache.pekko.actor.Status;
76-
import org.apache.pekko.japi.pf.ReceiveBuilder;
77-
import org.apache.pekko.pattern.Patterns;
78-
import org.apache.pekko.stream.javadsl.Sink;
79-
8079
/**
8180
* Actor which receives message from an AMQP source and forwards them to a {@code MessageMappingProcessorActor}.
8281
*/
@@ -363,7 +362,11 @@ private void handleJmsMessage(final JmsMessage message) {
363362
.correlationId(correlationId)
364363
.connectionId(connectionId)
365364
.start();
366-
headers = startedSpan.propagateContext(headers);
365+
headers = startedSpan.propagateContext(DittoHeaders.of(headers)
366+
.toBuilder()
367+
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
368+
.build()
369+
);
367370
final ExternalMessageBuilder builder = ExternalMessageFactory.newExternalMessageBuilder(headers);
368371
final ExternalMessage externalMessage = extractPayloadFromMessage(message, builder)
369372
.withAuthorizationContext(source.getAuthorizationContext())

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaMessageTransformer.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.kafka.clients.consumer.ConsumerRecord;
2626
import org.apache.kafka.common.header.Header;
27+
import org.apache.pekko.kafka.ConsumerMessage;
2728
import org.eclipse.ditto.base.model.common.ByteBufferUtils;
2829
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
2930
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
@@ -41,8 +42,6 @@
4142
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
4243
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
4344

44-
import org.apache.pekko.kafka.ConsumerMessage;
45-
4645
/**
4746
* Transforms incoming messages from Apache Kafka to {@link org.eclipse.ditto.connectivity.api.ExternalMessage}.
4847
*/
@@ -112,7 +111,11 @@ public TransformationResult transform(final ConsumerRecord<String, ByteBuffer> c
112111
.correlationId(correlationId)
113112
.connectionId(connectionId)
114113
.start();
115-
messageHeaders = startedSpan.propagateContext(messageHeaders);
114+
messageHeaders = startedSpan.propagateContext(DittoHeaders.of(messageHeaders)
115+
.toBuilder()
116+
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
117+
.build()
118+
);
116119

117120
try {
118121
final String key = consumerRecord.key();

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/rabbitmq/RabbitMQConsumerActor.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222

2323
import javax.annotation.Nullable;
2424

25+
import org.apache.pekko.Done;
26+
import org.apache.pekko.NotUsed;
27+
import org.apache.pekko.actor.Props;
28+
import org.apache.pekko.japi.pf.ReceiveBuilder;
29+
import org.apache.pekko.stream.javadsl.Sink;
2530
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
2631
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
2732
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
@@ -51,12 +56,6 @@
5156
import com.rabbitmq.client.Delivery;
5257
import com.rabbitmq.client.Envelope;
5358

54-
import org.apache.pekko.Done;
55-
import org.apache.pekko.NotUsed;
56-
import org.apache.pekko.actor.Props;
57-
import org.apache.pekko.japi.pf.ReceiveBuilder;
58-
import org.apache.pekko.stream.javadsl.Sink;
59-
6059

6160
/**
6261
* Actor which receives message from an RabbitMQ source and forwards them to a {@code MessageMappingProcessorActor}.
@@ -150,7 +149,11 @@ private void handleDelivery(final Delivery delivery) {
150149
.connectionId(connectionId)
151150
.correlationId(correlationId)
152151
.start();
153-
headers = startedSpan.propagateContext(headers);
152+
headers = startedSpan.propagateContext(DittoHeaders.of(headers)
153+
.toBuilder()
154+
.removeHeader(DittoHeaderDefinition.W3C_TRACEPARENT.getKey())
155+
.build()
156+
);
154157

155158
final ExternalMessageBuilder externalMessageBuilder =
156159
ExternalMessageFactory.newExternalMessageBuilder(headers);

0 commit comments

Comments
 (0)