Skip to content

Commit 37ec8ed

Browse files
authored
chore(observability): make kafka message handling distinct spans (#228)
1 parent 67b062e commit 37ec8ed

File tree

2 files changed

+69
-42
lines changed

2 files changed

+69
-42
lines changed

build.gradle.kts

+2-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ val kotlinVersion = "1.9.22"
2222
val testContainerVersion = "1.19.6"
2323
val ktfmtVersion = "0.44"
2424
val snappyJavaVersion = "1.1.10.5"
25-
25+
val opentelemetryVersion = "2.3.0"
2626

2727
plugins {
2828
id("application")
@@ -78,6 +78,7 @@ dependencies {
7878

7979
implementation("ch.qos.logback:logback-classic:$logbackVersion")
8080
implementation("net.logstash.logback:logstash-logback-encoder:$logstashEncoderVersion")
81+
implementation("io.opentelemetry.instrumentation:opentelemetry-instrumentation-annotations:$opentelemetryVersion")
8182

8283
testImplementation("org.jetbrains.kotlin:kotlin-test:$kotlinVersion")
8384
testImplementation("org.amshove.kluent:kluent:$kluentVersion")

src/main/kotlin/no/nav/syfo/syfosmvarsel/Bootstrap.kt

+67-41
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import com.fasterxml.jackson.databind.ObjectMapper
55
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
66
import com.fasterxml.jackson.module.kotlin.readValue
77
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
8+
import io.opentelemetry.api.trace.Span
9+
import io.opentelemetry.instrumentation.annotations.WithSpan
810
import java.time.Duration
911
import java.util.UUID
1012
import kotlinx.coroutines.CoroutineScope
@@ -28,6 +30,7 @@ import no.nav.syfo.syfosmvarsel.util.KafkaFactory.Companion.getBrukernotifikasjo
2830
import no.nav.syfo.syfosmvarsel.util.KafkaFactory.Companion.getKafkaStatusConsumerAiven
2931
import no.nav.syfo.syfosmvarsel.util.KafkaFactory.Companion.getNyKafkaAivenConsumer
3032
import no.nav.syfo.util.util.Unbounded
33+
import org.apache.kafka.clients.consumer.ConsumerRecord
3134
import org.apache.kafka.clients.consumer.KafkaConsumer
3235
import org.slf4j.Logger
3336
import org.slf4j.LoggerFactory
@@ -94,7 +97,7 @@ fun createListener(
9497
log.error(
9598
"En uhåndtert feil oppstod, applikasjonen restarter {}",
9699
fields(e.loggingMeta),
97-
e.cause
100+
e.cause,
98101
)
99102
} finally {
100103
applicationState.alive = false
@@ -118,15 +121,15 @@ fun launchListeners(
118121
nyKafkaConsumerAiven,
119122
nySykmeldingService,
120123
avvistSykmeldingService,
121-
environment
124+
environment,
122125
)
123126
}
124127

125128
createListener(applicationState) {
126129
blockingApplicationLogicStatusendringAiven(
127130
applicationState,
128131
statusendringService,
129-
kafkaStatusConsumerAiven
132+
kafkaStatusConsumerAiven,
130133
)
131134
}
132135
}
@@ -143,33 +146,46 @@ suspend fun blockingApplicationLogicNySykmelding(
143146
.poll(Duration.ofMillis(1000))
144147
.filterNot { it.value() == null }
145148
.forEach {
146-
val receivedSykmelding: ReceivedSykmelding = objectMapper.readValue(it.value())
147-
148-
val loggingMeta =
149-
LoggingMeta(
150-
mottakId = receivedSykmelding.navLogId,
151-
orgNr = receivedSykmelding.legekontorOrgNr,
152-
msgId = receivedSykmelding.msgId,
153-
sykmeldingId = receivedSykmelding.sykmelding.id,
154-
)
155-
wrapExceptions(loggingMeta) {
156-
when (it.topic()) {
157-
environment.avvistSykmeldingTopicAiven ->
158-
avvistSykmeldingService.opprettVarselForAvvisteSykmeldinger(
159-
receivedSykmelding,
160-
loggingMeta
161-
)
162-
else ->
163-
nySykmeldingService.opprettVarselForNySykmelding(
164-
receivedSykmelding,
165-
loggingMeta
166-
)
167-
}
168-
}
149+
handleNySykmelding(it, environment, avvistSykmeldingService, nySykmeldingService)
169150
}
170151
}
171152
}
172153

154+
@WithSpan
155+
private suspend fun handleNySykmelding(
156+
message: ConsumerRecord<String, String>,
157+
environment: Environment,
158+
avvistSykmeldingService: AvvistSykmeldingService,
159+
nySykmeldingService: NySykmeldingService
160+
) {
161+
val receivedSykmelding: ReceivedSykmelding = objectMapper.readValue(message.value())
162+
163+
val currentSpan = Span.current()
164+
currentSpan.setAttribute("sykmeldingId", receivedSykmelding.sykmelding.id)
165+
166+
val loggingMeta =
167+
LoggingMeta(
168+
mottakId = receivedSykmelding.navLogId,
169+
orgNr = receivedSykmelding.legekontorOrgNr,
170+
msgId = receivedSykmelding.msgId,
171+
sykmeldingId = receivedSykmelding.sykmelding.id,
172+
)
173+
wrapExceptions(loggingMeta) {
174+
when (message.topic()) {
175+
environment.avvistSykmeldingTopicAiven ->
176+
avvistSykmeldingService.opprettVarselForAvvisteSykmeldinger(
177+
receivedSykmelding,
178+
loggingMeta,
179+
)
180+
else ->
181+
nySykmeldingService.opprettVarselForNySykmelding(
182+
receivedSykmelding,
183+
loggingMeta,
184+
)
185+
}
186+
}
187+
}
188+
173189
fun blockingApplicationLogicStatusendringAiven(
174190
applicationState: ApplicationState,
175191
statusendringService: StatusendringService,
@@ -180,21 +196,31 @@ fun blockingApplicationLogicStatusendringAiven(
180196
.poll(Duration.ofMillis(1000))
181197
.filter { it.value() != null }
182198
.filter { it.key().erUuid() }
183-
.forEach {
184-
val sykmeldingStatusKafkaMessageDTO: SykmeldingStatusKafkaMessageDTO = it.value()
185-
try {
186-
log.info(
187-
"Mottatt statusmelding fra aiven ${sykmeldingStatusKafkaMessageDTO.kafkaMetadata.sykmeldingId}"
188-
)
189-
statusendringService.handterStatusendring(sykmeldingStatusKafkaMessageDTO)
190-
} catch (e: Exception) {
191-
log.error(
192-
"Noe gikk galt ved behandling av statusendring fra aiven for sykmelding med id {}",
193-
sykmeldingStatusKafkaMessageDTO.kafkaMetadata.sykmeldingId
194-
)
195-
throw e
196-
}
197-
}
199+
.forEach { handleStatusEndring(it, statusendringService) }
200+
}
201+
}
202+
203+
@WithSpan
204+
private fun handleStatusEndring(
205+
it: ConsumerRecord<String, SykmeldingStatusKafkaMessageDTO>,
206+
statusendringService: StatusendringService
207+
) {
208+
val sykmeldingStatusKafkaMessageDTO: SykmeldingStatusKafkaMessageDTO = it.value()
209+
210+
val currentSpan = Span.current()
211+
currentSpan.setAttribute("sykmeldingId", sykmeldingStatusKafkaMessageDTO.event.sykmeldingId)
212+
213+
try {
214+
log.info(
215+
"Mottatt statusmelding fra aiven ${sykmeldingStatusKafkaMessageDTO.kafkaMetadata.sykmeldingId}",
216+
)
217+
statusendringService.handterStatusendring(sykmeldingStatusKafkaMessageDTO)
218+
} catch (e: Exception) {
219+
log.error(
220+
"Noe gikk galt ved behandling av statusendring fra aiven for sykmelding med id {}",
221+
sykmeldingStatusKafkaMessageDTO.kafkaMetadata.sykmeldingId,
222+
)
223+
throw e
198224
}
199225
}
200226

0 commit comments

Comments
 (0)