5
5
6
6
package io .opentelemetry .testing ;
7
7
8
+ import java .lang .reflect .Method ;
8
9
import org .apache .kafka .clients .admin .NewTopic ;
9
10
import org .springframework .beans .factory .ObjectProvider ;
10
11
import org .springframework .boot .SpringBootConfiguration ;
15
16
import org .springframework .kafka .config .TopicBuilder ;
16
17
import org .springframework .kafka .core .ConsumerFactory ;
17
18
import org .springframework .kafka .listener .ConcurrentMessageListenerContainer ;
19
+ import org .springframework .util .backoff .BackOff ;
20
+ import org .springframework .util .backoff .FixedBackOff ;
18
21
19
22
@ SpringBootConfiguration
20
23
@ EnableAutoConfiguration
@@ -50,7 +53,13 @@ public ConcurrentKafkaListenerContainerFactory<String, String> batchFactory(
50
53
ConcurrentKafkaListenerContainerFactory <String , String > factory =
51
54
new ConcurrentKafkaListenerContainerFactory <>();
52
55
// do not retry failed records
53
- factory .setBatchErrorHandler (new DoNothingBatchErrorHandler ());
56
+ try {
57
+ Class .forName ("org.springframework.kafka.listener.BatchErrorHandler" );
58
+ ErrorHandlerSetter .setBatchErrorHandler (factory );
59
+ } catch (ClassNotFoundException ignored ) {
60
+ // org.springframework.kafka.listener.BatchErrorHandler is missing in latest
61
+ setCommonErrorHandler (factory );
62
+ }
54
63
factory .setConsumerFactory (consumerFactory );
55
64
factory .setBatchListener (true );
56
65
factory .setAutoStartup (true );
@@ -68,11 +77,34 @@ public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
68
77
ConcurrentKafkaListenerContainerFactory <String , String > factory =
69
78
new ConcurrentKafkaListenerContainerFactory <>();
70
79
// do not retry failed records
71
- factory .setErrorHandler (new DoNothingErrorHandler ());
80
+ try {
81
+ Class .forName ("org.springframework.kafka.listener.ErrorHandler" );
82
+ ErrorHandlerSetter .setErrorHandler (factory );
83
+ } catch (ClassNotFoundException ignored ) {
84
+ // org.springframework.kafka.listener.ErrorHandler is missing in latest
85
+ setCommonErrorHandler (factory );
86
+ }
72
87
factory .setConsumerFactory (consumerFactory );
73
88
factory .setBatchListener (false );
74
89
factory .setAutoStartup (true );
75
90
customizerProvider .ifAvailable (factory ::setContainerCustomizer );
76
91
return factory ;
77
92
}
93
+
94
+ private static void setCommonErrorHandler (
95
+ ConcurrentKafkaListenerContainerFactory <String , String > factory ) {
96
+ try {
97
+ Class <?> handlerClass =
98
+ Class .forName ("org.springframework.kafka.listener.CommonErrorHandler" );
99
+ Class <?> defaultHandlerClass =
100
+ Class .forName ("org.springframework.kafka.listener.DefaultErrorHandler" );
101
+ BackOff backOff = new FixedBackOff (0 , 0 );
102
+ Object handler =
103
+ defaultHandlerClass .getDeclaredConstructor (BackOff .class ).newInstance (backOff );
104
+ Method method = factory .getClass ().getMethod ("setCommonErrorHandler" , handlerClass );
105
+ method .invoke (factory , handler );
106
+ } catch (Exception exception ) {
107
+ // ignored
108
+ }
109
+ }
78
110
}
0 commit comments