Skip to content

Commit 60e33f8

Browse files
committedMar 17, 2025
implementer KafkaProducerRepositoryImpl
1 parent 231bdf5 commit 60e33f8

File tree

3 files changed

+128
-0
lines changed

3 files changed

+128
-0
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package no.nav.mulighetsrommet.kafka
2+
3+
import kotliquery.Row
4+
import kotliquery.queryOf
5+
import no.nav.common.kafka.producer.feilhandtering.KafkaProducerRepository
6+
import no.nav.common.kafka.producer.feilhandtering.StoredProducerRecord
7+
import no.nav.mulighetsrommet.database.Database
8+
import no.nav.mulighetsrommet.database.createTextArray
9+
import org.intellij.lang.annotations.Language
10+
11+
class KafkaProducerRepositoryImpl(private val db: Database) : KafkaProducerRepository {
12+
override fun storeRecord(record: StoredProducerRecord): Long = db.session { session ->
13+
@Language("PostgreSQL")
14+
val sql = """
15+
insert into kafka_producer_record (topic, key, value, headers_json) values (?, ?, ?, ?)
16+
returning id
17+
""".trimIndent()
18+
19+
val query = queryOf(sql, record.topic, record.key, record.value, record.headersJson)
20+
21+
session.single(query) { it.long("id") }!!
22+
}
23+
24+
override fun deleteRecords(ids: List<Long>): Unit = db.session { session ->
25+
@Language("PostgreSQL")
26+
val sql = """
27+
delete from kafka_producer_record where id = any(?::bigint[])
28+
""".trimIndent()
29+
30+
session.update(queryOf(sql, session.createArrayOf("bigint", ids)))
31+
}
32+
33+
override fun getRecords(maxMessages: Int): List<StoredProducerRecord> = db.session { session ->
34+
@Language("PostgreSQL")
35+
val sql = """
36+
select * from kafka_producer_record order by id limit ?
37+
""".trimIndent()
38+
39+
val query = queryOf(sql, maxMessages)
40+
41+
session.list(query) { toStoredProducerRecord(it) }
42+
}
43+
44+
override fun getRecords(maxMessages: Int, topics: List<String>): List<StoredProducerRecord> = db.session { session ->
45+
@Language("PostgreSQL")
46+
val sql = """
47+
select * from kafka_producer_record where topic = any(?::text[]) order by id limit ?
48+
""".trimIndent()
49+
50+
val query = queryOf(sql, session.createTextArray(topics), maxMessages)
51+
52+
session.list(query) { toStoredProducerRecord(it) }
53+
}
54+
}
55+
56+
private fun toStoredProducerRecord(row: Row) = StoredProducerRecord(
57+
row.long("id"),
58+
row.string("topic"),
59+
row.bytes("key"),
60+
row.bytes("value"),
61+
row.string("headers_json"),
62+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package no.nav.mulighetsrommet.kafka
2+
3+
import io.kotest.core.spec.style.FunSpec
4+
import io.kotest.matchers.shouldBe
5+
import no.nav.common.kafka.producer.feilhandtering.StoredProducerRecord
6+
import no.nav.mulighetsrommet.database.kotest.extensions.FlywayDatabaseTestListener
7+
8+
class KafkaProducerRepositoryTest : FunSpec({
9+
val database = extension(FlywayDatabaseTestListener(testDatabaseConfig))
10+
11+
lateinit var kafkaProducerRepository: KafkaProducerRepositoryImpl
12+
13+
beforeSpec {
14+
kafkaProducerRepository = KafkaProducerRepositoryImpl(database.db)
15+
}
16+
17+
test("should store records") {
18+
val records = (0..2).map {
19+
StoredProducerRecord(
20+
"topic$it",
21+
"key$it".toByteArray(),
22+
"value$it".toByteArray(),
23+
"{}",
24+
)
25+
}
26+
27+
records.forEach { kafkaProducerRepository.storeRecord(it) }
28+
29+
database.assertTable("kafka_producer_record").hasNumberOfRows(3)
30+
}
31+
32+
test("should delete records") {
33+
kafkaProducerRepository.deleteRecords(listOf(1, 2))
34+
database.assertTable("kafka_producer_record").hasNumberOfRows(1)
35+
}
36+
37+
test("should get all records") {
38+
val result = kafkaProducerRepository.getRecords(10)
39+
result.size shouldBe 1
40+
result[0].id shouldBe 3
41+
}
42+
43+
test("should get records by topics") {
44+
kafkaProducerRepository.storeRecord(
45+
StoredProducerRecord(
46+
"topic1",
47+
"key1".toByteArray(),
48+
"value1".toByteArray(),
49+
"{}",
50+
),
51+
)
52+
53+
val result = kafkaProducerRepository.getRecords(10, listOf("topic1"))
54+
result.size shouldBe 1
55+
}
56+
})

‎common/kafka/src/test/resources/db/migration/V1__test-database.sql

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ create table failed_events
2424
unique (topic, partition, record_offset)
2525
);
2626

27+
create table kafka_producer_record
28+
(
29+
id bigint generated always as identity,
30+
topic text not null,
31+
key bytea,
32+
value bytea,
33+
headers_json text,
34+
created_at timestamp default current_timestamp not null
35+
);
36+
2737
create type topic_type as enum ('CONSUMER', 'PRODUCER');
2838

2939
create table topics

0 commit comments

Comments
 (0)
Please sign in to comment.