Skip to content

Commit b840485

Browse files
committed
IS-3142: Add consumer for updating oppfolgingsenhet changes
1 parent 51ada41 commit b840485

14 files changed

+183
-58
lines changed

src/main/kotlin/no/nav/syfo/personstatus/application/IPersonOversiktStatusRepository.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ interface IPersonOversiktStatusRepository {
3030

3131
fun getVeilederTilknytningHistorikk(personident: PersonIdent): List<VeilederTildelingHistorikkDTO>
3232

33-
fun getPersonerWithOppgaveAndOldEnhet(): List<Pair<PersonIdent, String?>>
33+
fun getPersonerWithOppgaveAndOldEnhet(): List<PersonIdent>
3434

3535
fun getPersonerWithVeilederTildelingAndOldOppfolgingstilfelle(): List<PersonOversiktStatus>
3636

src/main/kotlin/no/nav/syfo/personstatus/application/PersonBehandlendeEnhetService.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,21 @@ class PersonBehandlendeEnhetService(
88
private val personoversiktStatusRepository: IPersonOversiktStatusRepository,
99
private val behandlendeEnhetClient: BehandlendeEnhetClient,
1010
) {
11-
fun getPersonerToCheckForUpdatedEnhet(): List<Pair<PersonIdent, String?>> =
11+
fun getPersonerToCheckForUpdatedEnhet(): List<PersonIdent> =
1212
personoversiktStatusRepository.getPersonerWithOppgaveAndOldEnhet()
1313

14-
suspend fun updateBehandlendeEnhet(
15-
personIdent: PersonIdent,
16-
tildeltEnhet: String? = null,
17-
) {
18-
val maybeNewBehandlendeEnhet = behandlendeEnhetClient.getEnhet(
14+
suspend fun updateBehandlendeEnhet(personIdent: PersonIdent) {
15+
val behandlendeEnhet = behandlendeEnhetClient.getEnhet(
1916
callId = UUID.randomUUID().toString(),
2017
personIdent = personIdent,
2118
)
22-
if (maybeNewBehandlendeEnhet != null && maybeNewBehandlendeEnhet.enhetId != tildeltEnhet) {
19+
val tildeltEnhet = personoversiktStatusRepository.getPersonOversiktStatus(personIdent)?.enhet
20+
val isEnhetUpdate = behandlendeEnhet != null && behandlendeEnhet.enhetId != tildeltEnhet
21+
22+
if (isEnhetUpdate) {
2323
personoversiktStatusRepository.updatePersonTildeltEnhetAndRemoveTildeltVeileder(
2424
personIdent = personIdent,
25-
enhetId = maybeNewBehandlendeEnhet.enhetId,
25+
enhetId = behandlendeEnhet.enhetId,
2626
)
2727
} else {
2828
personoversiktStatusRepository.updatePersonTildeltEnhetUpdatedAt(

src/main/kotlin/no/nav/syfo/personstatus/application/oppfolgingsoppgave/OppfolgingsoppgaveService.kt

+3-13
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import no.nav.syfo.personstatus.infrastructure.database.queries.updateOppfolging
1414
import org.slf4j.Logger
1515
import org.slf4j.LoggerFactory
1616
import kotlin.collections.filter
17-
import kotlin.collections.first
1817
import kotlin.collections.firstOrNull
1918
import kotlin.collections.forEach
2019
import kotlin.jvm.java
@@ -27,10 +26,7 @@ class OppfolgingsoppgaveService(
2726
fun processOppfolgingsoppgave(records: List<OppfolgingsoppgaveRecord>) {
2827
createOrUpdatePersonOversiktStatus(records)
2928
records.filter { it.isActive }.forEach {
30-
val personOversiktStatus = database.getPersonOversiktStatusList(
31-
fnr = it.personIdent,
32-
).first()
33-
updateBehandlendeEnhet(PersonIdent(it.personIdent), personOversiktStatus.enhet)
29+
updateBehandlendeEnhet(PersonIdent(it.personIdent))
3430
}
3531
}
3632

@@ -61,17 +57,11 @@ class OppfolgingsoppgaveService(
6157
}
6258
}
6359

64-
private fun updateBehandlendeEnhet(
65-
personIdent: PersonIdent,
66-
existingEnhet: String?,
67-
) {
60+
private fun updateBehandlendeEnhet(personIdent: PersonIdent) {
6861
try {
6962
runBlocking {
7063
launch(Dispatchers.IO) {
71-
personBehandlendeEnhetService.updateBehandlendeEnhet(
72-
personIdent = personIdent,
73-
tildeltEnhet = existingEnhet
74-
)
64+
personBehandlendeEnhetService.updateBehandlendeEnhet(personIdent = personIdent)
7565
log.info("Updated Behandlende Enhet of person after received active oppfolgingsoppgave")
7666
}
7767
}

src/main/kotlin/no/nav/syfo/personstatus/infrastructure/cronjob/behandlendeenhet/PersonBehandlendeEnhetCronjob.kt

+2-6
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,9 @@ class PersonBehandlendeEnhetCronjob(
2222
val result = CronjobResult()
2323

2424
personBehandlendeEnhetService.getPersonerToCheckForUpdatedEnhet()
25-
.forEach { personIdentTildeltEnhetPair ->
25+
.forEach { personident ->
2626
try {
27-
val (personIdent, tildeltEnhet) = personIdentTildeltEnhetPair
28-
personBehandlendeEnhetService.updateBehandlendeEnhet(
29-
personIdent = personIdent,
30-
tildeltEnhet = tildeltEnhet,
31-
)
27+
personBehandlendeEnhetService.updateBehandlendeEnhet(personIdent = personident)
3228
result.updated++
3329
COUNT_CRONJOB_PERSON_BEHANDLENDE_ENHET_UPDATE.increment()
3430
} catch (ex: Exception) {

src/main/kotlin/no/nav/syfo/personstatus/infrastructure/database/repository/PersonOversiktStatusRepository.kt

+4-8
Original file line numberDiff line numberDiff line change
@@ -237,15 +237,10 @@ class PersonOversiktStatusRepository(private val database: DatabaseInterface) :
237237
}
238238
}
239239

240-
override fun getPersonerWithOppgaveAndOldEnhet(): List<Pair<PersonIdent, String?>> =
240+
override fun getPersonerWithOppgaveAndOldEnhet(): List<PersonIdent> =
241241
database.connection.use { connection ->
242242
connection.prepareStatement(GET_PERSONER_WITH_OPPGAVE_AND_OLD_ENHET).use {
243-
it.executeQuery().toList {
244-
Pair(
245-
PersonIdent(getString("fnr")),
246-
getString("tildelt_enhet"),
247-
)
248-
}
243+
it.executeQuery().toList { PersonIdent(getString("fnr")) }
249244
}
250245
}
251246

@@ -334,7 +329,8 @@ class PersonOversiktStatusRepository(private val database: DatabaseInterface) :
334329
}
335330

336331
return results.map {
337-
val personOppfolgingstilfelleVirksomhetList = personOppfolgingstilfelleVirksomhetMap[it.id]?.toPersonOppfolgingstilfelleVirksomhet() ?: emptyList()
332+
val personOppfolgingstilfelleVirksomhetList =
333+
personOppfolgingstilfelleVirksomhetMap[it.id]?.toPersonOppfolgingstilfelleVirksomhet() ?: emptyList()
338334
it.toPersonOversiktStatus(personOppfolgingstilfelleVirksomhetList = personOppfolgingstilfelleVirksomhetList)
339335
}
340336
}

src/main/kotlin/no/nav/syfo/personstatus/infrastructure/kafka/KafkaModule.kt

+7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import no.nav.syfo.pdlpersonhendelse.kafka.launchKafkaTaskPersonhendelse
1414
import no.nav.syfo.personoppgavehendelse.kafka.launchKafkaTaskPersonoppgavehendelse
1515
import no.nav.syfo.personstatus.application.PersonoversiktStatusService
1616
import no.nav.syfo.personstatus.application.OppfolgingstilfelleService
17+
import no.nav.syfo.personstatus.infrastructure.kafka.behandlendeenhet.BehandlendeEnhetConsumer
1718
import no.nav.syfo.personstatus.infrastructure.kafka.manglendemedvirkning.ManglendeMedvirkningVurderingConsumer
1819
import no.nav.syfo.personstatus.infrastructure.kafka.meroppfolging.SenOppfolgingKandidatStatusConsumer
1920
import no.nav.syfo.personstatus.infrastructure.kafka.oppfolgingsoppgave.launchOppfolgingsoppgaveConsumer
@@ -85,4 +86,10 @@ fun launchKafkaModule(
8586
applicationState = applicationState,
8687
kafkaEnvironment = environment.kafka,
8788
)
89+
90+
BehandlendeEnhetConsumer(personBehandlendeEnhetService = personBehandlendeEnhetService)
91+
.start(
92+
applicationState = applicationState,
93+
kafkaEnvironment = environment.kafka,
94+
)
8895
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package no.nav.syfo.personstatus.infrastructure.kafka.behandlendeenhet
2+
3+
import no.nav.syfo.ApplicationState
4+
import no.nav.syfo.personstatus.application.PersonBehandlendeEnhetService
5+
import no.nav.syfo.personstatus.domain.PersonIdent
6+
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaConsumerService
7+
import no.nav.syfo.personstatus.infrastructure.kafka.KafkaEnvironment
8+
import no.nav.syfo.personstatus.infrastructure.kafka.kafkaAivenConsumerConfig
9+
import no.nav.syfo.personstatus.infrastructure.kafka.launchKafkaTask
10+
import no.nav.syfo.util.configuredJacksonMapper
11+
import org.apache.kafka.clients.consumer.ConsumerConfig
12+
import org.apache.kafka.clients.consumer.ConsumerRecords
13+
import org.apache.kafka.clients.consumer.KafkaConsumer
14+
import org.apache.kafka.common.serialization.Deserializer
15+
import org.slf4j.LoggerFactory
16+
import java.time.Duration
17+
import java.time.OffsetDateTime
18+
import java.util.Properties
19+
20+
class BehandlendeEnhetConsumer(
21+
private val personBehandlendeEnhetService: PersonBehandlendeEnhetService,
22+
) : KafkaConsumerService<BehandlendeEnhetUpdateRecord> {
23+
24+
override val pollDurationInMillis: Long = 1000
25+
26+
override suspend fun pollAndProcessRecords(consumer: KafkaConsumer<String, BehandlendeEnhetUpdateRecord>) {
27+
val records = consumer.poll(Duration.ofMillis(pollDurationInMillis))
28+
if (records.count() > 0) {
29+
log.info("BehandlendeEnhetConsumer trace: Received ${records.count()} records")
30+
processRecords(records = records)
31+
consumer.commitSync()
32+
}
33+
}
34+
35+
private suspend fun processRecords(records: ConsumerRecords<String, BehandlendeEnhetUpdateRecord>) =
36+
records.requireNoNulls().map { record ->
37+
personBehandlendeEnhetService.updateBehandlendeEnhet(
38+
PersonIdent(record.value().personident)
39+
)
40+
}
41+
42+
fun start(applicationState: ApplicationState, kafkaEnvironment: KafkaEnvironment) {
43+
val consumerProperties = Properties().apply {
44+
putAll(kafkaAivenConsumerConfig(kafkaEnvironment = kafkaEnvironment))
45+
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = BehandlendeEnhetUpdateRecordDeserializer::class.java.canonicalName
46+
}
47+
launchKafkaTask(
48+
applicationState = applicationState,
49+
kafkaConsumerService = this,
50+
consumerProperties = consumerProperties,
51+
topic = TOPIC,
52+
)
53+
}
54+
55+
companion object {
56+
private val log = LoggerFactory.getLogger(this::class.java)
57+
private const val TOPIC = "teamsykefravr.behandlendeenhet"
58+
}
59+
}
60+
61+
data class BehandlendeEnhetUpdateRecord(
62+
val personident: String,
63+
val updatedAt: OffsetDateTime,
64+
)
65+
66+
class BehandlendeEnhetUpdateRecordDeserializer : Deserializer<BehandlendeEnhetUpdateRecord> {
67+
private val mapper = configuredJacksonMapper()
68+
override fun deserialize(topic: String, data: ByteArray): BehandlendeEnhetUpdateRecord =
69+
mapper.readValue(data, BehandlendeEnhetUpdateRecord::class.java)
70+
}

src/test/kotlin/no/nav/syfo/cronjob/behandlendeenhet/PersonBehandlendeEnhetCronjobSpek.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ object PersonBehandlendeEnhetCronjobSpek : Spek({
9393
val pPersonOversiktStatus = pPersonOversiktStatusList.first()
9494

9595
pPersonOversiktStatus.enhet shouldNotBeEqualTo firstEnhet
96-
pPersonOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO().enhetId
96+
pPersonOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO.enhetId
9797
pPersonOversiktStatus.tildeltEnhetUpdatedAt.shouldNotBeNull()
9898
pPersonOversiktStatus.tildeltEnhetUpdatedAt!!.toInstant()
9999
.toEpochMilli() shouldBeGreaterThan tildeltEnhetUpdatedAtBeforeUpdate.toInstant()
@@ -171,7 +171,7 @@ object PersonBehandlendeEnhetCronjobSpek : Spek({
171171

172172
val pPersonOversiktStatus = pPersonOversiktStatusList.first()
173173

174-
pPersonOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO().enhetId
174+
pPersonOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO.enhetId
175175
pPersonOversiktStatus.tildeltEnhetUpdatedAt.shouldNotBeNull()
176176
pPersonOversiktStatus.tildeltEnhetUpdatedAt shouldNotBeEqualTo tildeltEnhetUpdatedAtBeforeUpdate
177177
pPersonOversiktStatus.veilederIdent.shouldBeNull()

src/test/kotlin/no/nav/syfo/personstatus/api/v2/BehandlerdialogPersonoversiktStatusApiV2Spek.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object BehandlerdialogPersonoversiktStatusApiV2Spek : Spek({
5858
response.status shouldBeEqualTo HttpStatusCode.OK
5959
val personOversiktStatus = response.body<List<PersonOversiktStatusDTO>>().first()
6060
personOversiktStatus.fnr shouldBeEqualTo oversikthendelseBehandlerdialogSvarMottatt.personident
61-
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO().enhetId
61+
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO.enhetId
6262
personOversiktStatus.behandlerdialogUbehandlet shouldBeEqualTo true
6363
}
6464
}
@@ -87,7 +87,7 @@ object BehandlerdialogPersonoversiktStatusApiV2Spek : Spek({
8787

8888
val personOversiktStatus = response.body<List<PersonOversiktStatusDTO>>().first()
8989
personOversiktStatus.fnr shouldBeEqualTo oversikthendelseBehandlerdialogUbesvartMottatt.personident
90-
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO().enhetId
90+
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO.enhetId
9191
personOversiktStatus.behandlerdialogUbehandlet shouldBeEqualTo true
9292
}
9393
}
@@ -122,7 +122,7 @@ object BehandlerdialogPersonoversiktStatusApiV2Spek : Spek({
122122

123123
val personOversiktStatus = response.body<List<PersonOversiktStatusDTO>>().first()
124124
personOversiktStatus.fnr shouldBeEqualTo oversikthendelseBehandlerdialogSvarMottatt.personident
125-
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO().enhetId
125+
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO.enhetId
126126
personOversiktStatus.behandlerdialogUbehandlet shouldBeEqualTo true
127127
}
128128
}
@@ -151,7 +151,7 @@ object BehandlerdialogPersonoversiktStatusApiV2Spek : Spek({
151151

152152
val personOversiktStatus = response.body<List<PersonOversiktStatusDTO>>().first()
153153
personOversiktStatus.fnr shouldBeEqualTo oversikthendelseBehandlerdialogAvvistMottatt.personident
154-
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO().enhetId
154+
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO.enhetId
155155
personOversiktStatus.behandlerdialogUbehandlet shouldBeEqualTo true
156156
}
157157
}

src/test/kotlin/no/nav/syfo/personstatus/api/v2/DialogmotekandidatPersonoversiktStatusApiV2Spek.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ object DialogmotekandidatPersonoversiktStatusApiV2Spek : Spek({
8888
personOversiktStatus.shouldNotBeNull()
8989
personOversiktStatus.veilederIdent shouldBeEqualTo null
9090
personOversiktStatus.fnr shouldBeEqualTo ARBEIDSTAKER_FNR
91-
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO().enhetId
91+
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO.enhetId
9292
personOversiktStatus.motebehovUbehandlet.shouldBeNull()
9393
personOversiktStatus.oppfolgingsplanLPSBistandUbehandlet.shouldBeNull()
9494
personOversiktStatus.dialogmotesvarUbehandlet shouldBeEqualTo false
@@ -112,7 +112,7 @@ object DialogmotekandidatPersonoversiktStatusApiV2Spek : Spek({
112112
personOversiktStatus.shouldNotBeNull()
113113
personOversiktStatus.veilederIdent shouldBeEqualTo null
114114
personOversiktStatus.fnr shouldBeEqualTo ARBEIDSTAKER_FNR
115-
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO().enhetId
115+
personOversiktStatus.enhet shouldBeEqualTo behandlendeEnhetDTO.enhetId
116116
personOversiktStatus.motebehovUbehandlet.shouldBeNull()
117117
personOversiktStatus.oppfolgingsplanLPSBistandUbehandlet.shouldBeNull()
118118
personOversiktStatus.dialogmotesvarUbehandlet shouldBeEqualTo false

0 commit comments

Comments
 (0)