RDF-Connect pipelines for publishing an LDES stream built from the following FAVV open data sources:
The operators list provides all companies and establishments registered with FAVV that currently hold a registration, an approval, or an authorization. Operators are listed with their activity code (PAP code) and their approval/authorization number.
| Language | File |
|---|---|
| German (DE) | inter_actieve_actoren_DE |
| English (EN) | inter_actieve_actoren_EN |
| French (FR) | inter_actieve_actoren_FR |
| Dutch (NL) | inter_actieve_actoren_NL |
This repository publishes a Linked Data Event Stream (LDES) for recognized FAVV operators and activities.
The main goals are:
- Transform source CSV exports into RDF using reusable YARRRML mappings.
- Detect record-level changes between data snapshots.
- Publish those changes as a time-based, bucketized LDES feed.
- Persist feed state so subsequent runs only emit meaningful updates.
- Automate refresh and publication with GitHub Actions.
The RDF model used by this pipeline is defined in:
- pipeline/resources/vocabularies/favv-ontology.ttl
- pipeline/resources/vocabularies/controlled-vocabularies.ttl
favv:Recognition: a recognition/authorization/registration record.favv:FoodBusinessOperator: an operator (company/establishment).favv:Activity: a PAP activity linked to an operator recognition.
favv:Recognitionfavv:forOperatorfavv:FoodBusinessOperatorfavv:Recognitionfavv:forActivityfavv:Activityfavv:Recognitionfavv:recognitionFormDescriptionskos:Concept
favv:recognitionFormDescription points to the recognition-form concept scheme in pipeline/resources/vocabularies/controlled-vocabularies.ttl, with three concepts:
https://data.favv.be/id/concept/recognition-form/1(Recognition)https://data.favv.be/id/concept/recognition-form/2(Authorization)https://data.favv.be/id/concept/recognition-form/3(Registration)
classDiagram
direction LR
class Recognition {
+recognitionNumber string
+recognitionStartDate string
+recognitionTypeCode string
+recognitionTypeDescription string
+recognitionFormCode string
}
class FoodBusinessOperator {
+operatorId string
+lnoId string
+municipalityName langString
+provinceName langString
+postCode string
}
class Activity {
+papId string
+papDescription langString
+activityCode string
+activityDescription langString
+placeCode string
+placeDescription langString
+productCode string
+productDescription langString
}
class SKOSConcept {
+notation string
+prefLabel langString
}
class RecognitionFormScheme {
+title langString
+prefLabel langString
}
Recognition --> FoodBusinessOperator : forOperator
Recognition --> Activity : forActivity
Recognition --> SKOSConcept : recognitionFormDescription
RecognitionFormScheme --> SKOSConcept : hasTopConcept
The architecture is centered on one RDF-Connect pipeline definition in pipeline/github-pipeline.ttl, supported by:
- Mapping rules in pipeline/resources/mappings/favv.yml.
- Focus-node query and shape for change detection in pipeline/resources/change-detection/focus-node-query.rq and pipeline/resources/change-detection/focus-node-shape.ttl.
- Feed output in docs.
- Stateful processing artifacts in pipeline/feed-state.
RDF-Connect uses two runners:
- Node runner for file processors, YARRRML parsing, change detection, SDS conversion, bucketization, and disk writing.
- JVM runner for the RML mapper processor with dedicated memory settings.
The workflow compiles and copies the custom JVM plugin from pipeline/build.gradle, installs Node dependencies from pipeline/package.json, downloads source CSV files, normalizes encoding for FR data, and executes the pipeline.
flowchart LR
NL[inter_actieve_actoren_NL.csv] -.-> A
FR[inter_actieve_actoren_FR.csv] -.-> A
A[GlobRead mappingReader\nresources/mappings/favv.yml] --> B[Yarrrml2RML yarrrmlParser]
B --> C[RmlMapper rmlMapper\nJVM runner]
C --> D[DumpsToFeed changeDetector]
E[GlobRead queryReader\nfocus-node-query.rq] --> D
F[GlobRead shapeReader\nfocus-node-shape.ttl] --> D
D --> G[Sdsify sdsAnnotator]
G --> H[Bucketize treeBucketizer]
H --> I[LdesDiskWriter ldesDiskWriter\n../docs]
D -. state .-> S1[(pipeline/feed-state)]
H -. state .-> S1
classDef nodeRunner fill:#dff3ff,stroke:#1f6fa8,stroke-width:2px,color:#0f2f45;
classDef jvm fill:#ffe5b4,stroke:#c97a00,stroke-width:2px,color:#2f1b00;
class A,B,D,E,F,G,H,I nodeRunner;
class C jvm;
-
mappingReader (rdfc:GlobRead) Reads the YARRRML mapping file from resources/mappings/favv.yml into a channel.
-
yarrrmlParser (rdfc:Yarrrml2RML) Converts YARRRML definitions into executable RML.
-
rmlMapper (rdfc:RmlMapper, JVM) Executes RML mappings against the CSV sources and emits RDF Turtle to an internal dump channel.
-
queryReader (rdfc:GlobRead) Loads the SPARQL focus-node query used by the change detector.
-
shapeReader (rdfc:GlobRead) Loads the SHACL node shape describing relevant entities for change detection.
-
changeDetector (rdfc:DumpsToFeed) Compares generated RDF dumps with persisted state and emits create/update/delete change events for focus nodes.
-
sdsAnnotator (rdfc:Sdsify) Adds SDS metadata and stream semantics, including stream identifier and publication timestamp path.
-
treeBucketizer (rdfc:Bucketize) Applies a time-based TREE fragmentation strategy (timestamp path as:published) and stores bucketization state.
-
ldesDiskWriter (rdfc:LdesDiskWriter) Materializes the bucketized LDES (members + metadata) into the docs directory used for publication.
- yarrrml_mappings: YARRRML input stream.
- rml_mappings: generated RML stream.
- rdf_data_dump: RDF produced by RML mapping.
- entity_query and entity_shape: inputs for focus-node change detection.
- change_events: change stream generated by DumpsToFeed.
- sds_members and sds_metadata: SDS member and metadata channels.
- bucketized_members and sds_metadata2: bucketized outputs written as LDES files.
The automation is defined in .github/workflows/create-feed.yml.
- Push to main.
- Scheduled run every 5 days (cron).
- Manual execution through workflow_dispatch.
- Checkout repository source.
- Install Java 21 (Temurin).
- Configure Gradle and build/copy JVM RML processor plugins.
- Install Node.js 24.
- Install RDF-Connect and processor dependencies with npm.
- Download NL and FR CSV files into pipeline/resources/data.
- Convert the FR CSV from Windows-1252 to UTF-8.
- Run the pipeline with debug logging via npx rdfc github-pipeline.ttl.
- Commit and push updated output artifacts in docs (published LDES files) and pipeline/feed-state (change/bucketization state).
This means each successful run both regenerates the published feed and persists state needed to produce incremental change events on future executions.
The repository includes a containerized runtime for the pipeline:
- Dockerfile: docker/Dockerfile
- Entrypoint script: docker/docker-entrypoint.sh
At container startup, the entrypoint performs the operational preparation steps automatically:
- Creates the input folder at pipeline/resources/data.
- Downloads the NL and FR source CSV files from FAVV.
- Converts the FR CSV from Windows-1252 to UTF-8.
- Starts the pipeline command (default: npx rdfc github-pipeline.ttl).
From the repository root:
docker build -f docker/Dockerfile -t favv-pipeline:latest .
Run with mounted output and persistent feed-state:
docker run --rm \
-e LOG_LEVEL=debug \
-v "$(pwd)/docs:/app/docs" \
-v "$(pwd)/pipeline/feed-state:/app/pipeline/feed-state" \
favv-pipeline:latest
This writes generated LDES output to docs and persists change-detection/bucketization state in pipeline/feed-state.
The image uses an entrypoint and a default CMD. You can override the CMD to run another RDF-Connect command:
docker run --rm \
-v "$(pwd)/docs:/app/docs" \
-v "$(pwd)/pipeline/feed-state:/app/pipeline/feed-state" \
favv-pipeline:latest \
npx rdfc github-pipeline.ttl