Skip to content

Commit b9f62d7

Browse files
authored
Konsumerer JOARK hendelser (#111)
* Legger til listener for joark journalføringshendelser. Signed-off-by: Ramin Esfandiari <[email protected]>
1 parent 0aa44ac commit b9f62d7

File tree

13 files changed

+187
-24
lines changed

13 files changed

+187
-24
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@ out/
3333
.vscode/
3434

3535
generated-pdf-*.pdf
36+
*.hprof

build.gradle.kts

+15-2
Original file line numberDiff line numberDiff line change
@@ -44,17 +44,28 @@ ext["mock-oauth2-server.version"] = "0.3.3"
4444
ext["nimbus.jose.jwt.version"] = "9.10"
4545

4646
repositories {
47-
mavenCentral()
4847
maven {
4948
name = "github-package-registry-navikt"
50-
url = uri("https://maven.pkg.github.com/navikt/maven-releas")
49+
url = uri("https://maven.pkg.github.com/navikt/legacy-avhengigheter")
50+
credentials {
51+
username = project.findProperty("gpr.user") as String? ?: System.getenv("GITHUB_USERNAME")
52+
password = project.findProperty("gpr.key") as String? ?: System.getenv("GITHUB_TOKEN")
53+
}
54+
}
55+
mavenCentral()
56+
57+
maven {
58+
name = "confluent"
59+
url = uri("https://packages.confluent.io/maven/")
5160
}
5261
}
5362

5463
dependencies {
5564

5665
// NAV
5766
implementation("no.nav.security:token-validation-spring:$tokenValidationVersion")
67+
//implementation("no.nav.dok:dok-journalfoering-hendelse-v1:0.0.3")
68+
implementation("no.nav.syfo.schemas:dok-journalfoering-hendelse-v1:67a9be4476b63b7247cfacfaf821ab656bd2a952")
5869
testImplementation("no.nav.security:token-validation-spring-test:$tokenValidationVersion")
5970
testImplementation("no.nav.security:mock-oauth2-server:0.3.3") // TODO: 09/06/2021 fjern når tokenValidationVersion oppdateres til 1.3.8
6071
implementation("com.nimbusds:nimbus-jose-jwt:9.10") // TODO: 09/06/2021 fjern når tokenValidationVersion oppdateres til 1.3.8
@@ -114,6 +125,8 @@ dependencies {
114125

115126
//Kafka
116127
implementation("org.springframework.kafka:spring-kafka")
128+
implementation("io.confluent:kafka-connect-avro-converter:$confluentVersion")
129+
implementation("io.confluent:kafka-avro-serializer:$confluentVersion")
117130
testImplementation("org.springframework.kafka:spring-kafka-test")
118131

119132
// PDF

nais/dev-gcp.json

+3-4
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99
"https://sif-innsyn-api.dev.nav.no"
1010
],
1111
"externalHosts": [
12-
"secretmanager.googleapis.com",
13-
"oauth2.googleapis.com",
14-
"www.googleapis.com",
1512
"login.microsoftonline.com",
16-
"k9-selvbetjening-oppslag.dev-fss-pub.nais.io"
13+
"k9-selvbetjening-oppslag.dev-fss-pub.nais.io",
14+
"kafka-schema-registry.nais-q.adeo.no"
1715
],
1816
"database": {
1917
"name": "sif-innsyn-db",
@@ -30,6 +28,7 @@
3028
"SPRING_PROFILES_ACTIVE": "dev-gcp",
3129
"KAFKA_ONPREM_SERVERS": "b27apvl00045.preprod.local:8443,b27apvl00046.preprod.local:8443,b27apvl00047.preprod.local:8443",
3230
"KAFKA_ONPREM_CONSUMER_AUTO_OFFSET_RESET": "latest",
31+
"KAFKA_ONPREM_CONSUMER_SCHEMA_REGISTRY_URL": "https://kafka-schema-registry.nais-q.adeo.no",
3332
"KAFKA_AIVEN_CONSUMER_AUTO_OFFSET_RESET": "latest",
3433
"NO_NAV_SECURITY_JWT_ISSUER_LOGINSERVICE_COOKIE_NAME": "selvbetjening-idtoken",
3534
"NO_NAV_SECURITY_CORS_ALLOWED_ORIGINS": "https://sif-innsyn.dev.nav.no",

nais/prod-gcp.json

+3-4
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99
"https://sif-innsyn-api.nav.no"
1010
],
1111
"externalHosts": [
12-
"secretmanager.googleapis.com",
13-
"oauth2.googleapis.com",
14-
"www.googleapis.com",
1512
"login.microsoftonline.com",
16-
"k9-selvbetjening-oppslag.prod-fss-pub.nais.io"
13+
"k9-selvbetjening-oppslag.prod-fss-pub.nais.io",
14+
"kafka-schema-registry.nais.adeo.no"
1715
],
1816
"database": {
1917
"name": "sif-innsyn-db",
@@ -30,6 +28,7 @@
3028
"SPRING_PROFILES_ACTIVE": "prod-gcp",
3129
"KAFKA_ONPREM_SERVERS": "a01apvl00145.adeo.no:8443,a01apvl00146.adeo.no:8443,a01apvl00147.adeo.no:8443,a01apvl00148.adeo.no:8443,a01apvl00149.adeo.no:8443,a01apvl00150.adeo.no:8443",
3230
"KAFKA_ONPREM_CONSUMER_AUTO_OFFSET_RESET": "none",
31+
"KAFKA_ONPREM_CONSUMER_SCHEMA_REGISTRY_URL": "https://kafka-schema-registry.nais.adeo.no",
3332
"NO_NAV_SECURITY_JWT_ISSUER_LOGINSERVICE_COOKIE_NAME": "selvbetjening-idtoken",
3433
"NO_NAV_SECURITY_CORS_ALLOWED_ORIGINS": "https://www.nav.no",
3534
"NO_NAV_GATEWAYS_K9_SELVBETJENING_OPPSLAG": "https://k9-selvbetjening-oppslag.prod-fss-pub.nais.io",

src/main/kotlin/no/nav/sifinnsynapi/config/kafka/CommonKafkaConfig.kt

+19-12
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,14 @@ class CommonKafkaConfig {
8484
}
8585

8686
fun kafkaTemplate(producerFactory: ProducerFactory<String, String>, kafkaConfigProps: KafkaConfigProperties) =
87-
KafkaTemplate(producerFactory).apply {
88-
setTransactionIdPrefix(kafkaConfigProps.producer.transactionIdPrefix)
89-
}
87+
KafkaTemplate(producerFactory).apply {
88+
setTransactionIdPrefix(kafkaConfigProps.producer.transactionIdPrefix)
89+
}
9090

91-
fun kafkaTransactionManager(producerFactory: ProducerFactory<String, String>, kafkaConfigProps: KafkaConfigProperties) =
91+
fun kafkaTransactionManager(
92+
producerFactory: ProducerFactory<String, String>,
93+
kafkaConfigProps: KafkaConfigProperties
94+
) =
9295
KafkaTransactionManager(producerFactory).apply {
9396
setTransactionIdPrefix(kafkaConfigProps.producer.transactionIdPrefix)
9497
}
@@ -126,6 +129,7 @@ class CommonKafkaConfig {
126129
val correlationId = topicEntry.metadata.correlationId
127130
MDCUtil.toMDC(Constants.CORRELATION_ID, correlationId)
128131
MDCUtil.toMDC(Constants.NAV_CONSUMER_ID, clientId)
132+
MDCUtil.toMDC(Constants.JOURNALPOST_ID, topicEntry.journalførtMelding.journalpostId)
129133

130134
val søker = JSONObject(topicEntry.melding).getJSONObject("søker")
131135
when (søknadRepository.existsSøknadDAOByAktørIdAndJournalpostId(
@@ -159,17 +163,20 @@ class CommonKafkaConfig {
159163
factory.containerProperties.authorizationExceptionRetryInterval = Duration.ofSeconds(10L)
160164

161165
//https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#after-rollback
162-
val defaultAfterRollbackProcessor =
163-
DefaultAfterRollbackProcessor<String, String>(
164-
recoverer(logger),
165-
FixedBackOff(retryInterval, Long.MAX_VALUE)
166-
)
167-
defaultAfterRollbackProcessor.setClassifications(mapOf(), true)
168-
factory.setAfterRollbackProcessor(defaultAfterRollbackProcessor)
166+
167+
factory.setAfterRollbackProcessor(defaultAfterRollbackProsessor(logger, retryInterval))
169168
return factory
170169
}
171170

172-
private fun recoverer(logger: Logger) = BiConsumer { cr: ConsumerRecord<*, *>, ex: Exception ->
171+
private fun defaultAfterRollbackProsessor(logger: Logger, retryInterval: Long) =
172+
DefaultAfterRollbackProcessor<String, String>(
173+
defaultRecoverer(logger), FixedBackOff(retryInterval, Long.MAX_VALUE)
174+
).apply {
175+
setClassifications(mapOf(), true)
176+
}
177+
178+
179+
fun defaultRecoverer(logger: Logger) = BiConsumer { cr: ConsumerRecord<*, *>, ex: Exception ->
173180
logger.error("Retry attempts exhausted for ${cr.topic()}-${cr.partition()}@${cr.offset()}", ex)
174181
}
175182
}

src/main/kotlin/no/nav/sifinnsynapi/config/kafka/CustomKafkaProperties.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ data class KafkaConsumerProperties(
2727
val isolationLevel: String,
2828
val retryInterval: Long,
2929
val keyDeserializer: String,
30-
val valueDeserializer: String
30+
val valueDeserializer: String,
31+
val schemaRegistryUrl: String
3132
)
3233

3334
data class KafkaProducerProperties(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package no.nav.sifinnsynapi.config.kafka
2+
3+
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
4+
import no.nav.joarkjournalfoeringhendelser.JournalfoeringHendelseRecord
5+
import no.nav.sifinnsynapi.config.kafka.CommonKafkaConfig.Companion.defaultRecoverer
6+
import no.nav.sifinnsynapi.util.Constants
7+
import no.nav.sifinnsynapi.util.MDCUtil
8+
import org.apache.kafka.clients.consumer.ConsumerConfig
9+
import org.slf4j.Logger
10+
import org.slf4j.LoggerFactory
11+
import org.springframework.context.annotation.Bean
12+
import org.springframework.context.annotation.Configuration
13+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
14+
import org.springframework.kafka.core.ConsumerFactory
15+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
16+
import org.springframework.kafka.listener.ContainerProperties
17+
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor
18+
import org.springframework.util.backoff.FixedBackOff
19+
import java.time.Duration
20+
21+
@Configuration
22+
internal class JoarkKafkaConfig(
23+
private val kafkaClusterProperties: KafkaClusterProperties
24+
) {
25+
companion object {
26+
private val logger = LoggerFactory.getLogger(JoarkKafkaConfig::class.java)
27+
28+
const val TEMA_NYTT_OMS = "OMS"
29+
const val MOTTAKS_KANAL_NAV_NO = "NAV_NO"
30+
const val ENDELIG_JOURNALFØRT = "EndeligJournalført"
31+
}
32+
33+
@Bean
34+
fun joarkConsumerFactory(): DefaultKafkaConsumerFactory<Long, JournalfoeringHendelseRecord> {
35+
val consumerProps = kafkaClusterProperties.onprem.consumer
36+
return DefaultKafkaConsumerFactory(
37+
mutableMapOf<String, Any>(
38+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to consumerProps.enableAutoCommit,
39+
ConsumerConfig.GROUP_ID_CONFIG to consumerProps.groupId,
40+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to consumerProps.autoOffsetReset,
41+
ConsumerConfig.ISOLATION_LEVEL_CONFIG to consumerProps.isolationLevel,
42+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to consumerProps.keyDeserializer,
43+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to "io.confluent.kafka.serializers.KafkaAvroDeserializer",
44+
KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG to consumerProps.schemaRegistryUrl,
45+
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG to "true"
46+
) + CommonKafkaConfig.commonConfig(kafkaClusterProperties.onprem)
47+
)
48+
}
49+
50+
@Bean
51+
fun joarkKafkaJsonListenerContainerFactor(joarkConsumerFactory: ConsumerFactory<Long, JournalfoeringHendelseRecord>) =
52+
ConcurrentKafkaListenerContainerFactory<Long, JournalfoeringHendelseRecord>().apply {
53+
this.consumerFactory = joarkConsumerFactory
54+
55+
// https://docs.spring.io/spring-kafka/reference/html/#listener-container
56+
containerProperties.authorizationExceptionRetryInterval = Duration.ofSeconds(10L)
57+
58+
// https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#delivery-header
59+
containerProperties.isDeliveryAttemptHeader = true
60+
61+
// https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#committing-offsets
62+
containerProperties.ackMode = ContainerProperties.AckMode.RECORD;
63+
64+
// https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#exactly-once
65+
containerProperties.eosMode = ContainerProperties.EOSMode.BETA
66+
67+
setRecordFilterStrategy {
68+
val hendelse = it.value()
69+
when {
70+
hendelse.temaNytt == TEMA_NYTT_OMS && hendelse.mottaksKanal == MOTTAKS_KANAL_NAV_NO && hendelse.hendelsesType == ENDELIG_JOURNALFØRT -> {
71+
MDCUtil.toMDC(Constants.JOURNALPOST_ID, hendelse.journalpostId)
72+
false
73+
}
74+
else -> true
75+
}
76+
}
77+
78+
setAfterRollbackProcessor(
79+
defaultAfterRollbackProsessor(
80+
logger,
81+
kafkaClusterProperties.onprem.consumer.retryInterval
82+
)
83+
)
84+
}
85+
86+
private fun defaultAfterRollbackProsessor(logger: Logger, retryInterval: Long) =
87+
DefaultAfterRollbackProcessor<Long, JournalfoeringHendelseRecord>(
88+
defaultRecoverer(logger), FixedBackOff(retryInterval, Long.MAX_VALUE)
89+
).apply {
90+
setClassifications(mapOf(), true)
91+
}
92+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package no.nav.sifinnsynapi.konsument.dokumentjournalforing
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper
4+
import no.nav.joarkjournalfoeringhendelser.JournalfoeringHendelseRecord
5+
import org.apache.kafka.clients.consumer.ConsumerRecord
6+
import org.slf4j.LoggerFactory
7+
import org.springframework.kafka.annotation.KafkaListener
8+
import org.springframework.messaging.handler.annotation.Payload
9+
import org.springframework.stereotype.Service
10+
11+
@Service
12+
class JoarkHendelseKonsument {
13+
14+
companion object {
15+
private val logger = LoggerFactory.getLogger(JoarkHendelseKonsument::class.java)
16+
}
17+
18+
@KafkaListener(
19+
topics = ["#{'\${topic.listener.dok-journalfoering-v1.navn}'}"],
20+
id = "#{'\${topic.listener.dok-journalfoering-v1.id}'}",
21+
groupId = "#{'\${kafka.onprem.consumer.group-id}'}",
22+
containerFactory = "joarkKafkaJsonListenerContainerFactor",
23+
autoStartup = "#{'\${topic.listener.dok-journalfoering-v1.bryter}'}"
24+
)
25+
fun konsumer(
26+
@Payload cr: ConsumerRecord<Long, JournalfoeringHendelseRecord>
27+
) {
28+
logger.info("Mottatt journalføringshendelse med status: {}", cr.value().hendelsesType)
29+
}
30+
}

src/main/kotlin/no/nav/sifinnsynapi/util/Constants.kt

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package no.nav.sifinnsynapi.util
33
object Constants {
44
const val NAV_CONSUMER_ID = "Nav-Consumer-Id"
55
const val CORRELATION_ID = "correlation_id"
6+
const val JOURNALPOST_ID = "journalpost_id"
67
const val CALL_ID = "callId"
78
const val X_CORRELATION_ID = "X-Correlation-ID"
89
}

src/main/resources/application-dev-gcp.yml

+9
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,12 @@ no.nav.security.jwt.issuer:
3939
loginservice:
4040
discoveryUrl: ${LOGINSERVICE_IDPORTEN_DISCOVERY_URL} # settes av configmap: loginservice-idporten i naiserator.yml
4141
accepted_audience: ${LOGINSERVICE_IDPORTEN_AUDIENCE} # settes av configmap: loginservice-idporten i naiserator.yml
42+
43+
topic:
44+
listener:
45+
# topic.listener.dok-journalfoering-v1
46+
dok-journalfoering-v1:
47+
id: dok-journalfoering-v1-listener
48+
navn: aapen-dok-journalfoering-v1-q1
49+
bryter: true
50+
dry-run: false

src/main/resources/application.yml

+9
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ topic:
115115
bryter: true
116116
dry-run: false
117117

118+
# topic.listener.dok-journalfoering-v1
119+
dok-journalfoering-v1:
120+
id: dok-journalfoering-v1-listener
121+
navn: aapen-dok-journalfoering-v1-p
122+
bryter: true
123+
dry-run: false
124+
118125
management:
119126
endpoints:
120127
web:
@@ -164,6 +171,7 @@ kafka:
164171
retry-interval: 60_000 # Egendefinert property
165172
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
166173
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
174+
schemaRegistryUrl: # overstyres fra nais/<cluster>.json
167175

168176
producer:
169177
client-id: ${HOSTNAME}
@@ -182,6 +190,7 @@ kafka:
182190
retry-interval: 60_000 # Egendefinert property
183191
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
184192
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
193+
schemaRegistryUrl: ${KAFKA_SCHEMA_REGISTRY}
185194

186195
producer:
187196
client-id: ${HOSTNAME}

src/main/resources/logback-spring.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
class="ch.qos.logback.core.ConsoleAppender">
1010
<layout class="ch.qos.logback.classic.PatternLayout">
1111
<Pattern>
12-
%d{yyyy-MM-dd HH:mm:ss} %X{correlation_id} [%thread] %-5level %logger{70} - %msg%n
12+
%d{yyyy-MM-dd HH:mm:ss} %X{correlation_id} %X{journalpost_id} [%thread] %-5level %logger{70} - %msg%n
1313
</Pattern>
1414
</layout>
1515
</appender>

src/test/resources/application-test.yml

+2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ kafka:
5151
retry-interval: 5_000 # Egendefinert property
5252
group-id: sif-innsyn-api-onprem
5353
auto-offset-reset: earliest
54+
schema-registry-url: mock://localhost
5455
producer:
5556
client-id: sif-innsyn-api-onprem
5657

@@ -60,6 +61,7 @@ kafka:
6061
retry-interval: 5_000 # Egendefinert property
6162
group-id: sif-innsyn-api-aiven
6263
auto-offset-reset: earliest
64+
schema-registry-url: mock://localhost
6365
producer:
6466
client-id: sif-innsyn-api-aiven
6567

0 commit comments

Comments
 (0)