Skip to content

Commit 8d58f5d

Browse files
committed
Endret til å benytte KTable join
1 parent 9f3e176 commit 8d58f5d

File tree

11 files changed

+180
-101
lines changed

11 files changed

+180
-101
lines changed

apps/min-side-varsler/nais/nais-dev.yaml

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ spec:
1818
cpu: 200m
1919
memory: 256Mi
2020
env:
21+
- name: KAFKA_PERIODE_STREAM_SUFFIX
22+
value: periode-beta-v1
2123
- name: KAFKA_BEKREFTELSE_STREAM_SUFFIX
22-
value: bekreftelse-beta-v3
24+
value: bekreftelse-beta-v4
2325
- name: KAFKA_VARSEL_HENDELSE_STREAM_SUFFIX
2426
value: varsel-hendelser-beta-v1
2527
- name: KAFKA_PAW_ARBEIDSOKERPERIODE_TOPIC

apps/min-side-varsler/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/KafkaTopologyConfig.kt

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const val KAFKA_TOPICS_CONFIG = "kafka_topology_config.toml"
66

77
data class KafkaTopologyConfig(
88
val shutdownTimeout: Duration = Duration.ofSeconds(5),
9+
val periodeStreamSuffix: String,
910
val bekreftelseStreamSuffix: String,
1011
val varselHendelseStreamSuffix: String,
1112
val periodeTopic: String,

apps/min-side-varsler/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/context/ApplicationContext.kt

+39-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import no.nav.paw.arbeidssoekerregisteret.repository.PeriodeRepository
1515
import no.nav.paw.arbeidssoekerregisteret.repository.VarselRepository
1616
import no.nav.paw.arbeidssoekerregisteret.service.VarselService
1717
import no.nav.paw.arbeidssoekerregisteret.topology.bekreftelseKafkaTopology
18+
import no.nav.paw.arbeidssoekerregisteret.topology.periodeKafkaTopology
1819
import no.nav.paw.arbeidssoekerregisteret.topology.varselHendelserKafkaTopology
1920
import no.nav.paw.arbeidssoekerregisteret.utils.VarselHendelseJsonSerde
2021
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
@@ -68,6 +69,13 @@ data class ApplicationContext(
6869
varselMeldingBygger = varselMeldingBygger
6970
)
7071

72+
val periodeKafkaStreams = buildPeriodeKafkaStreams(
73+
kafkaConfig = kafkaConfig,
74+
kafkaTopicsConfig = kafkaTopicsConfig,
75+
meterRegistry = prometheusMeterRegistry,
76+
healthIndicatorRepository = healthIndicatorRepository,
77+
varselService = varselService
78+
)
7179
val bekreftelseKafkaStreams = buildBekreftelseKafkaStreams(
7280
serverConfig = serverConfig,
7381
kafkaConfig = kafkaConfig,
@@ -90,13 +98,43 @@ data class ApplicationContext(
9098
dataSource = dataSource,
9199
prometheusMeterRegistry = prometheusMeterRegistry,
92100
healthIndicatorRepository = healthIndicatorRepository,
93-
kafkaStreamsList = listOf(bekreftelseKafkaStreams),
101+
kafkaStreamsList = listOf(
102+
periodeKafkaStreams,
103+
bekreftelseKafkaStreams
104+
),
94105
kafkaStreamsShutdownTimeout = kafkaTopicsConfig.shutdownTimeout
95106
)
96107
}
97108
}
98109
}
99110

111+
private fun buildPeriodeKafkaStreams(
112+
kafkaConfig: KafkaConfig,
113+
kafkaTopicsConfig: KafkaTopologyConfig,
114+
meterRegistry: MeterRegistry,
115+
healthIndicatorRepository: HealthIndicatorRepository,
116+
varselService: VarselService
117+
): KafkaStreams {
118+
val kafkaTopology = StreamsBuilder()
119+
.periodeKafkaTopology(
120+
kafkaTopicsConfig = kafkaTopicsConfig,
121+
meterRegistry = meterRegistry,
122+
varselService = varselService
123+
).build()
124+
val kafkaStreamsFactory = KafkaStreamsFactory(kafkaTopicsConfig.periodeStreamSuffix, kafkaConfig)
125+
.withDefaultKeySerde(Serdes.Long()::class)
126+
.withDefaultValueSerde(SpecificAvroSerde::class)
127+
.withExactlyOnce()
128+
return KafkaStreams(kafkaTopology, kafkaStreamsFactory.properties)
129+
.withApplicationTerminatingExceptionHandler()
130+
.withHealthIndicatorStateListener(
131+
livenessIndicator = healthIndicatorRepository
132+
.addLivenessIndicator(LivenessHealthIndicator(initialStatus = HealthStatus.UNKNOWN)),
133+
readinessIndicator = healthIndicatorRepository
134+
.addReadinessIndicator(ReadinessHealthIndicator(initialStatus = HealthStatus.UNKNOWN))
135+
)
136+
}
137+
100138
private fun buildBekreftelseKafkaStreams(
101139
serverConfig: ServerConfig,
102140
kafkaConfig: KafkaConfig,

apps/min-side-varsler/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/service/VarselService.kt

+14-28
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ class VarselService(
4646
}
4747
}
4848

49-
fun mottaBekreftelseHendelse(hendelse: BekreftelseHendelse): List<OppgaveMelding> = transaction {
49+
fun mottaBekreftelseHendelse(value: Pair<Periode?, BekreftelseHendelse?>): List<OppgaveMelding> = transaction {
50+
val (periode, hendelse) = value
5051
when (hendelse) {
5152
is BekreftelseTilgjengelig -> {
52-
val periodeRow = periodeRepository.findByPeriodeId(hendelse.periodeId)
53-
if (periodeRow != null) {
53+
if (periode != null) {
5454
val varselRow = varselRepository.findByVarselId(hendelse.bekreftelseId)
5555
if (varselRow != null) {
5656
logger.warn(
@@ -74,15 +74,15 @@ class VarselService(
7474

7575
listOf(
7676
varselMeldingBygger.opprettOppgave(
77-
periodeRow.identitetsnummer,
77+
periode.identitetsnummer,
7878
hendelse.bekreftelseId,
7979
hendelse.gjelderTil
8080
)
8181
)
8282
}
8383
} else {
8484
logger.warn(
85-
"Fant ingen aktiv periode for hendelse {} og periode {}",
85+
"Ingen periode mottatt for hendelse {} med periode {}",
8686
hendelse.hendelseType,
8787
hendelse.periodeId
8888
)
@@ -109,44 +109,30 @@ class VarselService(
109109
}
110110

111111
is PeriodeAvsluttet -> {
112-
logger.debug(
113-
"Avlutter og sletter alle varsler for hendelse {} og periode {}",
114-
hendelse.hendelseType,
115-
hendelse.periodeId
116-
)
117-
118-
val periodeRow = periodeRepository.findByPeriodeId(hendelse.periodeId)
119112
val varselRows = varselRepository.findByPeriodeId(hendelse.periodeId)
120-
121-
if (periodeRow == null) {
122-
logger.warn(
123-
"Fant ingen periode for hendelse {} og periode {}",
124-
hendelse.hendelseType,
125-
hendelse.periodeId
126-
)
127-
meterRegistry.bekreftelseHendelseCounter("fail", hendelse)
128-
} else {
129-
meterRegistry.bekreftelseHendelseCounter("delete", hendelse)
130-
}
131-
132113
if (varselRows.isEmpty()) {
133114
logger.warn(
134115
"Fant ingen varsler for hendelse {} og periode {}",
135116
hendelse.hendelseType,
136117
hendelse.periodeId
137118
)
138-
periodeRepository.deleteByPeriodeId(hendelse.periodeId)
139119
emptyList()
140120
} else {
121+
logger.debug(
122+
"Avlutter og sletter alle varsler for hendelse {} og periode {}",
123+
hendelse.hendelseType,
124+
hendelse.periodeId
125+
)
141126
varselRepository.deleteByPeriodeId(hendelse.periodeId)
142-
periodeRepository.deleteByPeriodeId(hendelse.periodeId)
143127
varselRows.map { varselMeldingBygger.avsluttOppgave(it.varselId) }
144128
}
145129
}
146130

147131
else -> {
148-
logger.debug("Ignorerer hendelse {}", hendelse.hendelseType)
149-
meterRegistry.bekreftelseHendelseCounter("ignore", hendelse)
132+
if (hendelse != null) {
133+
logger.debug("Ignorerer hendelse {}", hendelse.hendelseType)
134+
meterRegistry.bekreftelseHendelseCounter("ignore", hendelse)
135+
}
150136
emptyList()
151137
}
152138
}

apps/min-side-varsler/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/topology/Topology.kt

+32-10
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,18 @@ import no.nav.paw.arbeidssoekerregisteret.utils.periodeCounter
1010
import no.nav.paw.arbeidssoekerregisteret.utils.varselCounter
1111
import no.nav.paw.arbeidssoekerregisteret.utils.varselHendelseCounter
1212
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
13+
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
1314
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelseSerde
1415
import no.nav.paw.config.env.RuntimeEnvironment
1516
import no.nav.paw.config.env.namespaceOrDefaultForLocal
16-
import no.nav.paw.logging.logger.buildNamedLogger
1717
import org.apache.kafka.common.serialization.Serdes
1818
import org.apache.kafka.streams.KeyValue
1919
import org.apache.kafka.streams.StreamsBuilder
2020
import org.apache.kafka.streams.kstream.Consumed
2121
import org.apache.kafka.streams.kstream.Produced
22+
import org.apache.kafka.streams.kstream.ValueJoiner
2223

23-
private val logger = buildNamedLogger("bekreftelse.varsler.topology")
24-
25-
fun StreamsBuilder.bekreftelseKafkaTopology(
26-
runtimeEnvironment: RuntimeEnvironment,
24+
fun StreamsBuilder.periodeKafkaTopology(
2725
kafkaTopicsConfig: KafkaTopologyConfig,
2826
meterRegistry: MeterRegistry,
2927
varselService: VarselService
@@ -34,11 +32,35 @@ fun StreamsBuilder.bekreftelseKafkaTopology(
3432
.foreach { _, periode ->
3533
varselService.mottaPeriode(periode)
3634
}
35+
}
36+
return this
37+
}
38+
39+
class BekreftelseValueJoiner : ValueJoiner<BekreftelseHendelse, Periode, Pair<Periode?, BekreftelseHendelse?>> {
40+
override fun apply(bekreftelse: BekreftelseHendelse?, periode: Periode?): Pair<Periode?, BekreftelseHendelse?> {
41+
return periode to bekreftelse
42+
}
43+
}
3744

38-
stream(bekreftelseHendelseTopic, Consumed.with(Serdes.Long(), BekreftelseHendelseSerde()))
39-
.peek { _, hendelse -> meterRegistry.bekreftelseHendelseCounter("read", hendelse) }
40-
.flatMapValues { _, hendelse ->
41-
varselService.mottaBekreftelseHendelse(hendelse)
45+
fun StreamsBuilder.bekreftelseKafkaTopology(
46+
runtimeEnvironment: RuntimeEnvironment,
47+
kafkaTopicsConfig: KafkaTopologyConfig,
48+
meterRegistry: MeterRegistry,
49+
varselService: VarselService
50+
): StreamsBuilder {
51+
with(kafkaTopicsConfig) {
52+
val periodeTable = table<Long, Periode>(periodeTopic)
53+
val bekreftelseStream = stream(
54+
bekreftelseHendelseTopic,
55+
Consumed.with(Serdes.Long(), BekreftelseHendelseSerde())
56+
)
57+
58+
bekreftelseStream.leftJoin(periodeTable, BekreftelseValueJoiner())
59+
.peek { _, (_, hendelse) ->
60+
if (hendelse != null) meterRegistry.bekreftelseHendelseCounter("read", hendelse)
61+
}
62+
.flatMapValues { _, value ->
63+
varselService.mottaBekreftelseHendelse(value)
4264
}
4365
.peek { _, melding -> meterRegistry.varselCounter(runtimeEnvironment, melding) }
4466
.filter { _, _ -> false } // TODO: Disable utsending av varsler
@@ -57,7 +79,7 @@ fun StreamsBuilder.varselHendelserKafkaTopology(
5779
with(kafkaTopicsConfig) {
5880
stream(tmsVarselHendelseTopic, Consumed.with(Serdes.String(), VarselHendelseJsonSerde()))
5981
.filter { _, hendelse -> hendelse.namespace == runtimeEnvironment.namespaceOrDefaultForLocal() }
60-
.peek { _, hendelse -> meterRegistry.varselHendelseCounter(hendelse) }
82+
.peek { _, hendelse -> meterRegistry.varselHendelseCounter("read", hendelse) }
6183
.filter { _, hendelse -> hendelse.varseltype == VarselType.OPPGAVE }
6284
.foreach { _, hendelse ->
6385
varselService.mottaVarselHendelse(hendelse)

0 commit comments

Comments
 (0)