Skip to content

Commit 6bdf1f8

Browse files
committed
wip
1 parent c8c4a34 commit 6bdf1f8

File tree

10 files changed

+1411
-1214
lines changed

10 files changed

+1411
-1214
lines changed

src/main/kotlin/no/nav/syfo/Environment.kt

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ data class Environment(
2121
val syketilfelleScope: String = getEnvVar("SYKETILLFELLE_SCOPE"),
2222
val electorPath: String = getEnvVar("ELECTOR_PATH"),
2323
val dbUrl: String = getEnvVar("NAIS_DATABASE_JDBC_URL"),
24+
val syncTopic: String = "team-esyfo.dinesykmeldte-lest-topic",
2425
)
2526

2627
fun getEnvVar(varName: String, defaultValue: String? = null) =

src/main/kotlin/no/nav/syfo/kafka/KafkaUtils.kt

+1-4
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,10 @@ class KafkaUtils {
3333
}
3434
}
3535

36-
fun <T> createKafkaProducer(applicationName: String, clientId: String): KafkaProducer<String, T> =
36+
fun <T> createKafkaProducer(clientId: String): KafkaProducer<String, T> =
3737
KafkaProducer(
3838
KafkaUtils.getKafkaConfig(clientId)
3939
.toProducerConfig(
40-
"$applicationName-producer",
4140
JacksonKafkaSerializer::class,
4241
StringSerializer::class,
4342
),
@@ -57,13 +56,11 @@ fun Properties.toConsumerConfig(
5756
}
5857

5958
fun Properties.toProducerConfig(
60-
groupId: String,
6159
valueSerializer: KClass<out Serializer<out Any>>,
6260
keySerializer: KClass<out Serializer<out Any>> = StringSerializer::class
6361
): Properties =
6462
Properties().also {
6563
it.putAll(this)
66-
it[ConsumerConfig.GROUP_ID_CONFIG] = groupId
6764
it[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = valueSerializer.java
6865
it[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = keySerializer.java
6966
}

src/main/kotlin/no/nav/syfo/minesykmeldte/MineSykmeldteService.kt

+54-9
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,18 @@ import no.nav.syfo.sykmelding.db.SykmeldtDbModel
4040
import no.nav.syfo.sykmelding.model.sykmelding.arbeidsgiver.BehandlerAGDTO
4141
import no.nav.syfo.sykmelding.model.sykmelding.arbeidsgiver.SykmeldingsperiodeAGDTO
4242
import no.nav.syfo.sykmelding.model.sykmelding.model.PeriodetypeDTO
43+
import no.nav.syfo.synchendelse.SyncHendelse
44+
import no.nav.syfo.synchendelse.SyncHendelseType
4345
import no.nav.syfo.util.logger
4446
import no.nav.syfo.util.securelog
4547
import no.nav.syfo.util.toFormattedNameString
48+
import org.apache.kafka.clients.producer.KafkaProducer
49+
import org.apache.kafka.clients.producer.ProducerRecord
4650

4751
class MineSykmeldteService(
4852
private val mineSykmeldteDb: MineSykmeldteDb,
53+
private val kafkaProducer: KafkaProducer<String, SyncHendelse>,
54+
private val syncTopic: String,
4955
) {
5056
private val log = logger()
5157

@@ -118,7 +124,7 @@ class MineSykmeldteService(
118124
Dialogmote(
119125
hendelseId = it.hendelseId,
120126
tekst = it.tekst
121-
?: throw IllegalStateException("Dialogmøte uten tekst: ${it.id}"),
127+
?: throw IllegalStateException("Dialogmøte uten tekst: ${it.id}"),
122128
mottatt = it.mottatt,
123129
)
124130
}
@@ -151,7 +157,7 @@ class MineSykmeldteService(
151157
.mapNotNull { sykmeldt ->
152158
mapNullableSoknad(
153159
sykmeldt,
154-
getHendlersforSoknad(hendelserMap, sykmeldtEntry, sykmeldt)
160+
getHendlersforSoknad(hendelserMap, sykmeldtEntry, sykmeldt),
155161
)
156162
}
157163
}
@@ -171,19 +177,54 @@ class MineSykmeldteService(
171177
}
172178

173179
suspend fun markSykmeldingRead(sykmeldingId: String, lederFnr: String): Boolean {
174-
return mineSykmeldteDb.markSykmeldingRead(sykmeldingId, lederFnr)
180+
val ids = mineSykmeldteDb.markSykmeldingRead(sykmeldingId, lederFnr)
181+
kafkaProducer
182+
.send(ProducerRecord(syncTopic, SyncHendelse(ids, type = SyncHendelseType.SYKMELDING)))
183+
.get()
184+
return ids.isNotEmpty()
175185
}
176186

177187
suspend fun markSoknadRead(soknadId: String, lederFnr: String): Boolean {
178-
return mineSykmeldteDb.markSoknadRead(soknadId, lederFnr)
188+
val ids = mineSykmeldteDb.markSoknadRead(soknadId, lederFnr)
189+
kafkaProducer
190+
.send(ProducerRecord(syncTopic, SyncHendelse(ids, type = SyncHendelseType.SOKNAD)))
191+
.get()
192+
return ids.isNotEmpty()
179193
}
180194

181195
suspend fun markHendelseRead(hendelseId: UUID, lederFnr: String): Boolean {
182-
return mineSykmeldteDb.markHendelseRead(hendelseId, lederFnr)
196+
val ids = mineSykmeldteDb.markHendelseRead(hendelseId, lederFnr)
197+
kafkaProducer
198+
.send(ProducerRecord(syncTopic, SyncHendelse(ids, type = SyncHendelseType.HENDELSE)))
199+
.get()
200+
return ids.isNotEmpty()
183201
}
184202

185203
suspend fun markAllSykmeldingerAndSoknaderRead(lederFnr: String) {
186-
mineSykmeldteDb.markAllSykmeldingAndSoknadAsRead(lederFnr)
204+
val sykmeldingIdsAndSoknadIds = mineSykmeldteDb.markAllSykmeldingAndSoknadAsRead(lederFnr)
205+
val sykmeldingJob =
206+
kafkaProducer.send(
207+
ProducerRecord(
208+
syncTopic,
209+
SyncHendelse(
210+
sykmeldingIdsAndSoknadIds.sykmeldingIds,
211+
type = SyncHendelseType.SYKMELDING,
212+
),
213+
),
214+
)
215+
val soknadJob =
216+
kafkaProducer.send(
217+
ProducerRecord(
218+
syncTopic,
219+
SyncHendelse(
220+
sykmeldingIdsAndSoknadIds.soknadIds,
221+
type = SyncHendelseType.SOKNAD,
222+
),
223+
),
224+
)
225+
226+
sykmeldingJob.get()
227+
soknadJob.get()
187228
}
188229
}
189230

@@ -223,7 +264,7 @@ private fun Pair<SykmeldtDbModel, SoknadDbModel>.toSoknad(): Soknad {
223264
tom = soknadDb.tom,
224265
lest = soknadDb.lest,
225266
sendtDato = soknadDb.sykepengesoknad.sendtArbeidsgiver
226-
?: throw IllegalStateException("Søknad uten sendt dato: ${soknadDb.soknadId}"),
267+
?: throw IllegalStateException("Søknad uten sendt dato: ${soknadDb.soknadId}"),
227268
sendtTilNavDato = soknadDb.sykepengesoknad.sendtNav,
228269
korrigererSoknadId = soknadDb.sykepengesoknad.korrigerer,
229270
korrigertBySoknadId = soknadDb.sykepengesoknad.korrigertAv,
@@ -238,7 +279,7 @@ private fun Pair<SykmeldtDbModel, SoknadDbModel>.toSoknad(): Soknad {
238279
sp.tag != "TIL_SLUTT" &&
239280
sp.tag != "VAER_KLAR_OVER_AT"
240281
}
241-
.map { it.toSporsmal() }
282+
.map { it.toSporsmal() },
242283
)
243284
}
244285

@@ -330,19 +371,22 @@ private fun SykmeldingsperiodeAGDTO.toSykmeldingPeriode(): Periode =
330371
)
331372
},
332373
)
374+
333375
PeriodetypeDTO.AVVENTENDE ->
334376
Avventende(
335377
this.fom,
336378
this.tom,
337379
tilrettelegging = this.innspillTilArbeidsgiver,
338380
)
381+
339382
PeriodetypeDTO.BEHANDLINGSDAGER ->
340383
Behandlingsdager(
341384
this.fom,
342385
this.tom,
343386
this.behandlingsdager
344387
?: throw IllegalStateException("Behandlingsdager without behandlingsdager"),
345388
)
389+
346390
PeriodetypeDTO.GRADERT -> {
347391
val gradering = this.gradert
348392
requireNotNull(gradering) { "Gradert periode uten gradert-data burde ikke eksistere" }
@@ -354,6 +398,7 @@ private fun SykmeldingsperiodeAGDTO.toSykmeldingPeriode(): Periode =
354398
gradering.reisetilskudd,
355399
)
356400
}
401+
357402
PeriodetypeDTO.REISETILSKUDD ->
358403
Reisetilskudd(
359404
this.fom,
@@ -380,7 +425,7 @@ fun safeParseHendelseEnum(oppgavetype: String): HendelseType {
380425
HendelseType.valueOf(oppgavetype)
381426
} catch (e: Exception) {
382427
securelog.error(
383-
"Ukjent oppgave av type $oppgavetype er ikke håndtert i applikasjonen. Mangler vi implementasjon?"
428+
"Ukjent oppgave av type $oppgavetype er ikke håndtert i applikasjonen. Mangler vi implementasjon?",
384429
)
385430
HendelseType.UNKNOWN
386431
}

src/main/kotlin/no/nav/syfo/minesykmeldte/db/MineSykmeldteDb.kt

+46-22
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ import no.nav.syfo.sykmelding.db.SykmeldingDbModel
1515
import no.nav.syfo.sykmelding.db.SykmeldtDbModel
1616
import no.nav.syfo.util.objectMapper
1717

18+
data class SykmeldingAndSoknadIds(
19+
val sykmeldingIds: List<String>,
20+
val soknadIds: List<String>,
21+
)
22+
1823
class MineSykmeldteDb(private val database: DatabaseInterface) {
1924
suspend fun getMineSykmeldte(lederFnr: String): List<MinSykmeldtDbModel> =
2025
withContext(Dispatchers.IO) {
@@ -140,7 +145,7 @@ class MineSykmeldteDb(private val database: DatabaseInterface) {
140145
}
141146
}
142147

143-
suspend fun markSykmeldingRead(sykmeldingId: String, lederFnr: String): Boolean =
148+
suspend fun markSykmeldingRead(sykmeldingId: String, lederFnr: String): List<String> =
144149
withContext(Dispatchers.IO) {
145150
database.connection.use { connection ->
146151
val updated =
@@ -157,14 +162,14 @@ class MineSykmeldteDb(private val database: DatabaseInterface) {
157162
.use { ps ->
158163
ps.setString(1, sykmeldingId)
159164
ps.setString(2, lederFnr)
160-
ps.executeUpdate() > 0
165+
ps.executeQuery().toList { getString("sykmelding_id") }
161166
}
162167
connection.commit()
163168
updated
164169
}
165170
}
166171

167-
suspend fun markSoknadRead(soknadId: String, lederFnr: String): Boolean =
172+
suspend fun markSoknadRead(soknadId: String, lederFnr: String): List<String> =
168173
withContext(Dispatchers.IO) {
169174
database.connection.use { connection ->
170175
val updated =
@@ -181,14 +186,14 @@ class MineSykmeldteDb(private val database: DatabaseInterface) {
181186
.use { ps ->
182187
ps.setString(1, soknadId)
183188
ps.setString(2, lederFnr)
184-
ps.executeUpdate() > 0
189+
ps.executeQuery().toList { getString("soknad_id") }
185190
}
186191
connection.commit()
187192
updated
188193
}
189194
}
190195

191-
suspend fun markHendelseRead(hendelseId: UUID, lederFnr: String): Boolean =
196+
suspend fun markHendelseRead(hendelseId: UUID, lederFnr: String): List<String> =
192197
withContext(Dispatchers.IO) {
193198
database.connection.use { connection ->
194199
val updated =
@@ -200,40 +205,59 @@ class MineSykmeldteDb(private val database: DatabaseInterface) {
200205
WHERE (narmesteleder.pasient_fnr = hendelser.pasient_fnr AND narmesteleder.orgnummer = hendelser.orgnummer)
201206
AND hendelser.hendelse_id = ?
202207
AND narmesteleder.leder_fnr = ?
208+
returning hendelser.hendelse_id
203209
""",
204210
)
205211
.use { ps ->
206212
ps.setTimestamp(1, Timestamp.from(Instant.now()))
207213
ps.setObject(2, hendelseId)
208214
ps.setString(3, lederFnr)
209-
ps.executeUpdate() > 0
215+
ps.executeQuery().toList { getString("hendelse_id") }
210216
}
211217
connection.commit()
212218
updated
213219
}
214220
}
215221

216-
suspend fun markAllSykmeldingAndSoknadAsRead(lederFnr: String) =
217-
withContext(Dispatchers.IO) {
222+
suspend fun markAllSykmeldingAndSoknadAsRead(lederFnr: String): SykmeldingAndSoknadIds {
223+
return withContext(Dispatchers.IO) {
218224
database.connection.use { connection ->
219-
connection
220-
.prepareStatement(
221-
"""
222-
update sykmelding s set lest = TRUE
223-
from narmesteleder nl where s.pasient_fnr = nl.pasient_fnr and nl.leder_fnr = ?;
224-
update soknad s set lest = TRUE
225-
from narmesteleder nl where s.pasient_fnr = nl.pasient_fnr and nl.leder_fnr = ?;
225+
val sykmeldingIds =
226+
connection
227+
.prepareStatement(
228+
"""
229+
update sykmelding s set lest = TRUE
230+
from narmesteleder nl where s.pasient_fnr = nl.pasient_fnr and nl.leder_fnr = ? returning s.sykmelding_id;
231+
226232
"""
227-
.trimIndent(),
228-
)
229-
.use { ps ->
230-
ps.setString(1, lederFnr)
231-
ps.setString(2, lederFnr)
232-
ps.executeUpdate()
233-
}
233+
.trimIndent()
234+
)
235+
.use { ps ->
236+
ps.setString(1, lederFnr)
237+
ps.executeQuery().toList { getString("sykmelding_id") }
238+
}
239+
val soknadIds =
240+
connection
241+
.prepareStatement(
242+
"""
243+
update soknad s set lest = TRUE
244+
from narmesteleder nl where s.pasient_fnr = nl.pasient_fnr and nl.leder_fnr = ? returning s.soknad_id;
245+
"""
246+
.trimIndent()
247+
)
248+
.use { ps ->
249+
ps.setString(1, lederFnr)
250+
ps.executeQuery().toList { getString("soknad_id") }
251+
}
252+
234253
connection.commit()
254+
SykmeldingAndSoknadIds(
255+
sykmeldingIds = sykmeldingIds,
256+
soknadIds = soknadIds,
257+
)
235258
}
236259
}
260+
}
237261
}
238262

239263
private fun ResultSet.toHendelseDbModels() =

src/main/kotlin/no/nav/syfo/plugins/DependencyInjection.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import no.nav.syfo.soknad.db.SoknadDb
4242
import no.nav.syfo.syketilfelle.client.SyfoSyketilfelleClient
4343
import no.nav.syfo.sykmelding.SykmeldingService
4444
import no.nav.syfo.sykmelding.db.SykmeldingDb
45+
import no.nav.syfo.synchendelse.SyncHendelse
4546
import no.nav.syfo.util.AuthConfiguration
4647
import no.nav.syfo.util.getWellKnownTokenX
4748
import no.nav.syfo.virksomhet.api.VirksomhetService
@@ -66,10 +67,15 @@ fun Application.configureDependencies() {
6667
databaseModule(),
6768
servicesModule(),
6869
commonKafkaConsumer(),
70+
hendelseKafkaProducer(),
6971
)
7072
}
7173
}
7274

75+
fun hendelseKafkaProducer() = module {
76+
single { createKafkaProducer<SyncHendelse>("dinesykmeldte-sync-hendelse-producer") }
77+
}
78+
7379
private fun servicesModule() = module {
7480
single { AccessTokenClient(env().aadAccessTokenUrl, env().clientId, env().clientSecret, get()) }
7581
single { PdlClient(get(), env().pdlGraphqlPath) }
@@ -84,12 +90,12 @@ private fun servicesModule() = module {
8490
single {
8591
val nlResponseProducer =
8692
NLResponseProducer(
87-
createKafkaProducer(env().applicationName, "syfo-narmesteleder-producer"),
93+
createKafkaProducer("syfo-narmesteleder-producer"),
8894
env().nlResponseTopic
8995
)
9096
NarmestelederService(NarmestelederDb(get()), nlResponseProducer)
9197
}
92-
single { MineSykmeldteService(MineSykmeldteDb(get())) }
98+
single { MineSykmeldteService(MineSykmeldteDb(get()), get(), env().syncTopic) }
9399
single { LeaderElection(get(), env().electorPath) }
94100
single { DeleteDataService(DeleteDataDb(get()), get()) }
95101
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package no.nav.syfo.synchendelse
2+
3+
enum class SyncHendelseType {
4+
SYKMELDING,
5+
SOKNAD,
6+
HENDELSE
7+
}
8+
9+
enum class SyncSource {
10+
ESYFO,
11+
TSM,
12+
}
13+
14+
data class SyncHendelse(
15+
val id: List<String>,
16+
val type: SyncHendelseType,
17+
val source: SyncSource = SyncSource.ESYFO,
18+
)

0 commit comments

Comments
 (0)