Skip to content

Commit f3ab114

Browse files
committed
Oppdatert avhengigheter, lagt til async lib
1 parent 1caf6ab commit f3ab114

File tree

11 files changed

+183
-42
lines changed

11 files changed

+183
-42
lines changed

Diff for: apps/bekreftelse-api/src/test/kotlin/no/nav/paw/bekreftelse/api/producer/KafkaTestDataProducer.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,5 @@ fun main() {
3737
gjelderTil = Instant.now().plus(Duration.ofDays(14)),
3838
)
3939

40-
kafkaProducer.send(ProducerRecord(topic, key, value))
40+
kafkaProducer.send(ProducerRecord(topic, key, value)).get()
4141
}

Diff for: gradle/libs.versions.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ bekreftelseSchemaVersion = "1.25.03.11.31-1"
88
arrowVersion = "2.0.1"
99
arrowJacksonIntegrationVersion = "0.15.1"
1010
comSksamuelHopliteVersion = "2.9.0"
11-
graphqlClientVersion = "8.3.0"
11+
graphqlClientVersion = "8.4.0"
1212
orgApacheKafkaVersion = "3.9.0"
1313
ioConfluentKafkaVersion = "7.9.0"
1414
orgApacheAvroVersion = "1.12.0"
1515
comFasterxmlJacksonVersion = "2.18.3"
1616
kotlinExposedVersion = "0.60.0"
1717
logstashVersion = "8.0"
18-
logbackVersion = "1.5.17"
18+
logbackVersion = "1.5.18"
1919
kotestVersion = "5.9.1"
2020
mockkVersion = "1.13.17"
2121
testContainersVersion = "1.20.6"

Diff for: lib/async/build.gradle.kts

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
plugins {
2+
kotlin("jvm")
3+
}
4+
5+
val jvmMajorVersion: String by project
6+
7+
dependencies {
8+
compileOnly(libs.kotlinx.coroutines.core)
9+
compileOnly(libs.logbackClassic)
10+
11+
testImplementation(libs.test.junit5.runner)
12+
testImplementation(libs.test.kotest.assertionsCore)
13+
}
14+
15+
java {
16+
toolchain {
17+
languageVersion.set(JavaLanguageVersion.of(jvmMajorVersion))
18+
}
19+
}
20+
21+
tasks.withType<Test>().configureEach {
22+
useJUnitPlatform()
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package no.nav.paw.async.runner
2+
3+
interface AsyncRunner<T, R> {
4+
fun run(task: () -> T, onFailure: (Throwable) -> Unit, onSuccess: (T) -> Unit): R
5+
fun abort(onAbort: () -> Unit)
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package no.nav.paw.async.runner
2+
3+
import kotlinx.coroutines.CoroutineDispatcher
4+
import kotlinx.coroutines.CoroutineScope
5+
import kotlinx.coroutines.Dispatchers
6+
import kotlinx.coroutines.Job
7+
import kotlinx.coroutines.launch
8+
import org.slf4j.LoggerFactory
9+
import java.util.concurrent.atomic.AtomicBoolean
10+
import java.util.concurrent.atomic.AtomicReference
11+
12+
open class CoroutineAsyncRunner<T>(
13+
private val coroutineScope: CoroutineScope = CoroutineScope(Dispatchers.IO),
14+
private val coroutineDispatcher: CoroutineDispatcher = Dispatchers.IO,
15+
private val keepRunning: AtomicBoolean = AtomicBoolean(true)
16+
) : AsyncRunner<T, Job> {
17+
private val logger = LoggerFactory.getLogger(this.javaClass)
18+
private val jobRef: AtomicReference<Job> = AtomicReference(Job())
19+
20+
override fun run(task: () -> T, onFailure: (Throwable) -> Unit, onSuccess: (T) -> Unit): Job {
21+
logger.info("Running coroutine async function")
22+
jobRef.set(coroutineScope.launch(coroutineDispatcher) {
23+
do {
24+
try {
25+
val result = task()
26+
onSuccess(result)
27+
} catch (throwable: Throwable) {
28+
onFailure(throwable)
29+
}
30+
} while (keepRunning.get())
31+
})
32+
return jobRef.get()
33+
}
34+
35+
override fun abort(onAbort: () -> Unit) {
36+
logger.info("Aborting coroutine async function")
37+
keepRunning.set(false)
38+
jobRef.get().cancel()
39+
onAbort()
40+
}
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package no.nav.paw.async.runner
2+
3+
import org.slf4j.LoggerFactory
4+
import java.time.Duration
5+
import java.util.*
6+
import java.util.concurrent.atomic.AtomicBoolean
7+
8+
open class ScheduledAsyncRunner<T>(
9+
private val interval: Duration,
10+
private val delay: Duration = Duration.ZERO,
11+
private val keepRunning: AtomicBoolean = AtomicBoolean(true)
12+
) : AsyncRunner<T, Unit> {
13+
private val logger = LoggerFactory.getLogger(this.javaClass)
14+
private val timer = Timer()
15+
16+
override fun run(task: () -> T, onFailure: (Throwable) -> Unit, onSuccess: (T) -> Unit) {
17+
logger.info("Starting scheduled async function with interval {} and delay {}", interval, delay)
18+
val timerTask = ScheduledTimerTask(task, onSuccess, onFailure, keepRunning)
19+
timer.scheduleAtFixedRate(timerTask, delay.toMillis(), interval.toMillis())
20+
}
21+
22+
override fun abort(onAbort: () -> Unit) {
23+
logger.info("Aborting scheduled async function")
24+
keepRunning.set(false)
25+
timer.cancel()
26+
onAbort()
27+
}
28+
29+
class ScheduledTimerTask<T>(
30+
private val task: () -> T,
31+
private val onSuccess: (T) -> Unit,
32+
private val onFailure: (Throwable) -> Unit,
33+
private val keepRunning: AtomicBoolean = AtomicBoolean(true)
34+
) : TimerTask() {
35+
private val logger = LoggerFactory.getLogger(this.javaClass)
36+
37+
override fun run() {
38+
logger.info("Running scheduled async function")
39+
do {
40+
try {
41+
val result = task()
42+
onSuccess(result)
43+
} catch (throwable: Throwable) {
44+
onFailure(throwable)
45+
}
46+
} while (keepRunning.get())
47+
}
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package no.nav.paw.async.runner
2+
3+
import org.slf4j.LoggerFactory
4+
import java.util.concurrent.CompletableFuture
5+
import java.util.concurrent.ExecutorService
6+
import java.util.concurrent.Executors
7+
import java.util.concurrent.Future
8+
import java.util.concurrent.atomic.AtomicBoolean
9+
import java.util.concurrent.atomic.AtomicReference
10+
11+
open class ThreadPoolAsyncRunner<T>(
12+
private val executorService: ExecutorService = Executors.newSingleThreadExecutor(),
13+
private val keepRunning: AtomicBoolean = AtomicBoolean(true),
14+
private val meyInterruptIfRunning: AtomicBoolean = AtomicBoolean(true)
15+
) : AsyncRunner<T, Future<*>> {
16+
private val logger = LoggerFactory.getLogger(this.javaClass)
17+
private val futureRef: AtomicReference<Future<*>> = AtomicReference(CompletableFuture<Nothing>())
18+
19+
override fun run(task: () -> T, onFailure: (Throwable) -> Unit, onSuccess: (T) -> Unit): Future<*> {
20+
logger.info("Running thread pool async function")
21+
futureRef.set(executorService.submit {
22+
do {
23+
try {
24+
val result = task()
25+
onSuccess(result)
26+
} catch (throwable: Throwable) {
27+
onFailure(throwable)
28+
}
29+
} while (keepRunning.get())
30+
})
31+
return futureRef.get()
32+
}
33+
34+
override fun abort(onAbort: () -> Unit) {
35+
logger.info("Aborting thread pool async function")
36+
keepRunning.set(false)
37+
futureRef.get().cancel(meyInterruptIfRunning.get())
38+
onAbort()
39+
}
40+
}

Diff for: lib/scheduling/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ plugins {
33
}
44

55
dependencies {
6+
api(project(":lib:async"))
67
compileOnly(libs.ktor.server.core)
78
compileOnly(libs.kotlinx.coroutines.core)
89

Diff for: lib/scheduling/src/main/kotlin/no/nav/paw/scheduling/plugin/ScheduledTaskPlugin.kt

+8-27
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,17 @@ import io.ktor.server.application.ApplicationStopping
88
import io.ktor.server.application.createApplicationPlugin
99
import io.ktor.server.application.hooks.MonitoringEvent
1010
import io.ktor.server.application.log
11-
import kotlinx.coroutines.CoroutineDispatcher
12-
import kotlinx.coroutines.Dispatchers
13-
import kotlinx.coroutines.Job
14-
import kotlinx.coroutines.launch
15-
import org.slf4j.LoggerFactory
11+
import no.nav.paw.async.runner.ScheduledAsyncRunner
1612
import java.time.Duration
17-
import java.util.*
1813

19-
private val logger = LoggerFactory.getLogger("no.nav.paw.logger.scheduling")
2014
private const val PLUGIN_NAME_SUFFIX = "ScheduledTaskPlugin"
2115

2216
class ScheduledTaskPluginConfig {
2317
var task: (() -> Unit)? = null
18+
var interval: Duration? = null
2419
var delay: Duration? = null
25-
var period: Duration? = null
2620
var startEvent: EventDefinition<Application>? = null
2721
var stopEvent: EventDefinition<Application>? = null
28-
var coroutineDispatcher: CoroutineDispatcher? = null
2922
}
3023

3124
@Suppress("FunctionName")
@@ -34,30 +27,18 @@ fun ScheduledTaskPlugin(pluginInstance: Any): ApplicationPlugin<ScheduledTaskPlu
3427
return createApplicationPlugin(pluginName, ::ScheduledTaskPluginConfig) {
3528
application.log.info("Installerer {}", pluginName)
3629
val task = requireNotNull(pluginConfig.task) { "Task er null" }
30+
val interval = requireNotNull(pluginConfig.interval) { "Interval er null" }
3731
val delay = requireNotNull(pluginConfig.delay) { "Delay er null" }
38-
val period = requireNotNull(pluginConfig.period) { "Period er null" }
3932
val startEvent = pluginConfig.startEvent ?: ApplicationStarted
4033
val stopEvent = pluginConfig.stopEvent ?: ApplicationStopping
41-
val coroutineDispatcher = pluginConfig.coroutineDispatcher ?: Dispatchers.IO
42-
var job: Job? = null
43-
val timer = Timer()
44-
val timerTask = object : TimerTask() {
45-
override fun run() {
46-
logger.info("Running scheduled task {}", pluginInstance)
47-
task()
48-
}
49-
}
34+
val asyncRunner = ScheduledAsyncRunner<Unit>(interval, delay)
5035

51-
on(MonitoringEvent(startEvent)) { application ->
52-
application.log.info("Scheduling task {} at delay: {}, period {}", pluginInstance, delay, period)
53-
job = application.launch(coroutineDispatcher) {
54-
timer.scheduleAtFixedRate(timerTask, delay.toMillis(), period.toMillis())
55-
}
36+
on(MonitoringEvent(startEvent)) {
37+
asyncRunner.run(task, {}) {}
5638
}
5739

58-
on(MonitoringEvent(stopEvent)) { _ ->
59-
application.log.info("Canceling scheduled task {}", pluginInstance)
60-
job?.cancel()
40+
on(MonitoringEvent(stopEvent)) {
41+
asyncRunner.abort {}
6142
}
6243
}
6344
}

Diff for: lib/scheduling/src/main/kotlin/no/nav/paw/scheduling/plugin/ScheduledTaskPluginInstaller.kt

+7-8
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,24 @@ package no.nav.paw.scheduling.plugin
22

33
import io.ktor.events.EventDefinition
44
import io.ktor.server.application.Application
5+
import io.ktor.server.application.ApplicationStarted
6+
import io.ktor.server.application.ApplicationStopping
57
import io.ktor.server.application.install
6-
import kotlinx.coroutines.CoroutineDispatcher
78
import java.time.Duration
89

910
fun Application.installScheduledTaskPlugin(
1011
pluginInstance: Any,
1112
task: (() -> Unit),
12-
delay: Duration? = null,
13-
period: Duration? = null,
14-
startEvent: EventDefinition<Application>? = null,
15-
stopEvent: EventDefinition<Application>? = null,
16-
coroutineDispatcher: CoroutineDispatcher? = null
13+
interval: Duration,
14+
delay: Duration = Duration.ZERO,
15+
startEvent: EventDefinition<Application> = ApplicationStarted,
16+
stopEvent: EventDefinition<Application> = ApplicationStopping
1717
) {
1818
install(ScheduledTaskPlugin(pluginInstance)) {
1919
this.task = task
2020
this.delay = delay
21-
this.period = period
21+
this.interval = interval
2222
this.startEvent = startEvent
2323
this.stopEvent = stopEvent
24-
this.coroutineDispatcher = coroutineDispatcher
2524
}
2625
}

Diff for: settings.gradle.kts

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
plugins {
22
id("org.gradle.toolchains.foojay-resolver-convention") version "0.9.0"
3-
kotlin("jvm") version "2.1.10" apply false
4-
kotlin("plugin.serialization") version "2.1.10" apply false
5-
id("com.google.cloud.tools.jib") version "3.4.4" apply false
3+
kotlin("jvm") version "2.1.20" apply false
4+
kotlin("plugin.serialization") version "2.1.20" apply false
5+
id("com.google.cloud.tools.jib") version "3.4.5" apply false
66
id("org.openapi.generator") version "7.12.0" apply false
77
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1" apply false
8-
id("com.expediagroup.graphql") version "8.3.0" apply false
8+
id("com.expediagroup.graphql") version "8.4.0" apply false
99
}
1010

1111
rootProject.name = "paw-arbeidssoekerregisteret-monorepo-intern"
@@ -20,6 +20,7 @@ include(
2020
"lib:api-docs",
2121
"lib:security",
2222
"lib:database",
23+
"lib:async",
2324
"lib:scheduling",
2425
"lib:http-client-utils",
2526
"lib:kafka",

0 commit comments

Comments
 (0)