diff --git a/.gitignore b/.gitignore index 67edbd5..0954fa6 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ scratch # JDT-specific (Eclipse Java Development Tools) .classpath lib +dataflow.* diff --git a/QUICKSTART.md b/QUICKSTART.md new file mode 100644 index 0000000..16460a3 --- /dev/null +++ b/QUICKSTART.md @@ -0,0 +1,125 @@ +# OpenDataFlow + +## Overview + +OpenDataFlow is a lightweight orchestration utility that runs and coordinates batch jobs over partitioned or time-sliced data so teams can schedule, recover, and migrate large data-processing pipelines without changing their ETL code. + +This quickstart uses H2 for simplicity. + +It has been tested on Ubuntu and runs in a bash shell. + +1. Requirements to run the demo: + +- bash and the standard command line utilities +- jq (`sudo apt install -y jq`) +- The DataFlow jar: dataflow-1.0.0.jar +- the decryption passkey in $PASSKEY environment variable + ` export PASSKEY=plugh ` +- The two supplied scripts: `utility.sh` and `RunJob` + +Have jq on the your path, and put the dataflow-1.0.0.jar in the same directory as the scripts. + +2. Set up the h2 database, schema and tables: + +``` ./utility.sh createtables``` + + The initial connection to H2 creates the database, schema, and user automatically + The createtables utility creates the standard dataflow tables in the database. + +3. Configure the 'loadbob' job and the datasets that it uses +``` + ./utility.sh dml "insert into dataset (datasetid) values ('bobin')" + ./utility.sh dml "insert into dataset (datasetid) values ('bobout')" + ./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobout','OUT','loadbob')" + ./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobin' ,'IN', 'loadbob')" +``` +These insert test data into the schema that are enough to simulate a run. + +The first two commands register two datasets named 'bobin' and 'bobout'. +The second two commands associates bobin and bobout as input and output data sets respectively with the job named 'loadbob' +These inserts should only happen when one time to configure the job and datasets. + +4. Set a status for the input dataset + + ``` +./utility.sh dml "insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values ('1.0','bobin','fakejob', 'OUT',now(),'READY')" +``` + + +We give it a fake dataid, and specify a fakejob that "produced" it. The status of READY for an OUT data chunk means that it is ready and safe to bebe consumed. + +5. "Write" a `loadbob.sh` to run. In this example it is just a one-liner that outputs some of the automatic environment variables. + +``` + echo 'echo "running loadbob with dataid $dataid partition of input $bobin_DATASETID"' > loadbob.sh && chmod +x loadbob.sh +``` + + One important note: the jobid is **inferred** from the name of the script. That means that if our jobid is 'loadbob' then the script has to be named 'oadbob.sh'. This is mandatory, but is just the way that the RunJob script is written. The intent is to keep it simple so that the only parameter to RunJob is the script name. + +6. Run the job with RunJob + +``` + RunJob ./loadbob.sh +``` + +Output should look like this: + +```text + Mon Dec 1 04:08:50 PM CST 2025: Launching ./loadbob.sh with dataid 1.0 + running loadbob with dataid 1.0 partition of input bobin + Mon Dec 1 04:08:50 PM CST 2025: Job ./loadbob.sh is complete. Updating status + 1 rows updated to READY for loadbob and 1.0 1 IN file-local locks released +``` +Two log-style messages, confirming the start and end of the loadbob job, and the one line output by the `loadbob.sh` script +The last line informational message indicating that DataFlow has set the final status + + +7. Checks: + do `RunJob ./loadbob.sh` a second time, and confirm that it will refuse to do a duplicate run. + check the data with utility: + +```text + ./utility.sh runs + DATAID DATASETID JOBID LOCKTYPE MODIFIED STATUS + ------ --------- ----- -------- -------- ------ + 1.0 bobout loadbob OUT 2025-12-01 16:08:49.740813 READY + 1.0 bobin fakejob OUT 2025-12-01 15:46:19.56124 READY +``` + + Check the data with direct SQL's: + +```text + utility.sh sql "select * from datastatus" +DATAID DATASETID JOBID LOCKTYPE MODIFIED STATUS +------ --------- ----- -------- -------- ------ +1.0 bobin fakejob OUT 2025-12-01 15:46:19.56124 READY +1.0 bobout loadbob OUT 2025-12-01 16:08:49.740813 READY + +``` + + +## Remarks + +* We started with just the jar file and had to manually create the schema and tables. But if you build the package with maven, the tests will build the H2 database, schema, tables, user and password for you, and the dataflow-1.0.0.jar will be in utilities/target/dataflow-1.0.0.jar + +* Access to the h2 database for testing is through user ETL and password which was encrypted using the default passkey 'plugh'. You should encrypt your own password using your own passkey and put it into the dataflow.properties as soon as possible. +The encrypted password and other connection information is in core/src/main/resources/dataflow.properties. You can copy it to your working directory and modify it, and the utilities will override the core/ properties file if they find this one. The encryption is easily done because it is one of the functions published by the utilities.sh tool. + + +* In normal day-to-day operation you **never** need to update or insert the datastatus table, not in your code, not manually the way we did in this example. RunJob handles that for you. The inserts to job and dataset tables are one-time things to register and confure the datasets or to set up a test case. +In exceptional cases such as handling errors, you encounter a job in FAILED state. If in that case you want the job to run again, you can reset the job to RESUBMIT. You can either do a dml command, like `utility.sh dml 'update datastatus set status to RESUBMIT where jobid='loadbob' and dataid='1.0'` though I would just endjob utility command: +``` + utility.sh endjob loadbob 1.0 RESUBMIT +``` +The big advantage is that you don't have to ask the scheduling team to make any changes, and you don't have to worry about command line parameters because there are not any. If the job is scheduled to run multiple times a day, then it will just catch up the next time it runs, and there are no changes in production at all except for the RESUBMIT status. That means you avoid an enormous amount of red tape and committee meetings just to rerun the job. + + + * The actual dataset record has fields for things like hostname, database name, schema, table, username and (***encrypted***) password. These all appear as automatic variables to your script. This avoids all issues related to the temptation of hardcoding this metadata, the headaches involved with maintaining it, possible errors in connection strings, and having to make changes when moving from development to production. +It is possible, and in our opinion best practice to ***hard-code nothing in your script***. Get it all from the metadata that DataFlow provides. For one thing if you have one job producing data and another job consuming it, you are now using named datasets and so both jobs are guaranteed to be using the same dataset. Almost no chance of second job picking up the wrong data because of a misconfiguration. +Not only that, the framework guarantees that the second job will not have any false starts while the first job is running or in any error state. + +* The dataset metadata are not restricted to only jdbc connections. They can be repurposed to file system paths, web page urls, tcp endpoints, what have you. The semantics is entirely up to the consumer (the ETL script) which can do whatever they want with it. DataFlow doesn't use it at all. + +* Some of the other examples in the examples directory illustrate these points. + + diff --git a/README.md b/README.md index 0311733..adb86ef 100644 --- a/README.md +++ b/README.md @@ -87,23 +87,70 @@ See the examples/ directory for exact variable names and sample scripts that rea Security note: the decrypted credential is provided only at runtime in the job process’s environment. Be careful not to echo or persist it in logs. Keep the encryption key used to create the encrypted password secret. ### Quick start (5 minutes) +For the quick start we use H2 in file mode. It is fast and easy because H2 creates you database, schema, user with password automatically on the first connect. We have a utilty that creates the H2 tables next. +For featherweight implementations, H2 will work pretty well. If you need robustness, scalability, and good transaction semantics then you will need posgress. +also see the file QUICKSTART.md for more details. + 1. Prerequisites - Linux (Ubuntu tested), bash, maven - jq (sudo apt install -y jq) - - PostgreSQL (or run a local container) - - json-java (json-20250517.jar) and the Postgres JDBC driver (e.g., postgresql-42.7.3.jar) - - A Postgres database named `dataflow`, schema `dataflow`, and a user `etl` -2. Run a local Postgres (optional) +2. Build + mvn package + +3. Create the DataFlow tables + export PASSKEY=plugh (this is default but it should be changed ) + ./utility.sh createtables + +Your DataFlow tool is now configured and ready to run. + +### Test + +Test has a job (loadbob), which takes one input dataset (bobin) and one output dataset(bobout) +We are not actually testing an ETL job. We want to test that the framework is calling it correctly and supplying the necessary dataid and dataset metadata. + +1. Register two dataset +``` + ./utility.sh dml "insert into dataset (datasetid) values ('bobin')" + ./utility.sh dml "insert into dataset (datasetid) values ('bobout')" +``` + 2. Associate them to the loadbob job. +``` + ./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobout','OUT','loadbob')" + ./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobin' ,'IN', 'loadbob')" +``` + 3. Mark the input set as READY (just for the test. In real jobs some other job has completed and marked it) + +``` +./utility.sh dml "insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values ('1.0','bobin','fakejob', 'OUT',now(),'READY')" +``` + 4. Job should be ready to run, now that it has an input data set in READY status. Perform the test now with RunJob +``` + RunJob ./loadbob.sh +``` + +Output should look like this: + +```text + Mon Dec 1 04:08:50 PM CST 2025: Launching ./loadbob.sh with dataid 1.0 + running loadbob with dataid 1.0 partition of input bobin + Mon Dec 1 04:08:50 PM CST 2025: Job ./loadbob.sh is complete. Updating status + 1 rows updated to READY for loadbob and 1.0 1 IN file-local locks released +``` +Two log-style messages, confirming the start and end of the loadbob job, and the one line output by the `loadbob.sh` script +The last line informational message indicating that DataFlow has set the final status + +### Postgres +For this you need a postgres database up and running. The absolute easiest way is to spin up a container + +1. Run a local Postgres (optional) podman run -p 5432:5432 --name pg -e POSTGRES_PASSWORD=secretpass -d docker.io/postgres -3. Build - mvn package -4. Initialize database +2. Initialize database. Create a user, ETL and his password. Connect with psql and run docs/create_tables.sql. See docs/datamodel.txt for schema notes. -5. Configure +3. Configure Encrypt the DB password with the included Cryptor class: java -cp app/target/app-1.0.0.jar com.hamiltonlabs.dataflow.utility.Cryptor -e "" Create the file **dataflow.properties** and place the url,user,schema, and encrypted fields. This tells the utility how to access the dataflow database. @@ -116,7 +163,7 @@ Security note: the decrypted credential is provided only at runtime in the job p ``` Keep the encryption key private. -7. Run your job +4. Run your job Make your ETL script executable (e.g., myETL.sh) and invoke it via: RunJob myETL.sh diff --git a/RunJob b/RunJob index 60062f0..f691200 100755 --- a/RunJob +++ b/RunJob @@ -7,7 +7,6 @@ # Added to make cronjobs easier to set CLASSPATH thisdir=`dirname ${BASH_SOURCE[0]}` cd $thisdir -echo "running from working directory `pwd`" # support alternate way to pass the key if [ -z "$PASSKEY" ];then @@ -24,7 +23,18 @@ jobid=${jobid%.*} #strip off the extention shift 1 export args="$@" # -export CLASSPATH=bin/postgresql-42.7.3.jar:bin/json-20250517.jar:utility/target/utility-1.0.0.jar +#export CLASSPATH=bin/postgresql-42.7.3.jar:bin/json-20250517.jar:utility/target/utility-1.0.0.jar +#export CLASSPATH=bin/json-20250517.jar:bin/h2-2.2.224.jar:utility/target/utility-1.0.0.jar +# get the jar file either in current directory or in canonical build location +jarfile="`ls utility/target/dataflow*jar dataflow*jar 2>/dev/null|tail -1`" +if [ -f "$jarfile" ];then + export jarc="java -jar $jarfile $PASSKEY " +else + echo we need the dataflow-version.jar to run this utility. Cannot continue + exit +fi +export CLASSPATH=$jarfile + getEnvJSON(){ #java com.hamiltonlabs.dataflow.utility.GetJobData $jobid $passkey @@ -41,7 +51,7 @@ java com.hamiltonlabs.dataflow.utility.Cryptor -d $passkey "$1" declareEnv(){ #stale dataid will result in false executions so make sure we only set it here unset dataid - getEnvJSON|jq -c '.[] | to_entries[]' | while read -r entry; do + getEnvJSON|tail -1|jq -c '.[] | to_entries[]' | while read -r entry; do key=$(echo "$entry" | jq -r '.key') @@ -65,23 +75,22 @@ declareEnv(){ done } +# test env="`declareEnv`" # uncomment if you need to debug echo "$env" + source <(echo "$env") if [ -z "$dataid" ];then echo "`date`: no suitable data available for job. Not running it" else -echo "`date`: Launching $cmd with dataid $dataid" -# lock our files -#java com.hamiltonlabs.dataflow.utility.SetJobStart $passkey $jobid $dataid -echo runs "`./utility.sh runs`" -eval "$cmd $args" - if [ $? -eq 0 ];then + echo "`date`: Launching $cmd with dataid $dataid" + eval "$cmd $args" + if [ $? -eq 0 ];then echo "`date`: Job $cmd is complete. Updating status" java -cp $CLASSPATH:app.jar com.hamiltonlabs.dataflow.utility.SetJobEndStatus $passkey $jobid $dataid READY -else + else echo "`date`: Job $cmd has failed. Updating status" java -cp $CLASSPATH:app.jar com.hamiltonlabs.dataflow.utility.SetJobEndStatus $passkey $jobid $dataid FAILED -fi + fi fi diff --git a/bin/json-20250517.jar b/bin/json-20250517.jar deleted file mode 100644 index 0dc6cfc..0000000 Binary files a/bin/json-20250517.jar and /dev/null differ diff --git a/bin/postgresql-42.7.3.jar b/bin/postgresql-42.7.3.jar deleted file mode 100644 index fa42b1d..0000000 Binary files a/bin/postgresql-42.7.3.jar and /dev/null differ diff --git a/changes.log b/changes.log index 5441904..1ea363c 100644 --- a/changes.log +++ b/changes.log @@ -53,7 +53,12 @@ NOTE: for the script we have added a new dependency. It uses jq. The messy problems of 11-09 are fixed. 2025-11-17 Change RunJob to use simpler syntax. Now we just to RunJob myscript.sh and it infers the runid from the script name. To use the old method, which allows arbitrary commands like "echo or set", use SRUN jobid cmd - Updated the help in utility jar, added deleterun function. Loading properties file now as a resource. + Updated the help in utility jar, added deleterun function. Loading dataflow.properties file now optionally as a resource in jar. + + Remove passkey on command line utilities.sh and RunJob entirely and require it to be environment only. + +2025-11-21 Added useful functions such as ForceRun and simplified RunJob, and added clear examples, added very useful automatic dataset called 'today' which can be used for daily runs to get an automatic dataid. Merged the service_cleaup mod into main. + +2025-11-29 (dev) new "h2" dev branch. Added h2 support, for unit testing and as a light-weight implementation. Created platform module for platform-dependent SQL. Unit test for critical DataFlow.launchJob method. + - TODO: I think we should remove passkey on command line entirely and force it to be environment only. - diff --git a/core/dataflow.mv.db b/core/dataflow.mv.db new file mode 100644 index 0000000..d97a40c Binary files /dev/null and b/core/dataflow.mv.db differ diff --git a/core/pom.xml b/core/pom.xml index 621ee49..3a3a938 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -19,6 +19,21 @@ json 20250517 + + com.hamiltonlabs + platform + 1.0.0 + + + org.postgresql + postgresql + 42.7.3 + + + com.h2database + h2 + 2.2.224 + diff --git a/core/src/main/java/com/hamiltonlabs/dataflow/core/CredentialProvider.java b/core/src/main/java/com/hamiltonlabs/dataflow/core/CredentialProvider.java index 998a3e1..40d77f0 100644 --- a/core/src/main/java/com/hamiltonlabs/dataflow/core/CredentialProvider.java +++ b/core/src/main/java/com/hamiltonlabs/dataflow/core/CredentialProvider.java @@ -2,8 +2,8 @@ import java.util.Properties; import java.io.InputStream; -//import java.io.FileInputStream; - +import java.io.FileInputStream; +import java.io.File; /** Provide credentials for all OpenDataFlow access * Credentials are user/password. They are returned in a java.util.Properties object. * @@ -47,7 +47,12 @@ public static String getPass(String passphrase,String encryptedPassword) throws */ public static Properties getCredentials(String passphrase,String propertiesPath) throws java.security.GeneralSecurityException,java.io.IOException { Properties properties=new Properties(); - // properties.load(new FileInputStream(filepath)); + //properties.load(new FileInputStream(filepath)); + File file=new File(propertiesPath); + if (file.exists()){ + properties.load(new FileInputStream(file)); + return updateDecrypted(passphrase,properties); + } try (InputStream input = CredentialProvider.class.getClassLoader().getResourceAsStream(propertiesPath)) { if (input == null) { System.out.printf("Unable to find resource %s\n",propertiesPath); diff --git a/core/src/main/java/com/hamiltonlabs/dataflow/core/DataProvider.java b/core/src/main/java/com/hamiltonlabs/dataflow/core/DataProvider.java index ecf1fd6..4933a37 100644 --- a/core/src/main/java/com/hamiltonlabs/dataflow/core/DataProvider.java +++ b/core/src/main/java/com/hamiltonlabs/dataflow/core/DataProvider.java @@ -12,7 +12,7 @@ import org.json.JSONArray; import org.json.JSONObject; - +import com.hamiltonlabs.dataflow.platform.SqlResourceLoader; /** provide multi platform functionality for jdbc connections @@ -30,16 +30,25 @@ public class DataProvider implements AutoCloseable{ /* used to open a connection to the database */ private static final Properties properties = new Properties(); + /* the platform as determined from dataflow.properties */ + String platform; /* the jdbc connection used by this instance. */ private Connection connection; + public SqlResourceLoader sqlResource; + /** get the connection used by this data provider. * @return the connection object */ public Connection getConnection(){ return connection; } - + public String getSQL(String key){ + return sqlResource.getSql(key); + } + public String getPlatform(){ + return platform; + } /** open the connection and set default search path. * @param passphrase String used as key to decrypt the dataflow password @@ -50,7 +59,12 @@ public DataProvider open(String passphrase,String props)throws SQLException,java Properties creds=CredentialProvider.getCredentials(passphrase,props); String url=creds.getProperty("url"); connection=DriverManager.getConnection(url,creds); - connection.setSchema(creds.getProperty("schema")); + String schema=creds.getProperty("schema"); + if (schema!=null){ + connection.setSchema(creds.getProperty("schema")); + } + platform=creds.getProperty("platform"); + sqlResource=new SqlResourceLoader(platform); return this; } @@ -62,6 +76,9 @@ public DataProvider open(String passphrase,String props)throws SQLException,java */ public int runUpdate(String ... vars) throws SQLException{ String sql=vars[0]; + if (sql.equals("")){ + return 0; + } PreparedStatement st=connection.prepareStatement(sql); for (int i=1;i 4.0.0 @@ -13,8 +13,5 @@ platform jar - - - diff --git a/platform/src/main/java/com/hamiltonlabs/dataflow/platform/SqlResourceLoader.java b/platform/src/main/java/com/hamiltonlabs/dataflow/platform/SqlResourceLoader.java new file mode 100644 index 0000000..514f5fe --- /dev/null +++ b/platform/src/main/java/com/hamiltonlabs/dataflow/platform/SqlResourceLoader.java @@ -0,0 +1,41 @@ +package com.hamiltonlabs.dataflow.platform; +import java.util.Properties; +import java.io.InputStream; +import java.io.FileInputStream; +import java.io.File; +import java.io.IOException; + +public class SqlResourceLoader { + private final Properties sqlProps = new Properties(); + + public SqlResourceLoader(String platform) throws IOException { + String fileName = platform.toLowerCase() + ".sql.properties"; + + // 1. Check working directory + File localFile = new File(fileName); + if (localFile.exists()) { + try (InputStream in = new FileInputStream(localFile)) { + sqlProps.load(in); + return; + } + } + + // 2. Fallback to classpath resource + String resourceName = fileName; + try (InputStream in = getClass().getClassLoader().getResourceAsStream(resourceName)) { + if (in == null) { + throw new IllegalArgumentException("No SQL resource found for platform: " + platform); + } + sqlProps.load(in); + } + } + + public String getSql(String key) { + String sql = sqlProps.getProperty(key); + if (sql == null) { + throw new IllegalArgumentException("No SQL found for key: " + key); + } + return sql; + } +} + diff --git a/platform/src/main/resources/h2.sql.properties b/platform/src/main/resources/h2.sql.properties new file mode 100644 index 0000000..d9020c5 --- /dev/null +++ b/platform/src/main/resources/h2.sql.properties @@ -0,0 +1,115 @@ +dataidLockedSQL= \ +select d.dataid, j.jobid \ +from job j \ +join datastatus d \ + on j.datasetid = d.datasetid \ + and d.locktype = 'OUT' \ + and d.status = 'READY' \ +where j.itemtype = 'IN' \ + and j.jobid = ? \ + /* none of the input rows already have IN status for this job */ \ + and not exists ( \ + select 1 \ + from job j1 \ + join datastatus d1 \ + on j1.datasetid = d1.datasetid \ + and d1.locktype = 'IN' \ + where j1.jobid = j.jobid \ + and j1.itemtype = 'IN' \ + and d1.dataid = d.dataid \ + ) \ + /* this job is not running or complete already */ \ + and not exists ( \ + select 1 \ + from job j2 \ + join datastatus d2 \ + on j2.datasetid = d2.datasetid \ + where j2.jobid = j.jobid \ + and j2.itemtype = 'OUT' \ + and d2.dataid = d.dataid \ + and d2.status <> 'RESUBMIT' \ + ) \ + /* all inputs to job exist, are OUT, and have READY status */ \ + and not exists ( \ + select 1 \ + from job j3 \ + where j3.jobid = j.jobid \ + and j3.itemtype = 'IN' \ + and not exists ( \ + select 1 \ + from datastatus d3 \ + where d3.datasetid = j3.datasetid \ + and d3.locktype = 'OUT' \ + and d3.status = 'READY' \ + and d3.dataid = d.dataid \ + ) \ + ) \ +order by d.dataid \ +limit 1; + +lockStatusSQL= +endTransactionSQL= +beginTransactionSQL= +dropJobSQL=drop table if exists dataflow.job +dropDatasetSQL=drop table if exists dataflow.dataset +dropDatastatusSQL=drop table if exists dataflow.datastatus +createJobSQL= create table if not exists job( \ + jobid varchar, \ + itemtype varchar, /* for data, IN or OUT, or name of env variable */ \ + datasetid varchar, /* null for environment item */ \ + itemvalue varchar, /* null if IN or OUT item */ \ + modified timestamp) +createDatasetSQL= \ + create table if not exists dataflow.dataset( \ + datasetid varchar, \ + hostname varchar, \ + database varchar, \ + schemaname varchar, \ + tablename varchar, \ + username varchar, \ + encryptedpass varchar) +createIndexDatasetSQL=create index if not exists x_dataset on dataflow.dataset(datasetid) +createDatastatusSQL= \ + create table if not exists dataflow.datastatus( \ + datasetid varchar, \ + jobid varchar, \ + dataid varchar, \ + locktype varchar, \ + status varchar, \ + modified timestamp, \ + primary key (datasetid,jobid,dataid) \ +) + +dataidTodaySQL= \ + select to_char(current_date,'YYYY-MM-DD') as dataid,j.jobid from job j where j.itemtype='IN' and j.datasetid='today' and j.jobid=? /* and non of the input rows are already have IN status for this job */ and not exists ( select d1.dataid from job j1 join datastatus d1 on j1.jobid=j.jobid and j1.itemtype='IN' and d1.locktype='IN' and j1.datasetid=d1.datasetid and d1.dataid=to_char(current_date,'YYYY-MM-DD')) \ + /* and this job is not running or complete already */ \ + and not exists ( \ + select \ + d2.dataid from job j2 join datastatus d2 \ + on \ + j2.jobid=d2.jobid and j2.itemtype='OUT' \ + and d2.datasetid=j2.datasetid and d2.dataid=to_char(current_date,'YYYY-MM-DD') \ + where j2.jobid=j.jobid \ + and d2.status != 'RESUBMIT') \ + /* all inputs to job exist, are out and have a ready status */ \ + and not exists ( select j.datasetid from \ + job j3 \ + left outer join \ + datastatus d3 \ + on \ + j3.datasetid=d3.datasetid \ + and d3.locktype='OUT' \ + and d3.dataid=to_char(current_date,'YYYY-MM-DD') \ + and d3.status='READY' \ + where \ + j3.jobid=? \ + and j3.itemtype='IN' \ + and j3.datasetid != 'today' \ + and d3.dataid is null ) /* any non match is a missing input */ + dataidSQL= select x.dataid,x.jobid from (select j.datasetid,j.jobid,d.dataid from job j left join datastatus d on j.datasetid=d.datasetid and d.locktype='OUT' and d.status='READY' where j.itemtype='IN' and j.jobid=?)x where not exists (select d.dataid from job j join datastatus d on j.jobid=x.jobid and j.itemtype='IN' and d.locktype='IN' and j.datasetid=d.datasetid and d.dataid=x.dataid) and not exists (select d.dataid from job j join datastatus d on j.jobid=x.jobid and j.itemtype='OUT' and d.datasetid=j.datasetid and d.dataid=x.dataid where d.status != 'RESUBMIT') order by x.dataid limit 1 +datasetSQL=select d.*,itemtype from dataset d join job on job.datasetid=d.datasetid where job.jobid=? +updateSQL=update datastatus set status=?,locktype=? where dataid=? and datasetid=? and jobid=? +insertSQL=insert into datastatus values (?,?,?,?,?,now()); +updateOutStatusSQL=update datastatus set status=? where jobid=? and dataid=? and locktype='OUT' +updateFileLocalStatusSQL=delete from datastatus where jobid=? and dataid=? and locktype='IN' + diff --git a/platform/src/main/resources/postgres.sql.properties b/platform/src/main/resources/postgres.sql.properties new file mode 100644 index 0000000..80132f0 --- /dev/null +++ b/platform/src/main/resources/postgres.sql.properties @@ -0,0 +1,104 @@ + dataidLockedSQL= \ + /* dataid of at least one row is input for the job and is ready */ \ + select \ + dataid,j.jobid \ + from job j join datastatus d \ + on j.datasetid=d.datasetid and d.locktype='OUT' and d.status='READY' \ + where j.itemtype='IN' and j.jobid=? \ + /* and non of the input rows are already have IN status for this job */ \ + and not exists ( \ + select d1.dataid \ + from job j1 join datastatus d1 \ + on j1.jobid=j.jobid \ + and j1.itemtype='IN' and d1.locktype='IN' \ + and j1.datasetid=d1.datasetid and d1.dataid=d.dataid) \ + /* and this job is not running or complete already */ \ + and not exists ( \ + select d2.dataid from job j2 join datastatus d2 \ + on j2.jobid=d2.jobid and j2.itemtype='OUT' \ + and d2.datasetid=j2.datasetid and d2.dataid=d.dataid \ + where j2.jobid=j.jobid and d2.status != 'RESUBMIT') \ + /* all inputs to job exist, are out and have a ready status */ \ + and not exists ( \ + select \ + j.datasetid \ + from \ + job j3 \ + left outer join \ + datastatus d3 \ + on \ + j3.datasetid=d3.datasetid \ + and d3.locktype='OUT' \ + and d3.dataid=d.dataid \ + and d3.status='READY' \ + where \ + j3.jobid=j.jobid \ + and j3.itemtype='IN' \ + and d3.dataid is null ) /* any non match is a missing input */ \ + order by d.dataid limit 1 for update of d skip locked +lockStatusSQL=lock datastatus in access exclusive mode +endTransactionSQL=end transaction +beginTransactionSQL=begin transaction +dropJobSQL=drop table if exists dataflow.job +dropDatasetSQL=drop table if exists dataflow.dataset +dropDatastatusSQL=drop table if exists dataflow.datastatus +createJobSQL= create table if not exists job( \ + jobid varchar, \ + itemtype varchar, /* for data, IN or OUT, or name of env variable */ \ + datasetid varchar, /* null for environment item */ \ + itemvalue varchar, /* null if IN or OUT item */ \ + modified timestamp) +createDatasetSQL= \ + create table if not exists dataflow.dataset( \ + datasetid varchar, \ + hostname varchar, \ + database varchar, \ + schemaname varchar, \ + tablename varchar, \ + username varchar, \ + encryptedpass varchar) +createIndexDatasetSQL=create index if not exists x_dataset on dataflow.dataset(datasetid) +createDatastatusSQL= \ + create table if not exists dataflow.datastatus( \ + datasetid varchar, \ + jobid varchar, \ + dataid varchar, \ + locktype varchar, \ + status varchar, \ + modified timestamp, \ + primary key (datasetid,jobid,dataid) \ +) + +dataidTodaySQL= \ + select to_char(current_date,'YYYY-MM-DD') as dataid,j.jobid from job j where j.itemtype='IN' and j.datasetid='today' and j.jobid=? /* and non of the input rows are already have IN status for this job */ and not exists ( select d1.dataid from job j1 join datastatus d1 on j1.jobid=j.jobid and j1.itemtype='IN' and d1.locktype='IN' and j1.datasetid=d1.datasetid and d1.dataid=to_char(current_date,'YYYY-MM-DD')) \ + /* and this job is not running or complete already */ \ + and not exists ( \ + select \ + d2.dataid from job j2 join datastatus d2 \ + on \ + j2.jobid=d2.jobid and j2.itemtype='OUT' \ + and d2.datasetid=j2.datasetid and d2.dataid=to_char(current_date,'YYYY-MM-DD') \ + where j2.jobid=j.jobid \ + and d2.status != 'RESUBMIT') \ + /* all inputs to job exist, are out and have a ready status */ \ + and not exists ( select j.datasetid from \ + job j3 \ + left outer join \ + datastatus d3 \ + on \ + j3.datasetid=d3.datasetid \ + and d3.locktype='OUT' \ + and d3.dataid=to_char(current_date,'YYYY-MM-DD') \ + and d3.status='READY' \ + where \ + j3.jobid=? \ + and j3.itemtype='IN' \ + and j3.datasetid != 'today' \ + and d3.dataid is null ) /* any non match is a missing input */ + dataidSQL= select x.dataid,x.jobid from (select j.datasetid,j.jobid,d.dataid from job j left join datastatus d on j.datasetid=d.datasetid and d.locktype='OUT' and d.status='READY' where j.itemtype='IN' and j.jobid=?)x where not exists (select d.dataid from job j join datastatus d on j.jobid=x.jobid and j.itemtype='IN' and d.locktype='IN' and j.datasetid=d.datasetid and d.dataid=x.dataid) and not exists (select d.dataid from job j join datastatus d on j.jobid=x.jobid and j.itemtype='OUT' and d.datasetid=j.datasetid and d.dataid=x.dataid where d.status != 'RESUBMIT') order by x.dataid limit 1 +datasetSQL=select d.*,itemtype from dataset d join job on job.datasetid=d.datasetid where job.jobid=? +updateSQL=update datastatus set status=?,locktype=? where dataid=? and datasetid=? and jobid=? +insertSQL=insert into datastatus values (?,?,?,?,?,now()); +updateOutStatusSQL=update datastatus set status=? where jobid=? and dataid=? and locktype='OUT' +updateFileLocalStatusSQL=delete from datastatus where jobid=? and dataid=? and locktype='IN' + diff --git a/pom.xml b/pom.xml index 068bea4..85a317f 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,6 @@ database service platform - resources @@ -35,11 +34,6 @@ test - - org.postgresql - postgresql - 42.7.3 - diff --git a/resources/pom.xml b/resources/pom.xml deleted file mode 100644 index 190dc0b..0000000 --- a/resources/pom.xml +++ /dev/null @@ -1,20 +0,0 @@ - - 4.0.0 - - - com.hamiltonlabs - dataflow - 1.0.0 - - - resources - jar - - - - - - diff --git a/service/dataflow.mv.db b/service/dataflow.mv.db new file mode 100644 index 0000000..5f0ee26 Binary files /dev/null and b/service/dataflow.mv.db differ diff --git a/service/src/main/java/com/hamiltonlabs/dataflow/service/DataFlow.java b/service/src/main/java/com/hamiltonlabs/dataflow/service/DataFlow.java index 645b0df..caa4f72 100644 --- a/service/src/main/java/com/hamiltonlabs/dataflow/service/DataFlow.java +++ b/service/src/main/java/com/hamiltonlabs/dataflow/service/DataFlow.java @@ -19,90 +19,20 @@ public class DataFlow { - /* the lock is the OUT READY datastatus row wwhich is an IN for the job. Any other invocation of the same job will want it, so skip locked skips it */ - /* must be called inside a transaction */ - /* AUTOMATIC dataset "today" is out type that is always ready and has dataid of today */ - static String dataidLockedSQL= """ - /* dataid of at least one row is input for the job and is ready */ - select - dataid,j.jobid - from job j join datastatus d - on j.datasetid=d.datasetid and d.locktype='OUT' and d.status='READY' - where j.itemtype='IN' and j.jobid=? - /* and non of the input rows are already have IN status for this job */ - and not exists ( - select d1.dataid - from job j1 join datastatus d1 - on j1.jobid=j.jobid - and j1.itemtype='IN' and d1.locktype='IN' - and j1.datasetid=d1.datasetid and d1.dataid=d.dataid) - /* and this job is not running or complete already */ - and not exists ( - select d2.dataid from job j2 join datastatus d2 - on j2.jobid=d2.jobid and j2.itemtype='OUT' - and d2.datasetid=j2.datasetid and d2.dataid=d.dataid - where j2.jobid=j.jobid and d2.status != 'RESUBMIT') - /* all inputs to job exist, are out and have a ready status */ - and not exists ( - select - j.datasetid - from - job j3 - left outer join - datastatus d3 - on - j3.datasetid=d3.datasetid - and d3.locktype='OUT' - and d3.dataid=d.dataid - and d3.status='READY' - where - j3.jobid=j.jobid - and j3.itemtype='IN' - and d3.dataid is null ) /* any non match is a missing input */ - order by d.dataid limit 1 for update of d skip locked - """ ; - -/* Automatic dataset 'today' always supplies current date as dataid and is always READY */ -/* The rest of the logic confirming ready must be identical */ - static String dataidTodaySQL= """ - select to_char(current_date,'YYYY-MM-DD') as dataid,j.jobid from job j where j.itemtype='IN' and j.datasetid='today' and j.jobid=? /* and non of the input rows are already have IN status for this job */ and not exists ( select d1.dataid from job j1 join datastatus d1 on j1.jobid=j.jobid and j1.itemtype='IN' and d1.locktype='IN' and j1.datasetid=d1.datasetid and d1.dataid=to_char(current_date,'YYYY-MM-DD')) - /* and this job is not running or complete already */ - and not exists ( - select - d2.dataid from job j2 join datastatus d2 - on - j2.jobid=d2.jobid and j2.itemtype='OUT' - and d2.datasetid=j2.datasetid and d2.dataid=to_char(current_date,'YYYY-MM-DD') - where j2.jobid=j.jobid - and d2.status != 'RESUBMIT') - /* all inputs to job exist, are out and have a ready status */ - and not exists ( select j.datasetid from - job j3 - left outer join - datastatus d3 - on - j3.datasetid=d3.datasetid - and d3.locktype='OUT' - and d3.dataid=to_char(current_date,'YYYY-MM-DD') - and d3.status='READY' - where - j3.jobid=? - and j3.itemtype='IN' - and j3.datasetid != 'today' - and d3.dataid is null ) /* any non match is a missing input */ - """; - - - /* readonly version of the same but cannot skip locked*/ - static String dataidSQL="select x.dataid,x.jobid from (select j.datasetid,j.jobid,d.dataid from job j left join datastatus d on j.datasetid=d.datasetid and d.locktype='OUT' and d.status='READY' where j.itemtype='IN' and j.jobid=?)x where not exists (select d.dataid from job j join datastatus d on j.jobid=x.jobid and j.itemtype='IN' and d.locktype='IN' and j.datasetid=d.datasetid and d.dataid=x.dataid) and not exists (select d.dataid from job j join datastatus d on j.jobid=x.jobid and j.itemtype='OUT' and d.datasetid=j.datasetid and d.dataid=x.dataid where d.status != 'RESUBMIT') order by x.dataid limit 1"; - - - static String datasetSQL="select d.*,itemtype from dataset d join job on job.datasetid=d.datasetid where job.jobid=?"; - static String updateSQL="update datastatus set status=?,locktype=? where dataid=? and datasetid=? and jobid=?"; - static String insertSQL="insert into datastatus values (?,?,?,?,?,now())"; - static String updateOutStatusSQL="update datastatus set status=? where jobid=? and dataid=? and locktype='OUT' "; - static String updateFileLocalStatusSQL="delete from datastatus where jobid=? and dataid=? and locktype='IN' "; + public DataFlow(){ + } + /** create tables for the dataflow schema */ + public static String createTables(String passkey)throws Exception{ + DataProvider dataprovider=new DataProvider().open(passkey,"dataflow.properties"); + + int updatecount=0; + updatecount+=dataprovider.runUpdate(dataprovider.getSQL("createJobSQL")); + updatecount+=dataprovider.runUpdate(dataprovider.getSQL("createDatasetSQL")); + updatecount+=dataprovider.runUpdate(dataprovider.getSQL("createDatastatusSQL")); + updatecount+=dataprovider.runUpdate(dataprovider.getSQL("createIndexDatasetSQL")); + return String.format("[{\"result\":\"tables/indexes created\"}]"); + } /** Set the status for a given dataset,job, and chunk * * @param datasetid string representing registered dataset @@ -115,9 +45,9 @@ and not exists ( select j.datasetid from public static void setStatus(String datasetid,String jobid,String dataid,DataProvider dataprovider,String locktype,String status) throws Exception { /* insert should have the following fields: datasetid | jobid | dataid | locktype | status */ - int updatecount=dataprovider.runUpdate(updateSQL,status,locktype,dataid,datasetid,jobid); + int updatecount=dataprovider.runUpdate(dataprovider.getSQL("updateSQL"),status,locktype,dataid,datasetid,jobid); if (updatecount==0){ - updatecount=dataprovider.runUpdate(insertSQL,datasetid,jobid,dataid,locktype,status); + updatecount=dataprovider.runUpdate(dataprovider.getSQL("insertSQL"),datasetid,jobid,dataid,locktype,status); } if (updatecount==0){ throw new SQLException(String.format("Upsert for job %s and dataset %s could not be performed",jobid,datasetid)); @@ -129,8 +59,8 @@ public static String setJobEndStatus(String passkey,String jobid,String dataid,S DataProvider dataprovider=new DataProvider().open(passkey,"dataflow.properties"); /* set the OUT status */ - int updatecount=dataprovider.runUpdate(updateOutStatusSQL,status,jobid,dataid); - int deletecount=dataprovider.runUpdate(updateFileLocalStatusSQL,jobid,dataid); + int updatecount=dataprovider.runUpdate(dataprovider.getSQL("updateOutStatusSQL"),status,jobid,dataid); + int deletecount=dataprovider.runUpdate(dataprovider.getSQL("updateFileLocalStatusSQL"),jobid,dataid); returnString=String.format("%d rows updated to %s for %s and %s",updatecount,status,jobid,dataid); returnString=returnString+String.format(" %d IN file-local locks released",deletecount); @@ -162,7 +92,7 @@ public static String forceJob(String passkey,String jobid,String dataid)throws E /* If we get here we have found a dataset, have a dataid value for it, and a lock on the row */ /* now get the data to return and set the locks */ - rs=dataprovider.runSQL(datasetSQL,jobid); + rs=dataprovider.runSQL(dataprovider.getSQL("datasetSQL"),jobid); ResultSetMetaData rsmd = rs.getMetaData(); while(rs.next()) { int numColumns = rsmd.getColumnCount(); @@ -187,19 +117,21 @@ public static String launchJob(String passkey,String jobid)throws Exception{ DataProvider dataprovider=new DataProvider().open(passkey,"dataflow.properties"); - int bt=dataprovider.runUpdate("begin transaction"); - int k=dataprovider.runUpdate("lock datastatus in access exclusive mode "); - + int bt=dataprovider.runUpdate(dataprovider.getSQL("beginTransactionSQL")); + String lockStatusSQL=dataprovider.getSQL("lockStatusSQL"); + if (!lockStatusSQL.equals("")){ + int k=dataprovider.runUpdate("lock datastatus in access exclusive mode "); + } String dataid; JSONArray result=new JSONArray(); JSONObject obj = new JSONObject(); - ResultSet rs=dataprovider.runSQL(dataidTodaySQL,jobid,jobid); + ResultSet rs=dataprovider.runSQL(dataprovider.getSQL("dataidTodaySQL"),jobid,jobid); /* if no automatic result then do the check for normal data set input */ if (rs.next()){ dataid=rs.getString("dataid"); }else { - rs=dataprovider.runSQL(dataidLockedSQL,jobid); + rs=dataprovider.runSQL(dataprovider.getSQL("dataidLockedSQL"),jobid); if (rs.next()){ dataid=rs.getString("dataid"); } else { @@ -211,7 +143,7 @@ public static String launchJob(String passkey,String jobid)throws Exception{ result.put(obj); /* If we get here we have found a dataset, have a dataid value for it, and a lock on the row */ /* now get the data to return and set the locks */ - rs=dataprovider.runSQL(datasetSQL,jobid); + rs=dataprovider.runSQL(dataprovider.getSQL("datasetSQL"),jobid); ResultSetMetaData rsmd = rs.getMetaData(); while(rs.next()) { int numColumns = rsmd.getColumnCount(); @@ -229,7 +161,7 @@ public static String launchJob(String passkey,String jobid)throws Exception{ result.put(obj); } - int et=dataprovider.runUpdate("end transaction"); + int et=dataprovider.runUpdate(dataprovider.getSQL("endTransactionSQL")); return result.toString(); } @@ -247,7 +179,7 @@ public static String launchJob(String passkey,String jobid)throws Exception{ public static void setStartStatus(String passkey,String jobid,String dataid)throws Exception{ DataProvider dataprovider=new DataProvider().open(passkey,"dataflow.properties"); - ResultSet rs=dataprovider.runSQL(datasetSQL,jobid); + ResultSet rs=dataprovider.runSQL(dataprovider.getSQL("datasetSQL"),jobid); ResultSetMetaData rsmd = rs.getMetaData(); while(rs.next()) { int numColumns = rsmd.getColumnCount(); @@ -281,32 +213,61 @@ public static String getJobData(String passkey,String jobid) throws Exception{ return String.format("[{\"result\":\"%s\"}]",e.getMessage().replaceAll("\"","'")); } } -/** Run an arbitrary DML SQL, returning rows affected */ + +/** Run an arbitrary DDL SQL, returning rows affected */ public static int runUpdate(String passkey,String ... vars)throws Exception{ DataProvider p=new DataProvider().open(passkey,"dataflow.properties"); return p.runUpdate(vars); } -/** Run an arbitrary DML SQL */ + +/** Run an arbitrary DDL SQL. Opens a new DataProvider. This is typically used by utitilies which do one-off commands */ public static String runUpdate(String passkey,String sqltext){ try{ - DataProvider p=new DataProvider().open(passkey,"dataflow.properties"); - return String.format("[{\"result\":\"%d rows affected\"}]",p.runUpdate(sqltext)); + DataProvider p=new DataProvider().open(passkey,"dataflow.properties"); + return runUpdate(p,sqltext); } catch (SQLException |GeneralSecurityException|java.io.IOException e){ + return String.format("[{\"result\":\"%s\"}]",e.getMessage().replaceAll("\"","'")); + } + + } + +/** Run an arbitrary DML SQL on the DataProvider. Typically used by tests and functions which require multiple steps */ + public static String runUpdate(DataProvider p,String sqltext){ + try{ + return String.format("[{\"result\":\"%d rows affected\"}]",p.runUpdate(sqltext)); + } catch (SQLException e){ return String.format("[{\"result\":\"%s\"}]",e.getMessage().replaceAll("\"","'")); } - } -/** Run an arbitrary SQL */ +/** Run an arbitrary SQL. Accepts an open DataProvider */ public static String runSql(String passkey,String sqltext){ try{ DataProvider p=new DataProvider().open(passkey,"dataflow.properties"); - return DataFlow.rs2String(p.runSQL(sqltext)); + return runSql(p,sqltext); } catch (SQLException |GeneralSecurityException|java.io.IOException e){ return String.format("[{\"result\":\"%s\"}]",e.getMessage().replaceAll("\"","'").replaceAll("\n","")); } } + public static String runUpdate(DataProvider p,String ... args){ + try{ + return String.format("[{\"result\":\"%d rows affected\"}]",p.runUpdate(args)); + } catch (SQLException e){ + return String.format("[{\"result\":\"%s\"}]",e.getMessage().replaceAll("\"","'")); + } + } + +/** Run an arbitrary SQL. Opens a data DataProvider */ + + public static String runSql(DataProvider p,String sqltext){ + try{ + return DataFlow.rs2String(p.runSQL(sqltext)); + } catch (SQLException e){ + return String.format("[{\"result\":\"%s\"}]",e.getMessage().replaceAll("\"","'").replaceAll("\n","")); + } + } + /** get next available data chunk for the jobs * this one would be called from this class where an open dataprover exists */ @@ -318,7 +279,8 @@ public static String getJobData(String jobid,DataProvider dataprovider) throws E * TODO: Migrate sql to a resource */ - + String dataidSQL=dataprovider.getSQL("dataidSQL"); + String datasetSQL=dataprovider.getSQL("datasetSQL"); ResultSet rs=dataprovider.runSQL(dataidSQL,jobid); String dataid; JSONArray result=new JSONArray(); diff --git a/service/src/test/java/com/hamiltonlabs/dataflow/DataFlowTest.java b/service/src/test/java/com/hamiltonlabs/dataflow/DataFlowTest.java deleted file mode 100644 index c8c1455..0000000 --- a/service/src/test/java/com/hamiltonlabs/dataflow/DataFlowTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.hamiltonlabs.dataflow.service; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.TestInstance.Lifecycle; - - -import org.junit.jupiter.api.Test; - -import java.security.GeneralSecurityException; -import java.sql.SQLException; -import com.hamiltonlabs.dataflow.core.*; -import java.sql.ResultSet; - -@TestInstance(Lifecycle.PER_CLASS) -public class DataFlowTest{ - - @BeforeEach - void init()throws Exception{ - DataProvider p=new DataProvider().open("plugh","dataflow.properties"); - assertEquals(p.getConnection().isClosed(),false); - int rows=p.runUpdate("delete from datastatus where dataid in ('1.0','1.1','1.2') and jobid=? ","loadbob"); - rows=p.runUpdate("delete from datastatus where dataid in ('1.0','1.1','1.2') and jobid=? ","otherjob"); - - /* 1.0 should be skipped because it has no source already run and status is already READY */ - int inserts=p.runUpdate("insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values (?,?,?,'OUT',now(),'READY')","1.0","boout","loadbob"); - - /* 1.1 should be skipped because it has already RUNNING on the IN file */ - inserts=p.runUpdate("insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values (?,?,?,'OUT',now(),'READY')","1.1","bobin","otherjob"); - inserts=p.runUpdate("insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values (?,?,?,'IN',now(),'RUNNING')","1.1","bobin","loadbob"); - - /* this should be good to go because the in file is ready and no locks exist */ - inserts=p.runUpdate("insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values (?,?,?,'OUT',now(),'READY')","1.2","bobin","loadbob"); - } - - - /* assumes that dataflow has been configured with test data and creds were encrypted with the test key "plugh" */ - @Test - void getJobDataTest() throws Exception { - assertEquals( "[{\"dataid\":\"1.2\"},", DataFlow.getJobData("plugh","loadbob").substring(0,18)); - //assertEquals( "[{\"dataid\":\"1.2\"},", DataFlow.getJobData("loadbob","plugh")); - } - -} diff --git a/service/src/test/java/com/hamiltonlabs/dataflow/service/DataFlowTest.java b/service/src/test/java/com/hamiltonlabs/dataflow/service/DataFlowTest.java new file mode 100644 index 0000000..60b8d19 --- /dev/null +++ b/service/src/test/java/com/hamiltonlabs/dataflow/service/DataFlowTest.java @@ -0,0 +1,89 @@ +package com.hamiltonlabs.dataflow.service; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestInstance.Lifecycle; + + +import org.junit.jupiter.api.Test; + +import java.security.GeneralSecurityException; +import java.sql.SQLException; +import com.hamiltonlabs.dataflow.core.*; +import java.sql.ResultSet; + +@TestInstance(Lifecycle.PER_CLASS) +public class DataFlowTest{ + + DataProvider p; + + @BeforeEach + void init()throws Exception{ + p=new DataProvider().open("plugh","dataflow.properties"); + assertEquals(p.getConnection().isClosed(),false); + + String tables=DataFlow.createTables("plugh"); + assertEquals("[{\"result\":\"tables/indexes created\"}]",tables); + + int rows; + String r; + int inserts; + + System.out.printf("DataFlow Platform is %s\n",p.getPlatform()); + r=DataFlow.runUpdate(p,"delete from datastatus where dataid in ('1.0','1.1','1.2') and jobid=? ","otherjob"); + r=DataFlow.runUpdate(p,"delete from datastatus where dataid in ('1.0','1.1','1.2') and jobid=? ","loadbob"); + + /* 1.0 should be skipped because it has no source already run and status is already READY */ + r=DataFlow.runUpdate(p,"insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values ('1.0','bobout','loadbob','OUT',now(),'READY')"); + assertEquals(r,"[{\"result\":\"1 rows affected\"}]"); + +System.out.println(DataFlow.runSql(p,"select * from dataflow.datastatus")); + + /* 1.1 should be skipped because it has already RUNNING on the IN file */ + r=DataFlow.runUpdate(p,"insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values (?,?,?,'OUT',now(),'READY')","1.1","bobin","otherjob"); + System.out.printf("inserts %s\n",r); + assertEquals("[{\"result\":\"1 rows affected\"}]",r); + + inserts=p.runUpdate("insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values (?,?,?,'IN',now(),'RUNNING')","1.1","bobin","loadbob"); + assertEquals(inserts,1); + System.out.printf("inserts %d\n",inserts); + + rows=p.runUpdate("delete from job where datasetid=? and itemtype='IN' and jobid=?","bobin","loadbob"); + inserts=p.runUpdate("insert into job (datasetid,itemtype,jobid) values (?,'IN',?)","bobin","loadbob"); + assertEquals(inserts,1); + + + /* this should be good to go because the in file is ready and no locks exist */ + inserts=p.runUpdate("insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values (?,?,?,'OUT',now(),'READY')","1.2","bobin","loadbob"); + assertEquals(inserts,1); + } + + @Test + void runSQL() throws SQLException { + String s=DataFlow.runSql(p,"select user as X"); + assertEquals(s,"[{\"X\":\"ETL\"}]"); + + } + + @Test + void launchJobTest() throws Exception { + String jobdata=DataFlow.launchJob("plugh","loadbob"); + System.out.printf("debug %s\n",jobdata); + assertEquals( "[{\"dataid\":\"1.2\"}", jobdata.substring(0,17)); + String r=DataFlow.runUpdate(p, "delete from datastatus where dataid in ('1.0','1.1','1.2') and jobid in ('loadbob','otherjob')"); + assertEquals("[{\"result\":\"4 rows affected\"}]",r); + } + + @Test + void getJobDataTest() throws Exception { + String jobdata=DataFlow.getJobData("loadbob",p); + System.out.printf("debug %s\n",jobdata); + assertEquals( "[{\"dataid\":\"1.2\"}", jobdata.substring(0,17)); + String r=DataFlow.runUpdate(p, "delete from datastatus where dataid in ('1.0','1.1','1.2') and jobid in ('loadbob','otherjob')"); + assertEquals("[{\"result\":\"4 rows affected\"}]",r); + } + +} diff --git a/tablemaker.sh b/tablemaker.sh deleted file mode 100755 index e37aebd..0000000 --- a/tablemaker.sh +++ /dev/null @@ -1,22 +0,0 @@ - -read -d '' json -if [ "$json" = "[]" ];then - echo no rows returned by the query - exit -fi - -# occasionally we need to flatten and in one case step on the first pair -#json="`echo "$json"|sed 's/},{/,/g' `" - -export header=`echo "$json" | jq '.[0] | keys'` -export fields=`echo "$header"|sed 's/",/,/g'|sed 's/"$//g'|sed 's/"/./g'` - -export tbl="`echo $json|jq -r "($header),(.[]|$fields)|@tsv"|column -t -s $'\t'`" - -# get a line of correct length -line1=`echo "$tbl"|head -1` -line=`echo -e "$line1"|sed 's/[^ ]/-/g'` - -# insert at line 2 -echo -e "$tbl"|sed "2i$line" - diff --git a/utility.sh b/utility.sh index dae2d6d..8e0cdfe 100755 --- a/utility.sh +++ b/utility.sh @@ -1,24 +1,26 @@ -export CLASSPATH=bin/postgresql-42.7.3.jar:bin/json-20250517.jar:utility/target/utility-1.0.0.jar +#!/bin/bash + +export CLASSPATH=$CLASSPATH:bin/postgresql-42.7.3.jar:bin/json-20250517.jar:bin/h2-2.2.224.jar:utility/target/dataflow-1.0.0.jar if [ $# -lt 1 ];then cat< -- run a select sql statement - utility.sh dml -- run a dml statement + utility.sh ddl -- run a ddl or update/delete statement utility.sh crypt -e key text -- -e to encrypt or -d to decrypt utility.sh runs -- list the 20 most recent job runs utility.sh getjob -- get input/output configuration for a job utility.sh jobs -- list all jobs registered utility.sh datasets -- list all datasets registered utility.sh forcejob -- get a timestamp dataid even if no dependencies are met. But see ** + utility.sh createtables -- create the job,dataset, and datastatus tables CLASSNAME args -- any executable class in utilities module. Example: - utility.sh SetJobEndStatus jobid dataid status + utility.sh endjob jobid dataid status utility.sh GetJobData jobid utility.sh SetDataStatus RUNNING 3000 today newjob OUT utility.sh dml "delete from datastatus where jobid='newjob'a - ** forcejob breaks every contract. Just use it to verify your configuration during dev. ** if the PASSKEY environment variable is set then you can omit passkey from the arguments above. @@ -33,6 +35,40 @@ DONE exit fi +# A helper function that converts JSON string into a table format like a typical SQL output +# if it cant be parsed then print the string and go away +function tablemaker(){ + read -d '' json + if [ "$json" = "[]" ];then + echo no rows returned by the query + exit + fi + if [ -z "$json" ];then + echo no rows returned by the query + exit + fi + + + # First a simple test to see if it is valid JSON + echo "$json"|jq >/dev/null 2>&1 + if [ $? -gt 0 ]; then echo "$json";exit;fi + + # else if there is a switch to pretty print the json also the do it + if [ ! -z "$1" ];then + echo "$json" | jq + fi + + export header=`echo "$json" | jq '.[0] | keys'` 2>/dev/null + export fields=`echo "$header"|sed 's/",/,/g'|sed 's/"$//g'|sed 's/"/./g'` + export tbl="`echo $json|jq -r "($header),(.[]|$fields)|@tsv"|column -t -s $'\t'`" + + # get a line of correct length + line1=`echo "$tbl"|head -1` + line=`echo -e "$line1"|sed 's/[^ ]/-/g'` + + # insert at line 2 + echo -e "$tbl"|sed "2i$line" +} if [ -z "$PASSKEY" ];then export PASSKEY=$1 @@ -40,48 +76,62 @@ if [ -z "$PASSKEY" ];then fi cmd=$1 shift -# useful for debug -#echo passkey $PASSKEY -#echo command $cmd -#echo args "$@" + export args="$@" export util="com.hamiltonlabs.dataflow.utility" -export jarc="java -jar utility/target/utility-1.0.0.jar $PASSKEY " +# get the jar file either in current directory or in canonical build location +jarfile="`ls utility/target/dataflow*jar dataflow*jar 2>/dev/null|tail -1`" +if [ -f "$jarfile" ];then + export jarc="java -jar $jarfile $PASSKEY " +else + echo we need the dataflow-version.jar to run this utility. Cannot continue + exit +fi export jar="$jarc $cmd " -# Special case the runsql and other SQLs because we can and should make the result readable +# Special case the runsql and other SQLs because we can and should make the result readable +# cmd now has arguments: $PASSKEY $cmd $args case "$cmd" in "sql" ) - $jar "$@" |./tablemaker.sh + $jar "$@" |tail -1|tablemaker ;; "dml" ) - $jar "$@" |./tablemaker.sh + $jar "$@" |tablemaker ;; "crypt" ) $jar "$@" ;; "runs" ) - $jar |./tablemaker.sh + $jar |tail -1|tablemaker ;; "getjob" ) $jar $@ |jq . ;; - + "startjob" ) + $jar $@ |tablemaker jsonalso + ;; + "endjob" ) + $jar $@ |tablemaker + ;; "forcejob" ) $jar $@ ;; - + "createtables" ) + $jar $@ |tablemaker + ;; "jobs" ) - $jar |./tablemaker.sh + $jar |tail -1|tablemaker ;; "deleterun" ) - $jar $@ |./tablemaker.sh + $jar $@ |tablemaker ;; "datasets" ) - $jar $@ |./tablemaker.sh + $jar $@ |tablemaker ;; *) java $util.$cmd $PASSKEY $@ ;; esac + + diff --git a/utility/pom.xml b/utility/pom.xml index 9de26d1..a3f3d4f 100644 --- a/utility/pom.xml +++ b/utility/pom.xml @@ -1,3 +1,4 @@ + com.hamiltonlabs.dataflow.utility.util + - - ${project.artifactId.version} + + dataflow-${version} diff --git a/utility/src/main/java/com/hamiltonlabs/dataflow/utility/CreateTables.java b/utility/src/main/java/com/hamiltonlabs/dataflow/utility/CreateTables.java new file mode 100644 index 0000000..61fa854 --- /dev/null +++ b/utility/src/main/java/com/hamiltonlabs/dataflow/utility/CreateTables.java @@ -0,0 +1,22 @@ +package com.hamiltonlabs.dataflow.utility; + +import com.hamiltonlabs.dataflow.core.*; +import com.hamiltonlabs.dataflow.service.*; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +/** List all jobs registered in the DataFlow */ +public class CreateTables{ + + static String sqltext="select * from job"; + + public static String run(String passkey) throws Exception{ + return DataFlow.createTables(passkey); + } + public static void main(String[] args) throws Exception{ + System.out.println(run(args[0])); + } + +} + diff --git a/utility/src/main/java/com/hamiltonlabs/dataflow/utility/util.java b/utility/src/main/java/com/hamiltonlabs/dataflow/utility/util.java index 5813453..97ef210 100644 --- a/utility/src/main/java/com/hamiltonlabs/dataflow/utility/util.java +++ b/utility/src/main/java/com/hamiltonlabs/dataflow/utility/util.java @@ -42,6 +42,10 @@ public static void main(String[] args) throws Exception{ result=RunUpdate.run(args[0],args[2]); break; + case "createtables": + result=CreateTables.run(args[0]); + break; + case "getjob": result=GetJobData.run(args[0],args[2]); break;