Skip to content

Commit f909a2c

Browse files
committed
La inn støtte for å hpndtere feilretting av startet tidspunkt
1 parent c76b84a commit f909a2c

File tree

4 files changed

+157
-55
lines changed

4 files changed

+157
-55
lines changed

apps/hendelseprosessor/src/main/kotlin/no/nav/paw/arbeidssokerregisteret/app/funksjoner/Filter.kt

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1+
@file:OptIn(ExperimentalContracts::class)
2+
13
package no.nav.paw.arbeidssokerregisteret.app.funksjoner
24

35
import io.opentelemetry.api.common.AttributeKey
46
import io.opentelemetry.api.common.Attributes
57
import io.opentelemetry.api.trace.Span
6-
import io.opentelemetry.api.trace.SpanKind
7-
import io.opentelemetry.instrumentation.annotations.WithSpan
88
import no.nav.paw.arbeidssokerregisteret.app.StreamHendelse
99
import no.nav.paw.arbeidssokerregisteret.app.tilstand.InternTilstandOgHendelse
1010
import no.nav.paw.arbeidssokerregisteret.app.tilstand.GjeldeneTilstand
1111
import no.nav.paw.arbeidssokerregisteret.intern.v1.Avsluttet
1212
import no.nav.paw.arbeidssokerregisteret.intern.v1.Startet
13+
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.AvviksType
1314
import org.slf4j.LoggerFactory
15+
import kotlin.contracts.ExperimentalContracts
16+
import kotlin.contracts.contract
1417

1518
fun ignorerDuplikatStartOgStopp(
1619
@Suppress("UNUSED_PARAMETER") recordKey: Long,
@@ -19,7 +22,7 @@ fun ignorerDuplikatStartOgStopp(
1922
val (_, tilstand, hendelse) = tilstandOgHendelse
2023
return when (tilstand?.gjeldeneTilstand) {
2124
null -> hendelse.erIkke<Avsluttet>()
22-
GjeldeneTilstand.STARTET -> hendelse.erIkke<Startet>()
25+
GjeldeneTilstand.STARTET -> hendelse.erIkke<Startet>() || hendelse.erGyldigFeilrettingAvStartTid()
2326
GjeldeneTilstand.AVSLUTTET -> hendelse.erIkke<Avsluttet>()
2427
GjeldeneTilstand.AVVIST -> hendelse.erIkke<Avsluttet>()
2528
else -> false
@@ -37,7 +40,17 @@ fun ignorerDuplikatStartOgStopp(
3740
}
3841
}
3942

40-
inline fun <reified A : StreamHendelse> StreamHendelse.erIkke(): Boolean = this !is A
43+
@OptIn(ExperimentalContracts::class)
44+
inline fun <reified A : StreamHendelse> StreamHendelse.erIkke(): Boolean {
45+
contract {
46+
returns(false) implies (this@erIkke is A)
47+
}
48+
return this !is A
49+
}
50+
51+
fun Startet.erGyldigFeilrettingAvStartTid(): Boolean =
52+
metadata.tidspunktFraKilde?.avviksType == AvviksType.TIDSPUNKT_KORRIGERT &&
53+
metadata.tidspunktFraKilde?.tidspunkt != null
4154

4255
fun ignorerAvsluttetForAnnenPeriode(
4356
@Suppress("UNUSED_PARAMETER") recordKey: Long,
@@ -85,4 +98,4 @@ fun ignorerOpphoerteIdenter(
8598
)
8699
}
87100
}
88-
}
101+
}

apps/hendelseprosessor/src/main/kotlin/no/nav/paw/arbeidssokerregisteret/app/funksjoner/StartPeriode.kt

+107-25
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,120 @@
11
package no.nav.paw.arbeidssokerregisteret.app.funksjoner
22

3+
import io.opentelemetry.api.common.AttributeKey.stringKey
4+
import io.opentelemetry.api.common.Attributes
5+
import io.opentelemetry.api.trace.Span
36
import no.nav.paw.arbeidssokerregisteret.api.v4.OpplysningerOmArbeidssoeker
47
import no.nav.paw.arbeidssokerregisteret.app.tilstand.*
58
import no.nav.paw.arbeidssokerregisteret.intern.v1.Startet
9+
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.AvviksType
10+
import no.nav.paw.arbeidssokerregisteret.intern.v1.vo.TidspunktFraKilde
611
import org.slf4j.LoggerFactory
712
import java.time.Duration
813
import java.time.Instant
914
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode as ApiPeriode
15+
object StartPeriodeOtelHendelser {
16+
val starter = "startet_ny_periode"
17+
val nyttTidspunkt = "startet_nytt_tidspunkt"
18+
val ignorertFeilretting = "startet_feilretting_ignorert"
19+
val aarsakTilIgnorertKey = stringKey("arrsak_til_ignorert")
20+
}
1021

22+
fun FunctionContext<TilstandV1?, Long>.startPeriode(
23+
window: Duration,
24+
hendelse: Startet
25+
): InternTilstandOgApiTilstander {
26+
val (startetPeriode: Periode?, nyTilstand: TilstandV1, opplysninger: OpplysningerOmArbeidssoeker?) =
27+
if (tilstand?.gjeldenePeriode != null) {
28+
if (hendelse.erGyldigFeilrettingAvStartTid()) {
29+
val nyttTidspunkt = hendelse.metadata.tidspunktFraKilde?.tidspunkt ?: throw IllegalStateException("Tidspunkt fra kilde er null")
30+
oppdaterTidspunktForEksiterendePeriode(
31+
hendelseScope = scope,
32+
nyttTidspunkt = nyttTidspunkt,
33+
hendelse = hendelse,
34+
tilstand = tilstand,
35+
gjeldenePeriode = tilstand.gjeldenePeriode
36+
).let { (oppdatertPeriode, nyTilstand) ->
37+
Triple(oppdatertPeriode, nyTilstand, null)
38+
}
39+
} else {
40+
throw IllegalStateException("Gjeldene periode er ikke null. Kan ikke starte ny periode.")
41+
}
42+
} else {
43+
Span.current().addEvent(StartPeriodeOtelHendelser.starter)
44+
val (startetPeriode, nyTilstand) = opprettNyPeriode(hendelse, tilstand)
45+
val opplysninger = tilstand?.sisteOpplysningerOmArbeidssoeker
46+
?.takeIf { window.isWithinWindow(it.metadata.tidspunkt, hendelse.metadata.tidspunkt) }
47+
?.let { intern ->
48+
OpplysningerOmArbeidssoeker(
49+
intern.id,
50+
startetPeriode.id,
51+
intern.metadata.api(),
52+
intern.utdanning?.api(),
53+
intern.helse?.api(),
54+
intern.jobbsituasjon.api(),
55+
intern.annet?.api()
56+
)
57+
}
58+
Triple(startetPeriode, nyTilstand, opplysninger)
59+
}
60+
return InternTilstandOgApiTilstander(
61+
id = scope.id,
62+
tilstand = nyTilstand,
63+
nyOpplysningerOmArbeidssoekerTilstand = opplysninger,
64+
nyPeriodeTilstand = startetPeriode?.let {
65+
ApiPeriode(
66+
startetPeriode.id,
67+
startetPeriode.identitetsnummer,
68+
startetPeriode.startet.api(),
69+
startetPeriode.avsluttet?.api()
70+
)
71+
}
72+
)
73+
}
1174

12-
fun FunctionContext<TilstandV1?, Long>.startPeriode(window: Duration, hendelse: Startet): InternTilstandOgApiTilstander {
13-
if (tilstand?.gjeldenePeriode != null) throw IllegalStateException("Gjeldene periode er ikke null. Kan ikke starte ny periode.")
75+
fun oppdaterTidspunktForEksiterendePeriode(
76+
hendelseScope: HendelseScope<Long>,
77+
nyttTidspunkt: Instant,
78+
hendelse: Startet,
79+
tilstand: TilstandV1,
80+
gjeldenePeriode: Periode
81+
): Pair<Periode?, TilstandV1> {
82+
val orginaltStartTidspunkt = gjeldenePeriode.startet.tidspunkt
83+
if (nyttTidspunkt.isAfter(orginaltStartTidspunkt)) {
84+
Span.current().addEvent(
85+
StartPeriodeOtelHendelser.ignorertFeilretting,
86+
Attributes.of(
87+
StartPeriodeOtelHendelser.aarsakTilIgnorertKey,
88+
"nytt_tidspunkt_er_etter_orginalt_tidspunkt"
89+
)
90+
)
91+
return null to tilstand
92+
} else {
93+
Span.current().addEvent(StartPeriodeOtelHendelser.nyttTidspunkt)
94+
val oppdatertPeriode = gjeldenePeriode.copy(
95+
startet = gjeldenePeriode.startet.copy(
96+
tidspunktFraKilde = TidspunktFraKilde(
97+
tidspunkt = nyttTidspunkt,
98+
avviksType = AvviksType.TIDSPUNKT_KORRIGERT
99+
),
100+
utfoertAv = hendelse.metadata.utfoertAv,
101+
aarsak = hendelse.metadata.aarsak,
102+
kilde = hendelse.metadata.kilde
103+
)
104+
)
105+
val nyTilstand = tilstand.copy(
106+
gjeldenePeriode = oppdatertPeriode,
107+
hendelseScope = hendelseScope
108+
)
109+
return oppdatertPeriode to nyTilstand
110+
}
111+
112+
}
113+
114+
private fun FunctionContext<TilstandV1?, Long>.opprettNyPeriode(
115+
hendelse: Startet,
116+
tilstand: TilstandV1?
117+
): Pair<Periode, TilstandV1> {
14118
val startetPeriode = Periode(
15119
id = hendelse.hendelseId,
16120
identitetsnummer = hendelse.identitetsnummer,
@@ -35,29 +139,7 @@ fun FunctionContext<TilstandV1?, Long>.startPeriode(window: Duration, hendelse:
35139
sisteOpplysningerOmArbeidssoeker = null,
36140
forrigeOpplysningerOmArbeidssoeker = null
37141
)
38-
return InternTilstandOgApiTilstander(
39-
id = scope.id,
40-
tilstand = nyTilstand,
41-
nyOpplysningerOmArbeidssoekerTilstand = tilstand?.sisteOpplysningerOmArbeidssoeker
42-
?.takeIf { window.isWithinWindow(it.metadata.tidspunkt, hendelse.metadata.tidspunkt) }
43-
?.let { intern ->
44-
OpplysningerOmArbeidssoeker(
45-
intern.id,
46-
startetPeriode.id,
47-
intern.metadata.api(),
48-
intern.utdanning?.api(),
49-
intern.helse?.api(),
50-
intern.jobbsituasjon.api(),
51-
intern.annet?.api()
52-
)
53-
},
54-
nyPeriodeTilstand = ApiPeriode(
55-
startetPeriode.id,
56-
startetPeriode.identitetsnummer,
57-
startetPeriode.startet.api(),
58-
startetPeriode.avsluttet?.api()
59-
)
60-
)
142+
return startetPeriode to nyTilstand
61143
}
62144

63145
val windowLogger = LoggerFactory.getLogger("window_check")

apps/hendelseprosessor/src/test/kotlin/no/nav/paw/arbeidssokerregisteret/app/ApplikasjonsTest.kt

+30-24
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@ import org.apache.kafka.common.serialization.Serdes
2323
import org.apache.kafka.streams.TopologyTestDriver
2424
import java.time.Duration
2525
import java.time.Instant
26+
import java.time.temporal.ChronoUnit
2627
import java.util.*
28+
import no.nav.paw.arbeidssokerregisteret.api.v1.AvviksType as AvroAvviksType
2729
import no.nav.paw.arbeidssokerregisteret.api.v1.Beskrivelse.ER_PERMITTERT as API_ER_PERMITTERT
30+
import no.nav.paw.arbeidssokerregisteret.api.v1.BrukerType as AvroBrukerType
2831
import no.nav.paw.arbeidssokerregisteret.api.v1.JaNeiVetIkke.JA as ApiJa
2932
import no.nav.paw.arbeidssokerregisteret.api.v1.JaNeiVetIkke.NEI as ApiNei
3033

@@ -126,37 +129,35 @@ class ApplikasjonsTest : FreeSpec({
126129
periodeTopic.isEmpty shouldBe true
127130
opplysningerOmArbeidssoekerTopic.isEmpty shouldBe true
128131
}
129-
130-
"Når vi mottar en 'startet' med korrigert tidspunkt for en akrive periode skal vi oppdatere perioden".config(
131-
enabled = false // Funsjonalitet er ikke implementert
132-
) {
133-
val duplikatStart = Startet(
134-
hendelseId = UUID.randomUUID(),
135-
id = 1L,
136-
identitetsnummer = identitetnummer,
137-
metadata = Metadata(
138-
tidspunkt = Instant.now(),
139-
utfoertAv = Bruker(type = BrukerType.VEILEDER, id = "en_som_fikser_ting"),
140-
kilde = "unit-test2",
141-
aarsak = "tekniske feil i systemet",
142-
tidspunktFraKilde = TidspunktFraKilde(
143-
startet.metadata.tidspunkt - 10.days,
144-
AvviksType.TIDSPUNKT_KORRIGERT
145-
)
132+
val feilrettetStart = Startet(
133+
hendelseId = UUID.randomUUID(),
134+
id = 1L,
135+
identitetsnummer = identitetnummer,
136+
metadata = Metadata(
137+
tidspunkt = Instant.now(),
138+
utfoertAv = Bruker(type = BrukerType.VEILEDER, id = "en_som_fikser_ting"),
139+
kilde = "unit-test2",
140+
aarsak = "teknisk feil i systemet",
141+
tidspunktFraKilde = TidspunktFraKilde(
142+
startet.metadata.tidspunkt - 10.days,
143+
AvviksType.TIDSPUNKT_KORRIGERT
146144
)
147145
)
148-
eventlogTopic.pipeInput(key, duplikatStart)
146+
)
147+
"Når vi mottar en 'startet' med korrigert tidspunkt for en akrive periode skal vi oppdatere perioden" {
148+
eventlogTopic.pipeInput(key, feilrettetStart)
149149
periodeTopic.isEmpty shouldBe false
150150
val periodeKv = periodeTopic.readKeyValue()
151151
val periode = periodeKv.value
152-
periode.startet.tidspunkt shouldBe startet.metadata.tidspunkt
153-
periode.startet.tidspunktFraKilde?.avviksType shouldBe AvviksType.TIDSPUNKT_KORRIGERT
154-
periode.startet.tidspunktFraKilde?.tidspunkt shouldBe duplikatStart.metadata.tidspunktFraKilde?.tidspunkt
152+
periode.startet.tidspunkt.truncatedTo(ChronoUnit.MILLIS) shouldBe startet.metadata.tidspunkt.truncatedTo(ChronoUnit.MILLIS)
153+
periode.startet.tidspunktFraKilde?.avviksType shouldBe AvroAvviksType.TIDSPUNKT_KORRIGERT
154+
periode.startet.tidspunktFraKilde?.tidspunkt?.truncatedTo(ChronoUnit.MILLIS) shouldBe
155+
feilrettetStart.metadata.tidspunktFraKilde?.tidspunkt?.truncatedTo(ChronoUnit.MILLIS)
155156
periode.id shouldBe startet.hendelseId
156157
periode.identitetsnummer shouldBe startet.identitetsnummer
157-
periode.startet.utfoertAv.type shouldBe BrukerType.VEILEDER
158+
periode.startet.utfoertAv.type shouldBe AvroBrukerType.VEILEDER
158159
periode.startet.utfoertAv.id shouldBe "en_som_fikser_ting"
159-
periode.startet.aarsak shouldBe "tekniske feil i systemet"
160+
periode.startet.aarsak shouldBe "teknisk feil i systemet"
160161
periode.startet.kilde shouldBe "unit-test2"
161162
periodeTopic.isEmpty shouldBe true
162163
}
@@ -221,7 +222,12 @@ class ApplikasjonsTest : FreeSpec({
221222
val avsluttetPeriode = periodeTopic.readKeyValue()
222223
verifiserPeriodeOppMotStartetOgStoppetHendelser(
223224
forventetKafkaKey = key,
224-
startet = startet,
225+
startet = feilrettetStart.copy(
226+
metadata = feilrettetStart.metadata.copy(
227+
tidspunkt = startet.metadata.tidspunkt,
228+
),
229+
hendelseId = startet.hendelseId
230+
),
225231
avsluttet = stoppet,
226232
mottattRecord = avsluttetPeriode
227233
)

apps/hendelseprosessor/src/test/kotlin/no/nav/paw/arbeidssokerregisteret/app/TestUtils.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ fun verifiserApiMetadataMotInternMetadata(
7070
mottattApiMetadata.aarsak shouldBe forventedeMetadataVerdier.aarsak
7171
mottattApiMetadata.utfoertAv.type.name shouldBe forventedeMetadataVerdier.utfoertAv.type.name
7272
mottattApiMetadata.utfoertAv.id shouldBe forventedeMetadataVerdier.utfoertAv.id
73-
mottattApiMetadata.tidspunktFraKilde?.tidspunkt shouldBe forventedeMetadataVerdier.tidspunktFraKilde?.tidspunkt
73+
mottattApiMetadata.tidspunktFraKilde?.tidspunkt?.truncatedTo(ChronoUnit.MILLIS) shouldBe
74+
forventedeMetadataVerdier.tidspunktFraKilde?.tidspunkt?.truncatedTo(ChronoUnit.MILLIS)
7475
when (forventedeMetadataVerdier.tidspunktFraKilde?.avviksType) {
7576
FORSINKELSE -> mottattApiMetadata.tidspunktFraKilde?.avviksType shouldBe AvviksType.FORSINKELSE
7677
RETTING -> mottattApiMetadata.tidspunktFraKilde?.avviksType shouldBe AvviksType.RETTING

0 commit comments

Comments
 (0)