Skip to content

Commit 274530e

Browse files
committed
Lagt til mer metrikker og tester
1 parent 3895254 commit 274530e

File tree

6 files changed

+254
-32
lines changed

6 files changed

+254
-32
lines changed

apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/topology/OpplysningerTopology.kt

+34-17
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import no.nav.paw.config.kafka.streams.genericProcess
1212
import org.apache.kafka.common.serialization.Serdes
1313
import org.apache.kafka.streams.StreamsBuilder
1414
import org.apache.kafka.streams.Topology
15-
import org.apache.kafka.streams.kstream.Consumed
1615
import org.apache.kafka.streams.processor.PunctuationType
1716
import org.apache.kafka.streams.state.Stores
1817
import org.apache.kafka.streams.state.TimestampedKeyValueStore
@@ -37,22 +36,19 @@ private fun StreamsBuilder.addOpplysningerStateStore() {
3736
this.addStateStore(
3837
Stores.timestampedKeyValueStoreBuilder(
3938
Stores.persistentKeyValueStore(kafkaStreamsProperties.opplysningerStore),
40-
Serdes.Long(),
39+
Serdes.String(),
4140
buildOpplysningerOmArbeidssoekerAvroSerde()
4241
)
4342
)
4443
}
4544

4645
context(ApplicationContext)
47-
private fun StreamsBuilder.addOpplysningerKStream(meterRegistry: MeterRegistry) {
46+
fun StreamsBuilder.addOpplysningerKStream(meterRegistry: MeterRegistry) {
4847
logger.info("Oppretter KStream for opplysninger om arbeidssøker")
4948
val kafkaStreamsProperties = properties.kafkaStreams
5049

5150
this
52-
.stream(
53-
kafkaStreamsProperties.opplysningerTopic,
54-
Consumed.with(Serdes.Long(), buildOpplysningerOmArbeidssoekerAvroSerde())
55-
)
51+
.stream<Long, OpplysningerOmArbeidssoeker>(kafkaStreamsProperties.opplysningerTopic)
5652
.peek { key, _ ->
5753
logger.debug("Mottok event på {} med key {}", kafkaStreamsProperties.opplysningerTopic, key)
5854
meterRegistry.tellMottatteOpplysninger()
@@ -61,10 +57,20 @@ private fun StreamsBuilder.addOpplysningerKStream(meterRegistry: MeterRegistry)
6157
stateStoreNames = arrayOf(kafkaStreamsProperties.opplysningerStore),
6258
punctuation = buildPunctuation(meterRegistry)
6359
) { record ->
64-
val stateStore: TimestampedKeyValueStore<Long, OpplysningerOmArbeidssoeker> =
60+
val opplysninger = record.value()
61+
val stateStore: TimestampedKeyValueStore<String, OpplysningerOmArbeidssoeker> =
6562
getStateStore(kafkaStreamsProperties.opplysningerStore)
66-
logger.debug("Lagrer opplysninger for periode {}", record.value().periodeId)
67-
stateStore.put(record.key(), ValueAndTimestamp.make(record.value(), Instant.now().toEpochMilli()))
63+
val lagretTidspunkt = Instant.now()
64+
logger.info(
65+
"Lagrer opplysninger {} for periode {} med tidspunkt {}",
66+
opplysninger.id,
67+
opplysninger.periodeId,
68+
lagretTidspunkt
69+
)
70+
stateStore.put(
71+
opplysninger.id.toString(),
72+
ValueAndTimestamp.make(opplysninger, lagretTidspunkt.toEpochMilli())
73+
)
6874
}
6975
}
7076

@@ -82,26 +88,37 @@ private fun buildPunctuation(meterRegistry: MeterRegistry): Punctuation<Long, Op
8288
val antallTotalt = AtomicLong(0)
8389
val histogram = mutableMapOf<UUID, AtomicLong>()
8490

85-
val stateStore: TimestampedKeyValueStore<Long, OpplysningerOmArbeidssoeker> =
91+
val stateStore: TimestampedKeyValueStore<String, OpplysningerOmArbeidssoeker> =
8692
getStateStore(kafkaStreamsProperties.opplysningerStore)
8793
for (keyValue in stateStore.all()) {
8894
antallTotalt.incrementAndGet()
95+
96+
val opplysninger = keyValue.value.value()
8997
val lagretTidspunkt = Instant.ofEpochMilli(keyValue.value.timestamp())
90-
val utloepTidspunkt = Instant.now().minus(kafkaStreamsProperties.opplysningerLagretTidsperiode)
98+
val utloepTidspunkt = timestamp.minus(kafkaStreamsProperties.opplysningerLagretTidsperiode)
9199
if (utloepTidspunkt.isAfter(lagretTidspunkt)
92100
) {
93-
logger.debug(
94-
"Sletter opplysninger for periode {} fordi de har vært lagret mer enn {}m (utløp {} > lagret {})",
95-
keyValue.value.value().periodeId,
101+
logger.info(
102+
"Sletter opplysninger {} for periode {} fordi de har vært lagret mer enn {}m (utløp {} > lagret {})",
103+
opplysninger.id,
104+
opplysninger.periodeId,
96105
kafkaStreamsProperties.opplysningerLagretTidsperiode.toMinutes(),
97106
utloepTidspunkt,
98107
lagretTidspunkt
99108
)
100-
stateStore.delete(keyValue.key)
109+
stateStore.delete(opplysninger.id.toString())
101110
continue
111+
} else {
112+
logger.debug(
113+
"Opplysninger {} for periode {} har vært lagret mindre enn {}m (utløp {} < lagret {})",
114+
opplysninger.id,
115+
opplysninger.periodeId,
116+
kafkaStreamsProperties.opplysningerLagretTidsperiode.toMinutes(),
117+
utloepTidspunkt,
118+
lagretTidspunkt
119+
)
102120
}
103121

104-
val opplysninger = keyValue.value.value()
105122
val antall = histogram[opplysninger.periodeId]
106123
if (antall != null) {
107124
antall.incrementAndGet()

apps/opplysninger-aggregering/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfigTest.kt

-15
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package no.nav.paw.arbeidssoekerregisteret.properties
2+
3+
import io.kotest.core.spec.style.FreeSpec
4+
import io.kotest.matchers.shouldNotBe
5+
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
6+
7+
class ApplicationPropertiesTest : FreeSpec({
8+
"Skal laste config" {
9+
val properties = loadNaisOrLocalConfiguration<ApplicationProperties>(APPLICATION_CONFIG_FILE_NAME)
10+
properties.kafka shouldNotBe null
11+
properties.kafkaStreams shouldNotBe null
12+
}
13+
})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package no.nav.paw.arbeidssoekerregisteret.topology
2+
3+
import io.confluent.kafka.schemaregistry.testutil.MockSchemaRegistry
4+
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig
5+
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
6+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
7+
import no.nav.paw.arbeidssoekerregisteret.properties.ApplicationProperties
8+
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
9+
import org.apache.avro.specific.SpecificRecord
10+
import org.apache.kafka.common.serialization.Serdes
11+
import org.apache.kafka.streams.StreamsConfig
12+
import org.apache.kafka.streams.state.KeyValueStore
13+
import org.slf4j.Logger
14+
import org.slf4j.LoggerFactory
15+
import java.util.*
16+
17+
open class CommonTestContext {
18+
19+
val logger: Logger = LoggerFactory.getLogger("no.nav.paw.test")
20+
val applicationProperties =
21+
loadNaisOrLocalConfiguration<ApplicationProperties>("test_application_configuration.toml")
22+
val meterRegistry = SimpleMeterRegistry()
23+
24+
val kafkaStreamProperties = Properties().apply {
25+
this[StreamsConfig.APPLICATION_ID_CONFIG] = applicationProperties.kafka.applicationIdPrefix
26+
this[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = applicationProperties.kafka.brokers
27+
this[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.Long().javaClass
28+
this[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = MockSchemaRegistryAvroSerde<SpecificRecord>().javaClass
29+
this[KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS] = "true"
30+
this[KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = applicationProperties.kafka.schemaRegistry?.url
31+
}
32+
33+
class MockSchemaRegistryAvroSerde<T : SpecificRecord> :
34+
SpecificAvroSerde<T>(MockSchemaRegistry.getClientForScope("test")) {
35+
init {
36+
this.configure(
37+
mapOf(
38+
KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS to "true",
39+
KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG to "mock://dummy:1234"
40+
), false
41+
)
42+
}
43+
}
44+
45+
fun <K, V> KeyValueStore<K, V>.size(): Int {
46+
var count = 0
47+
for (keyValue in all()) {
48+
count++
49+
}
50+
return count
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package no.nav.paw.arbeidssoekerregisteret.topology
2+
3+
import io.kotest.core.spec.style.FreeSpec
4+
import io.kotest.matchers.shouldBe
5+
import io.kotest.matchers.shouldNotBe
6+
import io.kotest.matchers.types.shouldBeInstanceOf
7+
import no.nav.paw.arbeidssoekerregisteret.context.ApplicationContext
8+
import no.nav.paw.arbeidssokerregisteret.api.v1.AvviksType
9+
import no.nav.paw.arbeidssokerregisteret.api.v1.Beskrivelse
10+
import no.nav.paw.arbeidssokerregisteret.api.v1.BeskrivelseMedDetaljer
11+
import no.nav.paw.arbeidssokerregisteret.api.v1.Bruker
12+
import no.nav.paw.arbeidssokerregisteret.api.v1.BrukerType
13+
import no.nav.paw.arbeidssokerregisteret.api.v1.Helse
14+
import no.nav.paw.arbeidssokerregisteret.api.v1.JaNeiVetIkke
15+
import no.nav.paw.arbeidssokerregisteret.api.v1.Jobbsituasjon
16+
import no.nav.paw.arbeidssokerregisteret.api.v1.Metadata
17+
import no.nav.paw.arbeidssokerregisteret.api.v1.TidspunktFraKilde
18+
import no.nav.paw.arbeidssokerregisteret.api.v2.Annet
19+
import no.nav.paw.arbeidssokerregisteret.api.v4.OpplysningerOmArbeidssoeker
20+
import no.nav.paw.arbeidssokerregisteret.api.v4.Utdanning
21+
import org.apache.kafka.common.serialization.Serde
22+
import org.apache.kafka.common.serialization.Serdes
23+
import org.apache.kafka.streams.StreamsBuilder
24+
import org.apache.kafka.streams.TestInputTopic
25+
import org.apache.kafka.streams.TopologyTestDriver
26+
import org.apache.kafka.streams.state.KeyValueStore
27+
import org.apache.kafka.streams.state.Stores
28+
import org.apache.kafka.streams.state.ValueAndTimestamp
29+
import java.time.Duration
30+
import java.time.Instant
31+
import java.util.*
32+
33+
class OpplysningerKStreamTest : FreeSpec({
34+
35+
with(TestContext()) {
36+
"Testsuite for aggregering av opplysinger om arbeidssøker" - {
37+
38+
"Skal lagre opplysinger ved mottak" {
39+
val id = UUID.randomUUID()
40+
val periodeId = UUID.randomUUID()
41+
val ident = "01017012345"
42+
val key = 1L
43+
val opplysninger = buildOpplysninger(id, periodeId, ident)
44+
opplysingerTopic.pipeInput(key, opplysninger)
45+
46+
stateStore.size() shouldBe 1
47+
val valueAndTimestamp =
48+
stateStore.get(id.toString()).shouldBeInstanceOf<ValueAndTimestamp<OpplysningerOmArbeidssoeker>>()
49+
valueAndTimestamp shouldNotBe null
50+
valueAndTimestamp.value() shouldNotBe null
51+
valueAndTimestamp.value().id shouldBe opplysninger.id
52+
valueAndTimestamp.value().periodeId shouldBe opplysninger.periodeId
53+
valueAndTimestamp.value().sendtInnAv.utfoertAv.id shouldBe opplysninger.sendtInnAv.utfoertAv.id
54+
}
55+
56+
"Skal slette opplysninger etter 60 minutter" {
57+
testDriver.advanceWallClockTime(Duration.ofMinutes(65)) // 60 min++
58+
59+
stateStore.size() shouldBe 0
60+
}
61+
}
62+
}
63+
64+
}) {
65+
private class TestContext : CommonTestContext() {
66+
67+
val sourceKeySerde: Serde<Long> = Serdes.Long()
68+
val sourceValueSerde: Serde<OpplysningerOmArbeidssoeker> = MockSchemaRegistryAvroSerde()
69+
val storeKeySerde: Serde<String> = Serdes.String()
70+
71+
val testDriver = with(ApplicationContext(logger, applicationProperties)) {
72+
StreamsBuilder().apply {
73+
addStateStore(
74+
Stores.timestampedKeyValueStoreBuilder(
75+
Stores.inMemoryKeyValueStore(properties.kafkaStreams.opplysningerStore),
76+
storeKeySerde,
77+
sourceValueSerde
78+
)
79+
)
80+
addOpplysningerKStream(meterRegistry)
81+
}.build()
82+
}.let { TopologyTestDriver(it, kafkaStreamProperties) }
83+
84+
val stateStore: KeyValueStore<String, ValueAndTimestamp<OpplysningerOmArbeidssoeker>> = testDriver
85+
.getTimestampedKeyValueStore(applicationProperties.kafkaStreams.opplysningerStore)
86+
87+
val opplysingerTopic: TestInputTopic<Long, OpplysningerOmArbeidssoeker> = testDriver.createInputTopic(
88+
applicationProperties.kafkaStreams.opplysningerTopic,
89+
sourceKeySerde.serializer(),
90+
sourceValueSerde.serializer()
91+
)
92+
93+
fun buildOpplysninger(id: UUID, periodeId: UUID, ident: String): OpplysningerOmArbeidssoeker {
94+
return OpplysningerOmArbeidssoeker(
95+
id,
96+
periodeId,
97+
buildMetadata(ident),
98+
buildUtdanning(),
99+
buildHelse(),
100+
buildJobbsituasjon(),
101+
buildAnnet()
102+
)
103+
}
104+
105+
fun buildMetadata(ident: String): Metadata {
106+
return Metadata(
107+
Instant.now(),
108+
buildBruker(ident),
109+
"test",
110+
"test",
111+
TidspunktFraKilde(Instant.now(), AvviksType.UKJENT_VERDI)
112+
)
113+
}
114+
115+
fun buildBruker(ident: String): Bruker {
116+
return Bruker(BrukerType.SLUTTBRUKER, ident)
117+
}
118+
119+
fun buildUtdanning(): Utdanning {
120+
return Utdanning("69", JaNeiVetIkke.JA, JaNeiVetIkke.JA)
121+
}
122+
123+
fun buildHelse(): Helse {
124+
return Helse(JaNeiVetIkke.NEI)
125+
}
126+
127+
fun buildJobbsituasjon(): Jobbsituasjon {
128+
val beskrivelse = BeskrivelseMedDetaljer(
129+
Beskrivelse.ANNET, mapOf(
130+
"stilling" to "test",
131+
"stilling_styrk08" to "test"
132+
)
133+
)
134+
return Jobbsituasjon(listOf(beskrivelse))
135+
}
136+
137+
fun buildAnnet(): Annet {
138+
return Annet(JaNeiVetIkke.NEI)
139+
}
140+
}
141+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
[kafka]
2+
brokers = "dummy:1234"
3+
applicationIdPrefix = "test"
4+
5+
[kafka.schemaRegistry]
6+
url = "mock://dummy:1234"
7+
8+
[kafkaStreams]
9+
shutDownTimeout = "PT1S"
10+
opplysingerStreamIdSuffix = "opplysninger-stream-v1"
11+
opplysningerTopic = "paw.opplysninger-om-arbeidssoeker-v1"
12+
opplysningerStore = "opplysninger-store"
13+
opplysningerPunctuatorSchedule = "PT10M"
14+
opplysningerLagretTidsperiode = "PT1H"

0 commit comments

Comments
 (0)