Skip to content

Commit cae475b

Browse files
committed
[WIP] Refactor integration system, implement scheduling
1 parent 70deec7 commit cae475b

21 files changed

+401
-49
lines changed

build.gradle.kts

+8
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ dependencies {
8282
// logging
8383
implementation(log.logback)
8484

85+
// serialization
86+
implementation(libraries.jackson.core)
87+
implementation(libraries.jackson.databind)
88+
implementation(libraries.jackson.kotlin)
89+
90+
// task scheduling
91+
implementation(libraries.jobrunr)
92+
8593
// testing
8694
testImplementation(tests.ktor.server.tests)
8795
testImplementation(tests.kotlin.test.junit)

settings.gradle.kts

+8
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ dependencyResolutionManagement {
131131
.version("1.4.14")
132132
}
133133

134+
create("libraries") {
135+
library("jobrunr", "org.jobrunr", "jobrunr").version("7.1.0")
136+
val jackson = version("jackson", "2.17.0")
137+
library("jackson-core", "com.fasterxml.jackson.core", "jackson-core").versionRef(jackson)
138+
library("jackson-databind", "com.fasterxml.jackson.core", "jackson-databind").versionRef(jackson)
139+
library("jackson-kotlin", "com.fasterxml.jackson.module", "jackson-module-kotlin").versionRef(jackson)
140+
}
141+
134142
create("tests") {
135143
ktorServerPlugin("tests", prefix = "ktor")
136144
library("kotlin-test-junit", "org.jetbrains.kotlin", "kotlin-test-junit")

src/main/kotlin/me/snoty/backend/Application.kt

+7-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import io.micrometer.prometheus.PrometheusMeterRegistry
55
import me.snoty.backend.build.DevBuildInfo
66
import me.snoty.backend.config.ConfigLoaderImpl
77
import me.snoty.backend.integration.IntegrationManager
8+
import me.snoty.backend.scheduling.JobRunrConfigurer
89
import me.snoty.backend.server.KtorServer
910
import me.snoty.backend.spi.DevManager
1011
import org.jetbrains.exposed.sql.Database
@@ -28,12 +29,14 @@ fun main() {
2829
throw e
2930
}
3031
}
31-
32-
val database = Database.connect(config.database.value)
32+
val dataSource = config.database.value
33+
val database = Database.connect(dataSource)
3334
val meterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
3435

35-
IntegrationManager(database, meterRegistry).startup()
36+
val integrationManager = IntegrationManager(database, meterRegistry)
37+
JobRunrConfigurer.configure(dataSource, integrationManager, meterRegistry)
38+
integrationManager.startup()
3639

37-
KtorServer(config, buildInfo, database, meterRegistry)
40+
KtorServer(config, buildInfo, database, meterRegistry, integrationManager)
3841
.start(wait = true)
3942
}
+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package me.snoty.backend
2+
3+
import kotlinx.serialization.Serializable
4+
import me.snoty.backend.utils.UUIDSerializer
5+
import java.util.*
6+
7+
@Serializable
8+
data class User(
9+
@Serializable(with = UUIDSerializer::class)
10+
val id: UUID,
11+
val name: String,
12+
val email: String,
13+
)
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,31 @@
11
package me.snoty.backend.integration
22

33
import io.micrometer.core.instrument.MeterRegistry
4-
import me.snoty.backend.integration.moodle.startMoodleIntegration
4+
import me.snoty.backend.integration.common.Integration
5+
import me.snoty.backend.integration.common.IntegrationContext
6+
import me.snoty.backend.integration.moodle.MoodleIntegration
57
import org.jetbrains.exposed.sql.Database
68
import java.util.concurrent.Executors
79

810

9-
class IntegrationManager(private val database: Database, private val metricsRegistry: MeterRegistry) {
11+
class IntegrationManager(database: Database, metricsRegistry: MeterRegistry) {
1012
private val metricsPool = Executors.newScheduledThreadPool(1)
13+
private val context = IntegrationContext(database, metricsRegistry, metricsPool)
14+
15+
// TODO: replace with SPI
16+
val integrations: List<Integration> = listOf(
17+
MoodleIntegration(context)
18+
)
1119

1220
fun startup() {
13-
startMoodleIntegration(database, metricsRegistry, metricsPool)
21+
integrations.forEach(Integration::start)
22+
}
23+
24+
fun <T> getScheduleHandler(type: Class<T>): T? {
25+
@Suppress("UNCHECKED_CAST")
26+
return integrations.find {
27+
it.scheduler.javaClass == type
28+
|| it.scheduler.javaClass == type.enclosingClass
29+
}?.scheduler as T
1430
}
1531
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package me.snoty.backend.integration.common
2+
3+
import io.micrometer.core.instrument.MeterRegistry
4+
import me.snoty.backend.User
5+
import me.snoty.backend.integration.common.diff.EntityDiffMetrics
6+
import me.snoty.backend.integration.common.diff.EntityStateTable
7+
import org.jetbrains.exposed.sql.Database
8+
import java.util.concurrent.ScheduledExecutorService
9+
import java.util.concurrent.TimeUnit
10+
import kotlin.reflect.KClass
11+
12+
interface Integration {
13+
val name: String
14+
val settingsType: KClass<out Any>
15+
val scheduler: IntegrationScheduler<Any>
16+
17+
fun start()
18+
fun schedule(user: User, settings: Any)
19+
}
20+
21+
abstract class AbstractIntegration<S : Any>(
22+
override val name: String,
23+
override val settingsType: KClass<S>,
24+
stateTable: EntityStateTable<*>,
25+
schedulerFactory: IntegrationSchedulerFactory<S>,
26+
database: Database,
27+
meterRegistry: MeterRegistry,
28+
metricsPool: ScheduledExecutorService
29+
) : Integration {
30+
constructor(
31+
name: String,
32+
settingsType: KClass<S>,
33+
stateTable: EntityStateTable<*>,
34+
schedulerFactory: IntegrationSchedulerFactory<S>,
35+
context: IntegrationContext
36+
) : this(
37+
name,
38+
settingsType,
39+
stateTable,
40+
schedulerFactory,
41+
context.database,
42+
context.meterRegistry,
43+
context.metricsPool
44+
)
45+
46+
private val entityDiffMetrics: EntityDiffMetrics
47+
= EntityDiffMetrics(meterRegistry, database, name, stateTable)
48+
@Suppress("UNCHECKED_CAST")
49+
override val scheduler: IntegrationScheduler<Any>
50+
= schedulerFactory.create(entityDiffMetrics) as IntegrationScheduler<Any>
51+
52+
init {
53+
metricsPool.scheduleAtFixedRate(entityDiffMetrics.Job(), 0, 30, TimeUnit.SECONDS)
54+
}
55+
56+
override fun start() = scheduleAll()
57+
58+
private fun scheduleAll() {
59+
val allSettings = IntegrationConfigTable.getAllIntegrationConfigs<S>(name)
60+
allSettings.forEach {
61+
@Suppress("UNCHECKED_CAST")
62+
scheduler.schedule(it as IntegrationConfig<Any>)
63+
}
64+
}
65+
66+
override fun schedule(user: User, settings: Any) {
67+
@Suppress("UNCHECKED_CAST")
68+
scheduler.schedule(IntegrationConfig(user.id, settings as S))
69+
}
70+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package me.snoty.backend.integration.common
2+
3+
import io.ktor.serialization.kotlinx.json.*
4+
import org.jetbrains.exposed.dao.id.IdTable
5+
import org.jetbrains.exposed.sql.SchemaUtils
6+
import org.jetbrains.exposed.sql.json.jsonb
7+
import org.jetbrains.exposed.sql.transactions.transaction
8+
import java.util.*
9+
10+
data class IntegrationConfig<S>(val user: UUID, val settings: S)
11+
12+
object IntegrationConfigTable : IdTable<Long>() {
13+
override val id = long("id").entityId()
14+
15+
val user = uuid("user")
16+
private val integrationType = varchar("integration_type", 255)
17+
private val settings = jsonb<IntegrationSettings>("config", DefaultJson)
18+
19+
init {
20+
transaction {
21+
SchemaUtils.createMissingTablesAndColumns(this@IntegrationConfigTable)
22+
}
23+
}
24+
25+
fun <S> getAllIntegrationConfigs(integrationType: String) = transaction {
26+
select(settings, user)
27+
.where { this@IntegrationConfigTable.integrationType eq integrationType }
28+
.map { row ->
29+
@Suppress("UNCHECKED_CAST")
30+
IntegrationConfig(row[user], row[settings] as S)
31+
}
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package me.snoty.backend.integration.common
2+
3+
import io.micrometer.core.instrument.MeterRegistry
4+
import org.jetbrains.exposed.sql.Database
5+
import java.util.concurrent.ScheduledExecutorService
6+
7+
data class IntegrationContext(
8+
val database: Database,
9+
val meterRegistry: MeterRegistry,
10+
val metricsPool: ScheduledExecutorService
11+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package me.snoty.backend.integration.common
2+
3+
import me.snoty.backend.integration.common.diff.EntityDiffMetrics
4+
5+
fun interface IntegrationScheduler<S> {
6+
fun schedule(config: IntegrationConfig<S>)
7+
}
8+
9+
fun interface IntegrationSchedulerFactory<S> {
10+
fun create(entityDiffMetrics: EntityDiffMetrics): IntegrationScheduler<S>
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package me.snoty.backend.integration.common
2+
3+
import kotlinx.serialization.Serializable
4+
5+
@Serializable
6+
open class IntegrationSettings
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,36 @@
11
package me.snoty.backend.integration.moodle
22

3-
import io.micrometer.core.instrument.MeterRegistry
43
import kotlinx.serialization.Serializable
5-
import me.snoty.backend.integration.common.diff.EntityDiffMetrics
4+
import me.snoty.backend.integration.common.AbstractIntegration
5+
import me.snoty.backend.integration.common.IntegrationContext
6+
import me.snoty.backend.integration.common.IntegrationSettings
67
import me.snoty.backend.integration.common.diff.EntityStateTable
78
import me.snoty.backend.integration.common.diff.ID
89
import org.jetbrains.exposed.sql.Column
9-
import org.jetbrains.exposed.sql.Database
10-
import org.jetbrains.exposed.sql.SchemaUtils
11-
import org.jetbrains.exposed.sql.transactions.transaction
12-
import java.util.concurrent.ScheduledExecutorService
13-
import java.util.concurrent.TimeUnit
1410

1511
@Serializable
1612
data class MoodleSettings(
1713
val baseUrl: String,
1814
val username: String,
1915
val appSecret: String
20-
)
16+
) : IntegrationSettings()
2117

2218
object MoodleEntityStateTable : EntityStateTable<Long>() {
2319
override val id: Column<Long> = long(ID)
2420
override val primaryKey = buildPrimaryKey()
25-
2621
}
2722

28-
lateinit var moodleDiffMetrics: EntityDiffMetrics
29-
fun startMoodleIntegration(database: Database, metricsRegistry: MeterRegistry, metricsPool: ScheduledExecutorService) {
30-
transaction(database) {
31-
SchemaUtils.create(MoodleEntityStateTable)
23+
class MoodleIntegration(
24+
context: IntegrationContext,
25+
moodleAPI: MoodleAPI = MoodleAPIImpl()
26+
) : AbstractIntegration<MoodleSettings>(
27+
INTEGRATION_NAME,
28+
MoodleSettings::class,
29+
MoodleEntityStateTable,
30+
MoodleScheduler.Factory(moodleAPI),
31+
context
32+
) {
33+
companion object {
34+
const val INTEGRATION_NAME = "moodle"
3235
}
33-
moodleDiffMetrics = EntityDiffMetrics(metricsRegistry, database, "moodle", MoodleEntityStateTable)
34-
metricsPool.scheduleAtFixedRate(moodleDiffMetrics.Job(), 0, 30, TimeUnit.SECONDS)
3536
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package me.snoty.backend.integration.moodle
2+
3+
import kotlinx.coroutines.runBlocking
4+
import me.snoty.backend.integration.common.InstanceId
5+
import me.snoty.backend.integration.common.IntegrationConfig
6+
import me.snoty.backend.integration.common.IntegrationScheduler
7+
import me.snoty.backend.integration.common.IntegrationSchedulerFactory
8+
import me.snoty.backend.integration.common.diff.EntityDiffMetrics
9+
import me.snoty.backend.integration.common.diff.IUpdatableEntity
10+
import me.snoty.backend.integration.moodle.request.getCalendarUpcoming
11+
import me.snoty.backend.scheduling.JobRunrUtils
12+
import org.jobrunr.jobs.lambdas.JobRequest
13+
import org.jobrunr.jobs.lambdas.JobRequestHandler
14+
import org.slf4j.LoggerFactory
15+
import java.util.*
16+
17+
class ScheduleMoodleRequest : JobRequest {
18+
override fun getJobRequestHandler(): Class<out JobRequestHandler<JobRequest>> {
19+
@Suppress("UNCHECKED_CAST")
20+
return MoodleRequestHandler::class.java as Class<out JobRequestHandler<JobRequest>>
21+
}
22+
23+
var user: UUID? = null
24+
var settings: MoodleSettings? = null
25+
}
26+
27+
class MoodleRequestHandler(val moodleScheduler: MoodleScheduler) : JobRequestHandler<ScheduleMoodleRequest> {
28+
override fun run(jobRequest: ScheduleMoodleRequest) {
29+
runBlocking {
30+
moodleScheduler.fetchAssignements(jobRequest.settings!!)
31+
}
32+
}
33+
}
34+
35+
open class MoodleScheduler(private val entityDiffMetrics: EntityDiffMetrics, private val moodleAPI: MoodleAPI = MoodleAPIImpl()) : IntegrationScheduler<MoodleSettings> {
36+
private val jobRunrUtils = JobRunrUtils("moodle")
37+
private val logger = LoggerFactory.getLogger(javaClass)
38+
39+
fun updateStates(instanceId: InstanceId, elements: List<IUpdatableEntity<Long>>) {
40+
elements.forEach {
41+
val result = MoodleEntityStateTable.compareAndUpdateState(instanceId, it)
42+
entityDiffMetrics.process(result)
43+
}
44+
}
45+
46+
suspend fun fetchAssignements(moodleSettings: MoodleSettings) {
47+
val instanceId = moodleSettings.baseUrl.hashCode()
48+
val assignments = moodleAPI.getCalendarUpcoming(moodleSettings)
49+
updateStates(instanceId, assignments)
50+
logger.info("Fetched ${assignments.size} assignments for ${moodleSettings.username}")
51+
// TODO: send update events
52+
}
53+
54+
override fun schedule(config: IntegrationConfig<MoodleSettings>) {
55+
val instanceId = config.settings.baseUrl.hashCode()
56+
jobRunrUtils.scheduleJob(listOf(instanceId, config.user), customizer = {
57+
// TODO: inline this, unfortunately, jobrunr is doing some weird stuff with the JobActivator
58+
this.withDetails {
59+
fetchAll(config)
60+
}
61+
})
62+
}
63+
64+
open fun fetchAll(config: IntegrationConfig<MoodleSettings>) {
65+
runBlocking {
66+
fetchAssignements(config.settings)
67+
}
68+
}
69+
70+
class Factory(private val moodleAPI: MoodleAPI) : IntegrationSchedulerFactory<MoodleSettings> {
71+
override fun create(entityDiffMetrics: EntityDiffMetrics): IntegrationScheduler<MoodleSettings> {
72+
return MoodleScheduler(entityDiffMetrics, moodleAPI)
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)