NOTE This application is in BETA. It still needs some work to get to first release. Contributers are welcome
DataState is a lightweight orchestration utility that runs and coordinates batch jobs over partitioned or time-sliced data so teams can schedule, recover, and migrate large data-processing pipelines without changing their ETL code.
This is not Airflow. Airflow schedules tasks; DataState ensures data readiness. To Airflow, the batch cycle is a giant DAG, a tree structure representing a rigid order in which jobs will run. To DataState, every job is an independent task which only cares about the data it is consuming and producing. Provided the data requirements are met (see the five questions below), any job can run at any time with any degree of concurent jobs. The DataState tool makes this possible.
Data warehousing is all about data. Not about tasks. The data-centric approach is the right model to use. With DataState, we work with that model, not against it.
It does this by keeping track of the status of every data partition, reporting that status when requested, and using the status to determine a partition of data sets which is ready to be consumed by a job. Our one purpose is to associate that data to a particular job run and provide that info at runtime.
stateDiagram-v2
[*] --> READY
READY --> PREPARE : Data Ready?
PREPARE --> RUNNING : All Checks Pass
RUNNING --> COMPLETE :Job Succeeds
RUNNING --> FAILED : Failure
A side benefit of DataState is that it keeps connection info for every dataset, so that they do not have to be hardcoded into job scripts or maintained separately by each job. This enables easy integration with dataquality tools and saves support time and cost when diagnosing failures, and prevents accidents when job code is promoted to production (no code change, no configuration change needed).
Many data teams spend disproportionate time and engineering effort on the same operational problems: recovering after platform outages, catching up missed cycles, and migrating huge datasets in stages. OpenDataFlow was born out of repeated large migrations and outages. It encodes the orchestration and state-tracking so recovery, catch-up, and phased migration are first-class, routine operations — using the same job scripts you already have.
Years ago, while diagnosing root causes of ETL failures, the same five questions surfaced again and again. Answering them required:
- Deep code dives
- Runtime environment spelunking
- Direct inspection of datasets
- Log archaeology to infer concurrency state
Each investigation took 20–30 minutes.
After a platform outage affecting 100+ jobs?
Hours lost. Risky shortcuts taken. Burn out.
So we asked two natural questions:
- Why not capture this information at runtime so it’s instantly available during forensics?
- If it’s critical for root cause, isn’t it even more critical before the job runs?
The answer to both was obvious.
That insight birthed the DataFlow utilities and eventually became what we are calling now, DataState.
| # | Question | Transactional Analogy | What Breaks Without It | OpenDataFlow’s Answer |
|---|---|---|---|---|
| 1 | Is the data ready? | Commit prerequisite | Jobs start on partial/incomplete inputs | RunJob waits on datastatus = READY |
| 2 | Is it the right data? | Isolation + Correctness | Loading yesterday’s file → silent corruption | Partition key + timestamp validation |
| 3 | Is it in the right location? | Environment isolation | Dev paths in prod → data loss or breach | Config-driven paths, no hardcoding |
| 4 | Has it been validated? | Integrity check before commit | Garbage in → garbage forever | Schema/row-count/sanity checks before lock |
| 5 | Is it safe to access? | Concurrency control (locking) | Race conditions, cleanup collisions, double runs | dataid + datastatus with transactional semantics |
We use these questions to enforce a protocol identical in spirit to 2-phase commit:
| 2PC Phase | DataState Equivalent | Implementation |
|---|---|---|
| Phase 1: Prepare | RunJob checks all 5 Questions |
Scans data_manifest, datastatus, locks, paths, validation |
| Yes Vote | All inputs = READY, no lock conflicts |
Every input confirms: “I’m complete, valid, and exclusively available” |
| No Vote / Abort | Any question fails | Job exits early with clear code: DATA_NOT_READY, LOCKED, INVALID_PATH |
| Phase 2: Commit | Execute job.sh → write output |
Only runs if all participants voted yes |
| Post-Commit | Mark output COMPLETE, release lock |
Enables safe downstream consumption |
This is 2PC without the ceremony — and it works on RDBMS, NFS, S3, or a USB stick.
Because the transactional guarantees are enforced by the ETL jobs themselves, not the storage layer.
After a platform outage:
- You mark failed jobs:
sql UPDATE job_status SET status = 'RESUBMIT' WHERE dataid = '2025-11-12'; - Platform comes back online.
- Your regular scheduler runs RunJob dailyETL.sh as always.
- All RESUBMIT jobs automatically resume — safely, correctly, in order.
No manual reruns. No fire drills. No heroics. The system heals itself.
This is self-correcting data infrastructure.
- Your existing scripts can run unchanged through RunJob.
- Per-partition state is persisted so retries and dependencies are handled correctly.
- Operational tasks (recovery, catch-up, phased migration) become identical to normal runs — reducing human error and support time.
Environment: how your job receives context Before invoking your script, RunJob provisions the process environment with automatic variables that describe the partition and the dataset to process (for example: a partition id, connection/metadata descriptors, and credentials decrypted at runtime). These variables are the intended integration surface for your existing ETL scripts so you generally do not need to change your job code.
See the examples/ directory for exact variable names and sample scripts that read them.
Security note: the decrypted credential is provided only at runtime in the job process’s environment. Be careful not to echo or persist it in logs. Keep the encryption key used to create the encrypted password secret.
For the quick start we use H2 in file mode. It is fast and easy because H2 creates you database, schema, user with password automatically on the first connect. We have a utilty that creates the H2 tables next. For featherweight implementations, H2 will work pretty well. If you need robustness, scalability, and good transaction semantics then you will need posgress. also see the file QUICKSTART.md for more details.
-
Prerequisites
- Linux (Ubuntu tested), bash, maven
- jq (sudo apt install -y jq)
-
Build mvn package
-
Create the DataFlow tables export PASSKEY=plugh (this is default but it should be changed ) ./utility.sh createtables
Your DataFlow tool is now configured and ready to run.
Test has a job (loadbob), which takes one input dataset (bobin) and one output dataset(bobout) We are not actually testing an ETL job. We want to test that the framework is calling it correctly and supplying the necessary dataid and dataset metadata.
- Register two dataset
./utility.sh dml "insert into dataset (datasetid) values ('bobin')"
./utility.sh dml "insert into dataset (datasetid) values ('bobout')"
- Associate them to the loadbob job.
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobout','OUT','loadbob')"
./utility.sh dml "insert into job (datasetid,itemtype,jobid) values ('bobin' ,'IN', 'loadbob')"
- Mark the input set as READY (just for the test. In real jobs some other job has completed and marked it)
./utility.sh dml "insert into datastatus (dataid,datasetid,jobid,locktype,modified,status) values ('1.0','bobin','fakejob', 'OUT',now(),'READY')"
- Job should be ready to run, now that it has an input data set in READY status. Perform the test now with RunJob
RunJob ./loadbob.sh
Output should look like this:
Mon Dec 1 04:08:50 PM CST 2025: Launching ./loadbob.sh with dataid 1.0
running loadbob with dataid 1.0 partition of input bobin
Mon Dec 1 04:08:50 PM CST 2025: Job ./loadbob.sh is complete. Updating status
1 rows updated to READY for loadbob and 1.0 1 IN file-local locks released
Two log-style messages, confirming the start and end of the loadbob job, and the one line output by the loadbob.sh script
The last line informational message indicating that DataFlow has set the final status
For this you need a postgres database up and running. The absolute easiest way is to spin up a container
-
Run a local Postgres (using docker or podman) podman run -p 5432:5432 --name pg -e POSTGRES_PASSWORD=secretpass -d docker.io/postgres or podman run -v pg:/var/lib/postgresql/18/docker -p 5432:5432 --name pg -e POSTGRES_PASSWORD=mysecret -d docker.io/postgres
-
Initialize database. Create a user, ETL and his password. Connect with psql and run docs/create_tables.sql. See docs/datamodel.txt for schema notes.
-
Configure Encrypt the DB password with the included Cryptor class: java -cp app/target/app-1.0.0.jar com.hamiltonlabs.dataflow.utility.Cryptor -e "" Create the file dataflow.properties and place the url,user,schema, and encrypted fields. This tells the utility how to access the dataflow database.
url=jdbc:postgresql://localhost:5432/dataflow user=etl schema=dataflow encrypted=<encrypted-password-here>Keep the encryption key private.
-
Run your job Make your ETL script executable (e.g., myETL.sh) and invoke it via: RunJob myETL.sh
RunJob exports the environment variables described above, runs your script, captures exit status, and records the result to the dataflow DB. You can choose to use any or none, though at least you need to know the partitionid (that is $dataid) However, the other automatic variables contain data descriptors for your data source and they are very convenient to have.
- Partition — a tracked unit of data (date, shard, etc.)
- RunJob — the wrapper that populates environment variables, invokes your script, and records success/failure
- State machine — persisted partition states drive readiness, retries, and dependency resolution
- dataflow.properties - contains connection info to your dataflow database.
- docs/create_tables.sql — SQL to create the required tables
- docs/datamodel.txt — explanation of the data model
- examples/ — sample ETL scripts and fixtures that show how RunJob provisions environment variables (see variable names and usage)
- RunJob, utility.sh — the bash wrappers invoked in production
- app/target/...jar — built artifacts after mvn package
- Focused on orchestration for partitioned ETL; tested on Ubuntu.
- Planned: examples/ with a minimal demo ETL, support for H2, and a containerized dev environment.
- Open an issue or PR. Small examples and updated docs are very welcome.
- MIT
- Robert B Hamilton — RobertBHamilton/OpenDataFlow