Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IS: Refactor oppfolgingsoppgaveservice #528

Merged
merged 1 commit into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/kotlin/no/nav/syfo/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import no.nav.syfo.personstatus.infrastructure.clients.oppfolgingsoppgave.Oppfol
import no.nav.syfo.personstatus.infrastructure.clients.pdl.PdlClient
import no.nav.syfo.personstatus.infrastructure.clients.veiledertilgang.VeilederTilgangskontrollClient
import no.nav.syfo.personstatus.application.PersonBehandlendeEnhetService
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveService
import no.nav.syfo.personstatus.infrastructure.clients.veileder.VeilederClient
import no.nav.syfo.personstatus.infrastructure.cronjob.launchCronjobModule
import no.nav.syfo.personstatus.infrastructure.database.database
Expand Down Expand Up @@ -105,6 +106,7 @@ fun main() {
lateinit var personBehandlendeEnhetService: PersonBehandlendeEnhetService
lateinit var personoversiktStatusService: PersonoversiktStatusService
lateinit var oppfolgingstilfelleService: OppfolgingstilfelleService
lateinit var oppfolgingsoppgaveService: OppfolgingsoppgaveService

val applicationEnvironment = applicationEnvironment {
log = logger
Expand Down Expand Up @@ -141,6 +143,10 @@ fun main() {
personoversiktStatusRepository = personoversiktStatusRepository,
behandlendeEnhetClient = behandlendeEnhetClient,
)
oppfolgingsoppgaveService = OppfolgingsoppgaveService(
personBehandlendeEnhetService = personBehandlendeEnhetService,
personOversiktStatusRepository = personoversiktStatusRepository,
)
apiModule(
applicationState = applicationState,
database = database,
Expand Down Expand Up @@ -168,6 +174,7 @@ fun main() {
personoversiktStatusService = personoversiktStatusService,
personBehandlendeEnhetService = personBehandlendeEnhetService,
oppfolgingstilfelleService = oppfolgingstilfelleService,
oppfolgingsoppgaveService = oppfolgingsoppgaveService,
)
launchCronjobModule(
applicationState = applicationState,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package no.nav.syfo.identhendelse.kafka

import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import no.nav.syfo.identhendelse.IdenthendelseService
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
import no.nav.syfo.personstatus.infrastructure.COUNT_KAFKA_CONSUMER_PDL_AKTOR_TOMBSTONE
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
Expand All @@ -19,25 +18,23 @@ class IdenthendelseConsumerService(
override val pollDurationInMillis: Long = 1000

override suspend fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, GenericRecord>) {
runBlocking {
try {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
if (records.count() > 0) {
records.forEach { record ->
if (record.value() != null) {
identhendelseService.handleIdenthendelse(record.value().toKafkaIdenthendelseDTO())
} else {
log.warn("Identhendelse: Value of ConsumerRecord from topic $PDL_AKTOR_TOPIC is null, probably due to a tombstone. Contact the owner of the topic if an error is suspected")
COUNT_KAFKA_CONSUMER_PDL_AKTOR_TOMBSTONE.increment()
}
try {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
if (records.count() > 0) {
records.forEach { record ->
if (record.value() != null) {
identhendelseService.handleIdenthendelse(record.value().toKafkaIdenthendelseDTO())
} else {
log.warn("Identhendelse: Value of ConsumerRecord from topic $PDL_AKTOR_TOPIC is null, probably due to a tombstone. Contact the owner of the topic if an error is suspected")
COUNT_KAFKA_CONSUMER_PDL_AKTOR_TOMBSTONE.increment()
}
kafkaConsumer.commitSync()
}
} catch (ex: Exception) {
log.warn("Error running kafka consumer for pdl-aktor, unsubscribing and waiting $DELAY_ON_ERROR_SECONDS seconds for retry")
kafkaConsumer.unsubscribe()
delay(DELAY_ON_ERROR_SECONDS.seconds)
kafkaConsumer.commitSync()
}
} catch (ex: Exception) {
log.warn("Error running kafka consumer for pdl-aktor, unsubscribing and waiting $DELAY_ON_ERROR_SECONDS seconds for retry")
kafkaConsumer.unsubscribe()
delay(DELAY_ON_ERROR_SECONDS.seconds)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ interface IPersonOversiktStatusRepository {
personstatus: PersonOversiktStatus,
oppfolgingstilfelle: Oppfolgingstilfelle,
)

fun updateOppfolgingsoppgave(personIdent: PersonIdent, isActive: Boolean): Result<Int>
}
Original file line number Diff line number Diff line change
@@ -1,70 +1,44 @@
package no.nav.syfo.personstatus.application.oppfolgingsoppgave

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import no.nav.syfo.personstatus.infrastructure.database.DatabaseInterface
import no.nav.syfo.personstatus.application.IPersonOversiktStatusRepository
import no.nav.syfo.personstatus.application.PersonBehandlendeEnhetService
import no.nav.syfo.personstatus.domain.PersonIdent
import no.nav.syfo.personstatus.infrastructure.database.queries.createPersonOversiktStatus
import no.nav.syfo.personstatus.domain.PersonOversiktStatus
import no.nav.syfo.personstatus.infrastructure.kafka.oppfolgingsoppgave.COUNT_KAFKA_CONSUMER_TRENGER_OPPFOLGING_READ
import no.nav.syfo.personstatus.infrastructure.database.queries.getPersonOversiktStatusList
import no.nav.syfo.personstatus.infrastructure.database.queries.updateOppfolgingsoppgave
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import kotlin.collections.filter
import kotlin.collections.firstOrNull
import kotlin.collections.forEach
import kotlin.jvm.java
import kotlin.use

class OppfolgingsoppgaveService(
private val database: DatabaseInterface,
private val personBehandlendeEnhetService: PersonBehandlendeEnhetService,
private val personOversiktStatusRepository: IPersonOversiktStatusRepository,
) {
fun processOppfolgingsoppgave(records: List<OppfolgingsoppgaveRecord>) {
createOrUpdatePersonOversiktStatus(records)
records.filter { it.isActive }.forEach {
updateBehandlendeEnhet(PersonIdent(it.personIdent))
}
}

private fun createOrUpdatePersonOversiktStatus(records: List<OppfolgingsoppgaveRecord>) {
database.connection.use { connection ->
records.forEach { oppfolgingsOppgave ->
val existingPersonOversiktStatus = connection.getPersonOversiktStatusList(
fnr = oppfolgingsOppgave.personIdent,
).firstOrNull()

if (existingPersonOversiktStatus == null) {
val personoversiktStatus = PersonOversiktStatus(
fnr = oppfolgingsOppgave.personIdent,
isAktivOppfolgingsoppgave = oppfolgingsOppgave.isActive,
)
connection.createPersonOversiktStatus(
commit = false,
personOversiktStatus = personoversiktStatus,
)
} else {
connection.updateOppfolgingsoppgave(oppfolgingsoppgave = oppfolgingsOppgave)
}

log.info("Received oppfolgingsOppgave with uuid=${oppfolgingsOppgave.uuid}")
COUNT_KAFKA_CONSUMER_TRENGER_OPPFOLGING_READ.increment()
}
connection.commit()
fun createOrUpdatePersonOversiktStatus(personIdent: PersonIdent, isActiveOppfolgingsoppgave: Boolean) {
val existingPersonOversiktStatus =
personOversiktStatusRepository.getPersonOversiktStatus(personIdent)
if (existingPersonOversiktStatus == null) {
val personoversiktStatus = PersonOversiktStatus(
fnr = personIdent.value,
isAktivOppfolgingsoppgave = isActiveOppfolgingsoppgave,
)
personOversiktStatusRepository.createPersonOversiktStatus(
personOversiktStatus = personoversiktStatus,
)
} else {
personOversiktStatusRepository.updateOppfolgingsoppgave(
personIdent = personIdent,
isActive = isActiveOppfolgingsoppgave,
)
}

COUNT_KAFKA_CONSUMER_TRENGER_OPPFOLGING_READ.increment()
}

private fun updateBehandlendeEnhet(personIdent: PersonIdent) {
suspend fun updateBehandlendeEnhet(personIdent: PersonIdent) {
try {
runBlocking {
launch(Dispatchers.IO) {
personBehandlendeEnhetService.updateBehandlendeEnhet(personIdent = personIdent)
log.info("Updated Behandlende Enhet of person after received active oppfolgingsoppgave")
}
}
personBehandlendeEnhetService.updateBehandlendeEnhet(personIdent = personIdent)
log.info("Updated Behandlende Enhet of person after received active oppfolgingsoppgave")
} catch (ex: Exception) {
log.error(
"Exception caught while attempting to update Behandlende Enhet of person after received active oppfolgingsoppgave",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const val queryGetPersonOversiktStatusList =
WHERE fnr = ?
"""

@Deprecated("Use getPersonOversiktStatus in PersonOversiktStatusRepository")
fun Connection.getPersonOversiktStatusList(
fnr: String,
): List<PPersonOversiktStatus> =
Expand All @@ -25,6 +26,7 @@ fun Connection.getPersonOversiktStatusList(
it.executeQuery().toList { toPPersonOversiktStatus() }
}

@Deprecated("Use getPersonOversiktStatus in PersonOversiktStatusRepository")
fun DatabaseInterface.getPersonOversiktStatusList(
fnr: String,
): List<PPersonOversiktStatus> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,28 @@ class PersonOversiktStatusRepository(private val database: DatabaseInterface) :
}
}

override fun updateOppfolgingsoppgave(personIdent: PersonIdent, isActiveOppfolgingsoppgave: Boolean): Result<Int> {
return try {
database.connection.use { connection ->
val rowsAffected = connection.prepareStatement(UPDATE_PERSON_OPPFOLGINGSOPPGAVE).use {
it.setBoolean(1, isActiveOppfolgingsoppgave)
it.setTimestamp(2, Timestamp.from(Instant.now()))
it.setString(3, personIdent.value)
it.executeUpdate()
}
if (rowsAffected == 1) {
connection.commit()
Result.success(rowsAffected)
} else {
connection.rollback()
Result.failure(RuntimeException("updateOppfolgingsoppgave failed, expected single row to be updated."))
}
}
} catch (e: SQLException) {
Result.failure(e)
}
}

companion object {
private const val GET_PERSON_OVERSIKT_STATUS =
"""
Expand Down Expand Up @@ -603,6 +625,13 @@ class PersonOversiktStatusRepository(private val database: DatabaseInterface) :
RETURNING id
"""

private const val UPDATE_PERSON_OPPFOLGINGSOPPGAVE =
"""
UPDATE PERSON_OVERSIKT_STATUS
SET trenger_oppfolging = ?, sist_endret = ?
WHERE fnr = ?
"""

private const val SEARCH_PERSON_BASE_QUERY =
"SELECT * FROM PERSON_OVERSIKT_STATUS p WHERE (p.oppfolgingstilfelle_end + INTERVAL '16 DAY' >= now() OR $AKTIV_OPPGAVE_WHERE_CLAUSE)"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import no.nav.syfo.pdlpersonhendelse.kafka.launchKafkaTaskPersonhendelse
import no.nav.syfo.personoppgavehendelse.kafka.launchKafkaTaskPersonoppgavehendelse
import no.nav.syfo.personstatus.application.PersonoversiktStatusService
import no.nav.syfo.personstatus.application.OppfolgingstilfelleService
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveService
import no.nav.syfo.personstatus.infrastructure.kafka.behandlendeenhet.BehandlendeEnhetConsumer
import no.nav.syfo.personstatus.infrastructure.kafka.manglendemedvirkning.ManglendeMedvirkningVurderingConsumer
import no.nav.syfo.personstatus.infrastructure.kafka.meroppfolging.SenOppfolgingKandidatStatusConsumer
Expand All @@ -26,6 +27,7 @@ fun launchKafkaModule(
personoversiktStatusService: PersonoversiktStatusService,
personBehandlendeEnhetService: PersonBehandlendeEnhetService,
oppfolgingstilfelleService: OppfolgingstilfelleService,
oppfolgingsoppgaveService: OppfolgingsoppgaveService,
) {
launchKafkaTaskPersonoppgavehendelse(
applicationState = applicationState,
Expand Down Expand Up @@ -66,7 +68,7 @@ fun launchKafkaModule(
launchOppfolgingsoppgaveConsumer(
applicationState = applicationState,
kafkaEnvironment = environment.kafka,
personBehandlendeEnhetService = personBehandlendeEnhetService,
oppfolgingsoppgaveService = oppfolgingsoppgaveService,
)
launchKafkaTaskFriskTilArbeidVedtak(
applicationState = applicationState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,48 @@ package no.nav.syfo.personstatus.infrastructure.kafka.oppfolgingsoppgave

import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveRecord
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveService
import no.nav.syfo.personstatus.domain.PersonIdent
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.slf4j.LoggerFactory
import java.time.Duration
import kotlin.collections.map
import kotlin.collections.requireNoNulls

class OppfolgingsoppgaveConsumer(
private val oppfolgingsoppgaveService: OppfolgingsoppgaveService,
) : KafkaConsumerService<OppfolgingsoppgaveRecord> {

override val pollDurationInMillis: Long = 1000

override suspend fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, OppfolgingsoppgaveRecord>) {
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
override suspend fun pollAndProcessRecords(consumer: KafkaConsumer<String, OppfolgingsoppgaveRecord>) {
val records = consumer.poll(Duration.ofMillis(pollDurationInMillis))
if (records.count() > 0) {
processRecords(records)
kafkaConsumer.commitSync()
consumer.commitSync()
}
}

private fun processRecords(
consumerRecords: ConsumerRecords<String, OppfolgingsoppgaveRecord>,
) {
val validRecords = consumerRecords.requireNoNulls()
oppfolgingsoppgaveService.processOppfolgingsoppgave(
records = validRecords.map { it.value() }
)
private suspend fun processRecords(records: ConsumerRecords<String, OppfolgingsoppgaveRecord>) {
val (tombstoneRecords, validRecords) = records.partition { it.value() == null }
if (tombstoneRecords.isNotEmpty()) {
val numberOfTombstones = tombstoneRecords.size
log.error("Value of $numberOfTombstones ConsumerRecord are null, most probably due to a tombstone. Contact the owner of the topic if an error is suspected")
}
validRecords.forEach { record ->
val personident = PersonIdent(record.value().personIdent)
oppfolgingsoppgaveService.createOrUpdatePersonOversiktStatus(
personIdent = personident,
isActiveOppfolgingsoppgave = record.value().isActive
)
if (record.value().isActive) {
oppfolgingsoppgaveService.updateBehandlendeEnhet(
personIdent = personident
)
}
}
}

companion object {
private val log = LoggerFactory.getLogger(this::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package no.nav.syfo.personstatus.infrastructure.kafka.oppfolgingsoppgave

import no.nav.syfo.ApplicationState
import no.nav.syfo.personstatus.infrastructure.database.database
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaEnvironment
import no.nav.syfo.personstatus.infrastructure.kafka.kafkaAivenConsumerConfig
import no.nav.syfo.personstatus.application.PersonBehandlendeEnhetService
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveRecord
import no.nav.syfo.personstatus.application.oppfolgingsoppgave.OppfolgingsoppgaveService
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaEnvironment
import no.nav.syfo.personstatus.infrastructure.kafka.kafkaAivenConsumerConfig
import no.nav.syfo.personstatus.infrastructure.kafka.launchKafkaTask
import no.nav.syfo.util.configuredJacksonMapper
import org.apache.kafka.clients.consumer.ConsumerConfig
Expand All @@ -22,12 +20,8 @@ const val HUSKELAPP_TOPIC =
fun launchOppfolgingsoppgaveConsumer(
applicationState: ApplicationState,
kafkaEnvironment: KafkaEnvironment,
personBehandlendeEnhetService: PersonBehandlendeEnhetService,
oppfolgingsoppgaveService: OppfolgingsoppgaveService,
) {
val oppfolgingsoppgaveService = OppfolgingsoppgaveService(
database = database,
personBehandlendeEnhetService = personBehandlendeEnhetService,
)
val oppfolgingsoppgaveConsumer = OppfolgingsoppgaveConsumer(
oppfolgingsoppgaveService = oppfolgingsoppgaveService,
)
Expand Down
Loading
Loading