Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dev/feilkoe headers #112

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open

Dev/feilkoe headers #112

wants to merge 26 commits into from

Conversation

RettIProd
Copy link
Contributor

No description provided.

@RettIProd RettIProd marked this pull request as ready for review March 20, 2025 15:04
@RettIProd RettIProd requested a review from a team as a code owner March 20, 2025 15:04
Copy link
Contributor

@kfh kfh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noen kommentarer elles ser jo dette ut som en god start 👍

): Route =
get("/api/retry/{$RETRY_LIMIT}") {
resourceScope {
CoroutineScope(Dispatchers.IO).launch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Når du oppretter et CoroutineScope her på denne måten er det greit å være klar over at dette blir et helt frittstående scope som ikke er linket til resourceScope'et. Joda, det vil bli tatt ned når jvm'en shutter ned, men ikke på en clean måte.

Jeg snakket jo litt om dette for noen uker siden (i en delingstime) hvordan en kan linke frittstående scopes til omsluttende ressurs-scope så scopet i seg selv blir en ressurs / avhengighet på lik linje med alle andre ressurser og avhengigheter. Fordelen er selvsagt at scopet blir inkludert i livssyklusen til ressurs-scopet i SuspendApp og tatt ned og opprettet på en clean måte.

Her er linken til utility funksjonen som gjør dette for deg: https://github.com/navikt/smtp-transport/blob/main/src/main/kotlin/no/nav/emottak/util/ResourceUtil.kt

Sikkert noe vi burde putte inn i felles-biblioteket på et tidspunkt.


fun Route.simulateError(): Route = get("/api/forceretry/{$KAFKA_OFFSET}") {
resourceScope {
CoroutineScope(Dispatchers.IO).launch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samme som over

if (config().kafkaErrorQueue.active) {
failedMessageQueue.receive(
payloadMessageProcessorProvider.invoke(),
limit = (call.parameters[RETRY_LIMIT] as String).toInt()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Litt flisespikkeri, men syntes koden blir mer ryddig hvis du løfter call... ut til en egen val

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hvorfor synes du det?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tipper fordi blir koden mer verbose. I prinsipp er jeg helt enig, men akkurat i dette eksemplet tenker jeg at det ikke gir så mye verdi å bruke mer tid på denne linjen

suspend fun send(record: ReceiverRecord<String, ByteArray>, key: String = record.key(), value: ByteArray = record.value()) {
record.addHeader(RETRY_AFTER, getNextRetryTime(record))
try {
val result = producer.send(ProducerRecord(kafkaErrorQueue.topic, null, key, value, record.headers())).get()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jeg tenker at her bør du bruke KafkaPublisher fra kotlin-kafka-biblioteket så får du et publisherScope ut av boksen. Dette vil forenkle og gjøre koden mer tydelig.


fun getNextRetryTime(record: ReceiverRecord<String, ByteArray>): String {
return DateTime.now().plusMinutes(5)
.toString() // TODO create retry strategy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ref retry strategi, kanskje du bør se litt på https://arrow-kt.io/learn/resilience/retry-and-repeat/

ByteArrayDeserializer()
)
) {
partitionsFor(topic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Det er mye som foregår her, hva med å splitte dette opp i et par mindre funksjoner for å tydeliggjøre hva som faktisk skjer

}
}

fun getReceiverRecord(consumerRecord: ConsumerRecord<String, ByteArray>?): ReceiverRecord<String, ByteArray>? {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Det blir veldig mye kode inne i denne funksjonen og det går utover lesbarheten. Kan du ikke trekke klassen ut ?

Copy link
Contributor Author

@RettIProd RettIProd Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skjønner hva du mener, om man bare er interessert i flyten er det ikke interessant eller nødvendig med støyen en plutselig klassedefinisjon skaper men jeg har en annen vinkling.
Klassen eksisterer kun i denne contexten og trenger ikke eksistere i annet scope, ved å flytte den "forsøpler" den lesbarheten til resten av koden "forøvrig". Ved å se den her vet man at den kun eksisterer her umiddelbart. Ved å se den frittstående vil man tro den kan brukes flere steder og har et "udefinert eksistensområde" i koden. Det kan være vel så kognitivt belastende for en leser.

import kotlin.io.path.Path
import kotlin.io.path.exists

class KafkaIntegrationTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Veldig tynn integrasjonstest. savner feil-scenarioer

}
}
if (!config().kafkaErrorQueue.active) {
call.respondText(status = HttpStatusCode.ServiceUnavailable, text = "Retry not active.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Selv om Retry i teksten her refererer til API-endepunktet, kan teksten lett misforstås. Kanskje det er bedre å tydeliggjøre at det er endepunktet som ikke er aktivt, i stedet for å bare kalle det "Retry"

Copy link
Contributor Author

@RettIProd RettIProd Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Det er faktisk retry som er inaktivt, ikke endepunktet. ;)
Altså kafka-køen er ikke aktiv. Så man kan ikke få gjort noe retrys.

Comment on lines 47 to 51
keySerializer = StringSerializer(),
valueSerializer = ByteArraySerializer(),
acknowledgments = Acks.All,
properties = kafka.toProperties()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dette er en stor forbering! 👍 ⚡

record: ReceiverRecord<String, ByteArray>,
key: String = record.key(),
value: ByteArray = record.value()
) {
record.addHeader(RETRY_AFTER, getNextRetryTime(record))
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nå som jeg tenker på det - hva sier du til at vi bruker Arrow her også, så vi slipper de litt kjipe try-catchene?

Comment on lines 83 to 112
suspend fun consumeRetryQueue( // TODO refine retry logic
payloadMessageProcessor: PayloadMessageProcessor,
limit: Int = 10 // TODO default limit to offset
) {
// TODO DefaultKafkaReceiver is too constrainted so need own impl for custom logic
val consumer: Flow<ReceiverRecord<String, ByteArray>> =
errorTopicKafkaReceiver.receive(kafkaErrorQueue.topic)

logger.debug("Reading from error queue")
var counter = 0
consumerFlow.map { record ->
consumer.map { record ->
counter++
if (counter > limit) {
throw Exception("Error queue limit exceeded: $limit") // TODO fjern dette
logger.info("Kafka retryQueue Limit reached: $limit")
return@map
}
record.offset.acknowledge()
record.retryCounter()
payloadMessageProcessor.process(record)
record.offset.acknowledge()
if (DateTime.parse(
String(record.headers().lastHeader(RETRY_AFTER).value())
).isAfter(DateTime.now())
) {
payloadMessageProcessor.process(record)
} else {
logger.info("${record.key()} is not retryable yet.")
failedMessageQueue.sendToRetry(record)
}
record.offset.commit()
}.collect()
}
Copy link
Contributor

@ivanskodje ivanskodje Mar 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Siden vi vet at denne delen trenger mer arbeid, hva tenker du om å legge til en enkel unit test som kan hjelpe oss å oppdage hvis vi ubevisst ødelegger noe etter refaktorering? Det ville gjort det mye tryggere å rydde opp her, og samtidig gjøre terskelen til å rydde opp her lavere

@@ -26,10 +29,15 @@ class KafkaIntegrationTest {
val kafkaConfig = config()

fun noLocalKafkaEnv(): Boolean {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

En liten forenkling du kan vurdere her: Hva med å kalle denne hasLocalKafkaEnv i stedet for noLocalKafkaEnv? Det gjør det litt enklere å lese, siden vi slipper å tenke i det negative. 😅

suspend fun receive(payloadMessageProcessor: PayloadMessageProcessor, limit: Int = 10) { // TODO limit til offset
suspend fun consumeRetryQueue( // TODO refine retry logic
payloadMessageProcessor: PayloadMessageProcessor,
limit: Int = 10 // TODO default limit to offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO default limit to offset jeg forstår ikke denne kommentaren. Kan vi forbedre den eller ta den vekk hvis du ikke heller gjør det?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tenkte limiten er siste offset på tidspunktet man starter processen. Men man kan ikke innhente siste offset vha kotlin-kafka sin DefaultKafkaReceiver. Det blir en større jobb isåfall som er litt tentativ. Så lagde TODO

@ivanskodje ivanskodje requested a review from a team March 27, 2025 12:31
@RettIProd RettIProd requested review from ivanskodje and kfh March 27, 2025 14:39
@RettIProd
Copy link
Contributor Author

Mer kan selvsagt gjøres men tenker det er nok foreløpig. Har testet det i dev vha. retry endepunktet
https://ebms-async-fss.intern.dev.nav.no/api/retry/5
Feilende meldinger vil også bli lagt på køen. De vil ikke bli prosessert automatisk ihvertfall foreløpig.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants