Skip to content

Commit 428d622

Browse files
committed
La til basis oppsett for kafka-key-maintenance modul
1 parent 075e2ab commit 428d622

File tree

15 files changed

+671
-0
lines changed

15 files changed

+671
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
2+
3+
plugins {
4+
kotlin("jvm")
5+
application
6+
id("com.google.cloud.tools.jib")
7+
}
8+
9+
val baseImage: String by project
10+
val jvmMajorVersion: String by project
11+
12+
val image: String? by project
13+
14+
dependencies {
15+
implementation(project(":domain:interne-hendelser"))
16+
implementation(project(":domain:pdl-aktoer-schema"))
17+
implementation(project(":lib:kafka"))
18+
implementation(project(":lib:hoplite-config"))
19+
implementation(project(":lib:kafka-key-generator-client"))
20+
21+
implementation(libs.arrow.core.core)
22+
implementation(libs.bundles.ktorServerWithNettyAndMicrometer)
23+
implementation(libs.ktor.server.cors)
24+
implementation(libs.ktor.server.swagger)
25+
implementation(libs.ktor.server.callId)
26+
implementation(libs.ktor.server.statusPages)
27+
implementation(libs.ktor.server.contentNegotiation)
28+
implementation(libs.ktor.client.core)
29+
implementation(libs.ktor.client.contentNegotiation)
30+
implementation(libs.ktor.serialization.jvm)
31+
implementation(libs.ktor.serialization.jackson)
32+
implementation(libs.nav.security.tokenValidationKtorV2)
33+
implementation(libs.nav.common.tokenClient)
34+
implementation(libs.nav.common.tokenClient)
35+
implementation(libs.nav.common.auditLog)
36+
implementation(libs.nav.common.log)
37+
38+
implementation(libs.micrometer.registryPrometheus)
39+
implementation(libs.opentelemetry.annotations)
40+
implementation(libs.hoplite.core)
41+
implementation(libs.hoplite.toml)
42+
implementation(libs.nav.common.auditLog)
43+
implementation(libs.nav.common.log)
44+
implementation(libs.logbackClassic)
45+
implementation(libs.logstashLogbackEncoder)
46+
implementation(libs.kafka.clients)
47+
implementation(libs.avro.core)
48+
implementation(libs.avro.kafkaSerializer)
49+
implementation(libs.exposed.core)
50+
implementation(libs.exposed.jdbc)
51+
implementation(libs.exposed.javaTime)
52+
implementation(libs.database.postgres.driver)
53+
implementation(libs.database.flyway.core)
54+
implementation(libs.database.flyway.postgres)
55+
56+
implementation(libs.jackson.datatypeJsr310)
57+
implementation(libs.jackson.kotlin)
58+
59+
testImplementation(libs.test.junit5.runner)
60+
testImplementation(libs.test.kotest.assertionsCore)
61+
testImplementation(libs.test.mockk.core)
62+
testImplementation(libs.test.testContainers.core)
63+
testImplementation(libs.test.testContainers.postgresql)
64+
testImplementation(libs.ktor.server.testJvm)
65+
}
66+
67+
java {
68+
toolchain {
69+
languageVersion.set(JavaLanguageVersion.of(jvmMajorVersion))
70+
}
71+
}
72+
73+
application {
74+
mainClass.set("no.nav.paw.arbeidssokerregisteret.backup.StartAppKt")
75+
}
76+
77+
tasks.withType<KotlinCompile>().configureEach {
78+
compilerOptions {
79+
allWarningsAsErrors = true
80+
}
81+
}
82+
83+
jib {
84+
from.image = "$baseImage:$jvmMajorVersion"
85+
to.image = "${image ?: project.name}:${project.version}"
86+
container {
87+
jvmFlags = listOf("-XX:ActiveProcessorCount=4", "-XX:+UseZGC", "-XX:+ZGenerational")
88+
}
89+
}
90+
91+
tasks.named<Test>("test") {
92+
useJUnitPlatform()
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
version: '3.9'
2+
3+
services:
4+
database:
5+
image: postgres:16
6+
ports:
7+
- "5432:5432"
8+
environment:
9+
- POSTGRES_USER=admin
10+
- POSTGRES_PASSWORD=admin
11+
- POSTGRES_DB=appdb
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package no.nav.paw.kafkakeymaintenance
2+
3+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
4+
import org.slf4j.Logger
5+
import java.util.concurrent.atomic.AtomicBoolean
6+
7+
@JvmRecord
8+
data class ApplicationContext(
9+
val consumerVersion: Int,
10+
val logger: Logger,
11+
val meterRegistry: PrometheusMeterRegistry,
12+
val shutdownCalled: AtomicBoolean = AtomicBoolean(false),
13+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package no.nav.paw.kafkakeymaintenance.db
2+
3+
import javax.sql.DataSource
4+
5+
data class DatabaseConfig(
6+
val name: String,
7+
val host: String,
8+
val port: Int,
9+
val username: String,
10+
val password: String,
11+
val jdbc: String?
12+
) {
13+
val url get() = jdbc ?: "jdbc:postgresql://$host:$port/$name?user=$username&password=$password"
14+
15+
override fun toString(): String {
16+
return if (jdbc != null) {
17+
"DatabaseConfig(name='$name', jdbcUrl='***')"
18+
} else {
19+
"DatabaseConfig(name='$name', host='$host', port=$port, username='$username', password='${if (password.isBlank()) "null" else "***"}')"
20+
}
21+
}
22+
}
23+
24+
fun DatabaseConfig.dataSource(): DataSource {
25+
val dataSource = org.postgresql.ds.PGSimpleDataSource()
26+
dataSource.setURL(url)
27+
return dataSource
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package no.nav.paw.kafkakeymaintenance.db
2+
3+
import org.flywaydb.core.Flyway
4+
5+
fun migrateDatabase(dataSource: javax.sql.DataSource) {
6+
Flyway.configure()
7+
.dataSource(dataSource)
8+
.locations("classpath:db/migration")
9+
.baselineOnMigrate(true)
10+
.load()
11+
.migrate()
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package no.nav.paw.kafkakeymaintenance.kafka
2+
3+
import java.time.Instant
4+
5+
@JvmInline
6+
value class Topic(val value: String)
7+
fun topic(value: String): Topic = Topic(value)
8+
9+
@JvmInline
10+
value class PartitionTime(val value: Instant)
11+
fun partitionTime(value: Instant): PartitionTime = PartitionTime(value)
12+
13+
@JvmInline
14+
value class PartitionLastUpdated(val value: Instant)
15+
fun partitionLastUpdated(value: Instant): PartitionLastUpdated = PartitionLastUpdated(value)
16+
17+
fun hwmInfo(topic: Topic, partition: Int, offset: Long): HwmInfo = Hwm(
18+
topic = topic,
19+
partition = partition,
20+
offset = offset
21+
)
22+
23+
fun topicPartitionMetadata(
24+
topic: Topic,
25+
partition: Int,
26+
offset: Long,
27+
time: PartitionTime,
28+
lastUpdated: PartitionLastUpdated
29+
): TopicPartitionMetadata = topicPartitionMetadata(
30+
topic = topic,
31+
partition = partition,
32+
offset = offset,
33+
time = time,
34+
lastUpdated = lastUpdated
35+
)
36+
37+
fun progressInfo(time: PartitionTime, lastUpdated: PartitionLastUpdated): ProgressInfo =
38+
SimpleProgressInfo(time, lastUpdated)
39+
40+
interface HwmInfo {
41+
val topic: Topic
42+
val partition: Int
43+
val offset: Long
44+
}
45+
46+
interface ProgressInfo {
47+
val time: PartitionTime
48+
val lastUpdated: PartitionLastUpdated
49+
}
50+
51+
private data class Hwm(
52+
override val topic: Topic,
53+
override val partition: Int,
54+
override val offset: Long
55+
): HwmInfo
56+
57+
data class TopicPartitionMetadata(
58+
override val topic: Topic,
59+
override val partition: Int,
60+
override val offset: Long,
61+
override val time: PartitionTime,
62+
override val lastUpdated: PartitionLastUpdated
63+
): HwmInfo, ProgressInfo
64+
65+
private class SimpleProgressInfo(
66+
override val time: PartitionTime,
67+
override val lastUpdated: PartitionLastUpdated
68+
): ProgressInfo
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package no.nav.paw.kafkakeymaintenance.kafka
2+
3+
import no.nav.paw.kafkakeymaintenance.ApplicationContext
4+
import org.jetbrains.exposed.sql.*
5+
import org.jetbrains.exposed.sql.SqlExpressionBuilder.wrap
6+
import java.time.Instant
7+
8+
data class TransactionContext(
9+
val appContext: ApplicationContext,
10+
val transaction: Transaction
11+
)
12+
13+
fun txContext(applicationContext: ApplicationContext): Transaction.() -> TransactionContext = {
14+
TransactionContext(applicationContext, this)
15+
}
16+
17+
fun TransactionContext.initHwm(topic: Topic, partitionCount: Int) {
18+
(0 until partitionCount)
19+
.filter { getHwm(topic, it) == null }
20+
.forEach { insertHwm(topic, it, -1) }
21+
}
22+
23+
fun TransactionContext.getHwm(topic: Topic, partition: Int): Long? =
24+
HwmTable
25+
.selectAll()
26+
.where {
27+
(HwmTable.topic eq topic.value) and
28+
(HwmTable.partition eq partition) and
29+
(HwmTable.version eq appContext.consumerVersion)
30+
}
31+
.singleOrNull()?.get(HwmTable.offset)
32+
33+
fun TransactionContext.getTopicPartitionMetadata(topic: Topic, partition: Int): TopicPartitionMetadata? =
34+
HwmTable
35+
.selectAll()
36+
.where {
37+
(HwmTable.topic eq topic.value) and
38+
(HwmTable.partition eq partition) and
39+
(HwmTable.version eq appContext.consumerVersion)
40+
}
41+
.singleOrNull()
42+
?.let {
43+
TopicPartitionMetadata(
44+
topic = Topic(it[HwmTable.topic]),
45+
partition = it[HwmTable.partition],
46+
offset = it[HwmTable.offset],
47+
time = partitionTime(it[HwmTable.time]),
48+
lastUpdated = partitionLastUpdated(it[HwmTable.lastUpdated])
49+
)
50+
}
51+
52+
fun TransactionContext.getAllHwms(): List<HwmInfo> =
53+
HwmTable
54+
.selectAll()
55+
.where { HwmTable.version eq appContext.consumerVersion }
56+
.map {
57+
hwmInfo(
58+
topic = Topic(it[HwmTable.topic]),
59+
partition = it[HwmTable.partition],
60+
offset = it[HwmTable.offset]
61+
)
62+
}
63+
64+
fun TransactionContext.insertHwm(
65+
topic: Topic,
66+
partition: Int,
67+
offset: Long,
68+
time: Instant = Instant.EPOCH,
69+
lastUpdated: Instant = Instant.EPOCH
70+
) {
71+
HwmTable.insert {
72+
it[HwmTable.topic] = topic.value
73+
it[version] = appContext.consumerVersion
74+
it[HwmTable.partition] = partition
75+
it[HwmTable.offset] = offset
76+
it[HwmTable.time] = time
77+
it[HwmTable.lastUpdated] = lastUpdated
78+
}
79+
}
80+
81+
fun TransactionContext.updateHwm(
82+
topic: Topic,
83+
partition: Int,
84+
offset: Long,
85+
time: Instant,
86+
lastUpdated: Instant
87+
): Boolean =
88+
HwmTable
89+
.update({
90+
(HwmTable.topic eq topic.value) and
91+
(HwmTable.partition eq partition) and
92+
(HwmTable.offset less offset) and
93+
(HwmTable.version eq appContext.consumerVersion)
94+
}
95+
) {
96+
it[HwmTable.offset] = offset
97+
it[HwmTable.time] = HwmTable.time.max(time)
98+
it[HwmTable.lastUpdated] = lastUpdated
99+
} == 1
100+
101+
infix fun <T> ExpressionWithColumnType<T>.max(t: T): Greatest<T> = Greatest(
102+
expr1 = this,
103+
expr2 = wrap(t),
104+
columnType = this.columnType
105+
)
106+
107+
class Greatest<T>(
108+
expr1: Expression<T>,
109+
expr2: Expression<T>,
110+
columnType: IColumnType<T & Any>
111+
) : CustomFunction<T>(
112+
functionName = "greatest",
113+
columnType = columnType,
114+
expr1,
115+
expr2
116+
)

0 commit comments

Comments
 (0)