Skip to content

Commit 04a9a1d

Browse files
committed
La til backup app for hendelseloggen
1 parent d5af853 commit 04a9a1d

38 files changed

+1207
-93
lines changed

Diff for: .github/workflows/hendelselogg-backup.yaml

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
name: Hendelselogg-backup
2+
3+
on:
4+
push:
5+
paths:
6+
- 'apps/hendelselogg-backup/**'
7+
- 'lib/kafka/**'
8+
- 'lib/hoplite-config/**'
9+
- '.github/workflows/hendelselogg-backup.yaml'
10+
- 'gradle/**'
11+
- 'settings.gradle.kts'
12+
- 'gradle.properties'
13+
- 'gradlew'
14+
env:
15+
IMAGE: europe-north1-docker.pkg.dev/${{ vars.NAIS_MANAGEMENT_PROJECT_ID }}/paw/paw-arbeidssoekerregisteret-hendelselogg-backup
16+
jobs:
17+
build:
18+
name: Build and push Docker container
19+
runs-on: ubuntu-latest
20+
permissions:
21+
contents: read
22+
id-token: write
23+
packages: write
24+
outputs:
25+
image: ${{ steps.docker-build-push.outputs.image }}
26+
steps:
27+
- uses: actions/checkout@v3
28+
- uses: actions/setup-java@v3
29+
with:
30+
java-version: 21
31+
distribution: temurin
32+
cache: gradle
33+
- name: Set version
34+
run: echo "VERSION=$(date +'%y.%m.%d').${{ github.run_number }}-${{ github.run_attempt }}" >> $GITHUB_ENV
35+
- name: Login GAR
36+
uses: nais/login@v0
37+
with:
38+
project_id: ${{ vars.NAIS_MANAGEMENT_PROJECT_ID }}
39+
identity_provider: ${{ secrets.NAIS_WORKLOAD_IDENTITY_PROVIDER }}
40+
team: paw
41+
- name: Build with Gradle
42+
id: docker-build-push
43+
working-directory: ./
44+
run: |
45+
echo "image=${{ env.IMAGE }}:${{ env.VERSION }}" >> $GITHUB_OUTPUT
46+
./gradlew -Pversion=${{ env.VERSION }} -Pimage=${{ env.IMAGE }} :apps:hendelselogg-backup:build :apps:hendelselogg-backup:jib
47+
echo "DIGEST=$(cat apps/hendelselogg-backup/build/jib-image.digest)" >> $GITHUB_ENV
48+
env:
49+
ORG_GRADLE_PROJECT_githubPassword: ${{ secrets.GITHUB_TOKEN }}
50+
- name: Attest and sign
51+
uses: nais/[email protected]
52+
with:
53+
image_ref: ${{ env.IMAGE }}@${{ env.DIGEST }}
54+
55+
deploy-dev:
56+
name: Deploy to dev-gcp
57+
needs: build
58+
runs-on: ubuntu-latest
59+
permissions:
60+
contents: read
61+
id-token: write
62+
steps:
63+
- uses: actions/checkout@v3
64+
- uses: nais/deploy/actions/deploy@v2
65+
env:
66+
CLUSTER: dev-gcp
67+
RESOURCE: apps/hendelselogg-backup/nais/nais-dev.yaml
68+
VAR: image=${{ needs.build.outputs.image }},kafka=nav-dev
69+
70+
deploy-prod:
71+
if: github.ref == 'refs/heads/main'
72+
name: Deploy to prod-gcp
73+
needs: [build, deploy-dev]
74+
runs-on: ubuntu-latest
75+
permissions:
76+
contents: read
77+
id-token: write
78+
steps:
79+
- uses: actions/checkout@v3
80+
- uses: nais/deploy/actions/deploy@v2
81+
env:
82+
TEAM: paw
83+
CLUSTER: prod-gcp
84+
RESOURCE: apps/hendelselogg-backup/nais/nais.yaml
85+
VAR: image=${{ needs.build.outputs.image }},kafka=nav-prod

Diff for: apps/hendelselogg-backup/build.gradle.kts

+12-19
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
import org.gradle.configurationcache.extensions.capitalized
2-
import org.jetbrains.kotlin.gradle.tasks.KotlinCompilationTask
31
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
4-
import org.openapitools.generator.gradle.plugin.tasks.GenerateTask
52

63
plugins {
74
kotlin("jvm")
@@ -31,15 +28,18 @@ dependencies {
3128
implementation(exposed.core)
3229
implementation(exposed.jdbc)
3330
implementation(exposed.javaTime)
31+
implementation(postgres.driver)
32+
implementation(flyway.core)
33+
implementation(flyway.postgres)
3434

3535
implementation(jackson.datatypeJsr310)
36+
implementation(jackson.kotlin)
3637

37-
implementation(ktorServer.coreJvm)
38-
39-
testImplementation(ktorServer.testJvm)
4038
testImplementation(testLibs.runnerJunit5)
4139
testImplementation(testLibs.assertionsCore)
4240
testImplementation(testLibs.mockk)
41+
testImplementation(testLibs.testContainers)
42+
testImplementation(testLibs.postgresql)
4343
}
4444

4545
java {
@@ -49,31 +49,24 @@ java {
4949
}
5050

5151
application {
52-
mainClass.set("no.nav.paw.arbeidssokerregisteret.backup.ApplicationKt")
52+
mainClass.set("no.nav.paw.arbeidssokerregisteret.backup.StartAppKt")
5353
}
5454

55-
5655
tasks.withType<KotlinCompile>().configureEach {
5756
compilerOptions {
5857
freeCompilerArgs.add("-Xcontext-receivers")
59-
}
60-
}
61-
62-
tasks.withType(Jar::class) {
63-
manifest {
64-
attributes["Implementation-Version"] = project.version
65-
attributes["Main-Class"] = application.mainClass.get()
66-
attributes["Implementation-Title"] = rootProject.name
58+
allWarningsAsErrors = true
6759
}
6860
}
6961

7062
jib {
7163
from.image = "$baseImage:$jvmMajorVersion"
7264
to.image = "${image ?: project.name}:${project.version}"
7365
container {
74-
environment = mapOf(
75-
"IMAGE_WITH_VERSION" to "${image ?: project.name}:${project.version}"
76-
)
7766
jvmFlags = listOf("-XX:ActiveProcessorCount=4", "-XX:+UseZGC", "-XX:+ZGenerational")
7867
}
7968
}
69+
70+
tasks.named<Test>("test") {
71+
useJUnitPlatform()
72+
}

Diff for: apps/hendelselogg-backup/docker-compose.yaml

+18-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ version: '3.9'
22

33
services:
44
postgres:
5-
image: postgres:14
5+
image: postgres:15
66
ports:
77
- "5432:5432"
88
environment:
@@ -45,3 +45,20 @@ services:
4545
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
4646
CONFLUENT_METRICS_ENABLE: 'true'
4747
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
48+
49+
init-kafka:
50+
image: confluentinc/cp-server:7.3.1
51+
depends_on:
52+
- kafka
53+
entrypoint: [ '/bin/sh', '-c' ]
54+
command: |
55+
"
56+
# blocks until kafka is reachable
57+
kafka-topics --bootstrap-server kafka:29092 --list
58+
59+
echo -e 'Creating kafka topics'
60+
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic paw.arbeidssoker-hendelseslogg-v1 --replication-factor 1 --partitions 6
61+
62+
echo -e 'Successfully created the following topics:'
63+
kafka-topics --bootstrap-server kafka:29092 --list
64+
"

Diff for: apps/hendelselogg-backup/nais/nais-dev.yaml

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
apiVersion: nais.io/v1alpha1
2+
kind: Application
3+
metadata:
4+
name: paw-arbeidssoekerregisteret-hendelselogg-backup
5+
namespace: paw
6+
labels:
7+
team: paw
8+
spec:
9+
image: {{ image }}
10+
port: 8080
11+
replicas:
12+
min: 1
13+
max: 1
14+
liveness:
15+
path: /internal/isAlive
16+
initialDelay: 10
17+
readiness:
18+
path: /internal/isReady
19+
initialDelay: 10
20+
prometheus:
21+
enabled: true
22+
path: /internal/metrics
23+
observability:
24+
autoInstrumentation:
25+
enabled: true
26+
runtime: java
27+
kafka:
28+
pool: {{ kafka }}
29+
streams: false
30+
resources:
31+
limits:
32+
memory: 1024Mi
33+
requests:
34+
memory: 256Mi
35+
cpu: 50m
36+
gcp:
37+
sqlInstances:
38+
- type: POSTGRES_15
39+
databases:
40+
- name: hendelselogg

Diff for: apps/hendelselogg-backup/nais/nais-prod.yaml

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
apiVersion: nais.io/v1alpha1
2+
kind: Application
3+
metadata:
4+
name: paw-arbeidssoekerregisteret-hendelselogg-backup
5+
namespace: paw
6+
labels:
7+
team: paw
8+
spec:
9+
image: {{ image }}
10+
port: 8080
11+
replicas:
12+
min: 1
13+
max: 1
14+
liveness:
15+
path: /internal/isAlive
16+
initialDelay: 10
17+
readiness:
18+
path: /internal/isReady
19+
initialDelay: 10
20+
prometheus:
21+
enabled: true
22+
path: /internal/metrics
23+
observability:
24+
autoInstrumentation:
25+
enabled: true
26+
runtime: java
27+
kafka:
28+
pool: {{ kafka }}
29+
streams: false
30+
resources:
31+
limits:
32+
memory: 1024Mi
33+
requests:
34+
memory: 256Mi
35+
cpu: 50m
36+
gcp:
37+
sqlInstances:
38+
- type: POSTGRES_15
39+
tier: db-custom-1-6144
40+
databases:
41+
- name: hendelselogg
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package no.nav.paw.arbeidssoekerregisteret.backup
2+
3+
import no.nav.paw.arbeidssoekerregisteret.backup.database.getHwm
4+
import no.nav.paw.arbeidssoekerregisteret.backup.vo.ApplicationContext
5+
import org.apache.kafka.clients.consumer.Consumer
6+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
7+
import org.apache.kafka.common.TopicPartition
8+
import org.jetbrains.exposed.sql.transactions.transaction
9+
import java.util.concurrent.ConcurrentHashMap
10+
11+
class HwmRebalanceListener(
12+
private val context: ApplicationContext,
13+
private val consumer: Consumer<*, *>
14+
) : ConsumerRebalanceListener {
15+
16+
private val currentPartitions = ConcurrentHashMap<Int, TopicPartition>(6)
17+
18+
val currentlyAssignedPartitions: Set<Int> get() = currentPartitions.keys
19+
20+
override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>?) {
21+
context.logger.info("Revoked: $partitions")
22+
partitions?.forEach { partition ->
23+
currentPartitions.remove(partition.partition())
24+
}
25+
}
26+
27+
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>?) {
28+
with(context) {
29+
partitions?.forEach { partition ->
30+
currentPartitions.putIfAbsent(partition.partition(), partition)
31+
}
32+
val assignedPartitions = partitions ?: emptyList()
33+
context.logger.info("Assigned partitions $assignedPartitions")
34+
if (assignedPartitions.isNotEmpty()) {
35+
val seekTo = transaction {
36+
assignedPartitions.map { partition ->
37+
val offset = requireNotNull(getHwm(partition.partition())) {
38+
"No hwm for partition ${partition.partition()}, init not called?"
39+
}
40+
partition to offset
41+
}
42+
}
43+
seekTo.forEach { (partition, offset) ->
44+
consumer.seek(partition, offset + 1)
45+
}
46+
}
47+
}
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package no.nav.paw.arbeidssoekerregisteret.backup
2+
3+
import io.micrometer.core.instrument.Tag
4+
import io.micrometer.prometheusmetrics.PrometheusConfig
5+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
6+
import no.nav.paw.arbeidssoekerregisteret.backup.database.*
7+
import no.nav.paw.arbeidssoekerregisteret.backup.vo.ApplicationContext
8+
import no.nav.paw.arbeidssokerregisteret.intern.v1.Hendelse
9+
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseDeserializer
10+
import no.nav.paw.arbeidssokerregisteret.intern.v1.HendelseSerializer
11+
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
12+
import no.nav.paw.config.kafka.KAFKA_CONFIG
13+
import no.nav.paw.config.kafka.KafkaConfig
14+
import no.nav.paw.config.kafka.KafkaFactory
15+
import org.apache.kafka.clients.consumer.Consumer
16+
import org.apache.kafka.clients.consumer.ConsumerRecord
17+
import org.apache.kafka.common.serialization.LongDeserializer
18+
import org.jetbrains.exposed.sql.Database
19+
import org.jetbrains.exposed.sql.transactions.transaction
20+
import org.slf4j.LoggerFactory
21+
import java.util.concurrent.atomic.AtomicBoolean
22+
23+
const val CURRENT_VERSION = 1
24+
const val HENDELSE_TOPIC = "paw.arbeidssoker-hendelseslogg-v1"
25+
val CONSUMER_GROUP = "arbeidssoekerregisteret-backup-$CURRENT_VERSION"
26+
const val ACTIVE_PARTITIONS_GAUGE = "paw_arbeidssoekerregisteret_backup_active_partitions"
27+
const val RECORD_COUNTER = "paw_arbeidssoekerregisteret_backup_records_written"
28+
29+
fun initApplication(): Pair<Consumer<Long, Hendelse>, ApplicationContext> {
30+
val logger = LoggerFactory.getLogger("backup-init")
31+
logger.info("Initializing application...")
32+
val kafkaConfig = loadNaisOrLocalConfiguration<KafkaConfig>(KAFKA_CONFIG)
33+
with(loadNaisOrLocalConfiguration<DatabaseConfig>("database_configuration.toml")) {
34+
val ds = dataSource()
35+
logger.info("Connection to database($this)...")
36+
Database.Companion.connect(ds)
37+
logger.info("Migrating database...")
38+
migrateDatabase(ds)
39+
}
40+
logger.info("Connection to kafka...")
41+
val consumer = KafkaFactory(kafkaConfig).createConsumer(
42+
groupId = CONSUMER_GROUP,
43+
clientId = "client-$CONSUMER_GROUP",
44+
keyDeserializer = LongDeserializer::class,
45+
valueDeserializer = HendelseDeserializer::class,
46+
autoCommit = false,
47+
autoOffsetReset = "earliest"
48+
)
49+
val shutdown = AtomicBoolean(false)
50+
Runtime.getRuntime().addShutdownHook(Thread {
51+
shutdown.set(true)
52+
})
53+
val context = ApplicationContext(
54+
logger = LoggerFactory.getLogger("backup-context"),
55+
consumerVersion = CURRENT_VERSION,
56+
meterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT),
57+
shutdownCalled = shutdown
58+
)
59+
val partitions = consumer.partitionsFor(HENDELSE_TOPIC).count()
60+
with(context) {
61+
transaction {
62+
initHwm(partitions)
63+
}
64+
}
65+
logger.info("Application initialized")
66+
return Pair(consumer, context)
67+
}

0 commit comments

Comments
 (0)