Skip to content

Commit d7f7163

Browse files
committed
Fix producer
1 parent b37844d commit d7f7163

File tree

2 files changed

+26
-25
lines changed

2 files changed

+26
-25
lines changed

ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/kafka/consumer/FailedMessageKafkaHandler.kt

+21-21
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,14 @@ class FailedMessageKafkaHandler(
4040
val kafkaErrorQueue: KafkaErrorQueue = config().kafkaErrorQueue,
4141
kafka: Kafka = config().kafka
4242
) {
43-
private var producersFlow: Flow<KafkaProducer<String, ByteArray>> = kafkaProducer(
43+
44+
private var producer = KafkaProducer(
45+
kafka.toProperties(),
46+
StringSerializer(),
47+
ByteArraySerializer()
48+
)
49+
50+
private var producersFlow: Flow<KafkaProducer<String, ByteArray>> = kafkaProducer( // TODO Deprecated
4451
ProducerSettings(
4552
bootstrapServers = kafka.bootstrapServers,
4653
keyDeserializer = StringSerializer(),
@@ -60,12 +67,19 @@ class FailedMessageKafkaHandler(
6067
)
6168
).receive(kafkaErrorQueue.topic)
6269

63-
suspend fun send(record: ReceiverRecord<String, ByteArray>, key: String = record.key(), value: ByteArray = record.value()) { // TODO man trenger vel ikke egentlig value og key om man har record?
64-
record.addHeader(RETRY_AFTER, getNextRetryTime(record)) // TODO add retry logic
70+
suspend fun send(record: ReceiverRecord<String, ByteArray>, key: String = record.key(), value: ByteArray = record.value()) {
71+
record.addHeader(RETRY_AFTER, getNextRetryTime(record))
6572
try {
66-
producersFlow.collect { producer ->
67-
producer.send(ProducerRecord(kafkaErrorQueue.topic, null, key, value, record.headers())).get()
68-
}
73+
val result = producer.send(ProducerRecord(kafkaErrorQueue.topic, null, key, value, record.headers())).get()
74+
logger.info("Wrote to offset:" + result.offset())
75+
producer.close()
76+
// producersFlow.collect { producer ->
77+
// val metadata = producer.send(ProducerRecord(kafkaErrorQueue.topic, null, key, value, record.headers())).get()
78+
// logger.info("Offset on metadata: " + metadata.offset())
79+
// logger.info("Result " + metadata.partition() + " timestamp " + metadata.timestamp())
80+
// producer.commitTransaction()
81+
// producer.close()
82+
// }
6983
logger.info("Message sent successfully to topic ${kafkaErrorQueue.topic}")
7084
} catch (e: Exception) {
7185
logger.info("Failed to send message to ${kafkaErrorQueue.topic} : ${e.message}")
@@ -107,28 +121,14 @@ fun ReceiverRecord<String, ByteArray>.addHeader(key: String, value: String) {
107121
this.headers().add(key, value.toByteArray())
108122
}
109123

110-
fun getRecord2(topic: String, kafka: Kafka, fromOffset: Long = 0, requestedRecords: Int = 1) {
111-
val consumer = KafkaConsumer(
112-
kafka.copy(
113-
groupId = "ebms-provider-retry"
114-
).toProperties(),
115-
StringDeserializer(),
116-
ByteArrayDeserializer()
117-
)
118-
119-
consumer.partitionsFor(topic)
120-
}
121-
122124
fun getRetryRecord(fromOffset: Long = 0, requestedRecords: Int = 1): ReceiverRecord<String, ByteArray>? {
123125
return getRecord(config().kafkaErrorQueue.topic, config().kafka, fromOffset, requestedRecords)
124126
}
125127

126128
fun getRecord(topic: String, kafka: Kafka, fromOffset: Long = 0, requestedRecords: Int = 1): ReceiverRecord<String, ByteArray>? {
127129
return with(
128130
KafkaConsumer(
129-
kafka
130-
// .copy(groupId = "ebms-provider-retry")
131-
.toProperties(),
131+
kafka.toProperties(),
132132
StringDeserializer(),
133133
ByteArrayDeserializer()
134134
)

ebms-async/src/test/kotlin/no/nav/emottak/ebms/async/kafka/KafkaIntegrationTest.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import no.nav.emottak.ebms.async.configuration.config
55
import no.nav.emottak.ebms.async.kafka.consumer.failedMessageQueue
66
import no.nav.emottak.ebms.async.kafka.consumer.getRecord
77
import no.nav.emottak.ebms.async.kafka.consumer.getRetryRecord
8+
import org.junit.jupiter.api.Test
89
import org.testcontainers.shaded.com.google.common.io.Resources
910
import java.util.Properties
1011
import kotlin.io.path.Path
@@ -29,9 +30,9 @@ class KafkaIntegrationTest {
2930
return !Path(Resources.getResource("kafka/kafkaenv-local.properties").path).exists()
3031
}
3132

32-
// @Test
33-
// @Disabled
33+
@Test
3434
fun testGetRecord() {
35+
if (noLocalKafkaEnv()) return
3536
val record = getRecord(
3637
kafkaConfig.kafkaPayloadReceiver.topic,
3738
kafkaConfig.kafka
@@ -41,9 +42,9 @@ class KafkaIntegrationTest {
4142
)
4243
}
4344

44-
// @Test
45-
// @Disabled
45+
@Test
4646
fun leggTilRetry() {
47+
if (noLocalKafkaEnv()) return
4748
runTest {
4849
val record = getRecord(
4950
fromOffset = 9379942,

0 commit comments

Comments
 (0)