Skip to content

Infinite retry by using DeadLetterPublishingRecoverer with TracingProducerFactory #89

@fahtom94

Description

@fahtom94

Hi, I found exception which causes to infinite retry by using DeadLetterPublishingRecoverer with TracingProducerFactory together. If TracingProducerFactory injects to DeadLetterPublishingRecoverer then by handling error in consumer will be java.lang.UnsupportedOperationException thrown and it causes to infinite publishing message to the dlq. It happens because in TracingProducerFactory wasn't implemented org.springframework.kafka.core.ProducerFactory#getConfigurationProperties method

Here is example of kafkaListenerContainerFactory's configuration

@Bean
    fun kafkaListenerContainerFactory(template: KafkaTemplate<String, String>): ConcurrentKafkaListenerContainerFactory<String, String> {
        val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
        factory.consumerFactory = consumerFactory()
        factory.containerProperties
        factory.setConcurrency(concurrency.toInt())
        factory.setErrorHandler(
            SeekToCurrentErrorHandler(
                DeadLetterPublishingRecoverer(
                    template,
                    { record: ConsumerRecord<*, *>?, ex: Exception? ->
                        TopicPartition(
                            "test_topic",
                            -1
                        )
                    }), exponentialBackoff()))
        return factory
    }

exception's stacktrace

{"log":"{\"ts\":\"2021-09-25T09:21:37.836Z\",\"level\":\"ERROR\",\"logger_name\":\"org.springframework.kafka.listener.SeekToCurrentErrorHandler\",\"message\":\"Failed to determine if this record (test_record-2@0) should be recovererd, including in seeks\",\"root_stack_trace_element\":{\"class_name\":\"org.springframework.kafka.core.ProducerFactory\",\"method_name\":\"getConfigurationProperties\"},\"stack_trace\":\"java.lang.UnsupportedOperationException: This implementation doesn't support this method\\n\
at org.springframework.kafka.core.ProducerFactory.getConfigurationProperties(ProducerFactory.java:119)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.determineSendTimeout(DeadLetterPublishingRecoverer.java:507)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:488)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:480)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:382)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:351)\\n\
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:323)\\n\
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:227)\\n\
at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:182)\\n\
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:105)\\n\
at java.base/java.util.ArrayList.forEach(Unknown Source)\\n\
at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:102)\\n\
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:205)\\n\
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2360)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2229)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2143)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2025)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1707)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1274)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1266)\\n\
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161)\\n\
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)\\n\
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)\\n\

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions