Skip to content

Commit ab0bedd

Browse files
committed
Lagt til Kafka Streams
1 parent ef17f3e commit ab0bedd

File tree

14 files changed

+110
-19
lines changed

14 files changed

+110
-19
lines changed

Diff for: apps/bekreftelse-hendelsefilter/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ dependencies {
1212
implementation(project(":lib:error-handling"))
1313
implementation(project(":lib:kafka-streams"))
1414
implementation(project(":domain:bekreftelsesmelding-avro-schema"))
15+
implementation(project(":domain:bekreftelse-paavegneav-avro-schema"))
1516

1617
// Server
1718
implementation(libs.bundles.ktorServerWithNettyAndMicrometer)

Diff for: apps/bekreftelse-hendelsefilter/nais/nais-dev.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ spec:
1313
value: "paw.arbeidssoker-bekreftelse-beta-v3"
1414
- name: KAFKA_BEKREFTELSE_PAAVEGNEAV_TARGET_TOPIC
1515
value: "paw.arbeidssoker-bekreftelse-paavegneav-beta-v2"
16-
- name: KAFKA_BEKREFTELSE_TEAMDAGPENGER_SOURCE_TOPIC
16+
- name: KAFKA_TEAMDAGPENGER_BEKREFTELSE_SOURCE_TOPIC
1717
value: "paw.arbeidssoker-bekreftelse-teamdagpenger-beta-v1"
18-
- name: KAFKA_BEKREFTELSE_PAAVEGNEAV_TEAMDAGPENGER_SOURCE_TOPIC
18+
- name: KAFKA_TEAMDAGPENGER_BEKREFTELSE_PAAVEGNEAV_SOURCE_TOPIC
1919
value: "paw.arbeidssoker-bekreftelse-paavegneav-teamdagpenger-beta-v1"
2020
replicas:
2121
min: 2

Diff for: apps/bekreftelse-hendelsefilter/src/main/kotlin/no/nav/paw/bekreftelse/config/ApplicationConfig.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ data class ApplicationConfig(
77
)
88

99
data class KafkaTopologyConfig(
10+
val applicationIdSuffix: String,
1011
val bekreftelseTargetTopic: String,
1112
val bekreftelsePaaVegneAvTargetTopic: String,
12-
val bekreftelseTeamDagpengerSourceTopic: String,
13-
val bekreftelsePaaVegneAvTeamDagpengerSourceTopic: String
13+
val teamDagpengerBekreftelseSourceTopic: String,
14+
val teamDagpengerBekreftelsePaaVegneAvSourceTopic: String
1415
)

Diff for: apps/bekreftelse-hendelsefilter/src/main/kotlin/no/nav/paw/bekreftelse/context/ApplicationContext.kt

+19-4
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,51 @@
11
package no.nav.paw.bekreftelse.context
22

3+
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
34
import io.micrometer.prometheusmetrics.PrometheusConfig
45
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
56
import no.nav.paw.bekreftelse.config.APPLICATION_CONFIG
67
import no.nav.paw.bekreftelse.config.ApplicationConfig
78
import no.nav.paw.bekreftelse.config.SERVER_CONFIG
89
import no.nav.paw.bekreftelse.config.ServerConfig
10+
import no.nav.paw.bekreftelse.topology.buildKafkaStreamsList
911
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
1012
import no.nav.paw.health.repository.HealthIndicatorRepository
11-
import no.nav.paw.kafka.config.KAFKA_CONFIG_WITH_SCHEME_REG
13+
import no.nav.paw.kafka.config.KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG
1214
import no.nav.paw.kafka.config.KafkaConfig
15+
import no.nav.paw.kafka.factory.KafkaStreamsFactory
16+
import org.apache.kafka.common.serialization.Serdes
17+
import org.apache.kafka.streams.KafkaStreams
1318

1419
data class ApplicationContext(
1520
val serverConfig: ServerConfig,
1621
val applicationConfig: ApplicationConfig,
1722
val prometheusMeterRegistry: PrometheusMeterRegistry,
18-
val healthIndicatorRepository: HealthIndicatorRepository
23+
val healthIndicatorRepository: HealthIndicatorRepository,
24+
val kafkaStreamsList: List<KafkaStreams>
1925
) {
2026
companion object {
2127
fun create(): ApplicationContext {
2228
val serverConfig = loadNaisOrLocalConfiguration<ServerConfig>(SERVER_CONFIG)
2329
val applicationConfig = loadNaisOrLocalConfiguration<ApplicationConfig>(APPLICATION_CONFIG)
24-
val kafkaConfig = loadNaisOrLocalConfiguration<KafkaConfig>(KAFKA_CONFIG_WITH_SCHEME_REG)
30+
val kafkaConfig = loadNaisOrLocalConfiguration<KafkaConfig>(KAFKA_STREAMS_CONFIG_WITH_SCHEME_REG)
2531

2632
val prometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
2733
val healthIndicatorRepository = HealthIndicatorRepository()
2834

35+
val applicationIdSuffix = applicationConfig.kafkaTopology.applicationIdSuffix
36+
val kafkaStreamsFactory = KafkaStreamsFactory(applicationIdSuffix, kafkaConfig)
37+
.withDefaultKeySerde(Serdes.Long()::class)
38+
.withDefaultValueSerde(SpecificAvroSerde::class)
39+
.addPrometheusMeterRegistryToConfig(prometheusMeterRegistry)
40+
41+
val kafkaStreams = kafkaStreamsFactory.buildKafkaStreamsList(applicationConfig, healthIndicatorRepository)
42+
2943
return ApplicationContext(
3044
serverConfig,
3145
applicationConfig,
3246
prometheusMeterRegistry,
33-
healthIndicatorRepository
47+
healthIndicatorRepository,
48+
kafkaStreams
3449
)
3550
}
3651
}

Diff for: apps/bekreftelse-hendelsefilter/src/main/kotlin/no/nav/paw/bekreftelse/plugins/Kafka.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import no.nav.paw.kafka.plugin.KafkaStreamsPlugin
99
fun Application.configureKafka(applicationContext: ApplicationContext) {
1010
with(applicationContext) {
1111
install(KafkaStreamsPlugin) {
12-
kafkaStreams = listOf()
12+
kafkaStreams = kafkaStreamsList
1313
}
1414
}
1515
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package no.nav.paw.bekreftelse.topology
2+
3+
import no.nav.paw.bekreftelse.config.ApplicationConfig
4+
import no.nav.paw.error.handler.withApplicationTerminatingExceptionHandler
5+
import no.nav.paw.health.listener.withHealthIndicatorStateListener
6+
import no.nav.paw.health.model.LivenessHealthIndicator
7+
import no.nav.paw.health.model.ReadinessHealthIndicator
8+
import no.nav.paw.health.repository.HealthIndicatorRepository
9+
import no.nav.paw.kafka.factory.KafkaStreamsFactory
10+
import org.apache.kafka.streams.KafkaStreams
11+
import org.apache.kafka.streams.StreamsConfig
12+
import org.apache.kafka.streams.Topology
13+
14+
fun KafkaStreamsFactory.buildKafkaStreamsList(
15+
applicationConfig: ApplicationConfig,
16+
healthIndicatorRepository: HealthIndicatorRepository
17+
): List<KafkaStreams> {
18+
val bekreftelseTopologyList = buildBekreftelseKafkaTopologyList(applicationConfig)
19+
val bekreftelsePaaVegneAvTopologyList = buildBekreftelsePaaVegneAvKafkaTopologyList(applicationConfig)
20+
val kafkaTopologyList = bekreftelseTopologyList + bekreftelsePaaVegneAvTopologyList
21+
return kafkaTopologyList.map { buildKafkaStreams(healthIndicatorRepository, it) }
22+
}
23+
24+
fun KafkaStreamsFactory.buildKafkaStreams(
25+
healthIndicatorRepository: HealthIndicatorRepository,
26+
kafkaTopology: Topology
27+
): KafkaStreams {
28+
return KafkaStreams(
29+
kafkaTopology,
30+
StreamsConfig(properties)
31+
)
32+
.withHealthIndicatorStateListener(
33+
healthIndicatorRepository.addLivenessIndicator(LivenessHealthIndicator()),
34+
healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator())
35+
)
36+
.withApplicationTerminatingExceptionHandler()
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package no.nav.paw.bekreftelse.topology
2+
3+
import no.nav.paw.bekreftelse.config.ApplicationConfig
4+
import no.nav.paw.bekreftelse.melding.v1.Bekreftelse
5+
import org.apache.kafka.streams.StreamsBuilder
6+
import org.apache.kafka.streams.Topology
7+
8+
fun buildBekreftelseKafkaTopologyList(applicationConfig: ApplicationConfig): List<Topology> =
9+
with(applicationConfig.kafkaTopology) {
10+
listOf(
11+
buildKafkaTopology(teamDagpengerBekreftelseSourceTopic, bekreftelseTargetTopic)
12+
)
13+
}
14+
15+
fun buildBekreftelsePaaVegneAvKafkaTopologyList(applicationConfig: ApplicationConfig): List<Topology> =
16+
with(applicationConfig.kafkaTopology) {
17+
listOf(
18+
buildKafkaTopology(teamDagpengerBekreftelsePaaVegneAvSourceTopic, bekreftelseTargetTopic)
19+
)
20+
}
21+
22+
private fun buildKafkaTopology(
23+
sourceTopic: String,
24+
targetTopic: String
25+
): Topology = StreamsBuilder().apply {
26+
stream<Long, Bekreftelse>(sourceTopic)
27+
.to(targetTopic)
28+
}.build()
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[kafkaTopology]
2+
applicationIdSuffix = "v1"
23
bekreftelseTargetTopic = "paw.arbeidssoker-bekreftelse-v1"
34
bekreftelsePaaVegneAvTargetTopic = "paw.arbeidssoker-bekreftelse-paavegneav-v1"
4-
bekreftelseTeamDagpengerSourceTopic = "paw.arbeidssoker-bekreftelse-teamdagpenger-v1"
5-
bekreftelsePaaVegneAvTeamDagpengerSourceTopic = "paw.arbeidssoker-bekreftelse-paavegneav-teamdagpenger-v1"
5+
teamDagpengerBekreftelseSourceTopic = "paw.arbeidssoker-bekreftelse-teamdagpenger-v1"
6+
teamDagpengerBekreftelsePaaVegneAvSourceTopic = "paw.arbeidssoker-bekreftelse-paavegneav-teamdagpenger-v1"

Diff for: apps/bekreftelse-hendelsefilter/src/main/resources/local/kafka_configuration_schemareg.toml

-3
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
brokers = "localhost:9092"
2+
applicationIdPrefix = "paw.paw-arbeidssoekerregisteret-bekreftelse-hendelsefilter"
3+
4+
[schemaRegistry]
5+
url = "http://localhost:8082"
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[kafkaTopology]
2+
applicationIdSuffix = "v1"
23
bekreftelseTargetTopic = "${KAFKA_BEKREFTELSE_TARGET_TOPIC}"
34
bekreftelsePaaVegneAvTargetTopic = "${KAFKA_BEKREFTELSE_PAAVEGNEAV_TARGET_TOPIC}"
4-
bekreftelseTeamDagpengerSourceTopic = "${KAFKA_BEKREFTELSE_TEAMDAGPENGER_SOURCE_TOPIC}"
5-
bekreftelsePaaVegneAvTeamDagpengerSourceTopic = "${KAFKA_BEKREFTELSE_PAAVEGNEAV_TEAMDAGPENGER_SOURCE_TOPIC}"
5+
teamDagpengerBekreftelseSourceTopic = "${KAFKA_TEAMDAGPENGER_BEKREFTELSE_SOURCE_TOPIC}"
6+
teamDagpengerBekreftelsePaaVegneAvSourceTopic = "${KAFKA_TEAMDAGPENGER_BEKREFTELSE_PAAVEGNEAV_SOURCE_TOPIC}"

Diff for: docker/kafka/docker-compose.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,12 @@ services:
5353
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssokerperioder-v1 --replication-factor 1 --partitions 1
5454
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.opplysninger-om-arbeidssoeker-v1 --replication-factor 1 --partitions 1
5555
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-profilering-v1 --replication-factor 1 --partitions 1
56-
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-bekreftelse-v1 --replication-factor 1 --partitions 1
5756
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-hendelseslogg-v1 --replication-factor 1 --partitions 1
57+
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-bekreftelse-v1 --replication-factor 1 --partitions 1
58+
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-bekreftelse-paavegneav-v1 --replication-factor 1 --partitions 1
5859
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-bekreftelse-hendelseslogg-v1 --replication-factor 1 --partitions 1
60+
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-bekreftelse-teamdagpenger-v1 --replication-factor 1 --partitions 1
61+
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-bekreftelse-paavegneav-teamdagpenger-v1 --replication-factor 1 --partitions 1
5962
6063
echo -e 'Successfully created the following topics:'
6164
kafka-topics --bootstrap-server kafka:29092 --list

Diff for: lib/error-handling/src/main/kotlin/no/nav/paw/error/handler/KafkaExceptionHandler.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ import org.slf4j.LoggerFactory
77

88
private val logger: Logger = LoggerFactory.getLogger("no.nav.paw.logger.error.kafka")
99

10-
fun KafkaStreams.withApplicationTerminatingExceptionHandler() {
10+
fun KafkaStreams.withApplicationTerminatingExceptionHandler(): KafkaStreams {
1111
this.setUncaughtExceptionHandler(createApplicationTerminatingExceptionHandler())
12+
return this
1213
}
1314

1415
fun createApplicationTerminatingExceptionHandler() = StreamsUncaughtExceptionHandler { throwable ->

Diff for: lib/error-handling/src/main/kotlin/no/nav/paw/health/listener/KafkaStreamsStatusListener.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ private val logger = LoggerFactory.getLogger("no.nav.paw.logger.health.kafka")
1010
fun KafkaStreams.withHealthIndicatorStateListener(
1111
livenessIndicator: LivenessHealthIndicator,
1212
readinessIndicator: ReadinessHealthIndicator
13-
) {
13+
): KafkaStreams {
1414
this.setStateListener(createHealthIndicatorStateListener(livenessIndicator, readinessIndicator))
15+
return this
1516
}
1617

1718
fun createHealthIndicatorStateListener(

0 commit comments

Comments
 (0)