Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/feilkoe headers #112

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .nais/ebms-async-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ spec:
value: "true"
- name: EBMS_PAYLOAD_PRODUCER
value: "true"
- name: EBMS_RETRY_QUEUE
value: "true"
- name: VAULT_JDBC_URL
value: jdbc:postgresql://b27dbvl033.preprod.local:5432/
- name: CPA_REPO_URL
Expand Down
28 changes: 28 additions & 0 deletions .nais/kafka/kafka-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,31 @@ spec:
- access: read
application: emottak-kafka-manager
team: team-emottak

---

apiVersion: kafka.nais.io/v1
kind: Topic
metadata:
labels:
team: team-emottak
name: ebxml.retry
namespace: team-emottak
spec:
pool: nav-dev
config:
cleanupPolicy: compact
maxMessageBytes: 1048588
minimumInSyncReplicas: 2
partitions: 1
replication: 3
retentionBytes: -1
retentionHours: 168
segmentHours: 168
acl:
- access: readwrite
application: ebms-async
team: team-emottak
- access: read
application: emottak-kafka-manager
team: team-emottak
106 changes: 85 additions & 21 deletions ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import arrow.fx.coroutines.resourceScope
import io.ktor.server.application.Application
import io.ktor.server.auth.authenticate
import io.ktor.server.netty.Netty
import io.ktor.server.routing.Route
import io.ktor.server.routing.Routing
import io.ktor.server.routing.get
import io.ktor.server.routing.routing
import io.ktor.utils.io.CancellationException
import io.micrometer.prometheusmetrics.PrometheusConfig
Expand All @@ -28,6 +31,8 @@ import no.nav.emottak.ebms.SendInClient
import no.nav.emottak.ebms.SmtpTransportClient
import no.nav.emottak.ebms.async.configuration.Config
import no.nav.emottak.ebms.async.configuration.config
import no.nav.emottak.ebms.async.kafka.consumer.failedMessageQueue
import no.nav.emottak.ebms.async.kafka.consumer.getRecord
import no.nav.emottak.ebms.async.kafka.consumer.startPayloadReceiver
import no.nav.emottak.ebms.async.kafka.consumer.startSignalReceiver
import no.nav.emottak.ebms.async.kafka.producer.EbmsMessageProducer
Expand All @@ -48,7 +53,9 @@ import no.nav.emottak.ebms.registerRootEndpoint
import no.nav.emottak.ebms.scopedAuthHttpClient
import no.nav.emottak.ebms.sendin.SendInService
import no.nav.emottak.ebms.validation.DokumentValidator
import no.nav.emottak.utils.isProdEnv
import org.slf4j.LoggerFactory
import kotlin.uuid.ExperimentalUuidApi

val log = LoggerFactory.getLogger("no.nav.emottak.ebms.async.App")

Expand Down Expand Up @@ -81,6 +88,15 @@ fun main() = SuspendApp {
ebmsPayloadProducer = ebmsPayloadProducer
)

val payloadMessageProcessorProvider = payloadMessageProcessorProvider(
ebmsMessageDetailsRepository = ebmsMessageDetailsRepository,
dokumentValidator = dokumentValidator,
processingService = processingService,
ebmsSignalProducer = ebmsSignalProducer,
smtpTransportClient = smtpTransportClient,
payloadMessageResponder = payloadMessageResponder
)

result {
resourceScope {
launchSignalReceiver(
Expand All @@ -90,20 +106,16 @@ fun main() = SuspendApp {
)
launchPayloadReceiver(
config = config,
dokumentValidator = dokumentValidator,
processingService = processingService,
ebmsSignalProducer = ebmsSignalProducer,
smtpTransportClient = smtpTransportClient,
payloadMessageResponder = payloadMessageResponder,
ebmsMessageDetailsRepository = ebmsMessageDetailsRepository
payloadMessageProcessorProvider = payloadMessageProcessorProvider
)

server(
Netty,
port = 8080,
module = {
ebmsProviderModule(
payloadRepository = payloadRepository
payloadRepository = payloadRepository,
payloadProcessorProvider = payloadMessageProcessorProvider
)
}
).also { it.engineConfig.maxChunkSize = 100000 }
Expand All @@ -119,26 +131,36 @@ fun main() = SuspendApp {
}
}

private fun CoroutineScope.launchPayloadReceiver(
config: Config,
fun payloadMessageProcessorProvider(
ebmsMessageDetailsRepository: EbmsMessageDetailsRepository,
dokumentValidator: DokumentValidator,
processingService: ProcessingService,
ebmsSignalProducer: EbmsMessageProducer,
smtpTransportClient: SmtpTransportClient,
payloadMessageResponder: PayloadMessageResponder,
ebmsMessageDetailsRepository: EbmsMessageDetailsRepository
payloadMessageResponder: PayloadMessageResponder

): () -> PayloadMessageProcessor = {
PayloadMessageProcessor(
ebmsMessageDetailsRepository = ebmsMessageDetailsRepository,
validator = dokumentValidator,
processingService = processingService,
ebmsSignalProducer = ebmsSignalProducer,
smtpTransportClient = smtpTransportClient,
payloadMessageResponder
)
}

private fun CoroutineScope.launchPayloadReceiver(
config: Config,
payloadMessageProcessorProvider: () -> PayloadMessageProcessor
) {
if (config.kafkaPayloadReceiver.active) {
launch(Dispatchers.IO) {
val payloadMessageProcessor = PayloadMessageProcessor(
validator = dokumentValidator,
processingService = processingService,
ebmsSignalProducer = ebmsSignalProducer,
smtpTransportClient = smtpTransportClient,
ebmsMessageDetailsRepository = ebmsMessageDetailsRepository,
payloadMessageResponder = payloadMessageResponder
startPayloadReceiver(
config.kafkaPayloadReceiver.topic,
config.kafka,
payloadMessageProcessorProvider.invoke()
)
startPayloadReceiver(config.kafkaPayloadReceiver.topic, config.kafka, payloadMessageProcessor)
}
}
}
Expand All @@ -160,7 +182,8 @@ private fun CoroutineScope.launchSignalReceiver(
}

fun Application.ebmsProviderModule(
payloadRepository: PayloadRepository
payloadRepository: PayloadRepository,
payloadProcessorProvider: () -> PayloadMessageProcessor
) {
val appMicrometerRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)

Expand All @@ -173,9 +196,50 @@ fun Application.ebmsProviderModule(
registerHealthEndpoints()
registerPrometheusEndpoint(appMicrometerRegistry)
registerNavCheckStatus()

if (!isProdEnv()) {
simulateError()
}
retryErrors(payloadProcessorProvider)
authenticate(AZURE_AD_AUTH) {
getPayloads(payloadRepository)
}
}
}

const val RETRY_LIMIT = "retryLimit"

@OptIn(ExperimentalUuidApi::class)
fun Routing.retryErrors(
payloadMessageProcessorProvider: () -> PayloadMessageProcessor
): Route =
get("/api/retry/{$RETRY_LIMIT}") {
resourceScope {
CoroutineScope(Dispatchers.IO).launch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Når du oppretter et CoroutineScope her på denne måten er det greit å være klar over at dette blir et helt frittstående scope som ikke er linket til resourceScope'et. Joda, det vil bli tatt ned når jvm'en shutter ned, men ikke på en clean måte.

Jeg snakket jo litt om dette for noen uker siden (i en delingstime) hvordan en kan linke frittstående scopes til omsluttende ressurs-scope så scopet i seg selv blir en ressurs / avhengighet på lik linje med alle andre ressurser og avhengigheter. Fordelen er selvsagt at scopet blir inkludert i livssyklusen til ressurs-scopet i SuspendApp og tatt ned og opprettet på en clean måte.

Her er linken til utility funksjonen som gjør dette for deg: https://github.com/navikt/smtp-transport/blob/main/src/main/kotlin/no/nav/emottak/util/ResourceUtil.kt

Sikkert noe vi burde putte inn i felles-biblioteket på et tidspunkt.

if (config().kafkaErrorQueue.active) {
failedMessageQueue.receive(
payloadMessageProcessorProvider.invoke(),
limit = (call.parameters[RETRY_LIMIT] as String).toInt()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Litt flisespikkeri, men syntes koden blir mer ryddig hvis du løfter call... ut til en egen val

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hvorfor synes du det?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tipper fordi blir koden mer verbose. I prinsipp er jeg helt enig, men akkurat i dette eksemplet tenker jeg at det ikke gir så mye verdi å bruke mer tid på denne linjen

)
}
}
}
}

const val KAFKA_OFFSET = "offset"

fun Route.simulateError(): Route = get("/api/forceretry/{$KAFKA_OFFSET}") {
resourceScope {
CoroutineScope(Dispatchers.IO).launch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samme som over

if (config().kafkaErrorQueue.active) {
val record = getRecord(
config()
.kafkaPayloadReceiver.topic,
config().kafka
.copy(groupId = "ebms-provider-retry"),
(call.parameters[KAFKA_OFFSET] as String).toLong()
)
failedMessageQueue.send(record = record ?: throw Exception("No Record found. Offset: ${call.parameters[KAFKA_OFFSET]}"))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.nav.emottak.ebms.async.configuration

import com.sksamuel.hoplite.Masked
import org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
Expand All @@ -15,7 +16,8 @@ data class Config(
val kafkaSignalReceiver: KafkaSignalReceiver,
val kafkaSignalProducer: KafkaSignalProducer,
val kafkaPayloadReceiver: KafkaPayloadReceiver,
val kafkaPayloadProducer: KafkaPayloadProducer
val kafkaPayloadProducer: KafkaPayloadProducer,
val kafkaErrorQueue: KafkaErrorQueue
)

@JvmInline
Expand Down Expand Up @@ -53,6 +55,11 @@ data class KafkaPayloadProducer(
val topic: String
)

data class KafkaErrorQueue(
val active: Boolean,
val topic: String
)

data class Kafka(
val bootstrapServers: String,
val securityProtocol: SecurityProtocol,
Expand All @@ -67,6 +74,7 @@ data class Kafka(

fun Kafka.toProperties() = Properties()
.apply {
put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(SECURITY_PROTOCOL_CONFIG, securityProtocol.value)
put(SSL_KEYSTORE_TYPE_CONFIG, keystoreType.value)
put(SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ suspend fun startPayloadReceiver(topic: String, kafka: Kafka, payloadMessageProc
KafkaReceiver(receiverSettings)
.receive(topic)
.map { record ->
payloadMessageProcessor.process(record.key(), record.value())
payloadMessageProcessor.process(record)
record.offset.acknowledge()
}.collect()
}
Loading
Loading