1
+ package no.nav.paw.health.listener
2
+
3
+ import io.kotest.core.spec.style.FreeSpec
4
+ import io.kotest.matchers.shouldBe
5
+ import io.mockk.mockkClass
6
+ import no.nav.paw.health.model.HealthStatus
7
+ import no.nav.paw.health.model.LivenessHealthIndicator
8
+ import no.nav.paw.health.model.ReadinessHealthIndicator
9
+ import no.nav.paw.health.model.getAggregatedStatus
10
+ import no.nav.paw.health.repository.HealthIndicatorRepository
11
+ import org.apache.kafka.streams.KafkaStreams
12
+
13
+ class KafkaStreamsStatusListenerTest : FreeSpec ({
14
+
15
+ " Kafka Streams Status Listener skal returnere korrekt helsesjekk-status" {
16
+ val kafkaStreams = mockkClass(KafkaStreams ::class)
17
+ val healthIndicatorRepository = HealthIndicatorRepository ()
18
+
19
+ val liveness = healthIndicatorRepository.addLivenessIndicator(LivenessHealthIndicator ())
20
+ val readiness = healthIndicatorRepository.addReadinessIndicator(ReadinessHealthIndicator ())
21
+
22
+ val listener = kafkaStreams.withHealthIndicatorStateListener(liveness, readiness)
23
+
24
+ healthIndicatorRepository.getReadinessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNKNOWN
25
+ healthIndicatorRepository.getLivenessIndicators().getAggregatedStatus() shouldBe HealthStatus .HEALTHY
26
+
27
+ listener.onChange(KafkaStreams .State .CREATED , KafkaStreams .State .CREATED )
28
+
29
+ healthIndicatorRepository.getReadinessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNHEALTHY
30
+ healthIndicatorRepository.getLivenessIndicators().getAggregatedStatus() shouldBe HealthStatus .HEALTHY
31
+
32
+ listener.onChange(KafkaStreams .State .RUNNING , KafkaStreams .State .RUNNING )
33
+
34
+ healthIndicatorRepository.getReadinessIndicators().getAggregatedStatus() shouldBe HealthStatus .HEALTHY
35
+ healthIndicatorRepository.getLivenessIndicators().getAggregatedStatus() shouldBe HealthStatus .HEALTHY
36
+
37
+ listener.onChange(KafkaStreams .State .REBALANCING , KafkaStreams .State .REBALANCING )
38
+
39
+ healthIndicatorRepository.getReadinessIndicators().getAggregatedStatus() shouldBe HealthStatus .HEALTHY
40
+ healthIndicatorRepository.getLivenessIndicators().getAggregatedStatus() shouldBe HealthStatus .HEALTHY
41
+
42
+ listener.onChange(KafkaStreams .State .PENDING_ERROR , KafkaStreams .State .PENDING_ERROR )
43
+
44
+ healthIndicatorRepository.getReadinessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNHEALTHY
45
+ healthIndicatorRepository.getLivenessIndicators().getAggregatedStatus() shouldBe HealthStatus .HEALTHY
46
+
47
+ listener.onChange(KafkaStreams .State .ERROR , KafkaStreams .State .ERROR )
48
+
49
+ healthIndicatorRepository.getReadinessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNHEALTHY
50
+ healthIndicatorRepository.getLivenessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNHEALTHY
51
+
52
+ listener.onChange(KafkaStreams .State .PENDING_SHUTDOWN , KafkaStreams .State .PENDING_SHUTDOWN )
53
+
54
+ healthIndicatorRepository.getReadinessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNHEALTHY
55
+ healthIndicatorRepository.getLivenessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNHEALTHY
56
+
57
+ listener.onChange(KafkaStreams .State .NOT_RUNNING , KafkaStreams .State .NOT_RUNNING )
58
+
59
+ healthIndicatorRepository.getReadinessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNHEALTHY
60
+ healthIndicatorRepository.getLivenessIndicators().getAggregatedStatus() shouldBe HealthStatus .UNHEALTHY
61
+ }
62
+ })
0 commit comments