@@ -3,6 +3,7 @@ package no.nav.paw.bekreftelsetjeneste.topology
3
3
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
4
4
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
5
5
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde
6
+ import no.nav.paw.bekreftelse.internehendelser.RegisterGracePeriodeUtloeptEtterEksternInnsamling
6
7
import no.nav.paw.bekreftelse.paavegneav.v1.PaaVegneAv
7
8
import no.nav.paw.bekreftelsetjeneste.config.BekreftelseKonfigurasjon
8
9
import no.nav.paw.bekreftelsetjeneste.config.KafkaTopologyConfig
@@ -25,6 +26,7 @@ import java.time.Instant
25
26
import java.util.*
26
27
27
28
fun StreamsBuilder.byggBekreftelsePaaVegneAvStroem (
29
+ deaktiverUtmeldingVedStopp : Boolean = false,
28
30
bekreftelseKonfigurasjon : BekreftelseKonfigurasjon ,
29
31
registry : PrometheusMeterRegistry ,
30
32
kafkaTopologyConfig : KafkaTopologyConfig ,
@@ -52,7 +54,7 @@ fun StreamsBuilder.byggBekreftelsePaaVegneAvStroem(
52
54
bekreftelseTilstand = bekreftelseTilstand,
53
55
paaVegneAvTilstand = paaVegneAvTilstand,
54
56
paaVegneAvHendelse = message
55
- ).map { handling ->
57
+ ).mapNotNull { handling ->
56
58
when (handling) {
57
59
is SendHendelse -> handling.hendelse
58
60
is SkrivPaaVegneAvTilstand -> paaVegneAvTilstandStateStore.put(handling.id, handling.value)
@@ -62,6 +64,7 @@ fun StreamsBuilder.byggBekreftelsePaaVegneAvStroem(
62
64
}.filterIsInstance<BekreftelseHendelse >()
63
65
}
64
66
.flatMapValues { _, value -> value }
67
+ .filter { _, value -> (! deaktiverUtmeldingVedStopp || value !is RegisterGracePeriodeUtloeptEtterEksternInnsamling ) }
65
68
.peek { _, value -> registry.tellBekreftelseUtgaaendeHendelse(value) }
66
69
.to(
67
70
kafkaTopologyConfig.bekreftelseHendelseloggTopic,
0 commit comments