diff --git a/ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt b/ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt index 9c022bd1..94ed2349 100644 --- a/ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt +++ b/ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt @@ -1,11 +1,11 @@ package no.nav.emottak.ebms.async.kafka +import com.sksamuel.hoplite.Masked import kotlinx.coroutines.runBlocking import kotlinx.serialization.json.Json import no.nav.emottak.utils.config.Kafka import no.nav.emottak.utils.config.KeystoreLocation import no.nav.emottak.utils.config.KeystoreType -import no.nav.emottak.utils.config.Masked import no.nav.emottak.utils.config.SecurityProtocol import no.nav.emottak.utils.config.TruststoreLocation import no.nav.emottak.utils.config.TruststoreType @@ -36,13 +36,11 @@ class KafkaPublisherClientTest { fun `Legg 2 meldinger på Kafka`() { kafkaConsumer.subscribe(listOf(TOPIC)) runBlocking { - kafkaPublisher.send("MSG 1", randomEvent("Event 1").toByteArray()) - kafkaPublisher.send("MSG 2", randomEvent("Event 2").toByteArray()) + kafkaPublisher.publishMessage(randomEvent("Event 1").toByteArray()) + kafkaPublisher.publishMessage(randomEvent("Event 2").toByteArray()) } val msgs: List> = readRecentMessages() Assertions.assertEquals(2, msgs.size) - Assertions.assertEquals("MSG 1", msgs.first().key()) - Assertions.assertEquals("MSG 2", msgs.last().key()) val firstEventJson = msgs.first().value().decodeToString() val firstEvent = Json.decodeFromString(firstEventJson) @@ -57,11 +55,10 @@ class KafkaPublisherClientTest { fun `Legg 1 melding på Kafka`() { kafkaConsumer.subscribe(listOf(TOPIC)) runBlocking { - kafkaPublisher.send("MSG 3", randomEvent("Ny event 3").toByteArray()) + kafkaPublisher.publishMessage(randomEvent("Ny event 3").toByteArray()) } val msgs: List> = readRecentMessages() Assertions.assertEquals(1, msgs.size) - Assertions.assertEquals("MSG 3", msgs.first().key()) val firstEventJson = msgs.first().value().decodeToString() val firstEvent = Json.decodeFromString(firstEventJson) @@ -99,7 +96,7 @@ class KafkaPublisherClientTest { println("=== Kafka Test Container ===") KafkaTestContainer.start() println("KafkaTestContainer.bootstrapServers: ${KafkaTestContainer.bootstrapServers}") - kafkaPublisher = KafkaPublisherClient(TOPIC, kafkaSettings(KafkaTestContainer.bootstrapServers)) + kafkaPublisher = KafkaPublisherClient(kafkaSettings(KafkaTestContainer.bootstrapServers)) kafkaConsumer = createConsumer(KafkaTestContainer.bootstrapServers) } @@ -119,7 +116,9 @@ class KafkaPublisherClientTest { truststoreType = TruststoreType(""), truststoreLocation = TruststoreLocation(""), truststorePassword = Masked(""), - groupId = "ebms-provider" + groupId = "ebms-provider", + topic = TOPIC, + eventLoggingProducerActive = false ) private fun createConsumer(bootstrapServers: String): KafkaConsumer { diff --git a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt index 99003c1f..e7848dfe 100644 --- a/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt +++ b/ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt @@ -1,11 +1,9 @@ package no.nav.emottak.ebms.configuration import no.nav.emottak.utils.config.Kafka -import no.nav.emottak.utils.config.KafkaEventLoggingProducer data class Config( val kafka: Kafka, - val kafkaEventLoggingProducer: KafkaEventLoggingProducer, val kafkaSignalReceiver: KafkaSignalReceiver, val kafkaSignalProducer: KafkaSignalProducer, val kafkaPayloadReceiver: KafkaPayloadReceiver, diff --git a/emottak-utils/build.gradle.kts b/emottak-utils/build.gradle.kts index 90e28bdc..021b9d7d 100644 --- a/emottak-utils/build.gradle.kts +++ b/emottak-utils/build.gradle.kts @@ -35,5 +35,6 @@ publishing { dependencies { testImplementation(kotlin("test")) implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.0") - implementation("org.apache.kafka:kafka-clients:3.9.0") + implementation("io.github.nomisrev:kotlin-kafka:0.4.1") + implementation("com.sksamuel.hoplite:hoplite-core:2.8.2") } diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt index 6bde65d5..cc41a3e8 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt @@ -1,5 +1,9 @@ package no.nav.emottak.utils.config +import com.sksamuel.hoplite.Masked +import io.github.nomisRev.kafka.publisher.PublisherSettings +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.kafka.common.serialization.StringSerializer import java.util.Properties data class Kafka( @@ -11,10 +15,12 @@ data class Kafka( val truststoreType: TruststoreType, val truststoreLocation: TruststoreLocation, val truststorePassword: Masked, - val groupId: String + val groupId: String, + val topic: String, + val eventLoggingProducerActive: Boolean ) -fun Kafka.toProperties() = Properties() +private fun Kafka.toProperties() = Properties() .apply { put(SECURITY_PROTOCOL_CONFIG, securityProtocol.value) put(SSL_KEYSTORE_TYPE_CONFIG, keystoreType.value) @@ -25,10 +31,13 @@ fun Kafka.toProperties() = Properties() put(SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword.value) } -data class KafkaEventLoggingProducer( - val active: Boolean, - val topic: String -) +fun Kafka.toKafkaPublisherSettings(): PublisherSettings = + PublisherSettings( + bootstrapServers = bootstrapServers, + keySerializer = StringSerializer(), + valueSerializer = ByteArraySerializer(), + properties = toProperties() + ) @JvmInline value class SecurityProtocol(val value: String) @@ -45,12 +54,6 @@ value class TruststoreType(val value: String) @JvmInline value class TruststoreLocation(val value: String) -// Kopiert fra hoplite (types.kt), for å forhindre unødvendig stor avhengighet: -typealias Masked = Secret -data class Secret(val value: String) { - override fun toString(): String = "****" -} - const val SECURITY_PROTOCOL_CONFIG = "security.protocol" const val SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type" const val SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location" diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt index 3e5eef36..963f024d 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt @@ -2,77 +2,16 @@ package no.nav.emottak.utils.events import no.nav.emottak.utils.events.model.Event import no.nav.emottak.utils.events.model.EventMessageDetails -import no.nav.emottak.utils.events.model.EventType import no.nav.emottak.utils.kafka.KafkaPublisherClient -import no.nav.emottak.utils.toJsonString -import java.time.Instant -import kotlin.uuid.ExperimentalUuidApi -import kotlin.uuid.Uuid +import org.apache.kafka.clients.producer.RecordMetadata -@OptIn(ExperimentalUuidApi::class) -class EventLoggingService(private val kafkaPublisher: KafkaPublisherClient) { +class EventLoggingService( + private val kafkaPublisherClient: KafkaPublisherClient +) { - suspend fun logEvent(event: Event) { - kafkaPublisher.send(kafkaPublisher.topic, event.toByteArray()) - } + suspend fun logEvent(event: Event): Result = + kafkaPublisherClient.publishMessage(event.toByteArray()) - suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails) { - kafkaPublisher.send(kafkaPublisher.topic, eventMessageDetails.toByteArray()) - } - - suspend fun logEventMessageDetails( - requestId: Uuid, - cpaId: String, - conversationId: String, - messageId: String, - refToMessageId: String? = null, - fromPartyId: String, - fromRole: String? = null, - toPartyId: String, - toRole: String? = null, - service: String, - action: String, - sentAt: Instant? = null - ) { - logEventMessageDetails( - EventMessageDetails( - requestId = requestId, - cpaId = cpaId, - conversationId = conversationId, - messageId = messageId, - refToMessageId = refToMessageId, - fromPartyId = fromPartyId, - fromRole = fromRole, - toPartyId = toPartyId, - toRole = toRole, - service = service, - action = action, - sentAt = sentAt - ) - ) - } - - suspend fun logEventOK(eventType: EventType, requestId: Uuid, messageId: String, contentId: String? = null, eventData: String? = null) { - logEvent( - Event( - eventType = eventType, - requestId = requestId, - contentId = contentId, - messageId = messageId, - eventData = eventData - ) - ) - } - - suspend fun logEventException(eventType: EventType, requestId: Uuid, messageId: String, ex: Exception, contentId: String? = null) { - logEvent( - Event( - eventType = eventType, - requestId = requestId, - contentId = contentId, - messageId = messageId, - eventData = ex.toJsonString() - ) - ) - } + suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails): Result = + kafkaPublisherClient.publishMessage(eventMessageDetails.toByteArray()) } diff --git a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt index c4d83bf8..e8de8ba5 100644 --- a/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt +++ b/emottak-utils/src/main/kotlin/no/nav/emottak/utils/kafka/KafkaPublisherClient.kt @@ -1,57 +1,21 @@ package no.nav.emottak.utils.kafka +import io.github.nomisRev.kafka.publisher.KafkaPublisher import no.nav.emottak.utils.config.Kafka -import no.nav.emottak.utils.config.toProperties -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.producer.ProducerConfig +import no.nav.emottak.utils.config.toKafkaPublisherSettings import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.kafka.common.serialization.StringSerializer +import org.apache.kafka.clients.producer.RecordMetadata -class KafkaPublisherClient(val topic: String, val config: Kafka) : AutoCloseable { - // private val log = LoggerFactory.getLogger(this.javaClass) +class KafkaPublisherClient( + private val config: Kafka +) { + private val kafkaPublisher = KafkaPublisher(config.toKafkaPublisherSettings()) - companion object { - private var producer: KafkaProducer? = null - } - - suspend fun send(key: String, value: ByteArray) { - try { - getProducer().send( - ProducerRecord(topic, key, value) - ) - getProducer().flush() - // log.debug("Message ($key) sent successfully to topic ($topic)") - } catch (e: Exception) { - // log.error("Exception while writing message ($key) to queue ($topic)", e) - } - } - - private fun getProducer(): KafkaProducer { - if (producer == null) { - producer = createPublisher() - } - return producer!! - } - - private fun createPublisher(): KafkaProducer { - val properties = config.toProperties().apply { - put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers) - put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name) - put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer::class.java.name) - put(ProducerConfig.ACKS_CONFIG, "all") - // Performance - put(ProducerConfig.BUFFER_MEMORY_CONFIG, "16777216") - put(ProducerConfig.BATCH_SIZE_CONFIG, "8192") - put(ProducerConfig.RETRIES_CONFIG, "3") - put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000") + suspend fun publishMessage(value: ByteArray): Result = + kafkaPublisher.publishScope { + publishCatching(toProducerRecord(config.topic, value)) } - return KafkaProducer(properties) - } - override fun close() { - try { - getProducer().close() - } catch (_: Exception) {} - } + private fun toProducerRecord(topic: String, content: ByteArray): ProducerRecord = + ProducerRecord(topic, content) } diff --git a/emottak-utils/src/main/resources/kafka_common.conf b/emottak-utils/src/main/resources/kafka_common.conf index 73102db7..68599f55 100644 --- a/emottak-utils/src/main/resources/kafka_common.conf +++ b/emottak-utils/src/main/resources/kafka_common.conf @@ -8,9 +8,6 @@ kafka { truststoreType = "JKS" truststoreLocation = "${KAFKA_TRUSTSTORE_PATH:-}" truststorePassword = "${KAFKA_CREDSTORE_PASSWORD:-}" -} - -kafkaEventLoggingProducer { - active = "${EVENT_LOGGING_PRODUCER:-false}" topic = "team-emottak.event.log" + eventLoggingProducerActive = "${EVENT_LOGGING_PRODUCER:-false}" }