diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java index c7d6817975ee..15ff0536dd8f 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java @@ -108,7 +108,7 @@ void testKafkaProducerAndConsumerSpan(boolean testHeaders) throws Exception { .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( - processAttributes("10", greeting, testHeaders)), + processAttributes("10", greeting, testHeaders, false)), span -> span.hasName("processing").hasParent(trace.getSpan(1)))); } @@ -152,7 +152,8 @@ void testPassThroughTombstone() .hasKind(SpanKind.CONSUMER) .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(processAttributes(null, null, false)))); + .hasAttributesSatisfyingExactly( + processAttributes(null, null, false, false)))); } @DisplayName("test records(TopicPartition) kafka consume") @@ -203,6 +204,7 @@ void testRecordsWithTopicPartitionKafkaConsume() .hasKind(SpanKind.CONSUMER) .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(processAttributes(null, greeting, false)))); + .hasAttributesSatisfyingExactly( + processAttributes(null, greeting, false, false)))); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientPropagationDisabledTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientPropagationDisabledTest.java index e3ff36a08f98..aad9e01bf019 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientPropagationDisabledTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientPropagationDisabledTest.java @@ -64,7 +64,8 @@ void testReadRemoteContextWhenPropagationIsDisabled() throws InterruptedExceptio span.hasName(SHARED_TOPIC + " process") .hasKind(SpanKind.CONSUMER) .hasLinks(Collections.emptyList()) - .hasAttributesSatisfyingExactly(processAttributes(null, message, false)), + .hasAttributesSatisfyingExactly( + processAttributes(null, message, false, false)), span -> span.hasName("processing").hasParent(trace.getSpan(0)))); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientSuppressReceiveSpansTest.java index 1e8208310a34..8001b5007672 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientSuppressReceiveSpansTest.java @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaClientPropagationBaseTest; import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import java.util.concurrent.ExecutionException; @@ -33,8 +34,19 @@ void testKafkaProduceAndConsume() throws InterruptedException { testing.runWithSpan( "parent", () -> { + ProducerRecord producerRecord = + new ProducerRecord<>(SHARED_TOPIC, 10, greeting); + producerRecord + .headers() + // adding baggage header in w3c baggage format + .add( + "baggage", + "test-baggage-key-1=test-baggage-value-1".getBytes(StandardCharsets.UTF_8)) + .add( + "baggage", + "test-baggage-key-2=test-baggage-value-2".getBytes(StandardCharsets.UTF_8)); producer.send( - new ProducerRecord<>(SHARED_TOPIC, 10, greeting), + producerRecord, (meta, ex) -> { if (ex == null) { testing.runWithSpan("producer callback", () -> {}); @@ -70,7 +82,8 @@ void testKafkaProduceAndConsume() throws InterruptedException { span.hasName(SHARED_TOPIC + " process") .hasKind(SpanKind.CONSUMER) .hasParent(trace.getSpan(1)) - .hasAttributesSatisfyingExactly(processAttributes("10", greeting, false)), + .hasAttributesSatisfyingExactly( + processAttributes("10", greeting, false, true)), span -> span.hasName("processing") .hasKind(SpanKind.INTERNAL) @@ -108,7 +121,8 @@ void testPassThroughTombstone() span.hasName(SHARED_TOPIC + " process") .hasKind(SpanKind.CONSUMER) .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(processAttributes(null, null, false)))); + .hasAttributesSatisfyingExactly( + processAttributes(null, null, false, false)))); } @Test @@ -146,6 +160,7 @@ void testRecordsWithTopicPartitionKafkaConsume() span.hasName(SHARED_TOPIC + " process") .hasKind(SpanKind.CONSUMER) .hasParent(trace.getSpan(0)) - .hasAttributesSatisfyingExactly(processAttributes(null, greeting, false)))); + .hasAttributesSatisfyingExactly( + processAttributes(null, greeting, false, false)))); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java index decf68c18b7b..566af7589de5 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaClientBaseTest.java @@ -216,7 +216,7 @@ protected static List receiveAttributes(boolean testHeaders) @SuppressWarnings("deprecation") // using deprecated semconv protected static List processAttributes( - String messageKey, String messageValue, boolean testHeaders) { + String messageKey, String messageValue, boolean testHeaders, boolean testMultiBaggage) { List assertions = new ArrayList<>( Arrays.asList( @@ -249,6 +249,11 @@ protected static List processAttributes( AttributeKey.stringArrayKey("messaging.header.test_message_header"), Collections.singletonList("test"))); } + + if (testMultiBaggage) { + assertions.add(equalTo(AttributeKey.stringKey("test-baggage-key-1"), "test-baggage-value-1")); + assertions.add(equalTo(AttributeKey.stringKey("test-baggage-key-2"), "test-baggage-value-2")); + } return assertions; } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java index e1de24630529..809b8a89de3b 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java @@ -10,6 +10,7 @@ import io.opentelemetry.instrumentation.kafka.internal.KafkaClientBaseTest; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -48,8 +49,19 @@ void testInterceptors() throws InterruptedException { testing.runWithSpan( "parent", () -> { + ProducerRecord producerRecord = + new ProducerRecord<>(SHARED_TOPIC, greeting); + producerRecord + .headers() + // adding baggage header in w3c baggage format + .add( + "baggage", + "test-baggage-key-1=test-baggage-value-1".getBytes(StandardCharsets.UTF_8)) + .add( + "baggage", + "test-baggage-key-2=test-baggage-value-2".getBytes(StandardCharsets.UTF_8)); producer.send( - new ProducerRecord<>(SHARED_TOPIC, greeting), + producerRecord, (meta, ex) -> { if (ex == null) { testing.runWithSpan("producer callback", () -> {}); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java index 7c80e7837a5e..049020a2d074 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsSuppressReceiveSpansTest.java @@ -15,6 +15,7 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanKind; import java.nio.charset.StandardCharsets; import org.assertj.core.api.AbstractLongAssert; @@ -59,7 +60,13 @@ void assertTraces() { equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test"), satisfies( MESSAGING_CLIENT_ID, - stringAssert -> stringAssert.startsWith("consumer"))), + stringAssert -> stringAssert.startsWith("consumer")), + equalTo( + AttributeKey.stringKey("test-baggage-key-1"), + "test-baggage-value-1"), + equalTo( + AttributeKey.stringKey("test-baggage-key-2"), + "test-baggage-value-2")), span -> span.hasName("process child") .hasKind(SpanKind.INTERNAL) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerRecordGetter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerRecordGetter.java index 714719165101..3cbda1c280d9 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerRecordGetter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/KafkaConsumerRecordGetter.java @@ -5,14 +5,15 @@ package io.opentelemetry.instrumentation.kafka.internal; -import io.opentelemetry.context.propagation.TextMapGetter; +import io.opentelemetry.context.propagation.internal.ExtendedTextMapGetter; import java.nio.charset.StandardCharsets; +import java.util.Iterator; import java.util.stream.Collectors; import java.util.stream.StreamSupport; import javax.annotation.Nullable; import org.apache.kafka.common.header.Header; -enum KafkaConsumerRecordGetter implements TextMapGetter { +enum KafkaConsumerRecordGetter implements ExtendedTextMapGetter { INSTANCE; @Override @@ -35,4 +36,11 @@ public String get(@Nullable KafkaProcessRequest carrier, String key) { } return new String(value, StandardCharsets.UTF_8); } + + @Override + public Iterator getAll(@Nullable KafkaProcessRequest carrier, String key) { + return StreamSupport.stream(carrier.getRecord().headers().headers(key).spliterator(), false) + .map(header -> new String(header.value(), StandardCharsets.UTF_8)) + .iterator(); + } }