-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathIdenthendelseConsumerService.kt
61 lines (56 loc) · 2.67 KB
/
IdenthendelseConsumerService.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package no.nav.syfo.identhendelse.kafka
import kotlinx.coroutines.delay
import no.nav.syfo.identhendelse.IdenthendelseService
import no.nav.syfo.personstatus.infrastructure.COUNT_KAFKA_CONSUMER_PDL_AKTOR_TOMBSTONE
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import kotlin.time.Duration.Companion.seconds
class IdenthendelseConsumerService(
private val identhendelseService: IdenthendelseService,
) : KafkaConsumerService<GenericRecord> {
override val pollDurationInMillis: Long = 1000
override suspend fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, GenericRecord>) {
try {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
if (records.count() > 0) {
records.forEach { record ->
if (record.value() != null) {
identhendelseService.handleIdenthendelse(record.value().toKafkaIdenthendelseDTO())
} else {
log.warn("Identhendelse: Value of ConsumerRecord from topic $PDL_AKTOR_TOPIC is null, probably due to a tombstone. Contact the owner of the topic if an error is suspected")
COUNT_KAFKA_CONSUMER_PDL_AKTOR_TOMBSTONE.increment()
}
}
kafkaConsumer.commitSync()
}
} catch (ex: Exception) {
log.warn("Error running kafka consumer for pdl-aktor, unsubscribing and waiting $DELAY_ON_ERROR_SECONDS seconds for retry")
kafkaConsumer.unsubscribe()
delay(DELAY_ON_ERROR_SECONDS.seconds)
}
}
companion object {
private const val DELAY_ON_ERROR_SECONDS = 60L
private val log: Logger = LoggerFactory.getLogger("no.nav.syfo.identhendelse")
}
}
fun GenericRecord.toKafkaIdenthendelseDTO(): KafkaIdenthendelseDTO {
val identifikatorer = (get("identifikatorer") as GenericData.Array<GenericRecord>).map {
Identifikator(
idnummer = it.get("idnummer").toString(),
gjeldende = it.get("gjeldende").toString().toBoolean(),
type = when (it.get("type").toString()) {
"FOLKEREGISTERIDENT" -> IdentType.FOLKEREGISTERIDENT
"AKTORID" -> IdentType.AKTORID
"NPID" -> IdentType.NPID
else -> throw IllegalStateException("Har mottatt ident med ukjent type")
}
)
}
return KafkaIdenthendelseDTO(identifikatorer)
}