Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ scratch
# JDT-specific (Eclipse Java Development Tools)
.classpath
lib
dataflow.*
125 changes: 125 additions & 0 deletions QUICKSTART.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# OpenDataFlow

## Overview

OpenDataFlow is a lightweight orchestration utility that runs and coordinates batch jobs over partitioned or time-sliced data so teams can schedule, recover, and migrate large data-processing pipelines without changing their ETL code.

This quickstart uses H2 for simplicity.

It has been tested on Ubuntu and runs in a bash shell.

1. Requirements to run the demo:

- bash and the standard command line utilities
- jq (`sudo apt install -y jq`)
- The DataFlow jar: dataflow-1.0.0.jar
- the decryption passkey in $PASSKEY environment variable
` export PASSKEY=plugh `
- The two supplied scripts: `utility.sh` and `RunJob`

Have jq on the your path, and put the dataflow-1.0.0.jar in the same directory as the scripts.

2. Set up the h2 database, schema and tables:

``` ./utility.sh createtables```

The initial connection to H2 creates the database, schema, and user automatically
The createtables utility creates the standard dataflow tables in the database.

3. Configure the 'loadbob' job and the datasets that it uses
```
./utility.sh dml "insert into dataset (datasetid) values ('bobin')"
./utility.sh dml "insert into dataset (datasetid) values ('bobout')"
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobout','OUT','loadbob')"
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobin' ,'IN', 'loadbob')"
```
These insert test data into the schema that are enough to simulate a run.

The first two commands register two datasets named 'bobin' and 'bobout'.
The second two commands associates bobin and bobout as input and output data sets respectively with the job named 'loadbob'
These inserts should only happen when one time to configure the job and datasets.

4. Set a status for the input dataset

```
./utility.sh dml "insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values ('1.0','bobin','fakejob', 'OUT',now(),'READY')"
```


We give it a fake dataid, and specify a fakejob that "produced" it. The status of READY for an OUT data chunk means that it is ready and safe to bebe consumed.

5. "Write" a `loadbob.sh` to run. In this example it is just a one-liner that outputs some of the automatic environment variables.

```
echo 'echo "running loadbob with dataid $dataid partition of input $bobin_DATASETID"' > loadbob.sh && chmod +x loadbob.sh
```

One important note: the jobid is **inferred** from the name of the script. That means that if our jobid is 'loadbob' then the script has to be named 'oadbob.sh'. This is mandatory, but is just the way that the RunJob script is written. The intent is to keep it simple so that the only parameter to RunJob is the script name.

6. Run the job with RunJob

```
RunJob ./loadbob.sh
```

Output should look like this:

```text
Mon Dec 1 04:08:50 PM CST 2025: Launching ./loadbob.sh with dataid 1.0
running loadbob with dataid 1.0 partition of input bobin
Mon Dec 1 04:08:50 PM CST 2025: Job ./loadbob.sh is complete. Updating status
1 rows updated to READY for loadbob and 1.0 1 IN file-local locks released
```
Two log-style messages, confirming the start and end of the loadbob job, and the one line output by the `loadbob.sh` script
The last line informational message indicating that DataFlow has set the final status


7. Checks:
do `RunJob ./loadbob.sh` a second time, and confirm that it will refuse to do a duplicate run.
check the data with utility:

```text
./utility.sh runs
DATAID DATASETID JOBID LOCKTYPE MODIFIED STATUS
------ --------- ----- -------- -------- ------
1.0 bobout loadbob OUT 2025-12-01 16:08:49.740813 READY
1.0 bobin fakejob OUT 2025-12-01 15:46:19.56124 READY
```

Check the data with direct SQL's:

```text
utility.sh sql "select * from datastatus"
DATAID DATASETID JOBID LOCKTYPE MODIFIED STATUS
------ --------- ----- -------- -------- ------
1.0 bobin fakejob OUT 2025-12-01 15:46:19.56124 READY
1.0 bobout loadbob OUT 2025-12-01 16:08:49.740813 READY

```


## Remarks

* We started with just the jar file and had to manually create the schema and tables. But if you build the package with maven, the tests will build the H2 database, schema, tables, user and password for you, and the dataflow-1.0.0.jar will be in utilities/target/dataflow-1.0.0.jar

* Access to the h2 database for testing is through user ETL and password which was encrypted using the default passkey 'plugh'. You should encrypt your own password using your own passkey and put it into the dataflow.properties as soon as possible.
The encrypted password and other connection information is in core/src/main/resources/dataflow.properties. You can copy it to your working directory and modify it, and the utilities will override the core/ properties file if they find this one. The encryption is easily done because it is one of the functions published by the utilities.sh tool.


* In normal day-to-day operation you **never** need to update or insert the datastatus table, not in your code, not manually the way we did in this example. RunJob handles that for you. The inserts to job and dataset tables are one-time things to register and confure the datasets or to set up a test case.
In exceptional cases such as handling errors, you encounter a job in FAILED state. If in that case you want the job to run again, you can reset the job to RESUBMIT. You can either do a dml command, like `utility.sh dml 'update datastatus set status to RESUBMIT where jobid='loadbob' and dataid='1.0'` though I would just endjob utility command:
```
utility.sh endjob loadbob 1.0 RESUBMIT
```
The big advantage is that you don't have to ask the scheduling team to make any changes, and you don't have to worry about command line parameters because there are not any. If the job is scheduled to run multiple times a day, then it will just catch up the next time it runs, and there are no changes in production at all except for the RESUBMIT status. That means you avoid an enormous amount of red tape and committee meetings just to rerun the job.


* The actual dataset record has fields for things like hostname, database name, schema, table, username and (***encrypted***) password. These all appear as automatic variables to your script. This avoids all issues related to the temptation of hardcoding this metadata, the headaches involved with maintaining it, possible errors in connection strings, and having to make changes when moving from development to production.
It is possible, and in our opinion best practice to ***hard-code nothing in your script***. Get it all from the metadata that DataFlow provides. For one thing if you have one job producing data and another job consuming it, you are now using named datasets and so both jobs are guaranteed to be using the same dataset. Almost no chance of second job picking up the wrong data because of a misconfiguration.
Not only that, the framework guarantees that the second job will not have any false starts while the first job is running or in any error state.

* The dataset metadata are not restricted to only jdbc connections. They can be repurposed to file system paths, web page urls, tcp endpoints, what have you. The semantics is entirely up to the consumer (the ETL script) which can do whatever they want with it. DataFlow doesn't use it at all.

* Some of the other examples in the examples directory illustrate these points.


65 changes: 56 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,70 @@ See the examples/ directory for exact variable names and sample scripts that rea
Security note: the decrypted credential is provided only at runtime in the job process’s environment. Be careful not to echo or persist it in logs. Keep the encryption key used to create the encrypted password secret.

### Quick start (5 minutes)
For the quick start we use H2 in file mode. It is fast and easy because H2 creates you database, schema, user with password automatically on the first connect. We have a utilty that creates the H2 tables next.
For featherweight implementations, H2 will work pretty well. If you need robustness, scalability, and good transaction semantics then you will need posgress.
also see the file QUICKSTART.md for more details.

1. Prerequisites
- Linux (Ubuntu tested), bash, maven
- jq (sudo apt install -y jq)
- PostgreSQL (or run a local container)
- json-java (json-20250517.jar) and the Postgres JDBC driver (e.g., postgresql-42.7.3.jar)
- A Postgres database named `dataflow`, schema `dataflow`, and a user `etl`

2. Run a local Postgres (optional)
2. Build
mvn package

3. Create the DataFlow tables
export PASSKEY=plugh (this is default but it should be changed )
./utility.sh createtables

Your DataFlow tool is now configured and ready to run.

### Test

Test has a job (loadbob), which takes one input dataset (bobin) and one output dataset(bobout)
We are not actually testing an ETL job. We want to test that the framework is calling it correctly and supplying the necessary dataid and dataset metadata.

1. Register two dataset
```
./utility.sh dml "insert into dataset (datasetid) values ('bobin')"
./utility.sh dml "insert into dataset (datasetid) values ('bobout')"
```
2. Associate them to the loadbob job.
```
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobout','OUT','loadbob')"
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobin' ,'IN', 'loadbob')"
```
3. Mark the input set as READY (just for the test. In real jobs some other job has completed and marked it)

```
./utility.sh dml "insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values ('1.0','bobin','fakejob', 'OUT',now(),'READY')"
```
4. Job should be ready to run, now that it has an input data set in READY status. Perform the test now with RunJob
```
RunJob ./loadbob.sh
```

Output should look like this:

```text
Mon Dec 1 04:08:50 PM CST 2025: Launching ./loadbob.sh with dataid 1.0
running loadbob with dataid 1.0 partition of input bobin
Mon Dec 1 04:08:50 PM CST 2025: Job ./loadbob.sh is complete. Updating status
1 rows updated to READY for loadbob and 1.0 1 IN file-local locks released
```
Two log-style messages, confirming the start and end of the loadbob job, and the one line output by the `loadbob.sh` script
The last line informational message indicating that DataFlow has set the final status

### Postgres
For this you need a postgres database up and running. The absolute easiest way is to spin up a container

1. Run a local Postgres (optional)
podman run -p 5432:5432 --name pg -e POSTGRES_PASSWORD=secretpass -d docker.io/postgres

3. Build
mvn package

4. Initialize database
2. Initialize database. Create a user, ETL and his password.
Connect with psql and run docs/create_tables.sql. See docs/datamodel.txt for schema notes.

5. Configure
3. Configure
Encrypt the DB password with the included Cryptor class:
java -cp app/target/app-1.0.0.jar com.hamiltonlabs.dataflow.utility.Cryptor -e <key> "<password>"
Create the file **dataflow.properties** and place the url,user,schema, and encrypted fields. This tells the utility how to access the dataflow database.
Expand All @@ -116,7 +163,7 @@ Security note: the decrypted credential is provided only at runtime in the job p
```
Keep the encryption key private.

7. Run your job
4. Run your job
Make your ETL script executable (e.g., myETL.sh) and invoke it via:
RunJob myETL.sh

Expand Down
31 changes: 20 additions & 11 deletions RunJob
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# Added to make cronjobs easier to set CLASSPATH
thisdir=`dirname ${BASH_SOURCE[0]}`
cd $thisdir
echo "running from working directory `pwd`"

# support alternate way to pass the key
if [ -z "$PASSKEY" ];then
Expand All @@ -24,7 +23,18 @@ jobid=${jobid%.*} #strip off the extention
shift 1
export args="$@"
#
export CLASSPATH=bin/postgresql-42.7.3.jar:bin/json-20250517.jar:utility/target/utility-1.0.0.jar
#export CLASSPATH=bin/postgresql-42.7.3.jar:bin/json-20250517.jar:utility/target/utility-1.0.0.jar
#export CLASSPATH=bin/json-20250517.jar:bin/h2-2.2.224.jar:utility/target/utility-1.0.0.jar
# get the jar file either in current directory or in canonical build location
jarfile="`ls utility/target/dataflow*jar dataflow*jar 2>/dev/null|tail -1`"
if [ -f "$jarfile" ];then
export jarc="java -jar $jarfile $PASSKEY "
else
echo we need the dataflow-version.jar to run this utility. Cannot continue
exit
fi
export CLASSPATH=$jarfile


getEnvJSON(){
#java com.hamiltonlabs.dataflow.utility.GetJobData $jobid $passkey
Expand All @@ -41,7 +51,7 @@ java com.hamiltonlabs.dataflow.utility.Cryptor -d $passkey "$1"
declareEnv(){
#stale dataid will result in false executions so make sure we only set it here
unset dataid
getEnvJSON|jq -c '.[] | to_entries[]' | while read -r entry; do
getEnvJSON|tail -1|jq -c '.[] | to_entries[]' | while read -r entry; do


key=$(echo "$entry" | jq -r '.key')
Expand All @@ -65,23 +75,22 @@ declareEnv(){
done
}

# test
env="`declareEnv`"
# uncomment if you need to debug echo "$env"

source <(echo "$env")
if [ -z "$dataid" ];then
echo "`date`: no suitable data available for job. Not running it"
else

echo "`date`: Launching $cmd with dataid $dataid"
# lock our files
#java com.hamiltonlabs.dataflow.utility.SetJobStart $passkey $jobid $dataid
echo runs "`./utility.sh runs`"
eval "$cmd $args"
if [ $? -eq 0 ];then
echo "`date`: Launching $cmd with dataid $dataid"
eval "$cmd $args"
if [ $? -eq 0 ];then
echo "`date`: Job $cmd is complete. Updating status"
java -cp $CLASSPATH:app.jar com.hamiltonlabs.dataflow.utility.SetJobEndStatus $passkey $jobid $dataid READY
else
else
echo "`date`: Job $cmd has failed. Updating status"
java -cp $CLASSPATH:app.jar com.hamiltonlabs.dataflow.utility.SetJobEndStatus $passkey $jobid $dataid FAILED
fi
fi
fi
Binary file removed bin/json-20250517.jar
Binary file not shown.
Binary file removed bin/postgresql-42.7.3.jar
Binary file not shown.
11 changes: 8 additions & 3 deletions changes.log
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ NOTE: for the script we have added a new dependency. It uses jq.
The messy problems of 11-09 are fixed.
2025-11-17 Change RunJob to use simpler syntax. Now we just to RunJob myscript.sh and it infers the runid from the script name.
To use the old method, which allows arbitrary commands like "echo or set", use SRUN jobid cmd
Updated the help in utility jar, added deleterun function. Loading properties file now as a resource.
Updated the help in utility jar, added deleterun function. Loading dataflow.properties file now optionally as a resource in jar.

Remove passkey on command line utilities.sh and RunJob entirely and require it to be environment only.

2025-11-21 Added useful functions such as ForceRun and simplified RunJob, and added clear examples, added very useful automatic dataset called 'today' which can be used for daily runs to get an automatic dataid. Merged the service_cleaup mod into main.

2025-11-29 (dev) new "h2" dev branch. Added h2 support, for unit testing and as a light-weight implementation. Created platform module for platform-dependent SQL. Unit test for critical DataFlow.launchJob method.


TODO: I think we should remove passkey on command line entirely and force it to be environment only.

Binary file added core/dataflow.mv.db
Binary file not shown.
15 changes: 15 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@
<artifactId>json</artifactId>
<version>20250517</version>
</dependency>
<dependency>
<groupId>com.hamiltonlabs</groupId>
<artifactId>platform</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.3</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
</dependency>
</dependencies>
</project>

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import java.util.Properties;
import java.io.InputStream;
//import java.io.FileInputStream;

import java.io.FileInputStream;
import java.io.File;
/** Provide credentials for all OpenDataFlow access
* Credentials are user/password. They are returned in a java.util.Properties object.
*
Expand Down Expand Up @@ -47,7 +47,12 @@ public static String getPass(String passphrase,String encryptedPassword) throws
*/
public static Properties getCredentials(String passphrase,String propertiesPath) throws java.security.GeneralSecurityException,java.io.IOException {
Properties properties=new Properties();
// properties.load(new FileInputStream(filepath));
//properties.load(new FileInputStream(filepath));
File file=new File(propertiesPath);
if (file.exists()){
properties.load(new FileInputStream(file));
return updateDecrypted(passphrase,properties);
}
try (InputStream input = CredentialProvider.class.getClassLoader().getResourceAsStream(propertiesPath)) {
if (input == null) {
System.out.printf("Unable to find resource %s\n",propertiesPath);
Expand Down
Loading
Loading