|
| 1 | +package no.nav.paw.meldeplikttjeneste |
| 2 | + |
| 3 | +import no.nav.paw.arbeidssokerregisteret.api.v1.Periode |
| 4 | +import no.nav.paw.config.kafka.streams.genericProcess |
| 5 | +import no.nav.paw.config.kafka.streams.mapWithContext |
| 6 | +import no.nav.paw.kafkakeygenerator.client.KafkaKeysResponse |
| 7 | +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstand |
| 8 | +import no.nav.paw.meldeplikttjeneste.tilstand.InternTilstandSerde |
| 9 | +import no.nav.paw.meldeplikttjeneste.tilstand.initTilstand |
| 10 | +import no.nav.paw.rapportering.internehendelser.PeriodeAvsluttet |
| 11 | +import no.nav.paw.rapportering.internehendelser.RapporteringsHendelse |
| 12 | +import org.apache.kafka.common.serialization.Serdes |
| 13 | +import org.apache.kafka.streams.StreamsBuilder |
| 14 | +import org.apache.kafka.streams.kstream.Produced |
| 15 | +import org.apache.kafka.streams.state.KeyValueStore |
| 16 | +import java.util.* |
| 17 | + |
| 18 | + |
| 19 | +context(ApplicationConfiguration, ApplicationContext) |
| 20 | +fun StreamsBuilder.processPeriodeTopic(kafkaKeyFunction: (String) -> KafkaKeysResponse) { |
| 21 | + stream<Long, Periode>(periodeTopic) |
| 22 | + .mapWithContext("lagreEllerSlettPeriode", statStoreName) { periode -> |
| 23 | + val keyValueStore: KeyValueStore<UUID, InternTilstand> = getStateStore(statStoreName) |
| 24 | + val currentState = keyValueStore[periode.id] |
| 25 | + val (id, key) = currentState?.let { it.periode.kafkaKeysId to it.periode.recordKey } ?: |
| 26 | + kafkaKeyFunction(periode.identitetsnummer).let { it.id to it.key} |
| 27 | + when { |
| 28 | + currentState == null && periode.avsluttet() -> Action.DoNothing |
| 29 | + periode.avsluttet() -> Action.DeleteStateAndEmit(id, periode) |
| 30 | + currentState == null -> Action.UpdateState(initTilstand(id = id, key = key, periode = periode)) |
| 31 | + else -> Action.DoNothing |
| 32 | + } |
| 33 | + } |
| 34 | + .genericProcess<Long, Action, Long, RapporteringsHendelse>( |
| 35 | + name = "executeAction", |
| 36 | + punctuation = null, |
| 37 | + stateStoreNames = arrayOf(statStoreName) |
| 38 | + ) { record -> |
| 39 | + val keyValueStore: KeyValueStore<UUID, InternTilstand> = getStateStore(statStoreName) |
| 40 | + when (val action = record.value()) { |
| 41 | + is Action.DeleteStateAndEmit -> { |
| 42 | + keyValueStore.delete(action.periode.id) |
| 43 | + forward( |
| 44 | + record.withValue( |
| 45 | + PeriodeAvsluttet( |
| 46 | + UUID.randomUUID(), |
| 47 | + action.periode.id, |
| 48 | + action.periode.identitetsnummer, |
| 49 | + action.arbeidsoekerId |
| 50 | + ) as RapporteringsHendelse |
| 51 | + ) |
| 52 | + ) |
| 53 | + } |
| 54 | + |
| 55 | + Action.DoNothing -> {} |
| 56 | + is Action.UpdateState -> keyValueStore.put(action.state.periode.periodeId, action.state) |
| 57 | + } |
| 58 | + }.to(rapporteringsHendelsesloggTopic, Produced.with(Serdes.Long(), rapporteringsHendelseSerde)) |
| 59 | +} |
| 60 | + |
| 61 | +fun Periode.avsluttet(): Boolean = avsluttet != null |
| 62 | + |
| 63 | +sealed interface Action { |
| 64 | + data object DoNothing : Action |
| 65 | + data class DeleteStateAndEmit(val arbeidsoekerId: Long, val periode: Periode) : Action |
| 66 | + data class UpdateState(val state: InternTilstand) : Action |
| 67 | +} |
0 commit comments