-
Notifications
You must be signed in to change notification settings - Fork 64
Description
Hello,
I'm using opentracing-kafka-spring version 0.1.15. I'm trying to set custom span names for producers and consumers. Making it work for producers was very easy and worked immediately, and I assumed it would be the same for consumers, but alas it was not.
After a little bit of investigating, I found that buildAndFinishChildSpan is never called (https://github.com/opentracing-contrib/java-kafka-client/blob/0.1.15/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaUtils.java#L115-L139). Spans are still being created though, I can clearly see them in the logs, but they appear as KafkaListener_xxx, which is not what I want.
Also, it's not impossible that there is a concept that I don't understand properly :).
Here is how I configure the library:
package com.mycompany;
import com.mycompany.common.eventhandler.eventmessage.EventMessage;
import io.opentracing.Tracer;
import io.opentracing.contrib.kafka.spring.TracingConsumerFactory;
import io.opentracing.contrib.kafka.spring.TracingKafkaAspect;
import io.opentracing.contrib.kafka.spring.TracingProducerFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiFunction;
@Configuration
public class KafkaTracingConfig {
private final KafkaProperties kafkaProperties;
private final Tracer tracer;
// Sets the span name to the event name
private final BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
(operationName, consumerRecord) -> "CONSUME_" + ((EventMessage<?>)consumerRecord.value()).getName();
private final BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
(operationName, producerRecord) -> "PRODUCE_" + ((EventMessage<?>)producerRecord.value()).getName();
public KafkaTracingConfig(KafkaProperties kafkaProperties, Tracer tracer) {
this.kafkaProperties = kafkaProperties;
this.tracer = tracer;
}
@Bean
public ConsumerFactory<String, EventMessage<?>> consumerFactory() {
return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps()), tracer, consumerSpanNameProvider);
}
@Bean
public ProducerFactory<String, EventMessage<?>> producerFactory() {
return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer, producerSpanNameProvider);
}
@Bean
public KafkaTemplate<String, EventMessage<?>> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public TracingKafkaAspect tracingKafkaAspect() {
return new TracingKafkaAspect(tracer);
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildConsumerProperties());
return props;
}
private Map<String, Object> producerProps() {
Map<String, Object> props = new HashMap<>(kafkaProperties.buildProducerProperties());
return props;
}
}
And here is a simplified version of one of our consumers:
package com.mycompany.eventhandler.consumer;
import com.mycompany.common.eventhandler.eventmessage.EventMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class StationReservationConsumer {
@KafkaListener(topics = {"myTopic"}, id = "anId")
public void consumeMyTopic(@Payload EventMessage<?> eventMessage, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) int offset, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// blablabla
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
log.info("Kafka reading unknown: " + object);
}
}
Thanks and have a nice day !