Skip to content

Commit 46c1b1e

Browse files
committed
IS: Refactor oppfolgingsoppgaveservice
1 parent 9fa04ad commit 46c1b1e

File tree

10 files changed

+120
-99
lines changed

10 files changed

+120
-99
lines changed

src/main/kotlin/no/nav/syfo/App.kt

+7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import no.nav.syfo.personstatus.infrastructure.clients.oppfolgingsoppgave.Oppfol
2121
import no.nav.syfo.personstatus.infrastructure.clients.pdl.PdlClient
2222
import no.nav.syfo.personstatus.infrastructure.clients.veiledertilgang.VeilederTilgangskontrollClient
2323
import no.nav.syfo.personstatus.application.PersonBehandlendeEnhetService
24+
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveService
2425
import no.nav.syfo.personstatus.infrastructure.clients.veileder.VeilederClient
2526
import no.nav.syfo.personstatus.infrastructure.cronjob.launchCronjobModule
2627
import no.nav.syfo.personstatus.infrastructure.database.database
@@ -105,6 +106,7 @@ fun main() {
105106
lateinit var personBehandlendeEnhetService: PersonBehandlendeEnhetService
106107
lateinit var personoversiktStatusService: PersonoversiktStatusService
107108
lateinit var oppfolgingstilfelleService: OppfolgingstilfelleService
109+
lateinit var oppfolgingsoppgaveService: OppfolgingsoppgaveService
108110

109111
val applicationEnvironment = applicationEnvironment {
110112
log = logger
@@ -141,6 +143,10 @@ fun main() {
141143
personoversiktStatusRepository = personoversiktStatusRepository,
142144
behandlendeEnhetClient = behandlendeEnhetClient,
143145
)
146+
oppfolgingsoppgaveService = OppfolgingsoppgaveService(
147+
personBehandlendeEnhetService = personBehandlendeEnhetService,
148+
personOversiktStatusRepository = personoversiktStatusRepository,
149+
)
144150
apiModule(
145151
applicationState = applicationState,
146152
database = database,
@@ -168,6 +174,7 @@ fun main() {
168174
personoversiktStatusService = personoversiktStatusService,
169175
personBehandlendeEnhetService = personBehandlendeEnhetService,
170176
oppfolgingstilfelleService = oppfolgingstilfelleService,
177+
oppfolgingsoppgaveService = oppfolgingsoppgaveService,
171178
)
172179
launchCronjobModule(
173180
applicationState = applicationState,

src/main/kotlin/no/nav/syfo/identhendelse/kafka/IdenthendelseConsumerService.kt

+15-18
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package no.nav.syfo.identhendelse.kafka
22

33
import kotlinx.coroutines.delay
4-
import kotlinx.coroutines.runBlocking
54
import no.nav.syfo.identhendelse.IdenthendelseService
6-
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
75
import no.nav.syfo.personstatus.infrastructure.COUNT_KAFKA_CONSUMER_PDL_AKTOR_TOMBSTONE
6+
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
87
import org.apache.avro.generic.GenericData
98
import org.apache.avro.generic.GenericRecord
109
import org.apache.kafka.clients.consumer.KafkaConsumer
@@ -19,25 +18,23 @@ class IdenthendelseConsumerService(
1918
override val pollDurationInMillis: Long = 1000
2019

2120
override suspend fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, GenericRecord>) {
22-
runBlocking {
23-
try {
24-
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
25-
if (records.count() > 0) {
26-
records.forEach { record ->
27-
if (record.value() != null) {
28-
identhendelseService.handleIdenthendelse(record.value().toKafkaIdenthendelseDTO())
29-
} else {
30-
log.warn("Identhendelse: Value of ConsumerRecord from topic $PDL_AKTOR_TOPIC is null, probably due to a tombstone. Contact the owner of the topic if an error is suspected")
31-
COUNT_KAFKA_CONSUMER_PDL_AKTOR_TOMBSTONE.increment()
32-
}
21+
try {
22+
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
23+
if (records.count() > 0) {
24+
records.forEach { record ->
25+
if (record.value() != null) {
26+
identhendelseService.handleIdenthendelse(record.value().toKafkaIdenthendelseDTO())
27+
} else {
28+
log.warn("Identhendelse: Value of ConsumerRecord from topic $PDL_AKTOR_TOPIC is null, probably due to a tombstone. Contact the owner of the topic if an error is suspected")
29+
COUNT_KAFKA_CONSUMER_PDL_AKTOR_TOMBSTONE.increment()
3330
}
34-
kafkaConsumer.commitSync()
3531
}
36-
} catch (ex: Exception) {
37-
log.warn("Error running kafka consumer for pdl-aktor, unsubscribing and waiting $DELAY_ON_ERROR_SECONDS seconds for retry")
38-
kafkaConsumer.unsubscribe()
39-
delay(DELAY_ON_ERROR_SECONDS.seconds)
32+
kafkaConsumer.commitSync()
4033
}
34+
} catch (ex: Exception) {
35+
log.warn("Error running kafka consumer for pdl-aktor, unsubscribing and waiting $DELAY_ON_ERROR_SECONDS seconds for retry")
36+
kafkaConsumer.unsubscribe()
37+
delay(DELAY_ON_ERROR_SECONDS.seconds)
4138
}
4239
}
4340

src/main/kotlin/no/nav/syfo/personstatus/application/IPersonOversiktStatusRepository.kt

+2
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,6 @@ interface IPersonOversiktStatusRepository {
4848
personstatus: PersonOversiktStatus,
4949
oppfolgingstilfelle: Oppfolgingstilfelle,
5050
)
51+
52+
fun updateOppfolgingsoppgave(personIdent: PersonIdent, isActive: Boolean): Result<Int>
5153
}

src/main/kotlin/no/nav/syfo/personstatus/application/oppfolgingsoppgave/OppfolgingsoppgaveService.kt

+23-49
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,44 @@
11
package no.nav.syfo.personstatus.application.oppfolgingsoppgave
22

3-
import kotlinx.coroutines.Dispatchers
4-
import kotlinx.coroutines.launch
5-
import kotlinx.coroutines.runBlocking
6-
import no.nav.syfo.personstatus.infrastructure.database.DatabaseInterface
3+
import no.nav.syfo.personstatus.application.IPersonOversiktStatusRepository
74
import no.nav.syfo.personstatus.application.PersonBehandlendeEnhetService
85
import no.nav.syfo.personstatus.domain.PersonIdent
9-
import no.nav.syfo.personstatus.infrastructure.database.queries.createPersonOversiktStatus
106
import no.nav.syfo.personstatus.domain.PersonOversiktStatus
117
import no.nav.syfo.personstatus.infrastructure.kafka.oppfolgingsoppgave.COUNT_KAFKA_CONSUMER_TRENGER_OPPFOLGING_READ
12-
import no.nav.syfo.personstatus.infrastructure.database.queries.getPersonOversiktStatusList
13-
import no.nav.syfo.personstatus.infrastructure.database.queries.updateOppfolgingsoppgave
148
import org.slf4j.Logger
159
import org.slf4j.LoggerFactory
16-
import kotlin.collections.filter
17-
import kotlin.collections.firstOrNull
18-
import kotlin.collections.forEach
1910
import kotlin.jvm.java
20-
import kotlin.use
2111

2212
class OppfolgingsoppgaveService(
23-
private val database: DatabaseInterface,
2413
private val personBehandlendeEnhetService: PersonBehandlendeEnhetService,
14+
private val personOversiktStatusRepository: IPersonOversiktStatusRepository,
2515
) {
26-
fun processOppfolgingsoppgave(records: List<OppfolgingsoppgaveRecord>) {
27-
createOrUpdatePersonOversiktStatus(records)
28-
records.filter { it.isActive }.forEach {
29-
updateBehandlendeEnhet(PersonIdent(it.personIdent))
30-
}
31-
}
32-
33-
private fun createOrUpdatePersonOversiktStatus(records: List<OppfolgingsoppgaveRecord>) {
34-
database.connection.use { connection ->
35-
records.forEach { oppfolgingsOppgave ->
36-
val existingPersonOversiktStatus = connection.getPersonOversiktStatusList(
37-
fnr = oppfolgingsOppgave.personIdent,
38-
).firstOrNull()
39-
40-
if (existingPersonOversiktStatus == null) {
41-
val personoversiktStatus = PersonOversiktStatus(
42-
fnr = oppfolgingsOppgave.personIdent,
43-
isAktivOppfolgingsoppgave = oppfolgingsOppgave.isActive,
44-
)
45-
connection.createPersonOversiktStatus(
46-
commit = false,
47-
personOversiktStatus = personoversiktStatus,
48-
)
49-
} else {
50-
connection.updateOppfolgingsoppgave(oppfolgingsoppgave = oppfolgingsOppgave)
51-
}
5216

53-
log.info("Received oppfolgingsOppgave with uuid=${oppfolgingsOppgave.uuid}")
54-
COUNT_KAFKA_CONSUMER_TRENGER_OPPFOLGING_READ.increment()
55-
}
56-
connection.commit()
17+
fun createOrUpdatePersonOversiktStatus(personIdent: PersonIdent, isActiveOppfolgingsoppgave: Boolean) {
18+
val existingPersonOversiktStatus =
19+
personOversiktStatusRepository.getPersonOversiktStatus(personIdent)
20+
if (existingPersonOversiktStatus == null) {
21+
val personoversiktStatus = PersonOversiktStatus(
22+
fnr = personIdent.value,
23+
isAktivOppfolgingsoppgave = isActiveOppfolgingsoppgave,
24+
)
25+
personOversiktStatusRepository.createPersonOversiktStatus(
26+
personOversiktStatus = personoversiktStatus,
27+
)
28+
} else {
29+
personOversiktStatusRepository.updateOppfolgingsoppgave(
30+
personIdent = personIdent,
31+
isActive = isActiveOppfolgingsoppgave,
32+
)
5733
}
34+
35+
COUNT_KAFKA_CONSUMER_TRENGER_OPPFOLGING_READ.increment()
5836
}
5937

60-
private fun updateBehandlendeEnhet(personIdent: PersonIdent) {
38+
suspend fun updateBehandlendeEnhet(personIdent: PersonIdent) {
6139
try {
62-
runBlocking {
63-
launch(Dispatchers.IO) {
64-
personBehandlendeEnhetService.updateBehandlendeEnhet(personIdent = personIdent)
65-
log.info("Updated Behandlende Enhet of person after received active oppfolgingsoppgave")
66-
}
67-
}
40+
personBehandlendeEnhetService.updateBehandlendeEnhet(personIdent = personIdent)
41+
log.info("Updated Behandlende Enhet of person after received active oppfolgingsoppgave")
6842
} catch (ex: Exception) {
6943
log.error(
7044
"Exception caught while attempting to update Behandlende Enhet of person after received active oppfolgingsoppgave",

src/main/kotlin/no/nav/syfo/personstatus/infrastructure/database/queries/getFromPersonOversiktStatus.kt

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const val queryGetPersonOversiktStatusList =
1717
WHERE fnr = ?
1818
"""
1919

20+
// Deprecated: Use getPersonOversiktStatus in PersonOversiktStatusRepository instead
2021
fun Connection.getPersonOversiktStatusList(
2122
fnr: String,
2223
): List<PPersonOversiktStatus> =

src/main/kotlin/no/nav/syfo/personstatus/infrastructure/database/repository/PersonOversiktStatusRepository.kt

+29
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,28 @@ class PersonOversiktStatusRepository(private val database: DatabaseInterface) :
411411
}
412412
}
413413

414+
override fun updateOppfolgingsoppgave(personIdent: PersonIdent, isActiveOppfolgingsoppgave: Boolean): Result<Int> {
415+
return try {
416+
database.connection.use { connection ->
417+
val rowsAffected = connection.prepareStatement(UPDATE_PERSON_OPPFOLGINGSOPPGAVE).use {
418+
it.setBoolean(1, isActiveOppfolgingsoppgave)
419+
it.setTimestamp(2, Timestamp.from(Instant.now()))
420+
it.setString(3, personIdent.value)
421+
it.executeUpdate()
422+
}
423+
if (rowsAffected == 1) {
424+
connection.commit()
425+
Result.success(rowsAffected)
426+
} else {
427+
connection.rollback()
428+
Result.failure(RuntimeException("updateOppfolgingsoppgave failed, expected single row to be updated."))
429+
}
430+
}
431+
} catch (e: SQLException) {
432+
Result.failure(e)
433+
}
434+
}
435+
414436
companion object {
415437
private const val GET_PERSON_OVERSIKT_STATUS =
416438
"""
@@ -603,6 +625,13 @@ class PersonOversiktStatusRepository(private val database: DatabaseInterface) :
603625
RETURNING id
604626
"""
605627

628+
private const val UPDATE_PERSON_OPPFOLGINGSOPPGAVE =
629+
"""
630+
UPDATE PERSON_OVERSIKT_STATUS
631+
SET trenger_oppfolging = ?, sist_endret = ?
632+
WHERE fnr = ?
633+
"""
634+
606635
private const val SEARCH_PERSON_BASE_QUERY =
607636
"SELECT * FROM PERSON_OVERSIKT_STATUS p WHERE (p.oppfolgingstilfelle_end + INTERVAL '16 DAY' >= now() OR $AKTIV_OPPGAVE_WHERE_CLAUSE)"
608637

src/main/kotlin/no/nav/syfo/personstatus/infrastructure/kafka/KafkaModule.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import no.nav.syfo.pdlpersonhendelse.kafka.launchKafkaTaskPersonhendelse
1414
import no.nav.syfo.personoppgavehendelse.kafka.launchKafkaTaskPersonoppgavehendelse
1515
import no.nav.syfo.personstatus.application.PersonoversiktStatusService
1616
import no.nav.syfo.personstatus.application.OppfolgingstilfelleService
17+
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveService
1718
import no.nav.syfo.personstatus.infrastructure.kafka.behandlendeenhet.BehandlendeEnhetConsumer
1819
import no.nav.syfo.personstatus.infrastructure.kafka.manglendemedvirkning.ManglendeMedvirkningVurderingConsumer
1920
import no.nav.syfo.personstatus.infrastructure.kafka.meroppfolging.SenOppfolgingKandidatStatusConsumer
@@ -26,6 +27,7 @@ fun launchKafkaModule(
2627
personoversiktStatusService: PersonoversiktStatusService,
2728
personBehandlendeEnhetService: PersonBehandlendeEnhetService,
2829
oppfolgingstilfelleService: OppfolgingstilfelleService,
30+
oppfolgingsoppgaveService: OppfolgingsoppgaveService,
2931
) {
3032
launchKafkaTaskPersonoppgavehendelse(
3133
applicationState = applicationState,
@@ -66,7 +68,7 @@ fun launchKafkaModule(
6668
launchOppfolgingsoppgaveConsumer(
6769
applicationState = applicationState,
6870
kafkaEnvironment = environment.kafka,
69-
personBehandlendeEnhetService = personBehandlendeEnhetService,
71+
oppfolgingsoppgaveService = oppfolgingsoppgaveService,
7072
)
7173
launchKafkaTaskFriskTilArbeidVedtak(
7274
applicationState = applicationState,

src/main/kotlin/no/nav/syfo/personstatus/infrastructure/kafka/oppfolgingsoppgave/OppfolgingsoppgaveConsumer.kt

+27-12
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,48 @@ package no.nav.syfo.personstatus.infrastructure.kafka.oppfolgingsoppgave
22

33
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveRecord
44
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveService
5+
import no.nav.syfo.personstatus.domain.PersonIdent
56
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
67
import org.apache.kafka.clients.consumer.ConsumerRecords
78
import org.apache.kafka.clients.consumer.KafkaConsumer
9+
import org.slf4j.LoggerFactory
810
import java.time.Duration
9-
import kotlin.collections.map
10-
import kotlin.collections.requireNoNulls
1111

1212
class OppfolgingsoppgaveConsumer(
1313
private val oppfolgingsoppgaveService: OppfolgingsoppgaveService,
1414
) : KafkaConsumerService<OppfolgingsoppgaveRecord> {
1515

1616
override val pollDurationInMillis: Long = 1000
1717

18-
override suspend fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, OppfolgingsoppgaveRecord>) {
19-
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
18+
override suspend fun pollAndProcessRecords(consumer: KafkaConsumer<String, OppfolgingsoppgaveRecord>) {
19+
val records = consumer.poll(Duration.ofMillis(pollDurationInMillis))
2020
if (records.count() > 0) {
2121
processRecords(records)
22-
kafkaConsumer.commitSync()
22+
consumer.commitSync()
2323
}
2424
}
2525

26-
private fun processRecords(
27-
consumerRecords: ConsumerRecords<String, OppfolgingsoppgaveRecord>,
28-
) {
29-
val validRecords = consumerRecords.requireNoNulls()
30-
oppfolgingsoppgaveService.processOppfolgingsoppgave(
31-
records = validRecords.map { it.value() }
32-
)
26+
private suspend fun processRecords(records: ConsumerRecords<String, OppfolgingsoppgaveRecord>) {
27+
val (tombstoneRecords, validRecords) = records.partition { it.value() == null }
28+
if (tombstoneRecords.isNotEmpty()) {
29+
val numberOfTombstones = tombstoneRecords.size
30+
log.error("Value of $numberOfTombstones ConsumerRecord are null, most probably due to a tombstone. Contact the owner of the topic if an error is suspected")
31+
}
32+
validRecords.forEach { record ->
33+
val personident = PersonIdent(record.value().personIdent)
34+
oppfolgingsoppgaveService.createOrUpdatePersonOversiktStatus(
35+
personIdent = personident,
36+
isActiveOppfolgingsoppgave = record.value().isActive
37+
)
38+
if (record.value().isActive) {
39+
oppfolgingsoppgaveService.updateBehandlendeEnhet(
40+
personIdent = personident
41+
)
42+
}
43+
}
44+
}
45+
46+
companion object {
47+
private val log = LoggerFactory.getLogger(this::class.java)
3348
}
3449
}

src/main/kotlin/no/nav/syfo/personstatus/infrastructure/kafka/oppfolgingsoppgave/OppfolgingsoppgaveTask.kt

+3-9
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package no.nav.syfo.personstatus.infrastructure.kafka.oppfolgingsoppgave
22

33
import no.nav.syfo.ApplicationState
4-
import no.nav.syfo.personstatus.infrastructure.database.database
5-
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaEnvironment
6-
import no.nav.syfo.personstatus.infrastructure.kafka.kafkaAivenConsumerConfig
7-
import no.nav.syfo.personstatus.application.PersonBehandlendeEnhetService
84
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveRecord
95
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveService
6+
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaEnvironment
7+
import no.nav.syfo.personstatus.infrastructure.kafka.kafkaAivenConsumerConfig
108
import no.nav.syfo.personstatus.infrastructure.kafka.launchKafkaTask
119
import no.nav.syfo.util.configuredJacksonMapper
1210
import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -22,12 +20,8 @@ const val HUSKELAPP_TOPIC =
2220
fun launchOppfolgingsoppgaveConsumer(
2321
applicationState: ApplicationState,
2422
kafkaEnvironment: KafkaEnvironment,
25-
personBehandlendeEnhetService: PersonBehandlendeEnhetService,
23+
oppfolgingsoppgaveService: OppfolgingsoppgaveService,
2624
) {
27-
val oppfolgingsoppgaveService = OppfolgingsoppgaveService(
28-
database = database,
29-
personBehandlendeEnhetService = personBehandlendeEnhetService,
30-
)
3125
val oppfolgingsoppgaveConsumer = OppfolgingsoppgaveConsumer(
3226
oppfolgingsoppgaveService = oppfolgingsoppgaveService,
3327
)

0 commit comments

Comments
 (0)