Skip to content

Commit 9142925

Browse files
committed
lagre statusmeldinger som utgående meldinger
utgående meldinger blir prosessert og sendt på topic via KafkaProducerRecordProcessor
1 parent 60e33f8 commit 9142925

File tree

12 files changed

+354
-91
lines changed

12 files changed

+354
-91
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
apiVersion: kafka.nais.io/v1
2+
kind: Topic
3+
metadata:
4+
name: tiltaksokonomi.bestilling-status-v1
5+
namespace: team-mulighetsrommet
6+
labels:
7+
team: team-mulighetsrommet
8+
spec:
9+
pool: nav-dev
10+
config:
11+
cleanupPolicy: compact
12+
minimumInSyncReplicas: 2
13+
partitions: 2
14+
replication: 3
15+
retentionHours: 1
16+
segmentHours: 1
17+
acl:
18+
- team: team-mulighetsrommet
19+
application: tiltaksokonomi
20+
access: readwrite
21+
- team: team-mulighetsrommet
22+
application: mulighetsrommet-api
23+
access: read
24+
- team: team-mulighetsrommet
25+
application: mulighetsrommet-kafka-manager
26+
access: read
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
apiVersion: kafka.nais.io/v1
2+
kind: Topic
3+
metadata:
4+
name: tiltaksokonomi.faktura-status-v1
5+
namespace: team-mulighetsrommet
6+
labels:
7+
team: team-mulighetsrommet
8+
spec:
9+
pool: nav-dev
10+
config:
11+
cleanupPolicy: compact
12+
minimumInSyncReplicas: 2
13+
partitions: 2
14+
replication: 3
15+
retentionHours: 1
16+
segmentHours: 1
17+
acl:
18+
- team: team-mulighetsrommet
19+
application: tiltaksokonomi
20+
access: readwrite
21+
- team: team-mulighetsrommet
22+
application: mulighetsrommet-api
23+
access: read
24+
- team: team-mulighetsrommet
25+
application: mulighetsrommet-kafka-manager
26+
access: read

mulighetsrommet-tiltaksokonomi/build.gradle.kts

+3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ dependencies {
2929
implementation(projects.common.ktorClients)
3030
implementation(projects.common.metrics)
3131
implementation(projects.common.tiltaksokonomiClient)
32+
implementation(projects.common.tasks)
33+
34+
implementation(libs.shedlock.jdbc)
3235

3336
// Cache
3437
implementation(libs.caffeine)

mulighetsrommet-tiltaksokonomi/src/main/kotlin/no/nav/tiltak/okonomi/Application.kt

+48-14
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ package no.nav.tiltak.okonomi
33
import io.ktor.server.application.*
44
import io.ktor.server.engine.*
55
import io.ktor.server.netty.*
6+
import net.javacrumbs.shedlock.provider.jdbc.JdbcLockProvider
7+
import no.nav.common.job.leader_election.ShedLockLeaderElectionClient
8+
import no.nav.common.kafka.producer.feilhandtering.KafkaProducerRecordProcessor
9+
import no.nav.common.kafka.producer.util.KafkaProducerClientBuilder
610
import no.nav.mulighetsrommet.brreg.BrregClient
711
import no.nav.mulighetsrommet.database.Database
812
import no.nav.mulighetsrommet.database.FlywayMigrationManager
913
import no.nav.mulighetsrommet.env.NaisEnv
1014
import no.nav.mulighetsrommet.kafka.KafkaConsumerOrchestrator
15+
import no.nav.mulighetsrommet.kafka.KafkaProducerRepositoryImpl
1116
import no.nav.mulighetsrommet.ktor.plugins.configureMonitoring
1217
import no.nav.mulighetsrommet.tokenprovider.CachedTokenProvider
1318
import no.nav.tiltak.okonomi.api.configureApi
@@ -44,34 +49,39 @@ fun Application.configure(config: AppConfig) {
4449
configureMonitoring({ db.isHealthy() })
4550
configureHTTP()
4651

47-
val okonomiDb = OkonomiDatabase(db)
48-
49-
val cachedTokenProvider = CachedTokenProvider.init(config.auth.azure.audience, config.auth.azure.tokenEndpointUrl, config.auth.azure.privateJwk)
52+
val cachedTokenProvider = CachedTokenProvider.init(
53+
config.auth.azure.audience,
54+
config.auth.azure.tokenEndpointUrl,
55+
config.auth.azure.privateJwk,
56+
)
5057

5158
val oebs = OebsPoApClient(
5259
engine = config.httpClientEngine,
5360
baseUrl = config.clients.oebsPoAp.url,
5461
tokenProvider = cachedTokenProvider.withScope(config.clients.oebsPoAp.scope),
5562
)
63+
5664
val brreg = BrregClient(config.httpClientEngine)
57-
val okonomi = OkonomiService(okonomiDb, oebs, brreg)
58-
val kafka = configureKafka(config.kafka, db, okonomi)
5965

60-
configureApi(kafka, okonomiDb, okonomi)
66+
val okonomiDb = OkonomiDatabase(db)
6167

62-
monitor.subscribe(ApplicationStarted) {
63-
kafka.enableFailedRecordProcessor()
64-
}
68+
val okonomi = OkonomiService(
69+
topics = config.kafka.topics,
70+
db = okonomiDb,
71+
oebs = oebs,
72+
brreg = brreg,
73+
)
6574

66-
monitor.subscribe(ApplicationStopPreparing) {
67-
kafka.disableFailedRecordProcessor()
68-
kafka.stopPollingTopicChanges()
75+
val kafka = configureKafka(config.kafka, db, okonomi)
6976

77+
configureApi(kafka, okonomiDb, okonomi)
78+
79+
monitor.subscribe(ApplicationStopped) {
7080
db.close()
7181
}
7282
}
7383

74-
fun configureKafka(
84+
private fun Application.configureKafka(
7585
config: KafkaConfig,
7686
db: Database,
7787
okonomi: OkonomiService,
@@ -81,9 +91,33 @@ fun configureKafka(
8191
okonomi = okonomi,
8292
)
8393

84-
return KafkaConsumerOrchestrator(
94+
val producerClient = KafkaProducerClientBuilder.builder<ByteArray, ByteArray?>()
95+
.withProperties(config.producerPropertiesPreset)
96+
.build()
97+
val shedLockLeaderElectionClient = ShedLockLeaderElectionClient(JdbcLockProvider(db.getDatasource()))
98+
val producerRecordProcessor = KafkaProducerRecordProcessor(
99+
KafkaProducerRepositoryImpl(db),
100+
producerClient,
101+
shedLockLeaderElectionClient,
102+
)
103+
104+
val kafkaConsumerOrchestrator = KafkaConsumerOrchestrator(
85105
consumerPreset = config.consumerPropertiesPreset,
86106
db = db,
87107
consumers = listOf(bestilling),
88108
)
109+
110+
monitor.subscribe(ApplicationStarted) {
111+
kafkaConsumerOrchestrator.enableFailedRecordProcessor()
112+
producerRecordProcessor.start()
113+
}
114+
115+
monitor.subscribe(ApplicationStopPreparing) {
116+
kafkaConsumerOrchestrator.disableFailedRecordProcessor()
117+
kafkaConsumerOrchestrator.stopPollingTopicChanges()
118+
producerRecordProcessor.close()
119+
shedLockLeaderElectionClient.close()
120+
}
121+
122+
return kafkaConsumerOrchestrator
89123
}

mulighetsrommet-tiltaksokonomi/src/main/kotlin/no/nav/tiltak/okonomi/ApplicationConfig.kt

+7
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,16 @@ data class AppConfig(
2020

2121
data class KafkaConfig(
2222
val consumerPropertiesPreset: Properties,
23+
val producerPropertiesPreset: Properties,
24+
val topics: KafkaTopics,
2325
val clients: KafkaClients,
2426
)
2527

28+
data class KafkaTopics(
29+
val bestillingStatus: String,
30+
val fakturaStatus: String,
31+
)
32+
2633
data class KafkaClients(
2734
val okonomiBestillingConsumer: KafkaTopicConsumer.Config,
2835
)

mulighetsrommet-tiltaksokonomi/src/main/kotlin/no/nav/tiltak/okonomi/ApplicationConfigDev.kt

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ val ApplicationConfigDev = AppConfig(
3030
),
3131
kafka = KafkaConfig(
3232
consumerPropertiesPreset = KafkaPropertiesPreset.aivenDefaultConsumerProperties("team-mulighetsrommet.tiltaksokonomi.v1"),
33+
producerPropertiesPreset = KafkaPropertiesPreset.aivenByteProducerProperties("team-mulighetsrommet.tiltaksokonomi.v1"),
34+
topics = KafkaTopics(
35+
bestillingStatus = "team-mulighetsrommet.tiltaksokonomi-bestilling-status-v1",
36+
fakturaStatus = "team-mulighetsrommet.tiltaksokonomi-faktura-status-v1",
37+
),
3338
clients = KafkaClients(
3439
okonomiBestillingConsumer = KafkaTopicConsumer.Config(
3540
id = "bestilling",

mulighetsrommet-tiltaksokonomi/src/main/kotlin/no/nav/tiltak/okonomi/ApplicationConfigLocal.kt

+11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import no.nav.mulighetsrommet.ktor.respondJson
1111
import no.nav.mulighetsrommet.tokenprovider.createMockRSAKey
1212
import no.nav.tiltak.okonomi.oebs.OebsPoApClient
1313
import org.apache.kafka.common.serialization.ByteArrayDeserializer
14+
import org.apache.kafka.common.serialization.ByteArraySerializer
1415
import org.intellij.lang.annotations.Language
1516

1617
val mockClientEngine = createMockEngine {
@@ -70,6 +71,16 @@ val ApplicationConfigLocal = AppConfig(
7071
.withBrokerUrl("localhost:29092")
7172
.withDeserializers(ByteArrayDeserializer::class.java, ByteArrayDeserializer::class.java)
7273
.build(),
74+
producerPropertiesPreset = KafkaPropertiesBuilder.producerBuilder()
75+
.withBaseProperties()
76+
.withProducerId("tiltaksokonomi.v1")
77+
.withBrokerUrl("localhost:29092")
78+
.withSerializers(ByteArraySerializer::class.java, ByteArraySerializer::class.java)
79+
.build(),
80+
topics = KafkaTopics(
81+
bestillingStatus = "tiltaksokonomi-bestilling-status-v1",
82+
fakturaStatus = "tiltaksokonomi-faktura-status-v1",
83+
),
7384
clients = KafkaClients(
7485
okonomiBestillingConsumer = KafkaTopicConsumer.Config(
7586
id = "bestilling",

mulighetsrommet-tiltaksokonomi/src/main/kotlin/no/nav/tiltak/okonomi/db/OkonomiDatabase.kt

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import kotliquery.Session
44
import no.nav.mulighetsrommet.database.Database
55
import no.nav.tiltak.okonomi.db.queries.BestillingQueries
66
import no.nav.tiltak.okonomi.db.queries.FakturaQueries
7+
import no.nav.tiltak.okonomi.db.queries.KafkaProducerRecordQueries
78
import no.nav.tiltak.okonomi.db.queries.TiltakKonteringQueries
89
import javax.sql.DataSource
910

@@ -38,5 +39,6 @@ class QueryContext(val session: Session) {
3839
val bestilling = BestillingQueries(session)
3940
val faktura = FakturaQueries(session)
4041
val kontering = TiltakKonteringQueries(session)
42+
val kafkaProducerRecord = KafkaProducerRecordQueries(session)
4143
}
4244
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package no.nav.tiltak.okonomi.db.queries
2+
3+
import kotlinx.serialization.json.Json
4+
import kotliquery.Session
5+
import kotliquery.queryOf
6+
import no.nav.common.kafka.producer.util.ProducerUtils
7+
import no.nav.tiltak.okonomi.api.BestillingStatus
8+
import no.nav.tiltak.okonomi.api.FakturaStatus
9+
import org.apache.kafka.clients.producer.ProducerRecord
10+
import org.intellij.lang.annotations.Language
11+
12+
class KafkaProducerRecordQueries(private val session: Session) {
13+
14+
fun insertBestillingStatus(topic: String, status: BestillingStatus) {
15+
val record = ProducerRecord(
16+
topic,
17+
status.bestillingsnummer.toByteArray(),
18+
Json.encodeToString(status).toByteArray(),
19+
)
20+
21+
insert(record)
22+
}
23+
24+
fun insertFakturaStatus(topic: String, status: FakturaStatus) {
25+
val record = ProducerRecord(
26+
topic,
27+
status.fakturanummer.toByteArray(),
28+
Json.encodeToString(status).toByteArray(),
29+
)
30+
31+
insert(record)
32+
}
33+
34+
private fun insert(record: ProducerRecord<ByteArray?, ByteArray?>) {
35+
val storedRecord = ProducerUtils.mapToStoredRecord(record)
36+
37+
@Language("PostgreSQL")
38+
val sql = """
39+
insert into kafka_producer_record (topic, key, value, headers_json) values (?, ?, ?, ?)
40+
""".trimIndent()
41+
42+
val query = queryOf(sql, storedRecord.topic, storedRecord.key, storedRecord.value, storedRecord.headersJson)
43+
44+
session.execute(query)
45+
}
46+
}

0 commit comments

Comments
 (0)