Skip to content

Commit 23f7ab3

Browse files
committed
Lagt til konsumering av varsel hendelser
1 parent c7c820f commit 23f7ab3

28 files changed

+626
-328
lines changed

apps/bekreftelse-min-side-oppgaver/build.gradle.kts

+5-2
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ val jvmMajorVersion: String by project
88

99
dependencies {
1010
implementation(project(":lib:hoplite-config"))
11+
implementation(project(":lib:logging"))
12+
implementation(project(":lib:serialization"))
13+
implementation(project(":lib:error-handling"))
14+
implementation(project(":lib:metrics"))
1115
implementation(project(":lib:kafka-streams"))
1216
implementation(project(":domain:bekreftelse-interne-hendelser"))
13-
implementation(project(":lib:error-handling"))
1417
implementation(project(":domain:main-avro-schema"))
1518
implementation(libs.jackson.datatypeJsr310)
1619
implementation(libs.jackson.kotlin)
@@ -25,7 +28,7 @@ dependencies {
2528
implementation(libs.avro.kafkaStreamsSerde)
2629
implementation(libs.bundles.ktorServerWithNettyAndMicrometer)
2730
implementation(libs.micrometer.registryPrometheus)
28-
implementation("no.nav.tms.varsel:kotlin-builder:1.1.0")
31+
implementation("no.nav.tms.varsel:kotlin-builder:1.1.0") // TODO: Legg til i libs.versions.toml
2932
testImplementation(libs.kafka.streams.test)
3033
testImplementation(libs.bundles.testLibsWithUnitTesting)
3134
testImplementation(project(":lib:kafka-key-generator-client"))

apps/bekreftelse-min-side-oppgaver/nais/nais-dev.yaml

+9-9
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,17 @@ metadata:
66
labels:
77
team: paw
88
spec:
9-
azure:
10-
application:
11-
enabled: true
129
image: {{ image }}
1310
port: 8080
1411
replicas:
1512
min: 1
1613
max: 1
14+
resources:
15+
limits:
16+
memory: 512Mi
17+
requests:
18+
memory: 256Mi
19+
cpu: 25m
1720
env:
1821
- name: KAFKA_KEYS_SCOPE
1922
value: api://dev-gcp.paw.paw-kafka-key-generator/.default
@@ -25,6 +28,9 @@ spec:
2528
readiness:
2629
path: /internal/isReady
2730
initialDelay: 10
31+
azure:
32+
application:
33+
enabled: true
2834
prometheus:
2935
enabled: true
3036
path: /internal/metrics
@@ -35,12 +41,6 @@ spec:
3541
kafka:
3642
pool: {{ kafka }}
3743
streams: true
38-
resources:
39-
limits:
40-
memory: 512Mi
41-
requests:
42-
memory: 256Mi
43-
cpu: 25m
4444
accessPolicy:
4545
outbound:
4646
rules:

apps/bekreftelse-min-side-oppgaver/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/bekreftelse/minsideoppgaver/GenericJacksonSerde.kt

-26
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,52 @@
11
package no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver
22

3-
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
4-
import io.ktor.server.application.*
5-
import io.ktor.server.engine.*
6-
import io.ktor.server.metrics.micrometer.*
7-
import io.ktor.server.netty.*
8-
import io.ktor.server.routing.*
9-
import io.micrometer.core.instrument.binder.MeterBinder
10-
import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics
11-
import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics
3+
import io.ktor.server.application.Application
4+
import io.ktor.server.engine.addShutdownHook
5+
import io.ktor.server.engine.embeddedServer
6+
import io.ktor.server.netty.Netty
7+
import io.ktor.server.routing.routing
128
import io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics
13-
import io.micrometer.core.instrument.binder.system.ProcessorMetrics
14-
import io.micrometer.prometheusmetrics.PrometheusConfig
15-
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
16-
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.applogic.applicationTopology
17-
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.applogic.varselbygger.VarselMeldingBygger
18-
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.config.kafkaTopics
19-
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.config.minSideVarselKonfigurasjon
20-
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.vo.InternTilstand
21-
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.vo.StateStoreName
22-
import no.nav.paw.config.env.currentRuntimeEnvironment
23-
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
24-
import no.nav.paw.kafka.config.KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG
25-
import no.nav.paw.kafka.config.KafkaConfig
26-
import no.nav.paw.kafka.factory.KafkaStreamsFactory
27-
import no.nav.paw.error.handler.withApplicationTerminatingExceptionHandler
28-
import no.nav.paw.health.listener.withHealthIndicatorStateListener
29-
import no.nav.paw.health.model.HealthStatus
30-
import no.nav.paw.health.model.LivenessHealthIndicator
31-
import no.nav.paw.health.model.ReadinessHealthIndicator
32-
import no.nav.paw.health.repository.HealthIndicatorRepository
9+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.context.ApplicationContext
10+
import no.nav.paw.config.env.appNameOrDefaultForLocal
3311
import no.nav.paw.health.route.healthRoutes
34-
import org.apache.kafka.common.serialization.Serdes
35-
import org.apache.kafka.streams.KafkaStreams
36-
import org.apache.kafka.streams.StreamsBuilder
37-
import org.apache.kafka.streams.state.Stores
38-
import org.slf4j.LoggerFactory
12+
import no.nav.paw.kafka.plugin.installKafkaStreamsPlugins
13+
import no.nav.paw.logging.logger.buildApplicationLogger
14+
import no.nav.paw.metrics.plugin.installMetricsPlugin
15+
import no.nav.paw.metrics.route.metricsRoutes
3916

40-
const val APP_SUFFIX = "beta-v2"
41-
val STATE_STORE_NAME: StateStoreName = StateStoreName("internal_state")
42-
43-
val appLogger = LoggerFactory.getLogger("main")
4417
fun main() {
45-
appLogger.info("Starter...")
46-
val kafkaTopics = kafkaTopics()
47-
val streamsBuilder = StreamsBuilder()
48-
.addStateStore(
49-
Stores.keyValueStoreBuilder(
50-
Stores.persistentKeyValueStore(STATE_STORE_NAME.value),
51-
Serdes.UUID(),
52-
jacksonSerde<InternTilstand>()
53-
)
54-
)
55-
val kafkaConfig = loadNaisOrLocalConfiguration<KafkaConfig>(KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG)
56-
val streamsFactory = KafkaStreamsFactory(APP_SUFFIX, kafkaConfig)
57-
.withDefaultKeySerde(Serdes.Long()::class)
58-
.withDefaultValueSerde(SpecificAvroSerde::class)
59-
.withExactlyOnce()
60-
val stream = KafkaStreams(streamsBuilder.applicationTopology(
61-
varselMeldingBygger = VarselMeldingBygger(
62-
runtimeEnvironment = currentRuntimeEnvironment,
63-
minSideVarselKonfigurasjon = minSideVarselKonfigurasjon()
64-
),
65-
kafkaTopics = kafkaTopics,
66-
stateStoreName = STATE_STORE_NAME), streamsFactory.properties
67-
)
68-
stream.withApplicationTerminatingExceptionHandler()
18+
val logger = buildApplicationLogger
19+
20+
val applicationContext = ApplicationContext.build()
21+
val appName = applicationContext.serverConfig.runtimeEnvironment.appNameOrDefaultForLocal()
6922

70-
val registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
71-
val healthIndicatorRepository = HealthIndicatorRepository()
72-
val livenessHealthIndicator = healthIndicatorRepository.addLivenessIndicator(LivenessHealthIndicator(initialStatus = HealthStatus.UNKNOWN))
73-
val readinessHealthIndicator = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator(initialStatus = HealthStatus.UNKNOWN))
74-
stream.withHealthIndicatorStateListener(
75-
livenessIndicator = livenessHealthIndicator,
76-
readinessIndicator = readinessHealthIndicator
77-
)
78-
appLogger.info("Starter KafkaStreams...")
79-
stream.start()
80-
appLogger.info("Starter Ktor...")
81-
embeddedServer(Netty, port = 8080) {
82-
configureMetrics(
83-
registry,
84-
KafkaStreamsMetrics(stream)
23+
with(applicationContext.serverConfig) {
24+
logger.info("Starter $appName med hostname $host og port $port")
25+
26+
embeddedServer(factory = Netty, port = port) {
27+
module(applicationContext)
28+
}.apply {
29+
addShutdownHook {
30+
logger.info("Avslutter $appName")
31+
stop(gracePeriodMillis, timeoutMillis)
32+
}
33+
start(wait = true)
34+
}
35+
}
36+
}
37+
38+
fun Application.module(applicationContext: ApplicationContext) {
39+
with(applicationContext) {
40+
installMetricsPlugin(
41+
meterRegistry = prometheusMeterRegistry,
42+
additionalMeterBinders = kafkaStreamsList.map { KafkaStreamsMetrics(it) }
43+
)
44+
installKafkaStreamsPlugins(
45+
kafkaStreamsList = kafkaStreamsList,
8546
)
8647
routing {
8748
healthRoutes(healthIndicatorRepository)
49+
metricsRoutes(prometheusMeterRegistry)
8850
}
89-
}.start(wait = true)
90-
appLogger.info("Avslutter...")
91-
}
92-
93-
fun Application.configureMetrics(
94-
registry: PrometheusMeterRegistry,
95-
vararg additionalBinders: MeterBinder
96-
) {
97-
install(MicrometerMetrics) {
98-
this.registry = registry
99-
meterBinders = listOf(
100-
JvmMemoryMetrics(),
101-
JvmGcMetrics(),
102-
ProcessorMetrics()
103-
) + additionalBinders
10451
}
10552
}

apps/bekreftelse-min-side-oppgaver/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/bekreftelse/minsideoppgaver/applogic/GenererOppgaveMelding.kt

-39
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.applogic
2+
3+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.applogic.varselbygger.OppgaveMelding
4+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.applogic.varselbygger.VarselMeldingBygger
5+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.vo.InternTilstand
6+
import no.nav.paw.arbeidssokerregisteret.api.v1.Periode
7+
import no.nav.paw.bekreftelse.internehendelser.BekreftelseHendelse
8+
import no.nav.paw.bekreftelse.internehendelser.BekreftelseMeldingMottatt
9+
import no.nav.paw.bekreftelse.internehendelser.BekreftelseTilgjengelig
10+
import no.nav.paw.bekreftelse.internehendelser.PeriodeAvsluttet
11+
12+
fun Periode.asInternTilstand(
13+
gjeldeneTilstand: InternTilstand?,
14+
): InternTilstand =
15+
when {
16+
gjeldeneTilstand == null -> {
17+
InternTilstand(
18+
periodeId = id,
19+
ident = identitetsnummer,
20+
bekreftelser = emptyList()
21+
)
22+
}
23+
24+
gjeldeneTilstand.ident == identitetsnummer -> gjeldeneTilstand
25+
else -> gjeldeneTilstand.copy(ident = identitetsnummer)
26+
}
27+
28+
fun InternTilstand.asOppgaveMeldinger(
29+
hendelse: BekreftelseHendelse?,
30+
varselMeldingBygger: VarselMeldingBygger
31+
): Pair<InternTilstand?, List<OppgaveMelding>> =
32+
when (hendelse) {
33+
is PeriodeAvsluttet -> {
34+
null to bekreftelser
35+
.map(varselMeldingBygger::avsluttOppgave)
36+
}
37+
38+
is BekreftelseMeldingMottatt -> {
39+
if (bekreftelser.contains(hendelse.bekreftelseId)) {
40+
this.copy(bekreftelser = bekreftelser - hendelse.bekreftelseId) to
41+
listOf(varselMeldingBygger.avsluttOppgave(hendelse.bekreftelseId))
42+
43+
} else {
44+
this to emptyList()
45+
}
46+
}
47+
48+
is BekreftelseTilgjengelig -> {
49+
this.copy(bekreftelser = bekreftelser + hendelse.bekreftelseId) to
50+
listOf(varselMeldingBygger.opprettOppgave(ident, hendelse))
51+
52+
}
53+
54+
else -> this to emptyList()
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.applogic
2+
3+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.vo.VarselHendelse
4+
import no.nav.paw.serialization.kafka.JacksonSerde
5+
6+
class VarselHendelseJsonSerde : JacksonSerde<VarselHendelse>(VarselHendelse::class)

apps/bekreftelse-min-side-oppgaver/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/bekreftelse/minsideoppgaver/applogic/LagrePeriodeInfo.kt

-21
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.applogic
2+
3+
import io.micrometer.core.instrument.MeterRegistry
4+
import io.micrometer.core.instrument.Tag
5+
import io.micrometer.core.instrument.Tags
6+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.minsideoppgaver.vo.VarselHendelse
7+
8+
private const val METRIC_PREFIX = "paw_bekreftelse_min_side_oppgaver"
9+
10+
fun MeterRegistry.varselHendelseCounter(
11+
hendelse: VarselHendelse
12+
) {
13+
counter(
14+
"${METRIC_PREFIX}_antall_operasjoner",
15+
Tags.of(
16+
Tag.of("source", "kafka"),
17+
Tag.of("target", "metrics"),
18+
Tag.of("action", "count"),
19+
Tag.of("event.topic", "min-side.aapen-varsel-hendelse-v1"),
20+
Tag.of("event.name", hendelse.eventName.value),
21+
Tag.of("event.status", hendelse.status?.value ?: "null"),
22+
Tag.of("event.type", hendelse.varseltype.value),
23+
Tag.of("event.channel", hendelse.kanal?.value ?: "null"),
24+
Tag.of("event.renotifikasjon", hendelse.renotifikasjon?.toString() ?: "null"),
25+
Tag.of("event.sendtSomBatch", hendelse.sendtSomBatch?.toString() ?: "null"),
26+
Tag.of("event.namespace", hendelse.namespace),
27+
Tag.of("event.app", hendelse.appnavn),
28+
)
29+
).increment()
30+
}

0 commit comments

Comments
 (0)