Skip to content

Commit 7e414b1

Browse files
committedMar 19, 2025··
La til telling av utgående hendelser fra periode stream
1 parent 28169a3 commit 7e414b1

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed
 

‎apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/PeriodeStream.kt

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package no.nav.paw.bekreftelsetjeneste.topology
22

3+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
34
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
45
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
56
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde
67
import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet
78
import no.nav.paw.bekreftelsetjeneste.config.ApplicationConfig
9+
import no.nav.paw.bekreftelsetjeneste.metrics.tellBekreftelseUtgaaendeHendelse
810
import no.nav.paw.bekreftelsetjeneste.tilstand.BekreftelseTilstand
911
import no.nav.paw.bekreftelsetjeneste.tilstand.opprettBekreftelseTilstand
1012
import no.nav.paw.kafka.processor.genericProcess
@@ -19,6 +21,7 @@ import java.util.*
1921
import kotlin.jvm.optionals.getOrNull
2022

2123
fun StreamsBuilder.buildPeriodeStream(
24+
prometheusMeterRegistry: PrometheusMeterRegistry,
2225
applicationConfig: ApplicationConfig,
2326
kafaKeysClient: KafkaKeysClient
2427
) {
@@ -38,7 +41,9 @@ fun StreamsBuilder.buildPeriodeStream(
3841
periode.avsluttet() -> Action.DeleteStateAndEmit(arbeidsoekerId, periode)
3942
currentState == null -> Action.UpdateState(
4043
opprettBekreftelseTilstand(
41-
kafkaPartition = requireNotNull(recordMetadata().getOrNull()?.partition()) { "Forventer at kafka.partition er satt"},
44+
kafkaPartition = requireNotNull(
45+
recordMetadata().getOrNull()?.partition()
46+
) { "Forventer at kafka.partition er satt" },
4247
id = arbeidsoekerId,
4348
key = kafkaKey,
4449
periode = periode
@@ -73,7 +78,8 @@ fun StreamsBuilder.buildPeriodeStream(
7378
Action.DoNothing -> {}
7479
is Action.UpdateState -> keyValueStore.put(action.state.periode.periodeId, action.state)
7580
}
76-
}.to(bekreftelseHendelseloggTopic, Produced.with(Serdes.Long(), BekreftelseHendelseSerde()))
81+
}.peek { _, value -> prometheusMeterRegistry.tellBekreftelseUtgaaendeHendelse(value) }
82+
.to(bekreftelseHendelseloggTopic, Produced.with(Serdes.Long(), BekreftelseHendelseSerde()))
7783
}
7884
}
7985

‎apps/bekreftelse-tjeneste/src/main/kotlin/no/nav/paw/bekreftelsetjeneste/topology/Topology.kt

+5-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ typealias PaaVegneAvTilstandStateStore = KeyValueStore<UUID, PaaVegneAvTilstand>
1717
fun StreamsBuilder.buildTopology(
1818
applicationContext: ApplicationContext
1919
): Topology {
20-
buildPeriodeStream(applicationContext.applicationConfig, applicationContext.kafkaKeysClient)
20+
buildPeriodeStream(
21+
prometheusMeterRegistry = applicationContext.prometheusMeterRegistry,
22+
applicationConfig = applicationContext.applicationConfig,
23+
kafaKeysClient = applicationContext.kafkaKeysClient
24+
)
2125
buildBekreftelseStream(
2226
prometheusMeterRegistry = applicationContext.prometheusMeterRegistry,
2327
applicationConfig = applicationContext.applicationConfig,

0 commit comments

Comments
 (0)
Please sign in to comment.