Skip to content

Commit 072c32e

Browse files
authored
Merge pull request #1842 from navikt/stopp-consumers-og-schedulers-ved-ktor-stop
Stopp consumers og schedulers ved ktor stopp
2 parents d7f3435 + e4c3699 commit 072c32e

File tree

38 files changed

+1183
-924
lines changed

38 files changed

+1183
-924
lines changed

bootstrap/src/main/resources/logback-audit-config-local.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
</encoder>
77
</appender>
88

9-
<logger name="auditLogger" level="INFO" additivity="false">
9+
<logger name="auditLogger" level="DEBUG" additivity="false">
1010
<appender-ref ref="auditLogger"/>
1111
</logger>
1212
</included>

bootstrap/src/main/resources/logback-local.xml

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
</encoder>
1010
</appender>
1111

12+
<logger name="no.nav" level="DEBUG"/>
13+
1214
<root level="INFO">
1315
<appender-ref ref="STDOUT"/>
1416
</root>

bootstrap/src/main/resources/logback-secure-config-local.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
</encoder>
77
</appender>
88

9-
<logger name="sikkerLogg" level="INFO" additivity="false">
9+
<logger name="sikkerLogg" level="DEBUG" additivity="false">
1010
<appender-ref ref="secureAppender"/>
1111
</logger>
1212
</included>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package no.nav.su.se.bakover.common.infrastructure
2+
3+
import no.nav.su.se.bakover.common.infrastructure.consumer.StoppableConsumer
4+
import no.nav.su.se.bakover.common.infrastructure.job.StoppableJob
5+
6+
data class JobberOgConsumers(
7+
private val jobs: Iterable<StoppableJob>,
8+
private val consumers: Iterable<StoppableConsumer>,
9+
) {
10+
fun stop() {
11+
jobs.forEach { it.stop() }
12+
consumers.forEach { it.stop() }
13+
}
14+
15+
operator fun plus(other: JobberOgConsumers): JobberOgConsumers {
16+
return JobberOgConsumers(
17+
jobs = jobs + other.jobs,
18+
consumers = consumers + other.consumers,
19+
)
20+
}
21+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package no.nav.su.se.bakover.common.infrastructure.consumer
2+
3+
interface StoppableConsumer {
4+
/**
5+
* Unikt navn for consumer. Kan være samme som topic navn.
6+
*/
7+
val consumerName: String
8+
9+
/**
10+
* Idempotent. Forventer bare at ingen nye jobber blir startet. Pågående kjører ferdig.
11+
*/
12+
fun stop()
13+
}

common/infrastructure/src/main/kotlin/no/nav/su/se/bakover/common/infrastructure/jobs/RunJobCheck.kt common/infrastructure/src/main/kotlin/no/nav/su/se/bakover/common/infrastructure/job/RunJobCheck.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package no.nav.su.se.bakover.common.infrastructure.jobs
1+
package no.nav.su.se.bakover.common.infrastructure.job
22

33
import no.nav.su.se.bakover.common.domain.tid.zoneIdOslo
44
import no.nav.su.se.bakover.common.infrastructure.config.ApplicationConfig
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package no.nav.su.se.bakover.common.infrastructure.job
2+
3+
import arrow.core.Either
4+
import no.nav.su.se.bakover.common.CorrelationId
5+
import no.nav.su.se.bakover.common.infrastructure.correlation.withCorrelationId
6+
import no.nav.su.se.bakover.common.sikkerLogg
7+
import org.jetbrains.kotlin.utils.addToStdlib.ifTrue
8+
import org.slf4j.Logger
9+
import java.time.Duration
10+
import java.util.Date
11+
import java.util.Timer
12+
import java.util.TimerTask
13+
import kotlin.concurrent.fixedRateTimer
14+
15+
interface StoppableJob {
16+
val jobName: String
17+
18+
/**
19+
* Idempotent. Forventer bare at ingen nye jobber blir startet. Pågående kjører ferdig.
20+
*/
21+
fun stop()
22+
}
23+
24+
/**
25+
* Starter en jobb som venter [initialDelay] før den kjører jobben i et fast [intervall].
26+
*Vil starte en daemon thread som betyr at VMen ikke vil vente på denne tråden for å avslutte.
27+
*
28+
* @param job Wrappes i en correlationId og en try-catch for å logge eventuelle feil.
29+
* @param runJobCheck Liste av RunJobCheck som må returnere true for at jobben skal kjøre. Ved tom liste, kjøres jobben alltid.
30+
*/
31+
fun startStoppableJob(
32+
jobName: String,
33+
initialDelay: Duration,
34+
intervall: Duration,
35+
log: Logger,
36+
runJobCheck: List<RunJobCheck>,
37+
job: (CorrelationId) -> Unit,
38+
): StoppableJob {
39+
log.info("Starter skeduleringsjobb '$jobName'. Intervall: hvert ${intervall.toMinutes()}. minutt. Initial delay: ${initialDelay.toMinutes()} minutt(er)")
40+
return startStoppableJob(
41+
jobName = jobName,
42+
log = log,
43+
runJobCheck = runJobCheck,
44+
job = job,
45+
) {
46+
fixedRateTimer(
47+
name = jobName,
48+
daemon = true,
49+
initialDelay = initialDelay.toMillis(),
50+
period = intervall.toMillis(),
51+
action = it,
52+
)
53+
}
54+
}
55+
56+
/**
57+
* Starter en jobb som venter til et gitt tidspunkt ([startAt]) før den kjører jobben i et fast [intervall].
58+
* Vil starte en daemon thread som betyr at VMen ikke vil vente på denne tråden for å avslutte.
59+
*
60+
* @param job Wrappes i en correlationId og en try-catch for å logge eventuelle feil.
61+
* @param runJobCheck Liste av RunJobCheck som må returnere true for at jobben skal kjøre. Default er en tom liste. Ved tom liste, kjøres jobben alltid.
62+
*/
63+
fun startStoppableJob(
64+
jobName: String,
65+
startAt: Date,
66+
intervall: Duration,
67+
log: Logger,
68+
runJobCheck: List<RunJobCheck>,
69+
job: (CorrelationId) -> Unit,
70+
): StoppableJob {
71+
log.info("Starter skeduleringsjobb '$jobName'. Intervall: hvert ${intervall.toMinutes()}. minutt. Starter kl. $startAt.")
72+
return startStoppableJob(
73+
jobName = jobName,
74+
log = log,
75+
runJobCheck = runJobCheck,
76+
job = job,
77+
) {
78+
fixedRateTimer(
79+
name = jobName,
80+
daemon = true,
81+
startAt = startAt,
82+
period = intervall.toMillis(),
83+
action = it,
84+
)
85+
}
86+
}
87+
88+
private fun startStoppableJob(
89+
jobName: String,
90+
log: Logger,
91+
runJobCheck: List<RunJobCheck>,
92+
job: (CorrelationId) -> Unit,
93+
scheduleJob: (TimerTask.() -> Unit) -> Timer,
94+
): StoppableJob {
95+
return scheduleJob {
96+
Either.catch {
97+
runJobCheck.shouldRun().ifTrue {
98+
log.debug("Kjører skeduleringsjobb '$jobName'.")
99+
withCorrelationId { job(it) }
100+
log.debug("Fullførte skeduleringsjobb '$jobName'.")
101+
}
102+
?: log.debug("Skeduleringsjobb '$jobName' kjører ikke pga. startKriterier i runJobCheck. Eksempelvis er vi ikke leader pod.")
103+
}.onLeft {
104+
log.error(
105+
"Skeduleringsjobb '$jobName' feilet. Se sikkerlog for mer kontekst.",
106+
RuntimeException("Trigger stacktrace for enklere debug."),
107+
)
108+
sikkerLogg.error("Skeduleringsjobb '$jobName' feilet med stacktrace:", it)
109+
}
110+
}.let { timer ->
111+
object : StoppableJob {
112+
override val jobName = jobName
113+
override fun stop() {
114+
Either.catch {
115+
timer.cancel()
116+
}.onRight {
117+
log.info("Skeduleringsjobb '$jobName' stoppet. Pågående kjøringer ferdigstilles.")
118+
}.onLeft {
119+
log.error("Skeduleringsjobb '$jobName': Feil ved kall til stop()/kanseller Timer.", it)
120+
}
121+
}
122+
}
123+
}
124+
}

common/infrastructure/src/test/kotlin/no/nav/su/se/bakover/common/infrastructure/jobs/RunCheckTest.kt

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import io.kotest.matchers.shouldBe
66
import no.nav.su.se.bakover.common.domain.tid.november
77
import no.nav.su.se.bakover.common.domain.tid.oktober
88
import no.nav.su.se.bakover.common.infrastructure.config.ApplicationConfig
9+
import no.nav.su.se.bakover.common.infrastructure.job.RunCheckFactory
10+
import no.nav.su.se.bakover.common.infrastructure.job.shouldRun
911
import no.nav.su.se.bakover.common.nais.LeaderPodLookupFeil
1012
import no.nav.su.se.bakover.test.fixedClock
1113
import org.junit.jupiter.api.Test
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,34 @@
11
package no.nav.su.se.bakover.presentation.job
22

3-
import arrow.core.Either
4-
import no.nav.su.se.bakover.common.infrastructure.correlation.withCorrelationId
5-
import no.nav.su.se.bakover.common.infrastructure.jobs.RunCheckFactory
6-
import no.nav.su.se.bakover.common.infrastructure.jobs.shouldRun
3+
import no.nav.su.se.bakover.common.infrastructure.job.RunCheckFactory
4+
import no.nav.su.se.bakover.common.infrastructure.job.StoppableJob
5+
import no.nav.su.se.bakover.common.infrastructure.job.startStoppableJob
76
import no.nav.su.se.bakover.dokument.application.consumer.DistribuerDokumentHendelserKonsument
87
import no.nav.su.se.bakover.dokument.application.consumer.JournalførDokumentHendelserKonsument
9-
import org.jetbrains.kotlin.utils.addToStdlib.ifTrue
108
import org.slf4j.LoggerFactory
119
import java.time.Duration
12-
import kotlin.concurrent.fixedRateTimer
1310

1411
class DokumentJobber(
15-
private val initialDelay: Duration,
16-
private val intervall: Duration,
17-
private val runCheckFactory: RunCheckFactory,
18-
private val journalførtDokumentHendelserKonsument: JournalførDokumentHendelserKonsument,
19-
private val distribuerDokumentHendelserKonsument: DistribuerDokumentHendelserKonsument,
20-
) {
21-
private val log = LoggerFactory.getLogger(this::class.java)
22-
private val jobName = "Dokument"
23-
fun schedule() {
24-
log.info("Starter skeduleringsjobb '$jobName' med intervall: ${intervall.toMinutes()} min")
25-
fixedRateTimer(
26-
name = jobName,
27-
daemon = true,
28-
initialDelay = initialDelay.toMillis(),
29-
period = intervall.toMillis(),
30-
) {
31-
listOf(
32-
runCheckFactory.leaderPod(),
33-
).shouldRun().ifTrue {
34-
Either.catch {
35-
withCorrelationId { correlationId ->
36-
journalførtDokumentHendelserKonsument.journalførDokumenter(correlationId)
37-
distribuerDokumentHendelserKonsument.distribuer(correlationId)
38-
}
39-
}.mapLeft {
40-
log.error("Skeduleringsjobb '$jobName' feilet med stacktrace:", it)
41-
}
42-
}
12+
private val stoppableJob: StoppableJob,
13+
) : StoppableJob by stoppableJob {
14+
companion object {
15+
fun startJob(
16+
initialDelay: Duration,
17+
intervall: Duration,
18+
runCheckFactory: RunCheckFactory,
19+
journalførtDokumentHendelserKonsument: JournalførDokumentHendelserKonsument,
20+
distribuerDokumentHendelserKonsument: DistribuerDokumentHendelserKonsument,
21+
): DokumentJobber {
22+
return startStoppableJob(
23+
jobName = "Dokument",
24+
initialDelay = initialDelay,
25+
intervall = intervall,
26+
log = LoggerFactory.getLogger(DokumentJobber::class.java),
27+
runJobCheck = listOf(runCheckFactory.leaderPod()),
28+
) { correlationId ->
29+
journalførtDokumentHendelserKonsument.journalførDokumenter(correlationId)
30+
distribuerDokumentHendelserKonsument.distribuer(correlationId)
31+
}.let { DokumentJobber(it) }
4332
}
4433
}
4534
}
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,54 @@
11
package no.nav.su.se.bakover.kontrollsamtale.infrastructure.jobs
22

33
import arrow.core.Either
4-
import no.nav.su.se.bakover.common.infrastructure.correlation.withCorrelationId
5-
import no.nav.su.se.bakover.common.infrastructure.jobs.RunCheckFactory
6-
import no.nav.su.se.bakover.common.infrastructure.jobs.shouldRun
4+
import no.nav.su.se.bakover.common.infrastructure.job.RunCheckFactory
5+
import no.nav.su.se.bakover.common.infrastructure.job.StoppableJob
6+
import no.nav.su.se.bakover.common.infrastructure.job.startStoppableJob
7+
import no.nav.su.se.bakover.common.sikkerLogg
78
import no.nav.su.se.bakover.kontrollsamtale.domain.KontrollsamtaleService
8-
import org.jetbrains.kotlin.utils.addToStdlib.ifTrue
99
import org.slf4j.LoggerFactory
1010
import java.time.Duration
1111
import java.util.Date
12-
import kotlin.concurrent.fixedRateTimer
1312

1413
/**
1514
* Jobb som første dag i hver måned sender ut innkallelse til kontrollsamtaler
1615
*/
1716
class KontrollsamtaleinnkallingJob(
18-
private val kontrollsamtaleService: KontrollsamtaleService,
19-
private val starttidspunkt: Date,
20-
private val periode: Duration,
21-
private val runCheckFactory: RunCheckFactory,
22-
) {
23-
private val log = LoggerFactory.getLogger(this::class.java)
17+
private val stoppableJob: StoppableJob,
18+
) : StoppableJob by stoppableJob {
19+
companion object {
2420

25-
private val jobName = "Utsendelse av kontrollsamtaleinnkallelser"
26-
27-
fun schedule() {
28-
log.info("Starter skeduleringsjobb '$jobName' med periode: $periode og starttidspunkt: $starttidspunkt")
29-
fixedRateTimer(
30-
name = jobName,
31-
daemon = true,
32-
startAt = starttidspunkt,
33-
period = periode.toMillis(),
34-
) {
35-
Either.catch {
36-
log.debug("Kjører skeduleringsjobb '$jobName'")
37-
listOf(runCheckFactory.leaderPod())
38-
.shouldRun()
39-
.ifTrue {
40-
withCorrelationId {
41-
kontrollsamtaleService.hentPlanlagteKontrollsamtaler().map { kontrollsamtale ->
42-
kontrollsamtaleService.kallInn(kontrollsamtale)
43-
}
44-
}
21+
fun startJob(
22+
kontrollsamtaleService: KontrollsamtaleService,
23+
starttidspunkt: Date,
24+
periode: Duration,
25+
runCheckFactory: RunCheckFactory,
26+
): KontrollsamtaleinnkallingJob {
27+
val log = LoggerFactory.getLogger(KontrollsamtaleinnkallingJob::class.java)
28+
val jobName = "Utsendelse av kontrollsamtaleinnkallelser"
29+
return startStoppableJob(
30+
jobName = jobName,
31+
startAt = starttidspunkt,
32+
intervall = periode,
33+
log = log,
34+
runJobCheck = listOf(runCheckFactory.leaderPod()),
35+
) {
36+
kontrollsamtaleService.hentPlanlagteKontrollsamtaler().map { kontrollsamtale ->
37+
// Vi ønsker ikke å la en feil i en enkelt kontrollsamtale hindre resten av jobben i å kjøre.
38+
Either.catch {
39+
kontrollsamtaleService.kallInn(kontrollsamtale)
40+
}.onLeft {
41+
log.error(
42+
"Job '$jobName' kunne ikke kalle inn til kontrollsamtale. Se sikkerlogg for mer kontekst.",
43+
RuntimeException("Trigger stacktrace for enklere debug."),
44+
)
45+
sikkerLogg.error(
46+
"Job '$jobName' kunne ikke kalle inn til kontrollsamtale: $kontrollsamtale",
47+
it,
48+
)
4549
}
46-
log.debug("Fullførte skeduleringsjobb '$jobName'")
47-
}.mapLeft {
48-
log.error("Skeduleringsjobb '$jobName' feilet med stacktrace:", it)
49-
}
50+
}
51+
}.let { KontrollsamtaleinnkallingJob(it) }
5052
}
5153
}
5254
}

0 commit comments

Comments
 (0)