Transform the record value before sending it to DLT #1914
Replies: 3 comments 5 replies
-
There are (at least) a couple of solutions; probably the simplest is to add a You could also implement a custom /**
* Subclasses can override this method to customize the producer record to send to the
* DLQ. The default implementation simply copies the key and value from the consumer
* record and adds the headers. The timestamp is not set (the original timestamp is in
* one of the headers). IMPORTANT: if the partition in the {@link TopicPartition} is
* less than 0, it must be set to null in the {@link ProducerRecord}.
* @param record the failed record
* @param topicPartition the {@link TopicPartition} returned by the destination
* resolver.
* @param headers the headers - original record headers plus DLT headers.
* @param key the key to use instead of the consumer record key.
* @param value the value to use instead of the consumer record value.
* @return the producer record to send.
* @see KafkaHeaders
*/
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) { |
Beta Was this translation helpful? Give feedback.
-
Sorry - typo public interface ProducerInterceptor<K, V> extends Configurable {
/**
* This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
* get serialized and partition is assigned (if partition is not specified in ProducerRecord).
* <p>
* This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
* key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
* not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
* same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
* as expected.
* <p>
* Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
* Most often, it should be the same topic/partition from 'record'.
* <p>
* Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
* <p>
* Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
* specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
* in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
* previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
* the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
* of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
* modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
* is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
* or otherwise the client.
*
* @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
* @return producer record to send to topic/partition
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
... |
Beta Was this translation helpful? Give feedback.
-
The |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I am using the
@RetryableTopic
to have retry and dlt topics. Everything works well except that I need to transform the initial record value to a new type before sending it to the dlt topic, and I am having some issue finding a solution.I was thinking about creating my own
SeekToCurrentErrorHandler
by extending it and transforming eachrecords
before continuing, or using aRecordInterceptor
.If someone has a better solution I'm all ears 😅.
Beta Was this translation helpful? Give feedback.
All reactions