Skip to content

Commit 8ffd3e6

Browse files
bekreftelse-backup-app
1 parent 5fa2a7c commit 8ffd3e6

34 files changed

+2120
-1
lines changed
+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
name: bekreftelse-backup
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
- dev/*
8+
paths:
9+
- 'apps/bekreftelse-backup/**'
10+
- 'lib/**'
11+
- 'domain/**'
12+
- '.github/workflows/bekreftelse-backup.yaml'
13+
- 'gradle/**'
14+
- 'settings.gradle.kts'
15+
- 'gradle.properties'
16+
- 'gradlew'
17+
- 'gradlew.bat'
18+
19+
env:
20+
IMAGE: europe-north1-docker.pkg.dev/${{ vars.NAIS_MANAGEMENT_PROJECT_ID }}/paw/paw-arbeidssoekerregisteret-bekreftelse-backup
21+
jobs:
22+
build:
23+
name: Build
24+
runs-on: ubuntu-latest
25+
timeout-minutes: 10
26+
permissions:
27+
contents: read
28+
id-token: write
29+
packages: write
30+
outputs:
31+
image: ${{ steps.docker-build-push.outputs.image }}
32+
steps:
33+
- name: Checkout
34+
uses: actions/checkout@v4
35+
- name: Setup Java
36+
uses: actions/setup-java@v4
37+
with:
38+
java-version: 21
39+
distribution: temurin
40+
cache: gradle
41+
- name: Set module
42+
run: echo "MODULE=bekreftelse-backup" >> $GITHUB_ENV
43+
- name: Set version
44+
run: echo "VERSION=$(date +'%y.%m.%d').${{ github.run_number }}-${{ github.run_attempt }}" >> $GITHUB_ENV
45+
- name: Login GAR
46+
uses: nais/login@v0
47+
with:
48+
project_id: ${{ vars.NAIS_MANAGEMENT_PROJECT_ID }}
49+
identity_provider: ${{ secrets.NAIS_WORKLOAD_IDENTITY_PROVIDER }}
50+
team: paw
51+
- name: Build with Gradle
52+
id: docker-build-push
53+
working-directory: ./
54+
run: |
55+
echo "image=${{ env.IMAGE }}:${{ env.VERSION }}" >> $GITHUB_OUTPUT
56+
echo -Pversion=${{ env.VERSION }} -Pimage=${{ env.IMAGE }} :apps:${{ env.MODULE }}:build :apps:${{ env.MODULE }}:jib
57+
./gradlew -Pversion=${{ env.VERSION }} -Pimage=${{ env.IMAGE }} :apps:${{ env.MODULE }}:build :apps:${{ env.MODULE }}:jib
58+
echo "DIGEST=$(cat apps/${{ env.MODULE }}/build/jib-image.digest)" >> $GITHUB_ENV
59+
env:
60+
ORG_GRADLE_PROJECT_githubPassword: ${{ secrets.GITHUB_TOKEN }}
61+
- name: Attest and sign image
62+
uses: nais/[email protected]
63+
with:
64+
image_ref: ${{ env.IMAGE }}@${{ env.DIGEST }}
65+
66+
deploy-dev:
67+
if: github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/heads/dev')
68+
name: Deploy to dev-gcp
69+
needs:
70+
- build
71+
permissions:
72+
contents: read
73+
id-token: write
74+
runs-on: ubuntu-latest
75+
steps:
76+
- name: Checkout
77+
uses: actions/checkout@v4
78+
- name: Deploy to GCP
79+
uses: nais/deploy/actions/deploy@v2
80+
env:
81+
CLUSTER: dev-gcp
82+
RESOURCE: apps/bekreftelse-backup/nais/nais-dev.yaml
83+
VAR: image=${{ needs.build.outputs.image }},kafka=nav-dev
84+
85+
# deploy-prod:
86+
# if: github.ref == 'refs/heads/main'
87+
# name: Deploy to prod-gcp
88+
# needs:
89+
# - build
90+
# - deploy-dev
91+
# permissions:
92+
# contents: read
93+
# id-token: write
94+
# runs-on: ubuntu-latest
95+
# steps:
96+
# - name: Checkout
97+
# uses: actions/checkout@v4
98+
# - name: Deploy to GCP
99+
# uses: nais/deploy/actions/deploy@v2
100+
# env:
101+
# TEAM: paw
102+
# CLUSTER: prod-gcp
103+
# RESOURCE: apps/bekreftelse-backup/nais/nais-prod.yaml
104+
# VAR: image=${{ needs.build.outputs.image }},kafka=nav-prod

apps/bekreftelse-backup/README.md

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
# Hendelselogg backup
2+
Denne applikasjonen lagrer forløpende innholdet i topicene 'paw.arbeidssoker-bekreftelse-hendelseslogg-v2', 'paw.arbeidssoker-bekreftelse-v1' og 'paw.arbeidssoker-bekreftelse-paavegneav-v2' til postgres. Dette gjøres for å sikre oss mot feilaktig sletting av alt eller deler av innholdet i denne topicen. NAIS platformen kjører daglig backup av Postgres datarbaser.
3+
4+
## Løsnings beskrivelse
5+
Applikasjonen kjører uten å committe offset til kafka. Istedenfor brukes en egen HWM tabell som som oppdateres i samme transaksjon som dataen blir skrevet til databasen. Det er også lagt inn støtte for versjonering slik at ved å endre 'InitApplication.CURRENT_VERSION' vil løsningen starte på nytt og lagre data på nytt uten å røre eksisterende data.
6+
7+
### Database oversikt:
8+
9+
```sql
10+
CREATE TABLE bekreftelse_hendelser
11+
(
12+
version smallint NOT NULL,
13+
kafka_partition smallint NOT NULL,
14+
kafka_offset bigint NOT NULL,
15+
record_key bigint NOT NULL,
16+
arbeidssoeker_id bigint NOT NULL,
17+
traceparent varchar(58),
18+
data jsonb NOT NULL,
19+
primary key (version, kafka_partition, kafka_offset)
20+
);
21+
22+
CREATE TABLE bekreftelser
23+
(
24+
version smallint NOT NULL,
25+
kafka_partition smallint NOT NULL,
26+
kafka_offset bigint NOT NULL,
27+
record_key bigint NOT NULL,
28+
traceparent varchar(58),
29+
data bytea NOT NULL,
30+
primary key (version, kafka_partition, kafka_offset)
31+
);
32+
33+
CREATE TABLE bekreftelse_paa_vegne_av
34+
(
35+
version smallint NOT NULL,
36+
kafka_partition smallint NOT NULL,
37+
kafka_offset bigint NOT NULL,
38+
record_key bigint NOT NULL,
39+
traceparent varchar(58),
40+
data bytea NOT NULL,
41+
primary key (version, kafka_partition, kafka_offset)
42+
);
43+
44+
CREATE TABLE bekreftelse_hwm
45+
(
46+
version smallint NOT NULL,
47+
kafka_partition smallint NOT NULL,
48+
kafka_offset bigint NOT NULL,
49+
kafka_topic varchar(255) NOT NULL,
50+
primary key (version, kafka_partition, kafka_topic)
51+
);
52+
53+
-- indexes
54+
CREATE INDEX bekreftelse_hendelser_data_idx ON bekreftelse_hendelser USING GIN (data jsonb_path_ops);
55+
CREATE INDEX bekreftelse_hendelser_arbeidssoeker_id_idx ON bekreftelse_hendelser (arbeidssoeker_id);
56+
57+
CREATE INDEX bekreftelser_data_idx ON bekreftelser USING GIN (data bytea_ops);
58+
CREATE INDEX bekreftelse_paa_vegne_av_data_idx ON bekreftelse_paa_vegne_av USING GIN (data bytea_ops);
59+
60+
```
+90
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
2+
3+
plugins {
4+
kotlin("jvm")
5+
id("org.openapi.generator")
6+
application
7+
id("jib-distroless")
8+
}
9+
10+
val jvmMajorVersion: String by project
11+
12+
dependencies {
13+
implementation(project(":domain:bekreftelse-interne-hendelser"))
14+
implementation(project(":domain:bekreftelse-paavegneav-avro-schema"))
15+
implementation(project(":domain:bekreftelsesmelding-avro-schema"))
16+
implementation(project(":lib:kafka"))
17+
implementation(project(":lib:hoplite-config"))
18+
implementation(project(":lib:kafka-key-generator-client"))
19+
20+
implementation(libs.arrow.core.core)
21+
implementation(libs.bundles.ktorServerWithNettyAndMicrometer)
22+
implementation(libs.ktor.server.cors)
23+
implementation(libs.ktor.server.swagger)
24+
implementation(libs.ktor.server.callId)
25+
implementation(libs.ktor.server.statusPages)
26+
implementation(libs.ktor.server.contentNegotiation)
27+
implementation(libs.ktor.client.core)
28+
implementation(libs.ktor.client.contentNegotiation)
29+
implementation(libs.ktor.serialization.jvm)
30+
implementation(libs.ktor.serialization.jackson)
31+
implementation(libs.nav.security.tokenValidationKtorV3)
32+
implementation(libs.nav.common.tokenClient)
33+
implementation(libs.nav.common.tokenClient)
34+
implementation(libs.nav.common.auditLog)
35+
implementation(libs.nav.common.log)
36+
37+
implementation(libs.micrometer.registryPrometheus)
38+
implementation(libs.opentelemetry.annotations)
39+
implementation(libs.hoplite.core)
40+
implementation(libs.hoplite.toml)
41+
implementation(libs.nav.common.auditLog)
42+
implementation(libs.nav.common.log)
43+
implementation(libs.logbackClassic)
44+
implementation(libs.logstashLogbackEncoder)
45+
implementation(libs.kafka.clients)
46+
implementation(libs.exposed.core)
47+
implementation(libs.exposed.jdbc)
48+
implementation(libs.exposed.javaTime)
49+
implementation(libs.database.postgres.driver)
50+
implementation(libs.database.flyway.core)
51+
implementation(libs.database.flyway.postgres)
52+
53+
implementation(libs.jackson.datatypeJsr310)
54+
implementation(libs.jackson.kotlin)
55+
56+
testImplementation(libs.test.junit5.runner)
57+
testImplementation(libs.test.kotest.assertionsCore)
58+
testImplementation(libs.test.mockk.core)
59+
testImplementation(libs.test.testContainers.core)
60+
testImplementation(libs.test.testContainers.postgresql)
61+
testImplementation(libs.ktor.server.test.host)
62+
}
63+
64+
java {
65+
toolchain {
66+
languageVersion.set(JavaLanguageVersion.of(jvmMajorVersion))
67+
}
68+
}
69+
70+
application {
71+
mainClass.set("no.nav.paw.arbeidssoekerregisteret.bekreftelse.backup.StartAppKt")
72+
}
73+
74+
tasks.withType<KotlinCompile>().configureEach {
75+
compilerOptions {
76+
allWarningsAsErrors = true
77+
}
78+
}
79+
80+
tasks.named<Test>("test") {
81+
useJUnitPlatform()
82+
}
83+
84+
sourceSets {
85+
main {
86+
kotlin {
87+
srcDir("${layout.buildDirectory.get()}/generated/src/main/kotlin")
88+
}
89+
}
90+
}
+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
apiVersion: nais.io/v1alpha1
2+
kind: Application
3+
metadata:
4+
name: paw-arbeidssoekerregisteret-bekreftelse-backup
5+
namespace: paw
6+
labels:
7+
team: paw
8+
spec:
9+
image: {{ image }}
10+
port: 8080
11+
replicas:
12+
min: 1
13+
max: 1
14+
liveness:
15+
path: /internal/isAlive
16+
initialDelay: 10
17+
readiness:
18+
path: /internal/isReady
19+
initialDelay: 10
20+
prometheus:
21+
enabled: true
22+
path: /internal/metrics
23+
observability:
24+
autoInstrumentation:
25+
enabled: true
26+
runtime: java
27+
kafka:
28+
pool: {{ kafka }}
29+
azure:
30+
application:
31+
enabled: true
32+
resources:
33+
limits:
34+
memory: 1024Mi
35+
requests:
36+
memory: 256Mi
37+
cpu: 25m
38+
gcp:
39+
sqlInstances:
40+
- type: POSTGRES_17
41+
tier: db-custom-1-3840
42+
databases:
43+
- name: bekreftelse-backup
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
apiVersion: nais.io/v1alpha1
2+
kind: Application
3+
metadata:
4+
name: paw-arbeidssoekerregisteret-bekreftelse-backup
5+
namespace: paw
6+
labels:
7+
team: paw
8+
spec:
9+
image: {{ image }}
10+
port: 8080
11+
replicas:
12+
min: 1
13+
max: 1
14+
liveness:
15+
path: /internal/isAlive
16+
initialDelay: 10
17+
readiness:
18+
path: /internal/isReady
19+
initialDelay: 10
20+
prometheus:
21+
enabled: true
22+
path: /internal/metrics
23+
observability:
24+
autoInstrumentation:
25+
enabled: true
26+
runtime: java
27+
kafka:
28+
pool: {{ kafka }}
29+
azure:
30+
application:
31+
enabled: true
32+
resources:
33+
limits:
34+
memory: 1024Mi
35+
requests:
36+
memory: 384Mi
37+
cpu: 40m
38+
gcp:
39+
sqlInstances:
40+
- type: POSTGRES_17
41+
tier: db-custom-1-6144
42+
databases:
43+
- name: bekreftelse-backup
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package no.nav.paw.arbeidssoekerregisteret.bekreftelse.backup
2+
3+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.backup.database.getHwm
4+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.backup.database.txContext
5+
import no.nav.paw.arbeidssoekerregisteret.bekreftelse.backup.vo.ApplicationContext
6+
import org.apache.kafka.clients.consumer.Consumer
7+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
8+
import org.apache.kafka.common.TopicPartition
9+
import org.jetbrains.exposed.sql.transactions.transaction
10+
import java.util.concurrent.ConcurrentHashMap
11+
12+
class HwmRebalanceListener(
13+
private val context: ApplicationContext,
14+
private val consumer: Consumer<*, *>,
15+
private val topic: String
16+
) : ConsumerRebalanceListener {
17+
18+
private val currentPartitions = ConcurrentHashMap<Int, TopicPartition>(6)
19+
20+
val currentlyAssignedPartitions: Set<Int> get() = currentPartitions.keys
21+
22+
private val txContext = txContext(context)
23+
24+
override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>?) {
25+
context.logger.info("Revoked: $partitions for topic $topic")
26+
partitions?.forEach { partition ->
27+
currentPartitions.remove(partition.partition())
28+
}
29+
}
30+
31+
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>?) {
32+
partitions?.forEach { partition ->
33+
currentPartitions.putIfAbsent(partition.partition(), partition)
34+
}
35+
val assignedPartitions = partitions ?: emptyList()
36+
context.logger.info("Assigned partitions $assignedPartitions for topic $topic")
37+
if (assignedPartitions.isNotEmpty()) {
38+
val seekTo = transaction {
39+
assignedPartitions.map { partition ->
40+
val offset = requireNotNull(txContext().getHwm(partition.partition(), topic)) {
41+
"No hwm for partition ${partition.partition()} on topic $topic, init not called?"
42+
}
43+
partition to offset
44+
}
45+
}
46+
seekTo.forEach { (partition, offset) ->
47+
consumer.seek(partition, offset + 1)
48+
}
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)