Skip to content

Commit 85664fe

Browse files
Setup dummy dialogmote-status-endring kafka consumer with spek (#126)
1 parent 7883507 commit 85664fe

22 files changed

+339
-62
lines changed

.nais/naiserator-dev.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,5 @@ spec:
106106
value: "true"
107107
- name: TOGGLE_KAFKA_DIALOGMOTEKANDIDAT_PROCESSING_ENABLED
108108
value: "false"
109+
- name: TOGGLE_KAFKA_DIALOGMOTE_STATUSENDRING_PROCESSING_ENABLED
110+
value: "false"

.nais/naiserator-prod.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,5 @@ spec:
106106
value: "true"
107107
- name: TOGGLE_KAFKA_DIALOGMOTEKANDIDAT_PROCESSING_ENABLED
108108
value: "false"
109+
- name: TOGGLE_KAFKA_DIALOGMOTE_STATUSENDRING_PROCESSING_ENABLED
110+
value: "false"

README.md

+37
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,43 @@ $ docker-compose up
1818

1919
* Run the `main()` function in `SyfooversiktApplication.kt`
2020

21+
## Download packages from Github Package Registry
22+
23+
Certain packages (isdialogmote-schema) must be downloaded from Github Package Registry, which requires
24+
authentication. The packages can be downloaded via build.gradle:
25+
26+
```
27+
val githubUser: String by project
28+
val githubPassword: String by project
29+
repositories {
30+
maven {
31+
url = uri("https://maven.pkg.github.com/navikt/isdialogmote-schema")
32+
credentials {
33+
username = githubUser
34+
password = githubPassword
35+
}
36+
}
37+
}
38+
```
39+
40+
`githubUser` and `githubPassword` are properties that are set in `~/.gradle/gradle.properties`:
41+
42+
```
43+
githubUser=x-access-token
44+
githubPassword=<token>
45+
```
46+
47+
Where `<token>` is a personal access token with scope `read:packages`(and SSO enabled).
48+
49+
The variables can alternatively be configured as environment variables or used in the command lines:
50+
51+
* `ORG_GRADLE_PROJECT_githubUser`
52+
* `ORG_GRADLE_PROJECT_githubPassword`
53+
54+
```
55+
./gradlew -PgithubUser=x-access-token -PgithubPassword=[token]
56+
```
57+
2158
### Connect to the db from terminal:
2259

2360
To connect and run queries directly against the db run:

build.gradle.kts

+14
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ group = "no.nav.syfo"
55
version = "1.0-SNAPSHOT"
66

77
object Versions {
8+
const val confluent = "7.0.1"
89
const val flyway = "8.5.9"
910
const val hikari = "5.0.1"
11+
const val isdialogmoteSchema = "1.0.5"
1012
const val jackson = "2.13.1"
1113
const val jedis = "4.2.2"
1214
const val kafka = "3.1.0"
@@ -30,10 +32,19 @@ plugins {
3032
id("org.jlleitschuh.gradle.ktlint") version "10.2.1"
3133
}
3234

35+
val githubUser: String by project
36+
val githubPassword: String by project
3337
repositories {
3438
mavenCentral()
3539
maven(url = "https://packages.confluent.io/maven/")
3640
maven(url = "https://jitpack.io")
41+
maven {
42+
url = uri("https://maven.pkg.github.com/navikt/isdialogmote-schema")
43+
credentials {
44+
username = githubUser
45+
password = githubPassword
46+
}
47+
}
3748
}
3849

3950
dependencies {
@@ -76,6 +87,9 @@ dependencies {
7687
exclude(group = "log4j")
7788
}
7889
implementation("org.apache.kafka:kafka_2.13:${Versions.kafka}", excludeLog4j)
90+
implementation("io.confluent:kafka-avro-serializer:${Versions.confluent}", excludeLog4j)
91+
implementation("io.confluent:kafka-schema-registry:${Versions.confluent}", excludeLog4j)
92+
implementation("no.nav.syfo.dialogmote.avro:isdialogmote-schema:${Versions.isdialogmoteSchema}")
7993
testImplementation("no.nav:kafka-embedded-env:${Versions.kafkaEmbedded}", excludeLog4j)
8094

8195
testImplementation("com.nimbusds:nimbus-jose-jwt:${Versions.nimbusjosejwt}")

src/main/kotlin/no/nav/syfo/App.kt

+2-16
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ import no.nav.syfo.application.api.authentication.getWellKnown
1212
import no.nav.syfo.application.database.database
1313
import no.nav.syfo.application.database.databaseModule
1414
import no.nav.syfo.cronjob.launchCronjobModule
15-
import no.nav.syfo.dialogmotekandidat.kafka.launchKafkaTaskDialogmotekandidatEndring
16-
import no.nav.syfo.oppfolgingstilfelle.kafka.launchKafkaTaskOppfolgingstilfellePerson
17-
import no.nav.syfo.personstatus.kafka.launchOversiktHendelseKafkaTask
15+
import no.nav.syfo.kafka.launchKafkaModule
1816
import org.slf4j.LoggerFactory
1917
import java.util.concurrent.TimeUnit
2018

@@ -58,22 +56,10 @@ fun main() {
5856
applicationEngineEnvironment.monitor.subscribe(ApplicationStarted) { application ->
5957
applicationState.ready = true
6058
application.environment.log.info("Application is ready")
61-
launchOversiktHendelseKafkaTask(
59+
launchKafkaModule(
6260
applicationState = applicationState,
6361
environment = environment,
6462
)
65-
if (environment.kafkaOppfolgingstilfellePersonProcessingEnabled) {
66-
launchKafkaTaskOppfolgingstilfellePerson(
67-
applicationState = applicationState,
68-
kafkaEnvironment = environment.kafka,
69-
)
70-
}
71-
if (environment.kafkaDialogmotekandidatProcessingEnabled) {
72-
launchKafkaTaskDialogmotekandidatEndring(
73-
applicationState = applicationState,
74-
kafkaEnvironment = environment.kafka,
75-
)
76-
}
7763
launchCronjobModule(
7864
applicationState = applicationState,
7965
database = database,

src/main/kotlin/no/nav/syfo/application/ApplicationEnvironment.kt

+4
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,13 @@ data class Environment(
3838
aivenKeystoreLocation = getEnvVar("KAFKA_KEYSTORE_PATH"),
3939
aivenSecurityProtocol = "SSL",
4040
aivenTruststoreLocation = getEnvVar("KAFKA_TRUSTSTORE_PATH"),
41+
aivenSchemaRegistryUrl = getEnvVar("KAFKA_SCHEMA_REGISTRY"),
42+
aivenRegistryUser = getEnvVar("KAFKA_SCHEMA_REGISTRY_USER"),
43+
aivenRegistryPassword = getEnvVar("KAFKA_SCHEMA_REGISTRY_PASSWORD"),
4144
),
4245
val kafkaOppfolgingstilfellePersonProcessingEnabled: Boolean = getEnvVar("TOGGLE_KAFKA_OPPFOLGINGSTILFELLE_PERSON_PROCESSING_ENABLED").toBoolean(),
4346
val kafkaDialogmotekandidatProcessingEnabled: Boolean = getEnvVar("TOGGLE_KAFKA_DIALOGMOTEKANDIDAT_PROCESSING_ENABLED").toBoolean(),
47+
val kafkaDialogmoteStatusendringProcessingEnabled: Boolean = getEnvVar("TOGGLE_KAFKA_DIALOGMOTE_STATUSENDRING_PROCESSING_ENABLED").toBoolean(),
4448

4549
val clients: ClientsEnvironment = ClientsEnvironment(
4650
isproxy = ClientEnvironment(

src/main/kotlin/no/nav/syfo/application/kafka/KafkaEnvironment.kt

+3
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,7 @@ data class KafkaEnvironment(
66
val aivenKeystoreLocation: String,
77
val aivenSecurityProtocol: String,
88
val aivenTruststoreLocation: String,
9+
val aivenSchemaRegistryUrl: String,
10+
val aivenRegistryUser: String,
11+
val aivenRegistryPassword: String,
912
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package no.nav.syfo.dialogmotestatusendring.domain
2+
3+
enum class DialogmoteStatusendringType {
4+
INNKALT,
5+
AVLYST,
6+
FERDIGSTILT,
7+
NYTT_TID_STED,
8+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package no.nav.syfo.dialogmotestatusendring.kafka
2+
3+
import io.confluent.kafka.serializers.KafkaAvroDeserializer
4+
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
5+
import no.nav.syfo.application.kafka.KafkaEnvironment
6+
import no.nav.syfo.application.kafka.kafkaAivenConsumerConfig
7+
import org.apache.kafka.clients.consumer.ConsumerConfig
8+
import java.util.*
9+
10+
fun kafkaDialogmoteStatusendringConsumerConfig(
11+
kafkaEnvironment: KafkaEnvironment,
12+
): Properties {
13+
return Properties().apply {
14+
putAll(kafkaAivenConsumerConfig(kafkaEnvironment))
15+
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java.canonicalName
16+
17+
this[KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = kafkaEnvironment.aivenSchemaRegistryUrl
18+
this[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
19+
this[KafkaAvroDeserializerConfig.USER_INFO_CONFIG] = "${kafkaEnvironment.aivenRegistryUser}:${kafkaEnvironment.aivenRegistryPassword}"
20+
this[KafkaAvroDeserializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE] = "USER_INFO"
21+
}
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package no.nav.syfo.dialogmotestatusendring.kafka
2+
3+
import no.nav.syfo.application.ApplicationState
4+
import no.nav.syfo.application.kafka.KafkaEnvironment
5+
import no.nav.syfo.kafka.launchKafkaTask
6+
7+
const val DIALOGMOTE_STATUSENDRING_TOPIC = "teamsykefravr.isdialogmote-dialogmote-statusendring"
8+
9+
fun launchKafkaTaskDialogmoteStatusendring(
10+
applicationState: ApplicationState,
11+
kafkaEnvironment: KafkaEnvironment,
12+
) {
13+
val kafkaDialogmoteStatusendringService = KafkaDialogmoteStatusendringService()
14+
val consumerProperties = kafkaDialogmoteStatusendringConsumerConfig(
15+
kafkaEnvironment = kafkaEnvironment,
16+
)
17+
18+
launchKafkaTask(
19+
applicationState = applicationState,
20+
topic = DIALOGMOTE_STATUSENDRING_TOPIC,
21+
consumerProperties = consumerProperties,
22+
kafkaConsumerService = kafkaDialogmoteStatusendringService
23+
)
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package no.nav.syfo.dialogmotestatusendring.kafka
2+
3+
import no.nav.syfo.dialogmote.avro.KDialogmoteStatusEndring
4+
import no.nav.syfo.kafka.KafkaConsumerService
5+
import org.apache.kafka.clients.consumer.ConsumerRecords
6+
import org.apache.kafka.clients.consumer.KafkaConsumer
7+
import org.slf4j.LoggerFactory
8+
import java.time.Duration
9+
10+
class KafkaDialogmoteStatusendringService() : KafkaConsumerService<KDialogmoteStatusEndring> {
11+
override val pollDurationInMillis: Long = 1000
12+
13+
override fun pollAndProcessRecords(kafkaConsumer: KafkaConsumer<String, KDialogmoteStatusEndring>) {
14+
val records = kafkaConsumer.poll(Duration.ofMillis(pollDurationInMillis))
15+
if (records.count() > 0) {
16+
processRecords(records)
17+
kafkaConsumer.commitSync()
18+
}
19+
}
20+
21+
private fun processRecords(records: ConsumerRecords<String, KDialogmoteStatusEndring>) {
22+
records.forEach { consumerRecord ->
23+
log.info("Received ${KDialogmoteStatusEndring::class.java.simpleName} record with key: ${consumerRecord.key()}")
24+
}
25+
}
26+
27+
companion object {
28+
private val log = LoggerFactory.getLogger(KafkaDialogmoteStatusendringService::class.java)
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package no.nav.syfo.kafka
2+
3+
import no.nav.syfo.application.ApplicationState
4+
import no.nav.syfo.application.Environment
5+
import no.nav.syfo.dialogmotekandidat.kafka.launchKafkaTaskDialogmotekandidatEndring
6+
import no.nav.syfo.dialogmotestatusendring.kafka.launchKafkaTaskDialogmoteStatusendring
7+
import no.nav.syfo.oppfolgingstilfelle.kafka.launchKafkaTaskOppfolgingstilfellePerson
8+
import no.nav.syfo.personstatus.kafka.launchOversiktHendelseKafkaTask
9+
10+
fun launchKafkaModule(
11+
applicationState: ApplicationState,
12+
environment: Environment,
13+
) {
14+
launchOversiktHendelseKafkaTask(
15+
applicationState = applicationState,
16+
environment = environment,
17+
)
18+
if (environment.kafkaOppfolgingstilfellePersonProcessingEnabled) {
19+
launchKafkaTaskOppfolgingstilfellePerson(
20+
applicationState = applicationState,
21+
kafkaEnvironment = environment.kafka,
22+
)
23+
}
24+
if (environment.kafkaDialogmotekandidatProcessingEnabled) {
25+
launchKafkaTaskDialogmotekandidatEndring(
26+
applicationState = applicationState,
27+
kafkaEnvironment = environment.kafka,
28+
)
29+
}
30+
if (environment.kafkaDialogmoteStatusendringProcessingEnabled) {
31+
launchKafkaTaskDialogmoteStatusendring(
32+
applicationState = applicationState,
33+
kafkaEnvironment = environment.kafka,
34+
)
35+
}
36+
}

src/test/kotlin/no/nav/syfo/cronjob/behandlendeenhet/PersonBehandlendeEnhetCronjobSpek.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,18 @@ object PersonBehandlendeEnhetCronjobSpek : Spek({
3838

3939
val personBehandlendeEnhetCronjob = internalMockEnvironment.personBehandlendeEnhetCronjob
4040

41-
val oversiktHendelseService = internalMockEnvironment.kafkaOversiktHendelseService
42-
val kafkaOppfolgingstilfellePersonService = internalMockEnvironment.kafkaOppfolgingstilfellePersonService
41+
val oversiktHendelseService = TestKafkaModule.kafkaOversiktHendelseService
42+
val kafkaOppfolgingstilfellePersonService = TestKafkaModule.kafkaOppfolgingstilfellePersonService
4343

4444
val mockKafkaConsumerOppfolgingstilfellePerson =
45-
internalMockEnvironment.kafkaConsumerOppfolgingstilfellePerson
45+
TestKafkaModule.kafkaConsumerOppfolgingstilfellePerson
4646

4747
val oppfolgingstilfellePersonTopicPartition = oppfolgingstilfellePersonTopicPartition()
4848
val personIdentDefault = PersonIdent(ARBEIDSTAKER_FNR)
4949

50-
val kafkaDialogmotekandidatEndringService = internalMockEnvironment.kafkaDialogmotekandidatEndringService
50+
val kafkaDialogmotekandidatEndringService = TestKafkaModule.kafkaDialogmotekandidatEndringService
5151
val mockKafkaConsumerDialogmotekandidatEndring =
52-
internalMockEnvironment.kafkaConsumerDialogmotekandidatEndring
52+
TestKafkaModule.kafkaConsumerDialogmotekandidatEndring
5353
val dialogmoteKandidatTopicPartition = dialogmotekandidatEndringTopicPartition()
5454
val kafkaDialogmotekandidatEndringStoppunkt = generateKafkaDialogmotekandidatEndringStoppunkt(
5555
personIdent = personIdentDefault.value,

src/test/kotlin/no/nav/syfo/cronjob/virksomhetsnavn/PersonOppfolgingstilfelleVirksomhetsnavnCronjobSpek.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,18 @@ object PersonOppfolgingstilfelleVirksomhetsnavnCronjobSpek : Spek({
3838
val personOppfolgingstilfelleVirksomhetnavnCronjob =
3939
internalMockEnvironment.personOppfolgingstilfelleVirksomhetnavnCronjob
4040

41-
val oversiktHendelseService = internalMockEnvironment.kafkaOversiktHendelseService
42-
val kafkaOppfolgingstilfellePersonService = internalMockEnvironment.kafkaOppfolgingstilfellePersonService
41+
val oversiktHendelseService = TestKafkaModule.kafkaOversiktHendelseService
42+
val kafkaOppfolgingstilfellePersonService = TestKafkaModule.kafkaOppfolgingstilfellePersonService
4343

4444
val mockKafkaConsumerOppfolgingstilfellePerson =
45-
internalMockEnvironment.kafkaConsumerOppfolgingstilfellePerson
45+
TestKafkaModule.kafkaConsumerOppfolgingstilfellePerson
4646

4747
val oppfolgingstilfellePersonTopicPartition = oppfolgingstilfellePersonTopicPartition()
4848
val personIdentDefault = PersonIdent(ARBEIDSTAKER_FNR)
4949

50-
val kafkaDialogmotekandidatEndringService = internalMockEnvironment.kafkaDialogmotekandidatEndringService
50+
val kafkaDialogmotekandidatEndringService = TestKafkaModule.kafkaDialogmotekandidatEndringService
5151
val mockKafkaConsumerDialogmotekandidatEndring =
52-
internalMockEnvironment.kafkaConsumerDialogmotekandidatEndring
52+
TestKafkaModule.kafkaConsumerDialogmotekandidatEndring
5353
val dialogmoteKandidatTopicPartition = dialogmotekandidatEndringTopicPartition()
5454
val kafkaDialogmotekandidatEndringStoppunkt = generateKafkaDialogmotekandidatEndringStoppunkt(
5555
personIdent = personIdentDefault.value,

src/test/kotlin/no/nav/syfo/dialogmotekandidat/kafka/KafkaDialogmotekandidatEndringServiceSpek.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ class KafkaDialogmotekandidatEndringServiceSpek : Spek({
2222
val externalMockEnvironment = ExternalMockEnvironment.instance
2323
val database = externalMockEnvironment.database
2424

25-
val internalMockEnvironment = InternalMockEnvironment.instance
26-
val kafkaDialogmotekandidatEndringService = internalMockEnvironment.kafkaDialogmotekandidatEndringService
27-
val mockKafkaConsumerDialogmotekandidatEndring = internalMockEnvironment.kafkaConsumerDialogmotekandidatEndring
25+
val kafkaDialogmotekandidatEndringService = TestKafkaModule.kafkaDialogmotekandidatEndringService
26+
val mockKafkaConsumerDialogmotekandidatEndring = TestKafkaModule.kafkaConsumerDialogmotekandidatEndring
2827

2928
val dialogmoteKandidatTopicPartition = dialogmotekandidatEndringTopicPartition()
3029
val kafkaDialogmotekandidatEndringStoppunktYesterday = generateKafkaDialogmotekandidatEndringStoppunkt(

0 commit comments

Comments
 (0)