diff --git a/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/kafka/producer/EbmsMessageProducer.kt b/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/kafka/producer/EbmsMessageProducer.kt index 68b70913..8482570a 100644 --- a/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/kafka/producer/EbmsMessageProducer.kt +++ b/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/kafka/producer/EbmsMessageProducer.kt @@ -7,6 +7,7 @@ import no.nav.emottak.ebms.async.configuration.Kafka import no.nav.emottak.ebms.async.configuration.toProperties import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.header.Header import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.kafka.common.serialization.StringSerializer import org.slf4j.LoggerFactory @@ -24,15 +25,30 @@ class EbmsMessageProducer(private val topic: String, kafka: Kafka) { ) ) - suspend fun publishMessage(key: String, value: ByteArray): Result = - kafkaPublisher.publishScope { - publishCatching(toProducerRecord(topic, key, value)) - }.onSuccess { + suspend fun publishMessage( + key: String, + value: ByteArray, + headers: List
= emptyList() + ): Result = kafkaPublisher.publishScope { + publishCatching(toProducerRecord(topic, key, value, headers)) + } + .onSuccess { log.info("Message sent successfully to topic $topic with key $key") - }.onFailure { + } + .onFailure { log.error("Failed to send message to topic $topic with key $key", it) } - private fun toProducerRecord(topic: String, key: String, content: ByteArray): ProducerRecord = - ProducerRecord(topic, key, content) + private fun toProducerRecord( + topic: String, + key: String, + content: ByteArray, + headers: List
+ ): ProducerRecord = ProducerRecord( + topic, + null, + key, + content, + headers + ) } diff --git a/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/processing/PayloadMessageProcessor.kt b/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/processing/PayloadMessageProcessor.kt index 252f85e8..26d3e9e9 100644 --- a/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/processing/PayloadMessageProcessor.kt +++ b/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/processing/PayloadMessageProcessor.kt @@ -9,6 +9,7 @@ import no.nav.emottak.ebms.async.kafka.consumer.failedMessageQueue import no.nav.emottak.ebms.async.kafka.producer.EbmsMessageProducer import no.nav.emottak.ebms.async.log import no.nav.emottak.ebms.async.persistence.repository.EbmsMessageDetailsRepository +import no.nav.emottak.ebms.async.util.toHeaders import no.nav.emottak.ebms.model.signer import no.nav.emottak.ebms.processing.ProcessingService import no.nav.emottak.ebms.util.marker @@ -49,7 +50,8 @@ class PayloadMessageProcessor( getDocumentBuilder().parse(ByteArrayInputStream(content)) }, retrievePayloads(requestId) - ).transform().takeIf { it is PayloadMessage } ?: throw RuntimeException("Cannot process message as payload message: $requestId") + ).transform().takeIf { it is PayloadMessage } + ?: throw RuntimeException("Cannot process message as payload message: $requestId") return ebmsMessage as PayloadMessage } @@ -83,6 +85,7 @@ class PayloadMessageProcessor( log.debug(it.marker(), "Starting SendIn for $service") payloadMessageResponder.respond(it) } + else -> { log.debug(it.marker(), "Skipping SendIn for $service") } @@ -147,7 +150,12 @@ class PayloadMessageProcessor( val markers = ebMSDocument.messageHeader().marker() try { log.info(markers, "Sending message to Kafka queue") - ebmsSignalProducer.publishMessage(ebMSDocument.requestId, ebMSDocument.dokument.asByteArray()) + ebmsSignalProducer.publishMessage( + ebMSDocument.requestId, + ebMSDocument.dokument.asByteArray(), + signalResponderEmails.toHeaders() + + ) } catch (e: Exception) { log.error(markers, "Exception occurred while sending message to Kafka queue", e) } diff --git a/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/util/KafkaUtil.kt b/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/util/KafkaUtil.kt new file mode 100644 index 00000000..9d626459 --- /dev/null +++ b/ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/util/KafkaUtil.kt @@ -0,0 +1,14 @@ +package no.nav.emottak.ebms.async.util + +import no.nav.emottak.message.model.EmailAddress +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.RecordHeader + +private const val EMAIL_ADDRESSES = "emailAddresses" + +fun List.toHeaders(): List
= listOf( + RecordHeader( + EMAIL_ADDRESSES, + joinToString(",") { it.emailAddress }.toByteArray() + ) +)