Skip to content

Commit 63c12ae

Browse files
authored
Merge pull request #121 from navikt/feat/introduce-kotlin-kafka
Introduced usage of kotlin-kafka
2 parents 559912a + 8f3798a commit 63c12ae

File tree

7 files changed

+46
-145
lines changed

7 files changed

+46
-145
lines changed

ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaPublisherClientTest.kt

+8-9
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package no.nav.emottak.ebms.async.kafka
22

3+
import com.sksamuel.hoplite.Masked
34
import kotlinx.coroutines.runBlocking
45
import kotlinx.serialization.json.Json
56
import no.nav.emottak.utils.config.Kafka
67
import no.nav.emottak.utils.config.KeystoreLocation
78
import no.nav.emottak.utils.config.KeystoreType
8-
import no.nav.emottak.utils.config.Masked
99
import no.nav.emottak.utils.config.SecurityProtocol
1010
import no.nav.emottak.utils.config.TruststoreLocation
1111
import no.nav.emottak.utils.config.TruststoreType
@@ -36,13 +36,11 @@ class KafkaPublisherClientTest {
3636
fun `Legg 2 meldinger på Kafka`() {
3737
kafkaConsumer.subscribe(listOf(TOPIC))
3838
runBlocking {
39-
kafkaPublisher.send("MSG 1", randomEvent("Event 1").toByteArray())
40-
kafkaPublisher.send("MSG 2", randomEvent("Event 2").toByteArray())
39+
kafkaPublisher.publishMessage(randomEvent("Event 1").toByteArray())
40+
kafkaPublisher.publishMessage(randomEvent("Event 2").toByteArray())
4141
}
4242
val msgs: List<ConsumerRecord<String, ByteArray>> = readRecentMessages()
4343
Assertions.assertEquals(2, msgs.size)
44-
Assertions.assertEquals("MSG 1", msgs.first().key())
45-
Assertions.assertEquals("MSG 2", msgs.last().key())
4644

4745
val firstEventJson = msgs.first().value().decodeToString()
4846
val firstEvent = Json.decodeFromString<Event>(firstEventJson)
@@ -57,11 +55,10 @@ class KafkaPublisherClientTest {
5755
fun `Legg 1 melding på Kafka`() {
5856
kafkaConsumer.subscribe(listOf(TOPIC))
5957
runBlocking {
60-
kafkaPublisher.send("MSG 3", randomEvent("Ny event 3").toByteArray())
58+
kafkaPublisher.publishMessage(randomEvent("Ny event 3").toByteArray())
6159
}
6260
val msgs: List<ConsumerRecord<String, ByteArray>> = readRecentMessages()
6361
Assertions.assertEquals(1, msgs.size)
64-
Assertions.assertEquals("MSG 3", msgs.first().key())
6562

6663
val firstEventJson = msgs.first().value().decodeToString()
6764
val firstEvent = Json.decodeFromString<Event>(firstEventJson)
@@ -99,7 +96,7 @@ class KafkaPublisherClientTest {
9996
println("=== Kafka Test Container ===")
10097
KafkaTestContainer.start()
10198
println("KafkaTestContainer.bootstrapServers: ${KafkaTestContainer.bootstrapServers}")
102-
kafkaPublisher = KafkaPublisherClient(TOPIC, kafkaSettings(KafkaTestContainer.bootstrapServers))
99+
kafkaPublisher = KafkaPublisherClient(kafkaSettings(KafkaTestContainer.bootstrapServers))
103100
kafkaConsumer = createConsumer(KafkaTestContainer.bootstrapServers)
104101
}
105102

@@ -119,7 +116,9 @@ class KafkaPublisherClientTest {
119116
truststoreType = TruststoreType(""),
120117
truststoreLocation = TruststoreLocation(""),
121118
truststorePassword = Masked(""),
122-
groupId = "ebms-provider"
119+
groupId = "ebms-provider",
120+
topic = TOPIC,
121+
eventLoggingProducerActive = false
123122
)
124123

125124
private fun createConsumer(bootstrapServers: String): KafkaConsumer<String, ByteArray> {

ebms-provider/src/main/kotlin/no/nav/emottak/ebms/configuration/Config.kt

-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package no.nav.emottak.ebms.configuration
22

33
import no.nav.emottak.utils.config.Kafka
4-
import no.nav.emottak.utils.config.KafkaEventLoggingProducer
54

65
data class Config(
76
val kafka: Kafka,
8-
val kafkaEventLoggingProducer: KafkaEventLoggingProducer,
97
val kafkaSignalReceiver: KafkaSignalReceiver,
108
val kafkaSignalProducer: KafkaSignalProducer,
119
val kafkaPayloadReceiver: KafkaPayloadReceiver,

emottak-utils/build.gradle.kts

+2-1
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,6 @@ publishing {
3535
dependencies {
3636
testImplementation(kotlin("test"))
3737
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.8.0")
38-
implementation("org.apache.kafka:kafka-clients:3.9.0")
38+
implementation("io.github.nomisrev:kotlin-kafka:0.4.1")
39+
implementation("com.sksamuel.hoplite:hoplite-core:2.8.2")
3940
}

emottak-utils/src/main/kotlin/no/nav/emottak/utils/config/KafkaConfig.kt

+15-12
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package no.nav.emottak.utils.config
22

3+
import com.sksamuel.hoplite.Masked
4+
import io.github.nomisRev.kafka.publisher.PublisherSettings
5+
import org.apache.kafka.common.serialization.ByteArraySerializer
6+
import org.apache.kafka.common.serialization.StringSerializer
37
import java.util.Properties
48

59
data class Kafka(
@@ -11,10 +15,12 @@ data class Kafka(
1115
val truststoreType: TruststoreType,
1216
val truststoreLocation: TruststoreLocation,
1317
val truststorePassword: Masked,
14-
val groupId: String
18+
val groupId: String,
19+
val topic: String,
20+
val eventLoggingProducerActive: Boolean
1521
)
1622

17-
fun Kafka.toProperties() = Properties()
23+
private fun Kafka.toProperties() = Properties()
1824
.apply {
1925
put(SECURITY_PROTOCOL_CONFIG, securityProtocol.value)
2026
put(SSL_KEYSTORE_TYPE_CONFIG, keystoreType.value)
@@ -25,10 +31,13 @@ fun Kafka.toProperties() = Properties()
2531
put(SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword.value)
2632
}
2733

28-
data class KafkaEventLoggingProducer(
29-
val active: Boolean,
30-
val topic: String
31-
)
34+
fun Kafka.toKafkaPublisherSettings(): PublisherSettings<String, ByteArray> =
35+
PublisherSettings(
36+
bootstrapServers = bootstrapServers,
37+
keySerializer = StringSerializer(),
38+
valueSerializer = ByteArraySerializer(),
39+
properties = toProperties()
40+
)
3241

3342
@JvmInline
3443
value class SecurityProtocol(val value: String)
@@ -45,12 +54,6 @@ value class TruststoreType(val value: String)
4554
@JvmInline
4655
value class TruststoreLocation(val value: String)
4756

48-
// Kopiert fra hoplite (types.kt), for å forhindre unødvendig stor avhengighet:
49-
typealias Masked = Secret
50-
data class Secret(val value: String) {
51-
override fun toString(): String = "****"
52-
}
53-
5457
const val SECURITY_PROTOCOL_CONFIG = "security.protocol"
5558
const val SSL_KEYSTORE_TYPE_CONFIG = "ssl.keystore.type"
5659
const val SSL_KEYSTORE_LOCATION_CONFIG = "ssl.keystore.location"

emottak-utils/src/main/kotlin/no/nav/emottak/utils/events/EventLoggingService.kt

+8-69
Original file line numberDiff line numberDiff line change
@@ -2,77 +2,16 @@ package no.nav.emottak.utils.events
22

33
import no.nav.emottak.utils.events.model.Event
44
import no.nav.emottak.utils.events.model.EventMessageDetails
5-
import no.nav.emottak.utils.events.model.EventType
65
import no.nav.emottak.utils.kafka.KafkaPublisherClient
7-
import no.nav.emottak.utils.toJsonString
8-
import java.time.Instant
9-
import kotlin.uuid.ExperimentalUuidApi
10-
import kotlin.uuid.Uuid
6+
import org.apache.kafka.clients.producer.RecordMetadata
117

12-
@OptIn(ExperimentalUuidApi::class)
13-
class EventLoggingService(private val kafkaPublisher: KafkaPublisherClient) {
8+
class EventLoggingService(
9+
private val kafkaPublisherClient: KafkaPublisherClient
10+
) {
1411

15-
suspend fun logEvent(event: Event) {
16-
kafkaPublisher.send(kafkaPublisher.topic, event.toByteArray())
17-
}
12+
suspend fun logEvent(event: Event): Result<RecordMetadata> =
13+
kafkaPublisherClient.publishMessage(event.toByteArray())
1814

19-
suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails) {
20-
kafkaPublisher.send(kafkaPublisher.topic, eventMessageDetails.toByteArray())
21-
}
22-
23-
suspend fun logEventMessageDetails(
24-
requestId: Uuid,
25-
cpaId: String,
26-
conversationId: String,
27-
messageId: String,
28-
refToMessageId: String? = null,
29-
fromPartyId: String,
30-
fromRole: String? = null,
31-
toPartyId: String,
32-
toRole: String? = null,
33-
service: String,
34-
action: String,
35-
sentAt: Instant? = null
36-
) {
37-
logEventMessageDetails(
38-
EventMessageDetails(
39-
requestId = requestId,
40-
cpaId = cpaId,
41-
conversationId = conversationId,
42-
messageId = messageId,
43-
refToMessageId = refToMessageId,
44-
fromPartyId = fromPartyId,
45-
fromRole = fromRole,
46-
toPartyId = toPartyId,
47-
toRole = toRole,
48-
service = service,
49-
action = action,
50-
sentAt = sentAt
51-
)
52-
)
53-
}
54-
55-
suspend fun logEventOK(eventType: EventType, requestId: Uuid, messageId: String, contentId: String? = null, eventData: String? = null) {
56-
logEvent(
57-
Event(
58-
eventType = eventType,
59-
requestId = requestId,
60-
contentId = contentId,
61-
messageId = messageId,
62-
eventData = eventData
63-
)
64-
)
65-
}
66-
67-
suspend fun logEventException(eventType: EventType, requestId: Uuid, messageId: String, ex: Exception, contentId: String? = null) {
68-
logEvent(
69-
Event(
70-
eventType = eventType,
71-
requestId = requestId,
72-
contentId = contentId,
73-
messageId = messageId,
74-
eventData = ex.toJsonString()
75-
)
76-
)
77-
}
15+
suspend fun logEventMessageDetails(eventMessageDetails: EventMessageDetails): Result<RecordMetadata> =
16+
kafkaPublisherClient.publishMessage(eventMessageDetails.toByteArray())
7817
}
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,21 @@
11
package no.nav.emottak.utils.kafka
22

3+
import io.github.nomisRev.kafka.publisher.KafkaPublisher
34
import no.nav.emottak.utils.config.Kafka
4-
import no.nav.emottak.utils.config.toProperties
5-
import org.apache.kafka.clients.producer.KafkaProducer
6-
import org.apache.kafka.clients.producer.ProducerConfig
5+
import no.nav.emottak.utils.config.toKafkaPublisherSettings
76
import org.apache.kafka.clients.producer.ProducerRecord
8-
import org.apache.kafka.common.serialization.ByteArraySerializer
9-
import org.apache.kafka.common.serialization.StringSerializer
7+
import org.apache.kafka.clients.producer.RecordMetadata
108

11-
class KafkaPublisherClient(val topic: String, val config: Kafka) : AutoCloseable {
12-
// private val log = LoggerFactory.getLogger(this.javaClass)
9+
class KafkaPublisherClient(
10+
private val config: Kafka
11+
) {
12+
private val kafkaPublisher = KafkaPublisher(config.toKafkaPublisherSettings())
1313

14-
companion object {
15-
private var producer: KafkaProducer<String, ByteArray>? = null
16-
}
17-
18-
suspend fun send(key: String, value: ByteArray) {
19-
try {
20-
getProducer().send(
21-
ProducerRecord(topic, key, value)
22-
)
23-
getProducer().flush()
24-
// log.debug("Message ($key) sent successfully to topic ($topic)")
25-
} catch (e: Exception) {
26-
// log.error("Exception while writing message ($key) to queue ($topic)", e)
27-
}
28-
}
29-
30-
private fun getProducer(): KafkaProducer<String, ByteArray> {
31-
if (producer == null) {
32-
producer = createPublisher()
33-
}
34-
return producer!!
35-
}
36-
37-
private fun createPublisher(): KafkaProducer<String, ByteArray> {
38-
val properties = config.toProperties().apply {
39-
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
40-
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java.name)
41-
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer::class.java.name)
42-
put(ProducerConfig.ACKS_CONFIG, "all")
43-
// Performance
44-
put(ProducerConfig.BUFFER_MEMORY_CONFIG, "16777216")
45-
put(ProducerConfig.BATCH_SIZE_CONFIG, "8192")
46-
put(ProducerConfig.RETRIES_CONFIG, "3")
47-
put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000")
14+
suspend fun publishMessage(value: ByteArray): Result<RecordMetadata> =
15+
kafkaPublisher.publishScope {
16+
publishCatching(toProducerRecord(config.topic, value))
4817
}
49-
return KafkaProducer(properties)
50-
}
5118

52-
override fun close() {
53-
try {
54-
getProducer().close()
55-
} catch (_: Exception) {}
56-
}
19+
private fun toProducerRecord(topic: String, content: ByteArray): ProducerRecord<String, ByteArray> =
20+
ProducerRecord<String, ByteArray>(topic, content)
5721
}

emottak-utils/src/main/resources/kafka_common.conf

+1-4
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,6 @@ kafka {
88
truststoreType = "JKS"
99
truststoreLocation = "${KAFKA_TRUSTSTORE_PATH:-}"
1010
truststorePassword = "${KAFKA_CREDSTORE_PASSWORD:-}"
11-
}
12-
13-
kafkaEventLoggingProducer {
14-
active = "${EVENT_LOGGING_PRODUCER:-false}"
1511
topic = "team-emottak.event.log"
12+
eventLoggingProducerActive = "${EVENT_LOGGING_PRODUCER:-false}"
1613
}

0 commit comments

Comments
 (0)