Skip to content

Commit 10c852c

Browse files
committed
KafkaProducerRecordProcessor
Tar hensyn til rekkefølge per topic meldinger blir skrevet til db. Selve repository laget som sørger for å hente i riktig rekkefølge.
1 parent 71b113c commit 10c852c

File tree

18 files changed

+311
-449
lines changed

18 files changed

+311
-449
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package no.nav.mulighetsrommet.kafka
2+
3+
import kotlinx.coroutines.*
4+
import no.nav.common.kafka.producer.KafkaProducerClient
5+
import no.nav.common.kafka.util.KafkaUtils
6+
import org.apache.kafka.clients.producer.ProducerRecord
7+
import org.apache.kafka.clients.producer.RecordMetadata
8+
import org.slf4j.Logger
9+
import org.slf4j.LoggerFactory
10+
import java.util.*
11+
import java.util.concurrent.CountDownLatch
12+
import java.util.function.Consumer
13+
14+
interface ProducerRecordRepository {
15+
fun getRecords(): List<ProducerRecordDbo>
16+
fun deleteRecords(ids: List<Long>)
17+
}
18+
19+
class KafkaProducerRecordProcessor(
20+
private val repository: ProducerRecordRepository,
21+
private val producerClient: KafkaProducerClient<ByteArray, ByteArray>,
22+
) {
23+
private val pollTimeout: Long = 3000
24+
25+
private val log: Logger = LoggerFactory.getLogger(this.javaClass)
26+
private var job: Job? = null
27+
28+
fun start() {
29+
if (job != null) return
30+
31+
job = CoroutineScope(Dispatchers.Default).launch {
32+
while (isActive) {
33+
recordHandler()
34+
}
35+
}.apply {
36+
invokeOnCompletion {
37+
log.info("Shutting down Kafka producer")
38+
producerClient.close()
39+
}
40+
}
41+
}
42+
43+
fun close() {
44+
job?.cancel()
45+
job = null
46+
}
47+
48+
private suspend fun recordHandler() {
49+
try {
50+
val records = repository.getRecords()
51+
52+
if (records.isNotEmpty()) {
53+
publishRecords(records)
54+
} else {
55+
delay(pollTimeout)
56+
}
57+
} catch (e: Exception) {
58+
log.error("Failed to process kafka producer records", e)
59+
delay(pollTimeout)
60+
}
61+
}
62+
63+
private fun publishRecords(records: List<ProducerRecordDbo>) {
64+
val idsToDelete = Collections.synchronizedList(mutableListOf<Long>())
65+
66+
val latch = CountDownLatch(records.size)
67+
68+
records.forEach(
69+
Consumer { record ->
70+
producerClient.send(record.toProducerRecord()) { _: RecordMetadata?, exception: java.lang.Exception? ->
71+
latch.countDown()
72+
if (exception != null) {
73+
log.warn("Failed to send record to topic ${record.topic}", exception)
74+
} else {
75+
idsToDelete.add(record.id)
76+
}
77+
}
78+
},
79+
)
80+
81+
producerClient.producer.flush()
82+
latch.await()
83+
84+
repository.deleteRecords(idsToDelete)
85+
}
86+
}
87+
88+
fun ProducerRecordDbo.toProducerRecord(): ProducerRecord<ByteArray, ByteArray> {
89+
val headers = KafkaUtils.jsonToHeaders(headersJson)
90+
91+
return ProducerRecord(
92+
topic,
93+
null,
94+
null,
95+
key,
96+
value,
97+
headers,
98+
)
99+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package no.nav.mulighetsrommet.kafka
2+
3+
import kotliquery.Row
4+
import kotliquery.Session
5+
import kotliquery.queryOf
6+
import no.nav.common.kafka.util.KafkaUtils
7+
import org.apache.kafka.clients.producer.ProducerRecord
8+
import org.intellij.lang.annotations.Language
9+
10+
class KafkaProducerRecordQueries(private val session: Session) {
11+
fun insert(record: ProducerRecord<ByteArray, ByteArray?>) {
12+
@Language("PostgreSQL")
13+
val sql = """
14+
insert into kafka_producer_record (topic, key, value, headers_json) values (?, ?, ?, ?)
15+
""".trimIndent()
16+
17+
val query = queryOf(sql, record.topic(), record.key(), record.value(), KafkaUtils.headersToJson(record.headers()))
18+
19+
session.execute(query)
20+
}
21+
22+
fun deleteRecords(ids: List<Long>) {
23+
@Language("PostgreSQL")
24+
val sql = """
25+
delete from kafka_producer_record where id = any(?::bigint[])
26+
""".trimIndent()
27+
28+
session.update(queryOf(sql, session.createArrayOf("bigint", ids)))
29+
}
30+
31+
fun getRecords(): List<ProducerRecordDbo> {
32+
@Language("PostgreSQL")
33+
val sql = """
34+
select distinct on (topic) * from kafka_producer_record order by topic, id
35+
""".trimIndent()
36+
37+
val query = queryOf(sql)
38+
39+
return session.list(query) { it.toProducerRecordDbo() }
40+
}
41+
}
42+
43+
fun Row.toProducerRecordDbo() = ProducerRecordDbo(
44+
long("id"),
45+
string("topic"),
46+
bytes("key"),
47+
bytes("value"),
48+
stringOrNull("headers_json"),
49+
)
50+
51+
data class ProducerRecordDbo(
52+
val id: Long? = 0,
53+
val topic: String,
54+
val key: ByteArray,
55+
val value: ByteArray?,
56+
val headersJson: String?,
57+
)

common/kafka/src/main/kotlin/no/nav/mulighetsrommet/kafka/KafkaProducerRepositoryImpl.kt

-62
This file was deleted.

common/kafka/src/test/kotlin/no/nav/mulighetsrommet/kafka/KafkaProducerRepositoryTest.kt

-56
This file was deleted.

mulighetsrommet-api/src/main/kotlin/no/nav/mulighetsrommet/api/ApiDatabase.kt

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import no.nav.mulighetsrommet.api.utbetaling.db.UtbetalingQueries
2020
import no.nav.mulighetsrommet.api.veilederflate.VeilederJoyrideQueries
2121
import no.nav.mulighetsrommet.api.veilederflate.VeilederflateTiltakQueries
2222
import no.nav.mulighetsrommet.database.Database
23+
import no.nav.mulighetsrommet.kafka.KafkaProducerRecordQueries
2324
import no.nav.mulighetsrommet.notifications.NotificationQueries
2425
import no.nav.mulighetsrommet.utdanning.db.UtdanningQueries
2526
import javax.sql.DataSource
@@ -72,5 +73,6 @@ class QueryContext(val session: Session) {
7273
val totrinnskontroll = TotrinnskontrollQueries(session)
7374
val veilderTiltak = VeilederflateTiltakQueries(session)
7475
val veilederJoyride = VeilederJoyrideQueries(session)
76+
val kafkaProducerRecord = KafkaProducerRecordQueries(session)
7577
}
7678
}

mulighetsrommet-api/src/main/kotlin/no/nav/mulighetsrommet/api/plugins/DependencyInjection.kt

+19-4
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ import no.nav.mulighetsrommet.brreg.BrregClient
7979
import no.nav.mulighetsrommet.database.Database
8080
import no.nav.mulighetsrommet.database.DatabaseConfig
8181
import no.nav.mulighetsrommet.kafka.KafkaConsumerOrchestrator
82+
import no.nav.mulighetsrommet.kafka.KafkaProducerRecordProcessor
83+
import no.nav.mulighetsrommet.kafka.ProducerRecordDbo
84+
import no.nav.mulighetsrommet.kafka.ProducerRecordRepository
8285
import no.nav.mulighetsrommet.metrics.Metrikker
8386
import no.nav.mulighetsrommet.notifications.NotificationTask
8487
import no.nav.mulighetsrommet.oppgaver.OppgaverService
@@ -138,7 +141,6 @@ private fun kafka(appConfig: AppConfig) = module {
138141
.withMetrics(Metrikker.appMicrometerRegistry)
139142
.build()
140143
}
141-
142144
single {
143145
ArenaMigreringTiltaksgjennomforingerV1KafkaProducer(
144146
get(),
@@ -180,6 +182,21 @@ private fun kafka(appConfig: AppConfig) = module {
180182
consumers = consumers,
181183
)
182184
}
185+
single {
186+
val db = get<ApiDatabase>()
187+
val repository = object : ProducerRecordRepository {
188+
override fun getRecords(): List<ProducerRecordDbo> {
189+
return db.session { queries.kafkaProducerRecord.getRecords() }
190+
}
191+
override fun deleteRecords(ids: List<Long>) {
192+
return db.session { queries.kafkaProducerRecord.deleteRecords(ids) }
193+
}
194+
}
195+
KafkaProducerRecordProcessor(
196+
repository = repository,
197+
producerClient = get(),
198+
)
199+
}
183200
}
184201

185202
private fun services(appConfig: AppConfig) = module {
@@ -367,7 +384,7 @@ private fun services(appConfig: AppConfig) = module {
367384
single { TilsagnService(appConfig.okonomi, get(), get()) }
368385
single { AltinnRettigheterService(get(), get()) }
369386
single { OppgaverService(get()) }
370-
single { OkonomiBestillingService(appConfig.kafka.clients.okonomiBestilling, get(), get()) }
387+
single { OkonomiBestillingService(appConfig.kafka.clients.okonomiBestilling, get()) }
371388
single { ArrangorFlateService(get(), get()) }
372389
}
373390

@@ -403,7 +420,6 @@ private fun tasks(config: TaskConfig) = module {
403420
)
404421
val updateApentForPamelding = UpdateApentForPamelding(config.updateApentForPamelding, get(), get())
405422
val notificationTask: NotificationTask by inject()
406-
val okonomi: OkonomiBestillingService by inject()
407423
val generateValidationReport: GenerateValidationReport by inject()
408424
val initialLoadGjennomforinger: InitialLoadGjennomforinger by inject()
409425
val initialLoadTiltakstyper: InitialLoadTiltakstyper by inject()
@@ -418,7 +434,6 @@ private fun tasks(config: TaskConfig) = module {
418434
.create(
419435
db.getDatasource(),
420436
notificationTask.task,
421-
okonomi.task,
422437
generateValidationReport.task,
423438
initialLoadGjennomforinger.task,
424439
initialLoadTiltakstyper.task,

0 commit comments

Comments
 (0)