@@ -12,7 +12,6 @@ import io.ktor.util.KtorDsl
12
12
import kotlinx.coroutines.Dispatchers
13
13
import kotlinx.coroutines.Job
14
14
import kotlinx.coroutines.launch
15
- import no.nav.paw.config.kafka.asSequence
16
15
import no.nav.paw.kafkakeygenerator.listener.NoopConsumerRebalanceListener
17
16
import no.nav.paw.kafkakeygenerator.utils.buildApplicationLogger
18
17
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
@@ -21,12 +20,13 @@ import org.apache.kafka.clients.consumer.KafkaConsumer
21
20
import java.time.Duration
22
21
import java.util.concurrent.atomic.AtomicBoolean
23
22
23
+ private val logger = buildApplicationLogger
24
24
val KafkaConsumerReady : EventDefinition <Application > = EventDefinition ()
25
25
26
26
@KtorDsl
27
- class KafkaConsumerPluginConfig <K , V > {
28
- var consumeFunction: ((Sequence < ConsumerRecords <K , V > >) -> Unit )? = null
29
- var successFunction: ((Unit ) -> Unit )? = null
27
+ class KafkaConsumerPluginConfig <K , V , R > {
28
+ var consumeFunction: ((ConsumerRecords <K , V >) -> Unit )? = null
29
+ var successFunction: ((ConsumerRecords < K , V > ) -> Unit )? = null
30
30
var errorFunction: ((throwable: Throwable ) -> Unit )? = null
31
31
var kafkaConsumer: KafkaConsumer <K , V >? = null
32
32
var kafkaTopics: Collection <String >? = null
@@ -40,15 +40,26 @@ class KafkaConsumerPluginConfig<K, V> {
40
40
}
41
41
}
42
42
43
- fun <K , V > kafkaConsumerPlugin (): ApplicationPlugin <KafkaConsumerPluginConfig <K , V >> =
43
+ private fun <K , V > KafkaConsumer <K , V >.defaultSuccessFunction (records : ConsumerRecords <K , V >) {
44
+ if (! records.isEmpty) {
45
+ logger.debug(" Kafka Consumer success. {} records processed" , records.count())
46
+ this .commitSync()
47
+ }
48
+ }
49
+
50
+ private fun defaultErrorFunction (throwable : Throwable ) {
51
+ logger.error(" Kafka Consumer failed" , throwable)
52
+ throw throwable
53
+ }
54
+
55
+ fun <K , V > kafkaConsumerPlugin (): ApplicationPlugin <KafkaConsumerPluginConfig <K , V , Unit >> =
44
56
createApplicationPlugin(KafkaConsumerPluginConfig .PLUGIN_NAME , ::KafkaConsumerPluginConfig ) {
45
57
application.log.info(" Oppretter {}" , KafkaConsumerPluginConfig .PLUGIN_NAME )
46
- val logger = buildApplicationLogger
47
- val consumeFunction = requireNotNull(pluginConfig.consumeFunction) { " ConsumeFunction er null" }
48
- val successFunction = pluginConfig.successFunction ? : { logger.debug(" Kafka Consumer poll fullførte" ) }
49
- val errorFunction = pluginConfig.errorFunction ? : { logger.error(" Kafka Consumer poll feilet" ) }
50
- val kafkaConsumer = requireNotNull(pluginConfig.kafkaConsumer) { " KafkaConsumer er null" }
51
58
val kafkaTopics = requireNotNull(pluginConfig.kafkaTopics) { " KafkaTopics er null" }
59
+ val kafkaConsumer = requireNotNull(pluginConfig.kafkaConsumer) { " KafkaConsumer er null" }
60
+ val consumeFunction = requireNotNull(pluginConfig.consumeFunction) { " ConsumeFunction er null" }
61
+ val successFunction = pluginConfig.successFunction ? : kafkaConsumer::defaultSuccessFunction
62
+ val errorFunction = pluginConfig.errorFunction ? : ::defaultErrorFunction
52
63
val pollTimeout = pluginConfig.pollTimeout ? : Duration .ofMillis(100 )
53
64
val closeTimeout = pluginConfig.closeTimeout ? : Duration .ofSeconds(1 )
54
65
val rebalanceListener = pluginConfig.rebalanceListener ? : NoopConsumerRebalanceListener ()
@@ -72,15 +83,15 @@ fun <K, V> kafkaConsumerPlugin(): ApplicationPlugin<KafkaConsumerPluginConfig<K,
72
83
on(MonitoringEvent (KafkaConsumerReady )) { application ->
73
84
consumeJob = application.launch(Dispatchers .IO ) {
74
85
logger.info(" Kafka Consumer starter" )
75
- kafkaConsumer
76
- .asSequence(
77
- stop = shutdownFlag,
78
- pollTimeout = pollTimeout,
79
- closeTimeout = closeTimeout
80
- )
81
- .runCatching(consumeFunction )
82
- .mapCatching { kafkaConsumer.commitSync() }
83
- .fold(onSuccess = successFunction, onFailure = errorFunction)
86
+ while ( ! shutdownFlag.get()) {
87
+ try {
88
+ val records = kafkaConsumer.poll(pollTimeout)
89
+ consumeFunction(records)
90
+ successFunction(records)
91
+ } catch (throwable : Throwable ) {
92
+ errorFunction(throwable )
93
+ }
94
+ }
84
95
logger.info(" Kafka Consumer avsluttet" )
85
96
}
86
97
}
0 commit comments