Skip to content

Dev/feilkoe headers #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 31 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b3867e3
Init commit
RettIProd Mar 4, 2025
501c6d8
Resolve conflict
RettIProd Mar 4, 2025
5e59172
fix test setup
RettIProd Mar 4, 2025
0bcf452
Logging og active kø
RettIProd Mar 4, 2025
29d6b2e
Limit
RettIProd Mar 4, 2025
f18b0bd
Endepunkt for å simulere ting på feilkø
RettIProd Mar 5, 2025
ba756d5
Klammeparantes
RettIProd Mar 5, 2025
d3af0a8
Get path params
RettIProd Mar 5, 2025
0753c45
Getrecord config
RettIProd Mar 6, 2025
190a9d9
Bootstrap servers ikke satt i props
RettIProd Mar 6, 2025
6617a97
Debug
RettIProd Mar 19, 2025
c050c35
Test
RettIProd Mar 19, 2025
64975e1
merge conflict
RettIProd Mar 19, 2025
db4d56c
Integrasjonstest
RettIProd Mar 19, 2025
9cb3a70
Permissions for async
RettIProd Mar 20, 2025
2621350
Merge branch 'main' into dev/feilkoe-headers
RettIProd Mar 20, 2025
7d46ab0
Disable local tests
RettIProd Mar 20, 2025
b37844d
Disable local tests
RettIProd Mar 20, 2025
d7f7163
Fix producer
RettIProd Mar 20, 2025
a0a5e8f
Disable test
RettIProd Mar 20, 2025
ba3fc33
Fix deprecated kafka setup
RettIProd Mar 26, 2025
1b80d83
Basic RetryQueue logic
RettIProd Mar 27, 2025
73bf16d
Merge branch 'main' into dev/feilkoe-headers
RettIProd Mar 27, 2025
007260e
Ta inn util endring
RettIProd Mar 27, 2025
225fb97
Fix retry time check
RettIProd Mar 27, 2025
dc2e0dd
Reason added to headers
RettIProd Mar 27, 2025
18252be
Test med asserts for feilscenario
RettIProd Apr 2, 2025
e619e1d
Merge branch 'main' into dev/feilkoe-headers
RettIProd Apr 2, 2025
6673865
Test med asserts for feilscenario
RettIProd Apr 2, 2025
68cd22a
fix retrycount blank
RettIProd Apr 2, 2025
c311193
Merge branch 'main' into dev/feilkoe-headers
RettIProd Apr 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .nais/ebms-async-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ spec:
value: "true"
- name: EBMS_PAYLOAD_PRODUCER
value: "true"
- name: EBMS_RETRY_QUEUE
value: "true"
- name: VAULT_JDBC_URL
value: jdbc:postgresql://b27dbvl033.preprod.local:5432/
- name: CPA_REPO_URL
Expand Down
28 changes: 28 additions & 0 deletions .nais/kafka/kafka-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,31 @@ spec:
- access: read
application: emottak-kafka-manager
team: team-emottak

---

apiVersion: kafka.nais.io/v1
kind: Topic
metadata:
labels:
team: team-emottak
name: ebxml.retry
namespace: team-emottak
spec:
pool: nav-dev
config:
cleanupPolicy: compact
maxMessageBytes: 1048588
minimumInSyncReplicas: 2
partitions: 1
replication: 3
retentionBytes: -1
retentionHours: 168
segmentHours: 168
acl:
- access: readwrite
application: ebms-async
team: team-emottak
- access: read
application: emottak-kafka-manager
team: team-emottak
111 changes: 90 additions & 21 deletions ebms-async/src/main/kotlin/no/nav/emottak/ebms/async/App.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ import arrow.continuations.SuspendApp
import arrow.continuations.ktor.server
import arrow.core.raise.result
import arrow.fx.coroutines.resourceScope
import io.ktor.http.HttpStatusCode
import io.ktor.server.application.Application
import io.ktor.server.auth.authenticate
import io.ktor.server.netty.Netty
import io.ktor.server.response.respondText
import io.ktor.server.routing.Route
import io.ktor.server.routing.Routing
import io.ktor.server.routing.get
import io.ktor.server.routing.routing
import io.ktor.utils.io.CancellationException
import io.micrometer.prometheusmetrics.PrometheusConfig
Expand All @@ -28,6 +33,8 @@ import no.nav.emottak.ebms.SendInClient
import no.nav.emottak.ebms.SmtpTransportClient
import no.nav.emottak.ebms.async.configuration.Config
import no.nav.emottak.ebms.async.configuration.config
import no.nav.emottak.ebms.async.kafka.consumer.failedMessageQueue
import no.nav.emottak.ebms.async.kafka.consumer.getRecord
import no.nav.emottak.ebms.async.kafka.consumer.startPayloadReceiver
import no.nav.emottak.ebms.async.kafka.consumer.startSignalReceiver
import no.nav.emottak.ebms.async.kafka.producer.EbmsMessageProducer
Expand All @@ -48,7 +55,9 @@ import no.nav.emottak.ebms.registerRootEndpoint
import no.nav.emottak.ebms.scopedAuthHttpClient
import no.nav.emottak.ebms.sendin.SendInService
import no.nav.emottak.ebms.validation.DokumentValidator
import no.nav.emottak.message.util.isProdEnv
import org.slf4j.LoggerFactory
import kotlin.uuid.ExperimentalUuidApi

val log = LoggerFactory.getLogger("no.nav.emottak.ebms.async.App")

Expand Down Expand Up @@ -81,6 +90,15 @@ fun main() = SuspendApp {
ebmsPayloadProducer = ebmsPayloadProducer
)

val payloadMessageProcessorProvider = payloadMessageProcessorProvider(
ebmsMessageDetailsRepository = ebmsMessageDetailsRepository,
dokumentValidator = dokumentValidator,
processingService = processingService,
ebmsSignalProducer = ebmsSignalProducer,
smtpTransportClient = smtpTransportClient,
payloadMessageResponder = payloadMessageResponder
)

result {
resourceScope {
launchSignalReceiver(
Expand All @@ -90,20 +108,16 @@ fun main() = SuspendApp {
)
launchPayloadReceiver(
config = config,
dokumentValidator = dokumentValidator,
processingService = processingService,
ebmsSignalProducer = ebmsSignalProducer,
smtpTransportClient = smtpTransportClient,
payloadMessageResponder = payloadMessageResponder,
ebmsMessageDetailsRepository = ebmsMessageDetailsRepository
payloadMessageProcessorProvider = payloadMessageProcessorProvider
)

server(
Netty,
port = 8080,
module = {
ebmsProviderModule(
payloadRepository = payloadRepository
payloadRepository = payloadRepository,
payloadProcessorProvider = payloadMessageProcessorProvider
)
}
).also { it.engineConfig.maxChunkSize = 100000 }
Expand All @@ -119,26 +133,36 @@ fun main() = SuspendApp {
}
}

private fun CoroutineScope.launchPayloadReceiver(
config: Config,
fun payloadMessageProcessorProvider(
ebmsMessageDetailsRepository: EbmsMessageDetailsRepository,
dokumentValidator: DokumentValidator,
processingService: ProcessingService,
ebmsSignalProducer: EbmsMessageProducer,
smtpTransportClient: SmtpTransportClient,
payloadMessageResponder: PayloadMessageResponder,
ebmsMessageDetailsRepository: EbmsMessageDetailsRepository
payloadMessageResponder: PayloadMessageResponder

): () -> PayloadMessageProcessor = {
PayloadMessageProcessor(
ebmsMessageDetailsRepository = ebmsMessageDetailsRepository,
validator = dokumentValidator,
processingService = processingService,
ebmsSignalProducer = ebmsSignalProducer,
smtpTransportClient = smtpTransportClient,
payloadMessageResponder
)
}

private fun CoroutineScope.launchPayloadReceiver(
config: Config,
payloadMessageProcessorProvider: () -> PayloadMessageProcessor
) {
if (config.kafkaPayloadReceiver.active) {
launch(Dispatchers.IO) {
val payloadMessageProcessor = PayloadMessageProcessor(
validator = dokumentValidator,
processingService = processingService,
ebmsSignalProducer = ebmsSignalProducer,
smtpTransportClient = smtpTransportClient,
ebmsMessageDetailsRepository = ebmsMessageDetailsRepository,
payloadMessageResponder = payloadMessageResponder
startPayloadReceiver(
config.kafkaPayloadReceiver.topic,
config.kafka,
payloadMessageProcessorProvider.invoke()
)
startPayloadReceiver(config.kafkaPayloadReceiver.topic, config.kafka, payloadMessageProcessor)
}
}
}
Expand All @@ -160,7 +184,8 @@ private fun CoroutineScope.launchSignalReceiver(
}

fun Application.ebmsProviderModule(
payloadRepository: PayloadRepository
payloadRepository: PayloadRepository,
payloadProcessorProvider: () -> PayloadMessageProcessor
) {
val appMicrometerRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)

Expand All @@ -173,9 +198,53 @@ fun Application.ebmsProviderModule(
registerHealthEndpoints()
registerPrometheusEndpoint(appMicrometerRegistry)
registerNavCheckStatus()

if (!isProdEnv()) {
simulateError()
}
retryErrors(payloadProcessorProvider)
authenticate(AZURE_AD_AUTH) {
getPayloads(payloadRepository)
}
}
}

const val RETRY_LIMIT = "retryLimit"

@OptIn(ExperimentalUuidApi::class)
fun Routing.retryErrors(
payloadMessageProcessorProvider: () -> PayloadMessageProcessor
): Route =
get("/api/retry/{$RETRY_LIMIT}") {
if (!config().kafkaErrorQueue.active) {
call.respondText(status = HttpStatusCode.ServiceUnavailable, text = "Retry not active.")
return@get
}
failedMessageQueue.consumeRetryQueue(
payloadMessageProcessorProvider.invoke(),
limit = (call.parameters[RETRY_LIMIT])?.toInt() ?: 10
)
call.respondText(
status = HttpStatusCode.OK,
text = "Retry processing started with limit ${call.parameters[RETRY_LIMIT] ?: "default"}"
)
}

const val KAFKA_OFFSET = "offset"

fun Route.simulateError(): Route = get("/api/forceretry/{$KAFKA_OFFSET}") {
CoroutineScope(Dispatchers.IO).launch() {
if (config().kafkaErrorQueue.active) {
val record = getRecord(
config()
.kafkaPayloadReceiver.topic,
config().kafka
.copy(groupId = "ebms-provider-retry"),
(call.parameters[KAFKA_OFFSET])?.toLong() ?: 0
)
failedMessageQueue.sendToRetry(
record = record ?: throw Exception("No Record found. Offset: ${call.parameters[KAFKA_OFFSET]}"),
reason = "Simulated Error"
)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package no.nav.emottak.ebms.async.configuration

import com.sksamuel.hoplite.Masked
import org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG
import org.apache.kafka.common.config.SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
Expand All @@ -15,7 +16,8 @@ data class Config(
val kafkaSignalReceiver: KafkaSignalReceiver,
val kafkaSignalProducer: KafkaSignalProducer,
val kafkaPayloadReceiver: KafkaPayloadReceiver,
val kafkaPayloadProducer: KafkaPayloadProducer
val kafkaPayloadProducer: KafkaPayloadProducer,
val kafkaErrorQueue: KafkaErrorQueue
)

@JvmInline
Expand Down Expand Up @@ -53,6 +55,11 @@ data class KafkaPayloadProducer(
val topic: String
)

data class KafkaErrorQueue(
val active: Boolean,
val topic: String
)

data class Kafka(
val bootstrapServers: String,
val securityProtocol: SecurityProtocol,
Expand All @@ -62,11 +69,22 @@ data class Kafka(
val truststoreType: TruststoreType,
val truststoreLocation: TruststoreLocation,
val truststorePassword: Masked,
val groupId: String
val groupId: String,
val properties: Properties = Properties().apply {
put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(SECURITY_PROTOCOL_CONFIG, securityProtocol.value)
put(SSL_KEYSTORE_TYPE_CONFIG, keystoreType.value)
put(SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation.value)
put(SSL_KEYSTORE_PASSWORD_CONFIG, keystorePassword.value)
put(SSL_TRUSTSTORE_TYPE_CONFIG, truststoreType.value)
put(SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreLocation.value)
put(SSL_TRUSTSTORE_PASSWORD_CONFIG, truststorePassword.value)
}
)

fun Kafka.toProperties() = Properties()
.apply {
put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(SECURITY_PROTOCOL_CONFIG, securityProtocol.value)
put(SSL_KEYSTORE_TYPE_CONFIG, keystoreType.value)
put(SSL_KEYSTORE_LOCATION_CONFIG, keystoreLocation.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ suspend fun startPayloadReceiver(topic: String, kafka: Kafka, payloadMessageProc
KafkaReceiver(receiverSettings)
.receive(topic)
.map { record ->
payloadMessageProcessor.process(record.key(), record.value())
payloadMessageProcessor.process(record)
record.offset.acknowledge()
}.collect()
}
Loading