Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduced usage of kotlin-kafka #121

Merged
merged 7 commits into from
Mar 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package no.nav.emottak.ebms.async.kafka

import com.sksamuel.hoplite.Masked
import kotlinx.coroutines.runBlocking
import kotlinx.serialization.json.Json
import no.nav.emottak.utils.config.Kafka
import no.nav.emottak.utils.config.KeystoreLocation
import no.nav.emottak.utils.config.KeystoreType
import no.nav.emottak.utils.config.Masked
import no.nav.emottak.utils.config.SecurityProtocol
import no.nav.emottak.utils.config.TruststoreLocation
import no.nav.emottak.utils.config.TruststoreType
Expand Down Expand Up @@ -36,13 +36,11 @@ class KafkaPublisherClientTest {
fun `Legg 2 meldinger på Kafka`() {
kafkaConsumer.subscribe(listOf(TOPIC))
runBlocking {
kafkaPublisher.send("MSG 1", randomEvent("Event 1").toByteArray())
kafkaPublisher.send("MSG 2", randomEvent("Event 2").toByteArray())
kafkaPublisher.publishMessage(randomEvent("Event 1").toByteArray())
kafkaPublisher.publishMessage(randomEvent("Event 2").toByteArray())
}
val msgs: List<ConsumerRecord<String, ByteArray>> = readRecentMessages()
Assertions.assertEquals(2, msgs.size)
Assertions.assertEquals("MSG 1", msgs.first().key())
Assertions.assertEquals("MSG 2", msgs.last().key())

val firstEventJson = msgs.first().value().decodeToString()
val firstEvent = Json.decodeFromString<Event>(firstEventJson)
Expand All @@ -57,11 +55,10 @@ class KafkaPublisherClientTest {
fun `Legg 1 melding på Kafka`() {
kafkaConsumer.subscribe(listOf(TOPIC))
runBlocking {
kafkaPublisher.send("MSG 3", randomEvent("Ny event 3").toByteArray())
kafkaPublisher.publishMessage(randomEvent("Ny event 3").toByteArray())
}
val msgs: List<ConsumerRecord<String, ByteArray>> = readRecentMessages()
Assertions.assertEquals(1, msgs.size)
Assertions.assertEquals("MSG 3", msgs.first().key())

val firstEventJson = msgs.first().value().decodeToString()
val firstEvent = Json.decodeFromString<Event>(firstEventJson)
Expand Down Expand Up @@ -99,7 +96,7 @@ class KafkaPublisherClientTest {
println("=== Kafka Test Container ===")
KafkaTestContainer.start()
println("KafkaTestContainer.bootstrapServers: ${KafkaTestContainer.bootstrapServers}")
kafkaPublisher = KafkaPublisherClient(TOPIC, kafkaSettings(KafkaTestContainer.bootstrapServers))
kafkaPublisher = KafkaPublisherClient(kafkaSettings(KafkaTestContainer.bootstrapServers))
kafkaConsumer = createConsumer(KafkaTestContainer.bootstrapServers)
}

Expand All @@ -119,7 +116,9 @@ class KafkaPublisherClientTest {
truststoreType = TruststoreType(""),
truststoreLocation = TruststoreLocation(""),
truststorePassword = Masked(""),
groupId = "ebms-provider"
groupId = "ebms-provider",
topic = TOPIC,
eventLoggingProducerActive = false
)

private fun createConsumer(bootstrapServers: String): KafkaConsumer<String, ByteArray> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package no.nav.emottak.ebms.configuration

import no.nav.emottak.utils.config.Kafka
import no.nav.emottak.utils.config.KafkaEventLoggingProducer

data class Config(
val kafka: Kafka,
val kafkaEventLoggingProducer: KafkaEventLoggingProducer,
val kafkaSignalReceiver: KafkaSignalReceiver,
val kafkaSignalProducer: KafkaSignalProducer,
val kafkaPayloadReceiver: KafkaPayloadReceiver,
Expand Down
3 changes: 2 additions & 1 deletion emottak-utils/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ publishing {
dependencies {
testImplementation(kotlin("test"))
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.0")
implementation("org.apache.kafka:kafka-clients:3.9.0")
implementation("io.github.nomisrev:kotlin-kafka:0.4.1")
implementation("com.sksamuel.hoplite:hoplite-core:2.8.2")
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package no.nav.emottak.utils.config

import com.sksamuel.hoplite.Masked
import io.github.nomisRev.kafka.publisher.PublisherSettings
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

data class Kafka(
Expand All @@ -11,10 +15,12 @@ data class Kafka(
val truststoreType: TruststoreType,
val truststoreLocation: TruststoreLocation,
val truststorePassword: Masked,
val groupId: String
val groupId: String,
val topic: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synes egentlig at dette feltet burde hete eventTopic i og med at denne Kafka-klassen er tenkt gjenbrukt i alle moduler som benytter Kafka, og som da gjerne har andre topics i tillegg til topic for hendelser.

Copy link
Contributor

@ivanskodje ivanskodje Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Har vi Kafka-topics som ikke er events? 🤔 Hvis vi følger samme tankegang, burde vi da kanskje også bruke navn som eventGroupId, eventTruststoreType, osv.?

Tenker at det allerede er tydelig at en topic er Kafka-relatert, siden den ligger i dataklassen Kafka.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dette Kafka-configobjektet med bootstrapServers, securityProtocol osv, og topic ligger i emottak-utils fordi dette er felles config som er mer eller mindre statisk (det samme i alle moduler). Det som skiller seg ut, er hvilke topics den enkelte modul skal skrive til.

Det jeg mente, var at vi f.eks. i smtp-transport har et eget config-objekt KafkaTopics som inneholder payloadInTopic, signalInTopic, payloadOutTopic og signalOutTopic. Disse navnene er jo ganske tydelige på hva de er, mens Kafka.topic er utydelig på hvilken kø den er.

Tanken her er altså at Kafka sin topic er den ene felles-topic'en som vi har: Køen for hendelser (events). I allefall slik jeg har oppfattet det. Og da burde feltet også hete eventTopic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jeg merger denne nå så vi får koden ut. Jeg har tatt på meg å flytte hele repoet ut av ebxml-processor til eget repo. Mindre oppgaver som dette kan vi løfte over til nytt repo. Ser for meg litt andre endringer også siden det nå blir et helt selvsetendig repo.

val eventLoggingProducerActive: Boolean
)

fun Kafka.toProperties() = Properties()
private fun Kafka.toProperties() = Properties()
.apply {
put(SECURITY_PROTOCOL_CONFIG, securityProtocol.value)
put(SSL_KEYSTORE_TYPE_CONFIG, keystoreType.value)
Expand All @@ -25,10 +31,13 @@ fun Kafka.toProperties() = Properties()
put(SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword.value)
}

data class KafkaEventLoggingProducer(
val active: Boolean,
val topic: String
)
fun Kafka.toKafkaPublisherSettings(): PublisherSettings<String, ByteArray> =
PublisherSettings(
bootstrapServers = bootstrapServers,
keySerializer = StringSerializer(),
valueSerializer = ByteArraySerializer(),
properties = toProperties()
)

@JvmInline
value class SecurityProtocol(val value: String)
Expand All @@ -45,12 +54,6 @@ value class TruststoreType(val value: String)
@JvmInline
value class TruststoreLocation(val value: String)

// Kopiert fra hoplite (types.kt), for å forhindre unødvendig stor avhengighet:
typealias Masked = Secret
data class Secret(val value: String) {
override fun toString(): String = "****"
}

const val SECURITY_PROTOCOL_CONFIG = "security.protocol"
const val SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type"
const val SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,77 +2,16 @@ package no.nav.emottak.utils.events

import no.nav.emottak.utils.events.model.Event
import no.nav.emottak.utils.events.model.EventMessageDetails
import no.nav.emottak.utils.events.model.EventType
import no.nav.emottak.utils.kafka.KafkaPublisherClient
import no.nav.emottak.utils.toJsonString
import java.time.Instant
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid
import org.apache.kafka.clients.producer.RecordMetadata

@OptIn(ExperimentalUuidApi::class)
class EventLoggingService(private val kafkaPublisher: KafkaPublisherClient) {
class EventLoggingService(
private val kafkaPublisherClient: KafkaPublisherClient
) {

suspend fun logEvent(event: Event) {
kafkaPublisher.send(kafkaPublisher.topic, event.toByteArray())
}
suspend fun logEvent(event: Event): Result<RecordMetadata> =
kafkaPublisherClient.publishMessage(event.toByteArray())

suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails) {
kafkaPublisher.send(kafkaPublisher.topic, eventMessageDetails.toByteArray())
}

suspend fun logEventMessageDetails(
requestId: Uuid,
cpaId: String,
conversationId: String,
messageId: String,
refToMessageId: String? = null,
fromPartyId: String,
fromRole: String? = null,
toPartyId: String,
toRole: String? = null,
service: String,
action: String,
sentAt: Instant? = null
) {
logEventMessageDetails(
EventMessageDetails(
requestId = requestId,
cpaId = cpaId,
conversationId = conversationId,
messageId = messageId,
refToMessageId = refToMessageId,
fromPartyId = fromPartyId,
fromRole = fromRole,
toPartyId = toPartyId,
toRole = toRole,
service = service,
action = action,
sentAt = sentAt
)
)
}

suspend fun logEventOK(eventType: EventType, requestId: Uuid, messageId: String, contentId: String? = null, eventData: String? = null) {
logEvent(
Event(
eventType = eventType,
requestId = requestId,
contentId = contentId,
messageId = messageId,
eventData = eventData
)
)
}

suspend fun logEventException(eventType: EventType, requestId: Uuid, messageId: String, ex: Exception, contentId: String? = null) {
logEvent(
Event(
eventType = eventType,
requestId = requestId,
contentId = contentId,
messageId = messageId,
eventData = ex.toJsonString()
)
)
}
suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails): Result<RecordMetadata> =
kafkaPublisherClient.publishMessage(eventMessageDetails.toByteArray())
}
Original file line number Diff line number Diff line change
@@ -1,57 +1,21 @@
package no.nav.emottak.utils.kafka

import io.github.nomisRev.kafka.publisher.KafkaPublisher
import no.nav.emottak.utils.config.Kafka
import no.nav.emottak.utils.config.toProperties
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import no.nav.emottak.utils.config.toKafkaPublisherSettings
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.clients.producer.RecordMetadata

class KafkaPublisherClient(val topic: String, val config: Kafka) : AutoCloseable {
// private val log = LoggerFactory.getLogger(this.javaClass)
class KafkaPublisherClient(
private val config: Kafka
) {
private val kafkaPublisher = KafkaPublisher(config.toKafkaPublisherSettings())

companion object {
private var producer: KafkaProducer<String, ByteArray>? = null
}

suspend fun send(key: String, value: ByteArray) {
try {
getProducer().send(
ProducerRecord(topic, key, value)
)
getProducer().flush()
// log.debug("Message ($key) sent successfully to topic ($topic)")
} catch (e: Exception) {
// log.error("Exception while writing message ($key) to queue ($topic)", e)
}
}

private fun getProducer(): KafkaProducer<String, ByteArray> {
if (producer == null) {
producer = createPublisher()
}
return producer!!
}

private fun createPublisher(): KafkaProducer<String, ByteArray> {
val properties = config.toProperties().apply {
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer::class.java.name)
put(ProducerConfig.ACKS_CONFIG, "all")
// Performance
put(ProducerConfig.BUFFER_MEMORY_CONFIG, "16777216")
put(ProducerConfig.BATCH_SIZE_CONFIG, "8192")
put(ProducerConfig.RETRIES_CONFIG, "3")
put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000")
suspend fun publishMessage(value: ByteArray): Result<RecordMetadata> =
kafkaPublisher.publishScope {
publishCatching(toProducerRecord(config.topic, value))
}
return KafkaProducer(properties)
}

override fun close() {
try {
getProducer().close()
} catch (_: Exception) {}
}
private fun toProducerRecord(topic: String, content: ByteArray): ProducerRecord<String, ByteArray> =
ProducerRecord<String, ByteArray>(topic, content)
}
5 changes: 1 addition & 4 deletions emottak-utils/src/main/resources/kafka_common.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ kafka {
truststoreType = "JKS"
truststoreLocation = "${KAFKA_TRUSTSTORE_PATH:-}"
truststorePassword = "${KAFKA_CREDSTORE_PASSWORD:-}"
}

kafkaEventLoggingProducer {
active = "${EVENT_LOGGING_PRODUCER:-false}"
topic = "team-emottak.event.log"
eventLoggingProducerActive = "${EVENT_LOGGING_PRODUCER:-false}"
}