From 53b649d56b1bd5dea9239ff2c4ebbec44862f2af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Fr=C3=B8hlich?= <186453+kfh@users.noreply.github.com> Date: Wed, 19 Mar 2025 13:05:49 +0100 Subject: [PATCH 1/7] Introduced usage of kotlin-kafka --- emottak-utils/build.gradle.kts | 3 +- .../nav/emottak/utils/config/KafkaConfig.kt | 27 ++++--- .../utils/events/EventLoggingService.kt | 75 ++----------------- .../utils/kafka/KafkaPublisherClient.kt | 60 +++------------ .../src/main/resources/kafka_common.conf | 5 +- 5 files changed, 37 insertions(+), 133 deletions(-) diff --git a/emottak-utils/build.gradle.kts b/emottak-utils/build.gradle.kts index 90e28bdc..021b9d7d 100644 --- a/emottak-utils/build.gradle.kts +++ b/emottak-utils/build.gradle.kts @@ -35,5 +35,6 @@ publishing { dependencies { testImplementation(kotlin("test")) implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.0") - implementation("org.apache.kafka:kafka-clients:3.9.0") + implementation("io.github.nomisrev:kotlin-kafka:0.4.1") + implementation("com.sksamuel.hoplite:hoplite-core:2.8.2") } diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt index 6bde65d5..cc41a3e8 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt @@ -1,5 +1,9 @@ package no.nav.emottak.utils.config +import com.sksamuel.hoplite.Masked +import io.github.nomisRev.kafka.publisher.PublisherSettings +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer import java.util.Properties data class Kafka( @@ -11,10 +15,12 @@ data class Kafka( val truststoreType: TruststoreType, val truststoreLocation: TruststoreLocation, val truststorePassword: Masked, - val groupId: String + val groupId: String, + val topic: String, + val eventLoggingProducerActive: Boolean ) -fun Kafka.toProperties() = Properties() +private fun Kafka.toProperties() = Properties() .apply { put(SECURITY_PROTOCOL_CONFIG, securityProtocol.value) put(SSL_KEYSTORE_TYPE_CONFIG, keystoreType.value) @@ -25,10 +31,13 @@ fun Kafka.toProperties() = Properties() put(SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword.value) } -data class KafkaEventLoggingProducer( - val active: Boolean, - val topic: String -) +fun Kafka.toKafkaPublisherSettings(): PublisherSettings = + PublisherSettings( + bootstrapServers = bootstrapServers, + keySerializer = StringSerializer(), + valueSerializer = ByteArraySerializer(), + properties = toProperties() + ) @JvmInline value class SecurityProtocol(val value: String) @@ -45,12 +54,6 @@ value class TruststoreType(val value: String) @JvmInline value class TruststoreLocation(val value: String) -// Kopiert fra hoplite (types.kt), for å forhindre unødvendig stor avhengighet: -typealias Masked = Secret -data class Secret(val value: String) { - override fun toString(): String = "****" -} - const val SECURITY_PROTOCOL_CONFIG = "security.protocol" const val SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type" const val SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location" diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt index 3e5eef36..57f52811 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt @@ -2,77 +2,16 @@ package no.nav.emottak.utils.events import no.nav.emottak.utils.events.model.Event import no.nav.emottak.utils.events.model.EventMessageDetails -import no.nav.emottak.utils.events.model.EventType import no.nav.emottak.utils.kafka.KafkaPublisherClient -import no.nav.emottak.utils.toJsonString -import java.time.Instant -import kotlin.uuid.ExperimentalUuidApi -import kotlin.uuid.Uuid -@OptIn(ExperimentalUuidApi::class) -class EventLoggingService(private val kafkaPublisher: KafkaPublisherClient) { +class EventLoggingService( + private val kafkaPublisherClient: KafkaPublisherClient +) { - suspend fun logEvent(event: Event) { - kafkaPublisher.send(kafkaPublisher.topic, event.toByteArray()) + private suspend fun logEvent(event: Event) { + kafkaPublisherClient.publishMessage(event.toByteArray()) } - suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails) { - kafkaPublisher.send(kafkaPublisher.topic, eventMessageDetails.toByteArray()) - } - - suspend fun logEventMessageDetails( - requestId: Uuid, - cpaId: String, - conversationId: String, - messageId: String, - refToMessageId: String? = null, - fromPartyId: String, - fromRole: String? = null, - toPartyId: String, - toRole: String? = null, - service: String, - action: String, - sentAt: Instant? = null - ) { - logEventMessageDetails( - EventMessageDetails( - requestId = requestId, - cpaId = cpaId, - conversationId = conversationId, - messageId = messageId, - refToMessageId = refToMessageId, - fromPartyId = fromPartyId, - fromRole = fromRole, - toPartyId = toPartyId, - toRole = toRole, - service = service, - action = action, - sentAt = sentAt - ) - ) - } - - suspend fun logEventOK(eventType: EventType, requestId: Uuid, messageId: String, contentId: String? = null, eventData: String? = null) { - logEvent( - Event( - eventType = eventType, - requestId = requestId, - contentId = contentId, - messageId = messageId, - eventData = eventData - ) - ) - } - - suspend fun logEventException(eventType: EventType, requestId: Uuid, messageId: String, ex: Exception, contentId: String? = null) { - logEvent( - Event( - eventType = eventType, - requestId = requestId, - contentId = contentId, - messageId = messageId, - eventData = ex.toJsonString() - ) - ) - } + private suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails) = + kafkaPublisherClient.publishMessage(eventMessageDetails.toByteArray()) } diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt index c4d83bf8..bd8c7bcb 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt @@ -1,57 +1,21 @@ package no.nav.emottak.utils.kafka +import io.github.nomisRev.kafka.publisher.KafkaPublisher import no.nav.emottak.utils.config.Kafka -import no.nav.emottak.utils.config.toProperties -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig +import no.nav.emottak.utils.config.toKafkaPublisherSettings import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.clients.producer.RecordMetadata -class KafkaPublisherClient(val topic: String, val config: Kafka) : AutoCloseable { - // private val log = LoggerFactory.getLogger(this.javaClass) +class KafkaPublisherClient( + private val config: Kafka +) { + private val kafkaPublisher = KafkaPublisher(config.toKafkaPublisherSettings()) - companion object { - private var producer: KafkaProducer? = null - } - - suspend fun send(key: String, value: ByteArray) { - try { - getProducer().send( - ProducerRecord(topic, key, value) - ) - getProducer().flush() - // log.debug("Message ($key) sent successfully to topic ($topic)") - } catch (e: Exception) { - // log.error("Exception while writing message ($key) to queue ($topic)", e) - } - } - - private fun getProducer(): KafkaProducer { - if (producer == null) { - producer = createPublisher() - } - return producer!! - } - - private fun createPublisher(): KafkaProducer { - val properties = config.toProperties().apply { - put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) - put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name) - put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer::class.java.name) - put(ProducerConfig.ACKS_CONFIG, "all") - // Performance - put(ProducerConfig.BUFFER_MEMORY_CONFIG, "16777216") - put(ProducerConfig.BATCH_SIZE_CONFIG, "8192") - put(ProducerConfig.RETRIES_CONFIG, "3") - put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000") + suspend fun publishMessage(value: ByteArray): Result = + kafkaPublisher.publishScope { + publishCatching(toProducerRecord(config.topic, value)) } - return KafkaProducer(properties) - } - override fun close() { - try { - getProducer().close() - } catch (_: Exception) {} - } + private fun toProducerRecord(topic: String, content: ByteArray) = + ProducerRecord(topic, content) } diff --git a/emottak-utils/src/main/resources/kafka_common.conf b/emottak-utils/src/main/resources/kafka_common.conf index 73102db7..68599f55 100644 --- a/emottak-utils/src/main/resources/kafka_common.conf +++ b/emottak-utils/src/main/resources/kafka_common.conf @@ -8,9 +8,6 @@ kafka { truststoreType = "JKS" truststoreLocation = "${KAFKA_TRUSTSTORE_PATH:-}" truststorePassword = "${KAFKA_CREDSTORE_PASSWORD:-}" -} - -kafkaEventLoggingProducer { - active = "${EVENT_LOGGING_PRODUCER:-false}" topic = "team-emottak.event.log" + eventLoggingProducerActive = "${EVENT_LOGGING_PRODUCER:-false}" } From 6008a43986153254459fea1023d8c433bee17ddc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Fr=C3=B8hlich?= <186453+kfh@users.noreply.github.com> Date: Wed, 19 Mar 2025 13:13:37 +0100 Subject: [PATCH 2/7] Added explicit return type --- .../no/nav/emottak/utils/events/EventLoggingService.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt index 57f52811..afcc4b8d 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt @@ -3,15 +3,15 @@ package no.nav.emottak.utils.events import no.nav.emottak.utils.events.model.Event import no.nav.emottak.utils.events.model.EventMessageDetails import no.nav.emottak.utils.kafka.KafkaPublisherClient +import org.apache.kafka.clients.producer.RecordMetadata class EventLoggingService( private val kafkaPublisherClient: KafkaPublisherClient ) { - private suspend fun logEvent(event: Event) { + private suspend fun logEvent(event: Event): Result = kafkaPublisherClient.publishMessage(event.toByteArray()) - } - private suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails) = + private suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails): Result = kafkaPublisherClient.publishMessage(eventMessageDetails.toByteArray()) } From 914c4bfc31491d37a642d0963fd2ffcdeeb1bca9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Fr=C3=B8hlich?= <186453+kfh@users.noreply.github.com> Date: Wed, 19 Mar 2025 13:52:52 +0100 Subject: [PATCH 3/7] Added producer record return type --- .../kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt index bd8c7bcb..e8de8ba5 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt @@ -16,6 +16,6 @@ class KafkaPublisherClient( publishCatching(toProducerRecord(config.topic, value)) } - private fun toProducerRecord(topic: String, content: ByteArray) = + private fun toProducerRecord(topic: String, content: ByteArray): ProducerRecord = ProducerRecord(topic, content) } From ea97a7fe96305b67b68f00573234447026c4a60c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Fr=C3=B8hlich?= <186453+kfh@users.noreply.github.com> Date: Wed, 19 Mar 2025 14:08:08 +0100 Subject: [PATCH 4/7] Fixed test --- .../async/kafka/KafkaPublisherClientTest.kt | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt b/ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt index 9c022bd1..94ed2349 100644 --- a/ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt +++ b/ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt @@ -1,11 +1,11 @@ package no.nav.emottak.ebms.async.kafka +import com.sksamuel.hoplite.Masked import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.Json import no.nav.emottak.utils.config.Kafka import no.nav.emottak.utils.config.KeystoreLocation import no.nav.emottak.utils.config.KeystoreType -import no.nav.emottak.utils.config.Masked import no.nav.emottak.utils.config.SecurityProtocol import no.nav.emottak.utils.config.TruststoreLocation import no.nav.emottak.utils.config.TruststoreType @@ -36,13 +36,11 @@ class KafkaPublisherClientTest { fun `Legg 2 meldinger på Kafka`() { kafkaConsumer.subscribe(listOf(TOPIC)) runBlocking { - kafkaPublisher.send("MSG 1", randomEvent("Event 1").toByteArray()) - kafkaPublisher.send("MSG 2", randomEvent("Event 2").toByteArray()) + kafkaPublisher.publishMessage(randomEvent("Event 1").toByteArray()) + kafkaPublisher.publishMessage(randomEvent("Event 2").toByteArray()) } val msgs: List> = readRecentMessages() Assertions.assertEquals(2, msgs.size) - Assertions.assertEquals("MSG 1", msgs.first().key()) - Assertions.assertEquals("MSG 2", msgs.last().key()) val firstEventJson = msgs.first().value().decodeToString() val firstEvent = Json.decodeFromString(firstEventJson) @@ -57,11 +55,10 @@ class KafkaPublisherClientTest { fun `Legg 1 melding på Kafka`() { kafkaConsumer.subscribe(listOf(TOPIC)) runBlocking { - kafkaPublisher.send("MSG 3", randomEvent("Ny event 3").toByteArray()) + kafkaPublisher.publishMessage(randomEvent("Ny event 3").toByteArray()) } val msgs: List> = readRecentMessages() Assertions.assertEquals(1, msgs.size) - Assertions.assertEquals("MSG 3", msgs.first().key()) val firstEventJson = msgs.first().value().decodeToString() val firstEvent = Json.decodeFromString(firstEventJson) @@ -99,7 +96,7 @@ class KafkaPublisherClientTest { println("=== Kafka Test Container ===") KafkaTestContainer.start() println("KafkaTestContainer.bootstrapServers: ${KafkaTestContainer.bootstrapServers}") - kafkaPublisher = KafkaPublisherClient(TOPIC, kafkaSettings(KafkaTestContainer.bootstrapServers)) + kafkaPublisher = KafkaPublisherClient(kafkaSettings(KafkaTestContainer.bootstrapServers)) kafkaConsumer = createConsumer(KafkaTestContainer.bootstrapServers) } @@ -119,7 +116,9 @@ class KafkaPublisherClientTest { truststoreType = TruststoreType(""), truststoreLocation = TruststoreLocation(""), truststorePassword = Masked(""), - groupId = "ebms-provider" + groupId = "ebms-provider", + topic = TOPIC, + eventLoggingProducerActive = false ) private fun createConsumer(bootstrapServers: String): KafkaConsumer { From 55b5ef4c57fc927c8d6c32fe977370ce8f0f57f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Fr=C3=B8hlich?= <186453+kfh@users.noreply.github.com> Date: Wed, 19 Mar 2025 14:23:30 +0100 Subject: [PATCH 5/7] Removed redundant code --- .../src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt index 99003c1f..c3de922c 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt @@ -5,7 +5,6 @@ import no.nav.emottak.utils.config.KafkaEventLoggingProducer data class Config( val kafka: Kafka, - val kafkaEventLoggingProducer: KafkaEventLoggingProducer, val kafkaSignalReceiver: KafkaSignalReceiver, val kafkaSignalProducer: KafkaSignalProducer, val kafkaPayloadReceiver: KafkaPayloadReceiver, From a8a5d506cd83306efe6e251dcec00da7b8ab0ce0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Fr=C3=B8hlich?= <186453+kfh@users.noreply.github.com> Date: Wed, 19 Mar 2025 14:27:01 +0100 Subject: [PATCH 6/7] Removed unused import --- .../src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt index c3de922c..e7848dfe 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt @@ -1,7 +1,6 @@ package no.nav.emottak.ebms.configuration import no.nav.emottak.utils.config.Kafka -import no.nav.emottak.utils.config.KafkaEventLoggingProducer data class Config( val kafka: Kafka, From 8f3798a9b939ddc5c7640966a041e6e115bc6142 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristian=20Fr=C3=B8hlich?= <186453+kfh@users.noreply.github.com> Date: Mon, 24 Mar 2025 12:34:25 +0100 Subject: [PATCH 7/7] Made functions public --- .../kotlin/no/nav/emottak/utils/events/EventLoggingService.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt index afcc4b8d..963f024d 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt @@ -9,9 +9,9 @@ class EventLoggingService( private val kafkaPublisherClient: KafkaPublisherClient ) { - private suspend fun logEvent(event: Event): Result = + suspend fun logEvent(event: Event): Result = kafkaPublisherClient.publishMessage(event.toByteArray()) - private suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails): Result = + suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails): Result = kafkaPublisherClient.publishMessage(eventMessageDetails.toByteArray()) }