dvh-airflow-kafka kan benyttes for å konsumere data fra kafka-topics i Aiven og skrive data til tabeller i Oracle on-prem database (DWH). Dette er typisk aktuelt for "dvh"-team som bygger dataprodukter basert på data i "datavarehuset".
Følg denne guiden for å opprette en NAIS applikasjon som har tilgang til ønsket topic.
Krever at Docker desktop (Krever lisen) og Poetry er installert.
Podman er et godt alternativ til Docker Desktop.
poetry run test
poetry run pytest src/development/test_integration.py
poetry run pytest src/development/test_integration.py::test_test_run_subscribe
DVH-AIRFLOW-KAFKA bruker google secret manager
for å laste inn hemligheter for å koble til et kafka topic og oracle. Du angir navnet på hemlighetene slik at konsumenten kan laste inn hemlighetene som miljøvariabler. Som bruker har du disse mulighetene:
SOURCE_SECRET_PATH
&TARGET_SECRET_PATH
PROJECT_SECRET_PATH
Vi anbefaler å legge inn kafka hemligheten i SOURCE_SECRET_PATH
og oracle hemlighetene i TARGET_SECRET_PATH
. Alternativt kan de kombineres i en hemliget som angis av PROJECT_SECRET_PATH
Dette er miljøvariablene som forventes i SOURCE:
{
"KAFKA_BROKERS": "",
"KAFKA_CA": "",
"KAFKA_CERTIFICATE": "",
"KAFKA_CREDSTORE_PASSWORD": "",
"KAFKA_PRIVATE_KEY": "",
"KAFKA_SCHEMA_REGISTRY": "",
"KAFKA_SCHEMA_REGISTRY_PASSWORD": "",
"KAFKA_SCHEMA_REGISTRY_USER": "",
"KAFKA_SECRET_UPDATED": "",
}
Dette er miljøvariablene som forventes i TARGET:
{
"DB_USER": "user",
"DB_PASSWORD": "password",
"DB_DSN": "dsn",
}
DVH-AIRFLOW-KAFKA forventer en miljøvariabler CONSUMER_CONFIG
der verdien er en streng på yaml
format. Det er denne som bestemmer hvor dataen hentes fra, hvordan den transformeres, og hvor den lagres.
Kodeeksempel dv-a-team-dags
Eksempel config:
# Kildekonfigurasjon.
source:
# Kun kafka er støttet for øyeblikket
type: kafka
# Hvor mange meldinger skal leses før du skriver til target
batch-size: 5000
# Minimum tid konsumenten skal polle etter nyemeldinger før den returnerer
batch-interval: 5
# Kafka topic som skal kobles til som kilde
topic: topic-navn
# Et unikt group id som skal identfirseres som en subscriber på topicet. Hvis du å lese fra start anbefales det å lage en ny group-id, f.eks gruppe-id-v*
group-id: gruppe-id
# Type skjema. String er rå streng. Alt aksepteres. JSON skjema har struktur. Avro er streng på datatyper og nullverdier.
schema: json | avro | string
# Hvor mange sekunder konsumenten poller før timeout
poll-timeout: 10 # default 10
# Velg om du ønsker en konsument som bruker tidsstempel fra bruker til å bestemme offset som skal konsumeres, eller om offset per partisjon comittes til kafka.
# assign: konsumenten bruker DATA_INTERVAL_START/DATA_INTERVAL_END
# subscribe: konsumenten comitter offset etter hver batch og fortsetter fra offset som er lagret på topic
strategy: assign | subscribe # default assign
# Mål
target:
# Kun Oracle er støttet for øyeblikket
type: oracle
custom-config:
- method: oracledb.Cursor.setinputsizes
name: kafka_timestamp
value: oracledb.TIMESTAMP
- method: oracledb.Cursor.setinputsizes
name: kafka_message
value: oracledb.DB_TYPE_CLOB | oracledb.DB_TYPE_BLOB
# Hvis denne er med vil maksverdi i måltabellen bestemme hvor lesing av kafkatopicet skal starte. Ellers må data_interval_start spesifiseres eksplisitt i DAG.
delta:
# Kolonne det skal beregnes maksverdi fra.
column: kafka_timestamp
# Stort sett samme som under
table: <target-table-name>
# Måltabell
table: <target-table-name>
# Hvis du ønsker å filtrere duplikater fra kilde. Tar en liste med en eller flere kolonner.
skip-duplicates-with:
# Hvilke kolonner som til sammen skal være unike.
- kafka_partition
- kafka_offset
- kafka_topic
# Fjerner feltene key1 og key3 fra json-objektet før det sendes til oracle
keypath-seperator: /
message-fields-filter:
- key1
- key2/key3
# Mapping mellom kildekolonne og målkolonne
transform:
- src: kafka_key
dst: kafka_key
- src: kafka_offset
dst: kafka_offset
- src: kafka_partition
dst: kafka_partition
- src: kafka_timestamp
dst: kafka_timestamp
# Eksempel på konverteringsfunksjon
fun: int-unix-ms -> datetime-no
- src: kafka_topic
dst: kafka_topic
- src: kafka_hash
dst: kafka_hash
- src: kafka_message
dst: kafka_message
- src: <kildenavn> # eks $PERMITTERING
dst: KILDESYSTEM
- src: $$$BATCH_TIME
dst: lastet_dato