Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 3895254

Browse files
committedJul 10, 2024··
Lagt til store og punctuator i opplysninger aggregering
1 parent d37cdf6 commit 3895254

File tree

14 files changed

+291
-27
lines changed

14 files changed

+291
-27
lines changed
 

‎apps/opplysninger-aggregering/nais/nais-dev.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ spec:
1313
value: "opplysninger-stream-v1"
1414
- name: KAFKA_PAW_OPPLYSNINGER_OM_ARBEIDSSOEKER_TOPIC
1515
value: "paw.opplysninger-om-arbeidssoeker-v1"
16+
- name: OPPLYSNINGER_PUNCTUATOR_SCHEDULE
17+
value: "PT10M"
18+
- name: OPPLYSNINGER_LAGRET_TIDSPERIODE
19+
value: "PT1H"
1620
resources:
1721
limits:
1822
memory: 1024Mi

‎apps/opplysninger-aggregering/nais/nais-prod.yaml

+6-2
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@ spec:
1313
value: "opplysninger-stream-v1"
1414
- name: KAFKA_PAW_OPPLYSNINGER_OM_ARBEIDSSOEKER_TOPIC
1515
value: "paw.opplysninger-om-arbeidssoeker-v1"
16+
- name: OPPLYSNINGER_PUNCTUATOR_SCHEDULE
17+
value: "PT10M"
18+
- name: OPPLYSNINGER_LAGRET_TIDSPERIODE
19+
value: "PT1H"
1620
resources:
1721
limits:
18-
memory: 3072Mi
22+
memory: 2048Mi
1923
requests:
2024
cpu: 250m
21-
memory: 2048Mi
25+
memory: 512Mi
2226
replicas:
2327
min: 2
2428
max: 4

‎apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/Application.kt

+9-9
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@ import io.ktor.server.engine.embeddedServer
66
import io.ktor.server.netty.Netty
77
import io.micrometer.prometheusmetrics.PrometheusConfig
88
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry
9-
import no.nav.paw.arbeidssoekerregisteret.config.APPLICATION_CONFIG_FILE_NAME
10-
import no.nav.paw.arbeidssoekerregisteret.config.APPLICATION_LOGGER_NAME
11-
import no.nav.paw.arbeidssoekerregisteret.config.AppConfig
12-
import no.nav.paw.arbeidssoekerregisteret.config.SERVER_CONFIG_FILE_NAME
13-
import no.nav.paw.arbeidssoekerregisteret.config.SERVER_LOGGER_NAME
14-
import no.nav.paw.arbeidssoekerregisteret.config.ServerConfig
9+
import no.nav.paw.arbeidssoekerregisteret.properties.APPLICATION_CONFIG_FILE_NAME
10+
import no.nav.paw.arbeidssoekerregisteret.properties.APPLICATION_LOGGER_NAME
11+
import no.nav.paw.arbeidssoekerregisteret.properties.ApplicationProperties
12+
import no.nav.paw.arbeidssoekerregisteret.properties.SERVER_CONFIG_FILE_NAME
13+
import no.nav.paw.arbeidssoekerregisteret.properties.SERVER_LOGGER_NAME
14+
import no.nav.paw.arbeidssoekerregisteret.properties.ServerProperties
1515
import no.nav.paw.arbeidssoekerregisteret.context.ApplicationContext
1616
import no.nav.paw.arbeidssoekerregisteret.plugins.configureKafka
1717
import no.nav.paw.arbeidssoekerregisteret.plugins.configureMetrics
@@ -23,8 +23,8 @@ import org.slf4j.LoggerFactory
2323

2424
fun main() {
2525
val logger = LoggerFactory.getLogger(SERVER_LOGGER_NAME)
26-
val serverProperties = loadNaisOrLocalConfiguration<ServerConfig>(SERVER_CONFIG_FILE_NAME)
27-
val applicationProperties = loadNaisOrLocalConfiguration<AppConfig>(APPLICATION_CONFIG_FILE_NAME)
26+
val serverProperties = loadNaisOrLocalConfiguration<ServerProperties>(SERVER_CONFIG_FILE_NAME)
27+
val applicationProperties = loadNaisOrLocalConfiguration<ApplicationProperties>(APPLICATION_CONFIG_FILE_NAME)
2828

2929
logger.info("Starter ${applicationProperties.appId}")
3030

@@ -47,7 +47,7 @@ fun main() {
4747
server.start(wait = true)
4848
}
4949

50-
fun Application.module(properties: AppConfig) {
50+
fun Application.module(properties: ApplicationProperties) {
5151
val logger = LoggerFactory.getLogger(APPLICATION_LOGGER_NAME)
5252
val healthIndicatorService = HealthIndicatorService()
5353
val meterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)

‎apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/KafkaSerialization.kt

+12
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@ package no.nav.paw.arbeidssoekerregisteret.config
33
import com.fasterxml.jackson.core.JsonProcessingException
44
import com.fasterxml.jackson.databind.ObjectMapper
55
import com.fasterxml.jackson.module.kotlin.readValue
6+
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
7+
import no.nav.paw.arbeidssokerregisteret.api.v4.OpplysningerOmArbeidssoeker
68
import no.nav.paw.config.env.NaisEnv
79
import no.nav.paw.config.env.currentNaisEnv
10+
import org.apache.avro.specific.SpecificRecord
811
import org.apache.kafka.common.serialization.Deserializer
912
import org.apache.kafka.common.serialization.Serde
1013
import org.apache.kafka.common.serialization.Serializer
@@ -46,3 +49,12 @@ inline fun <reified T> buildJsonSerde(naisEnv: NaisEnv, objectMapper: ObjectMapp
4649
inline fun <reified T> buildJsonSerde(): Serde<T> {
4750
return buildJsonSerde<T>(currentNaisEnv, buildObjectMapper)
4851
}
52+
53+
inline fun <reified T : SpecificRecord> buildAvroSerde(): Serde<T> {
54+
return SpecificAvroSerde()
55+
}
56+
57+
fun buildOpplysningerOmArbeidssoekerAvroSerde(): Serde<OpplysningerOmArbeidssoeker> {
58+
return buildAvroSerde()
59+
}
60+
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,39 @@
11
package no.nav.paw.arbeidssoekerregisteret.config
22

33
import io.micrometer.core.instrument.MeterRegistry
4+
import io.micrometer.core.instrument.Tag
5+
import io.micrometer.core.instrument.Tags
6+
import java.time.Instant
7+
import java.time.ZoneId
8+
import java.util.concurrent.atomic.AtomicLong
49

510
private const val METRIC_PREFIX = "paw_arbeidssoeker_opplysninger_aggregering"
611

7-
fun MeterRegistry.tellAntallMottatteOpplysninger() {
12+
fun MeterRegistry.tellMottatteOpplysninger() {
813
counter(
914
"${METRIC_PREFIX}_antall_mottatte_opplysninger_total",
1015
).increment()
1116
}
17+
18+
fun MeterRegistry.antallLagredeOpplysningerTotal(antallReference: AtomicLong) {
19+
gauge(
20+
"${METRIC_PREFIX}_antall_lagrede_opplysninger_total",
21+
Tags.empty(),
22+
antallReference
23+
) {
24+
antallReference.get().toDouble()
25+
}
26+
}
27+
28+
fun MeterRegistry.antallLagredeOpplysningerSumPerPeriode(timestamp: Instant, antallReference: AtomicLong) {
29+
val zonedDateTime = timestamp.atZone(ZoneId.systemDefault())
30+
gauge(
31+
"${METRIC_PREFIX}_antall_lagrede_opplysninger_sum_per_periode",
32+
Tags.of(
33+
Tag.of("minute", "${zonedDateTime.minute}")
34+
),
35+
antallReference
36+
) {
37+
antallReference.get().toDouble()
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package no.nav.paw.arbeidssoekerregisteret.context
22

3-
import no.nav.paw.arbeidssoekerregisteret.config.AppConfig
3+
import no.nav.paw.arbeidssoekerregisteret.properties.ApplicationProperties
44
import org.slf4j.Logger
55

6-
data class ApplicationContext(val logger: Logger, val properties: AppConfig)
6+
data class ApplicationContext(val logger: Logger, val properties: ApplicationProperties)

‎apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/plugins/kafka/KafkaStreamsPlugin.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import io.ktor.server.application.createApplicationPlugin
77
import io.ktor.server.application.hooks.MonitoringEvent
88
import io.ktor.server.application.log
99
import io.ktor.util.KtorDsl
10-
import no.nav.paw.arbeidssoekerregisteret.config.KafkaStreamsConfig
10+
import no.nav.paw.arbeidssoekerregisteret.properties.KafkaStreamsProperties
1111
import org.apache.kafka.streams.KafkaStreams
1212

1313
@KtorDsl
1414
class KafkaStreamsPluginConfig {
15-
var kafkaStreamsConfig: KafkaStreamsConfig? = null
15+
var kafkaStreamsConfig: KafkaStreamsProperties? = null
1616
var kafkaStreams: List<KafkaStreams>? = null
1717
}
1818

‎apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfig.kt ‎apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/properties/ApplicationProperties.kt

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package no.nav.paw.arbeidssoekerregisteret.config
1+
package no.nav.paw.arbeidssoekerregisteret.properties
22

33
import no.nav.paw.config.env.NaisEnv
44
import no.nav.paw.config.env.currentAppId
@@ -11,16 +11,19 @@ const val SERVER_LOGGER_NAME = "no.nav.paw.server"
1111
const val APPLICATION_LOGGER_NAME = "no.nav.paw.application"
1212
const val APPLICATION_CONFIG_FILE_NAME = "application_configuration.toml"
1313

14-
data class AppConfig(
14+
data class ApplicationProperties(
1515
val kafka: KafkaConfig,
16-
val kafkaStreams: KafkaStreamsConfig,
16+
val kafkaStreams: KafkaStreamsProperties,
1717
val appName: String = currentAppName ?: "paw-arbeidssoeker-opplysninger-aggregering",
1818
val appId: String = currentAppId ?: "paw-arbeidssoeker-opplysninger-aggregering:LOCAL",
1919
val naisEnv: NaisEnv = currentNaisEnv
2020
)
2121

22-
data class KafkaStreamsConfig(
22+
data class KafkaStreamsProperties(
2323
val shutDownTimeout: Duration,
2424
val opplysingerStreamIdSuffix: String,
2525
val opplysningerTopic: String,
26+
val opplysningerStore: String,
27+
val opplysningerPunctuatorSchedule: Duration,
28+
val opplysningerLagretTidsperiode: Duration,
2629
)

‎apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/config/ServerConfig.kt ‎apps/opplysninger-aggregering/src/main/kotlin/no/nav/paw/arbeidssoekerregisteret/properties/ServerProperties.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
package no.nav.paw.arbeidssoekerregisteret.config
1+
package no.nav.paw.arbeidssoekerregisteret.properties
22

33
const val SERVER_CONFIG_FILE_NAME = "server_configuration.toml"
44

5-
data class ServerConfig(
5+
data class ServerProperties(
66
val port: Int,
77
val callGroupSize: Int,
88
val workerGroupSize: Int,
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,118 @@
11
package no.nav.paw.arbeidssoekerregisteret.topology
22

33
import io.micrometer.core.instrument.MeterRegistry
4-
import no.nav.paw.arbeidssoekerregisteret.config.tellAntallMottatteOpplysninger
4+
import no.nav.paw.arbeidssoekerregisteret.config.antallLagredeOpplysningerSumPerPeriode
5+
import no.nav.paw.arbeidssoekerregisteret.config.antallLagredeOpplysningerTotal
6+
import no.nav.paw.arbeidssoekerregisteret.config.buildOpplysningerOmArbeidssoekerAvroSerde
7+
import no.nav.paw.arbeidssoekerregisteret.config.tellMottatteOpplysninger
58
import no.nav.paw.arbeidssoekerregisteret.context.ApplicationContext
69
import no.nav.paw.arbeidssokerregisteret.api.v4.OpplysningerOmArbeidssoeker
10+
import no.nav.paw.config.kafka.streams.Punctuation
11+
import no.nav.paw.config.kafka.streams.genericProcess
12+
import org.apache.kafka.common.serialization.Serdes
713
import org.apache.kafka.streams.StreamsBuilder
814
import org.apache.kafka.streams.Topology
15+
import org.apache.kafka.streams.kstream.Consumed
16+
import org.apache.kafka.streams.processor.PunctuationType
17+
import org.apache.kafka.streams.state.Stores
18+
import org.apache.kafka.streams.state.TimestampedKeyValueStore
19+
import org.apache.kafka.streams.state.ValueAndTimestamp
20+
import java.time.Instant
21+
import java.util.*
22+
import java.util.concurrent.atomic.AtomicLong
923

1024
context(ApplicationContext)
1125
fun buildOpplysningerTopology(
1226
meterRegistry: MeterRegistry
1327
): Topology = StreamsBuilder().apply {
28+
addOpplysningerStateStore()
29+
addOpplysningerKStream(meterRegistry)
30+
}.build()
31+
32+
context(ApplicationContext)
33+
private fun StreamsBuilder.addOpplysningerStateStore() {
34+
logger.info("Oppretter state store for opplysninger om arbeidssøker")
35+
val kafkaStreamsProperties = properties.kafkaStreams
36+
37+
this.addStateStore(
38+
Stores.timestampedKeyValueStoreBuilder(
39+
Stores.persistentKeyValueStore(kafkaStreamsProperties.opplysningerStore),
40+
Serdes.Long(),
41+
buildOpplysningerOmArbeidssoekerAvroSerde()
42+
)
43+
)
44+
}
45+
46+
context(ApplicationContext)
47+
private fun StreamsBuilder.addOpplysningerKStream(meterRegistry: MeterRegistry) {
1448
logger.info("Oppretter KStream for opplysninger om arbeidssøker")
1549
val kafkaStreamsProperties = properties.kafkaStreams
1650

17-
this.stream<Long, OpplysningerOmArbeidssoeker>(kafkaStreamsProperties.opplysningerTopic)
51+
this
52+
.stream(
53+
kafkaStreamsProperties.opplysningerTopic,
54+
Consumed.with(Serdes.Long(), buildOpplysningerOmArbeidssoekerAvroSerde())
55+
)
1856
.peek { key, _ ->
1957
logger.debug("Mottok event på {} med key {}", kafkaStreamsProperties.opplysningerTopic, key)
20-
meterRegistry.tellAntallMottatteOpplysninger()
58+
meterRegistry.tellMottatteOpplysninger()
59+
}.genericProcess<Long, OpplysningerOmArbeidssoeker, Long, OpplysningerOmArbeidssoeker>(
60+
name = "processOpplysningerOmArbeidssoeker",
61+
stateStoreNames = arrayOf(kafkaStreamsProperties.opplysningerStore),
62+
punctuation = buildPunctuation(meterRegistry)
63+
) { record ->
64+
val stateStore: TimestampedKeyValueStore<Long, OpplysningerOmArbeidssoeker> =
65+
getStateStore(kafkaStreamsProperties.opplysningerStore)
66+
logger.debug("Lagrer opplysninger for periode {}", record.value().periodeId)
67+
stateStore.put(record.key(), ValueAndTimestamp.make(record.value(), Instant.now().toEpochMilli()))
2168
}
22-
}.build()
69+
}
70+
71+
context(ApplicationContext)
72+
private fun buildPunctuation(meterRegistry: MeterRegistry): Punctuation<Long, OpplysningerOmArbeidssoeker> {
73+
logger.info("Oppretter Punctuation for opplysninger om arbeidssøker")
74+
val kafkaStreamsProperties = properties.kafkaStreams
75+
76+
return Punctuation(
77+
kafkaStreamsProperties.opplysningerPunctuatorSchedule, PunctuationType.WALL_CLOCK_TIME
78+
) { timestamp, context ->
79+
logger.info("Punctuation kjører for tidspunkt {}", timestamp)
80+
81+
with(context) {
82+
val antallTotalt = AtomicLong(0)
83+
val histogram = mutableMapOf<UUID, AtomicLong>()
84+
85+
val stateStore: TimestampedKeyValueStore<Long, OpplysningerOmArbeidssoeker> =
86+
getStateStore(kafkaStreamsProperties.opplysningerStore)
87+
for (keyValue in stateStore.all()) {
88+
antallTotalt.incrementAndGet()
89+
val lagretTidspunkt = Instant.ofEpochMilli(keyValue.value.timestamp())
90+
val utloepTidspunkt = Instant.now().minus(kafkaStreamsProperties.opplysningerLagretTidsperiode)
91+
if (utloepTidspunkt.isAfter(lagretTidspunkt)
92+
) {
93+
logger.debug(
94+
"Sletter opplysninger for periode {} fordi de har vært lagret mer enn {}m (utløp {} > lagret {})",
95+
keyValue.value.value().periodeId,
96+
kafkaStreamsProperties.opplysningerLagretTidsperiode.toMinutes(),
97+
utloepTidspunkt,
98+
lagretTidspunkt
99+
)
100+
stateStore.delete(keyValue.key)
101+
continue
102+
}
103+
104+
val opplysninger = keyValue.value.value()
105+
val antall = histogram[opplysninger.periodeId]
106+
if (antall != null) {
107+
antall.incrementAndGet()
108+
histogram[opplysninger.periodeId] = antall
109+
} else {
110+
histogram[opplysninger.periodeId] = AtomicLong(1)
111+
}
112+
}
113+
114+
histogram.forEach { (_, antall) -> meterRegistry.antallLagredeOpplysningerSumPerPeriode(timestamp, antall) }
115+
meterRegistry.antallLagredeOpplysningerTotal(antallTotalt)
116+
}
117+
}
118+
}

‎apps/opplysninger-aggregering/src/main/resources/local/application_configuration.toml

+3
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,6 @@ url = "http://localhost:8082"
99
shutDownTimeout = "PT1S"
1010
opplysingerStreamIdSuffix = "opplysninger-stream-v1"
1111
opplysningerTopic = "paw.opplysninger-om-arbeidssoeker-v1"
12+
opplysningerStore = "opplysninger-store"
13+
opplysningerPunctuatorSchedule = "PT1M"
14+
opplysningerLagretTidsperiode = "PT10M"

‎apps/opplysninger-aggregering/src/main/resources/nais/application_configuration.toml

+3
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,6 @@ password = "${KAFKA_SCHEMA_REGISTRY_PASSWORD}"
1616
shutDownTimeout = "PT5S"
1717
opplysingerStreamIdSuffix = "${KAFKA_OPPLYSNINGER_OM_ARBEIDSSOEKER_STREAM_ID_SUFFIX}"
1818
opplysningerTopic = "${KAFKA_PAW_OPPLYSNINGER_OM_ARBEIDSSOEKER_TOPIC}"
19+
opplysningerStore = "opplysninger-store"
20+
opplysningerPunctuatorSchedule = "${OPPLYSNINGER_PUNCTUATOR_SCHEDULE}"
21+
opplysningerLagretTidsperiode = "${OPPLYSNINGER_LAGRET_TIDSPERIODE}"

‎apps/opplysninger-aggregering/src/test/kotlin/no/nav/paw/arbeidssoekerregisteret/config/AppConfigTest.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package no.nav.paw.arbeidssoekerregisteret.config
22

33
import io.kotest.core.spec.style.FreeSpec
44
import io.kotest.matchers.shouldNotBe
5+
import no.nav.paw.arbeidssoekerregisteret.properties.APPLICATION_CONFIG_FILE_NAME
6+
import no.nav.paw.arbeidssoekerregisteret.properties.ApplicationProperties
57
import no.nav.paw.config.hoplite.loadNaisOrLocalConfiguration
68

79
class AppConfigTest : FreeSpec({
810
"Skal laste config" {
9-
val appConfig = loadNaisOrLocalConfiguration<AppConfig>(APPLICATION_CONFIG_FILE_NAME)
11+
val appConfig = loadNaisOrLocalConfiguration<ApplicationProperties>(APPLICATION_CONFIG_FILE_NAME)
1012
appConfig.kafka shouldNotBe null
1113
appConfig.kafkaStreams shouldNotBe null
1214
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package no.nav.paw.config.kafka.streams
2+
3+
4+
import org.apache.kafka.streams.kstream.KStream
5+
import org.apache.kafka.streams.kstream.Named
6+
import org.apache.kafka.streams.processor.PunctuationType
7+
import org.apache.kafka.streams.processor.api.Processor
8+
import org.apache.kafka.streams.processor.api.ProcessorContext
9+
import org.apache.kafka.streams.processor.api.Record
10+
import java.time.Duration
11+
import java.time.Instant
12+
13+
fun <K, V> KStream<K, V>.filterWithContext(
14+
name: String,
15+
vararg stateStoreNames: String,
16+
function: ProcessorContext<K, V>.(V) -> Boolean
17+
): KStream<K, V> {
18+
val processor = {
19+
GenericProcessor<K, V, K, V> { record ->
20+
if (function(record.value())) forward(record)
21+
}
22+
}
23+
return process(processor, Named.`as`(name), *stateStoreNames)
24+
}
25+
26+
fun <K, V_IN, V_OUT> KStream<K, V_IN>.mapNonNull(
27+
name: String,
28+
vararg stateStoreNames: String,
29+
function: ProcessorContext<K, V_OUT>.(V_IN) -> V_OUT?
30+
): KStream<K, V_OUT> {
31+
val processor = {
32+
GenericProcessor<K, V_IN, K, V_OUT> { record ->
33+
val result = function(record.value())
34+
if (result != null) forward(record.withValue(result))
35+
}
36+
}
37+
return process(processor, Named.`as`(name), *stateStoreNames)
38+
}
39+
40+
fun <K, V_IN, V_OUT> KStream<K, V_IN>.mapWithContext(
41+
name: String,
42+
vararg stateStoreNames: String,
43+
function: ProcessorContext<K, V_OUT>.(V_IN) -> V_OUT
44+
): KStream<K, V_OUT> {
45+
val processor = {
46+
GenericProcessor<K, V_IN, K, V_OUT> { record ->
47+
val result = function(record.value())
48+
forward(record.withValue(result))
49+
}
50+
}
51+
return process(processor, Named.`as`(name), *stateStoreNames)
52+
}
53+
54+
fun <K_IN, V_IN, K_OUT, V_OUT> KStream<K_IN, V_IN>.mapKeyAndValue(
55+
name: String,
56+
vararg stateStoreNames: String,
57+
function: ProcessorContext<K_OUT, V_OUT>.(K_IN, V_IN) -> Pair<K_OUT, V_OUT>?
58+
): KStream<K_OUT, V_OUT> {
59+
val processor = {
60+
GenericProcessor<K_IN, V_IN, K_OUT, V_OUT> { record ->
61+
val result = function(record.key(), record.value())
62+
if (result != null) {
63+
forward(record.withKey(result.first).withValue(result.second))
64+
}
65+
}
66+
}
67+
return process(processor, Named.`as`(name), *stateStoreNames)
68+
}
69+
70+
fun <K_IN, V_IN, K_OUT, V_OUT> KStream<K_IN, V_IN>.genericProcess(
71+
name: String,
72+
vararg stateStoreNames: String,
73+
punctuation: Punctuation<K_OUT, V_OUT>? = null,
74+
function: ProcessorContext<K_OUT, V_OUT>.(Record<K_IN, V_IN>) -> Unit
75+
): KStream<K_OUT, V_OUT> {
76+
val processor = {
77+
GenericProcessor(function = function, punctuation = punctuation)
78+
}
79+
return process(processor, Named.`as`(name), *stateStoreNames)
80+
}
81+
82+
class GenericProcessor<K_IN, V_IN, K_OUT, V_OUT>(
83+
private val punctuation: Punctuation<K_OUT, V_OUT>? = null,
84+
private val function: ProcessorContext<K_OUT, V_OUT>.(Record<K_IN, V_IN>) -> Unit
85+
) : Processor<K_IN, V_IN, K_OUT, V_OUT> {
86+
private var context: ProcessorContext<K_OUT, V_OUT>? = null
87+
88+
override fun init(context: ProcessorContext<K_OUT, V_OUT>?) {
89+
super.init(context)
90+
this.context = context
91+
if (punctuation != null) {
92+
context?.schedule(punctuation.interval, punctuation.type) { time ->
93+
punctuation.function(Instant.ofEpochMilli(time), context)
94+
}
95+
}
96+
}
97+
98+
override fun process(record: Record<K_IN, V_IN>?) {
99+
if (record == null) return
100+
val ctx = requireNotNull(context) { "Context is not initialized" }
101+
with(ctx) { function(record) }
102+
}
103+
}
104+
105+
data class Punctuation<K, V>(
106+
val interval: Duration,
107+
val type: PunctuationType,
108+
val function: (Instant, ProcessorContext<K, V>) -> Unit
109+
)

0 commit comments

Comments
 (0)
Please sign in to comment.