As systems grow, so does the complexity of their processes. Simple a() then b() scripts quickly evolve into brittle, unmanageable tangles of if statements, try/except blocks, and implicit dependencies. We're building a new workflow platform to tackle this, and the first and most critical piece is the "definition" layer: a clear, robust, and expressive way to define what should run.
Today, I'm introducing highway_dsl, the library that forms this foundation. It's a Python-native, fluent API designed to build, validate, and serialize complex workflow graphs.
It's not an execution engine—that's a separate component. This library is focused entirely on providing a "spec" for your workflows, using a fluent builder that feels like writing Python but produces a structured, serializable, and Pydantic-validated model.
Core Features
The design philosophy is to keep the definition layer in Python but make it declarative. You get the power of a real programming language (like using lambdas for conditional branches) without writing imperative orchestration code.
1. Fluent Builder API
The main entry point is the WorkflowBuilder. It lets you chain tasks together, automatically managing simple dependencies.
# From highway_dsl.workflow_dsl.py
from datetime import timedelta
from highway_dsl import WorkflowBuilder, RetryPolicy, TimeoutPolicy
# A simple ETL chain
workflow = (
WorkflowBuilder("simple_etl")
.task("extract", "etl.extract_data", result_key="raw_data")
.task(
"transform",
"etl.transform_data",
args=["{{raw_data}}"],
result_key="transformed_data",
)
.retry(max_retries=3, delay=timedelta(seconds=10)) # Applies to 'transform'
.task("load", "etl.load_data", args=["{{transformed_data}}"])
.timeout(timeout=timedelta(minutes=30)) # Applies to 'load'
.build()
)
# The result is a serializable object
print(workflow.to_yaml())
2. Rich, Pydantic-Based Operators
This isn't just for simple A->B->C chains. The DSL supports a full set of control-flow operators, all built on Pydantic models for validation.
TaskOperator: The basic unit of work.ConditionOperator: Trueif/elsebranching.ParallelOperator: Fan-out/fan-in concurrency.ForEachOperator: Iterate over a list (e.g., process 100 files).WhileOperator: Run a sub-workflow until a condition is met (e.g., a QA rework loop).WaitOperator: Pause execution for a duration or until a specific time.
3. Built-in Resilience Policies
You can attach RetryPolicy and TimeoutPolicy objects directly to tasks. This is a first-class part of the definition, not an application-layer afterthought.
4. YAML/JSON Serialization
Because all operators are Pydantic models, a fully defined workflow can be serialized to YAML or JSON. This allows you to:
- Define workflows in Python and store them as version-controlled YAML.
- Load YAML into the DSL for validation or modification.
- Pass the YAML/JSON spec to a separate execution engine.
Use Cases: From ETL to AI Agents
The goal was to create a DSL expressive enough for any process. Here are a few examples of what it's designed to handle.
Use Case 1: Complex ETL & Data Reconciliation
Standard data pipelines are a common use case. The bank_end_of_the_day_etl_workflow.py example defines a massive banking workflow.
It starts with a ParallelOperator to ingest data from five different source systems (mainframe, SFTP, SQL, MQ) simultaneously.
Crucially, it then uses a WhileOperator to handle data reconciliation:
# From examples/bank_end_of_the_day_etl_workflow.py
builder.while_loop(
"reconciliation_loop",
condition="{{recon_status.is_balanced}} == false",
loop_body=lambda b: b.task(
"find_discrepancies",
"etl.reconciliation.find_discrepancies",
args=["{{recon_status.report_id}}"],
result_key="discrepancy_report",
)
.task(
"apply_auto_adjustments",
"etl.reconciliation.apply_adjustments",
args=["{{discrepancy_report.adjustments_file}}"],
)
.task(
"run_reconciliation_check",
"etl.validation.run_initial_checksums",
result_key="recon_status", # This variable is updated, controlling the loop
),
dependencies=["run_pre_reconciliation"],
)
This loop will run find_discrepancies, apply_auto_adjustments, and run_reconciliation_check repeatedly until the recon_status.is_balanced variable becomes true.
Use Case 2: Manufacturing & Human-in-the-Loop
The car_factory_workflow example shows a physical-world process. A ForEachOperator iterates over a build manifest ({{manifest.vehicles}}).
Inside this loop, a sub-workflow runs for each vehicle:
Condition: Is it an 'EV' or 'ICE'?Parallel: Weld frame, build electronics, and prep drivetrain all at once.Task(Fan-in): A "final assembly" task waits for all parallel branches to complete.While: A human-in-the-loop QA-rework cycle.
# From tests/test_car_factory_workflow_with_fluent_builder.py
builder.while_loop(
"qa_rework_loop",
condition="{{qa_report.passed}} == false",
loop_body=lambda b: b.task(
"perform_rework",
"human.rework.fix_issues",
args=["{{item.vin}}", "{{qa_report.issues}}"],
result_key="rework_report",
).task(
"re_inspect_vehicle",
"human.qa.inspect_rework",
args=["{{item.vin}}", "{{rework_report}}"],
result_key="qa_report", # Updates the loop variable
),
)
This is far more powerful than a simple DAG, as the number of rework cycles isn't known at design time.
Use Case 3: Autonomous AI Agents
This is the most complex use case and the true test of the DSL. How do you define a workflow that is agentic—that is, a system that plans, acts, and corrects itself?
The test_agentic_ai_software_workflow.py example defines an AI agent that builds a software platform using TDD.
The entire TDD process is wrapped in a WhileOperator: main_build_loop.
- Condition:
{{test_run_results.all_passed}} == false - Loop Body:
plan_next_coding_step: An LLM task analyzes failing tests.check_plan_action: AConditionchecks if the plan is to "implement" or "refactor."implement_code/refactor_code: LLM tasks that write code.run_full_test_suite: AParalleloperator runs unit, integration, and e2e tests.aggregate_test_results: This task updates thetest_run_resultsvariable, which determines if the loop runs again.
This is a dynamic, self-correcting workflow. The Python code to define this is complex, but it's pure definition.
A Note on Nested Dependency Hell
One of the trickiest problems in workflow engines is "dependency scope." The README.md alludes to this. If you have a Parallel operator that contains a ForEach loop, should the tasks inside the loop inherit the dependencies of the parent Parallel operator?
In version 1.0.2, we fixed this. Loop body tasks (ForEach, While) are now properly encapsulated. They only depend on their parent loop operator and the internal chain. This prevents "grandparent" dependencies from leaking in and creating unwanted execution constraints, which is a common, subtle bug in graph-based systems.
What's Next
highway_dsl is the first component of our larger workflow platform. By solidifying the definition layer, we now have a stable, expressive, and type-safe "spec" to feed into an execution engine. This DSL provides the vocabulary for defining any process, from a simple ETL to a self-correcting AI agent. Read more on the website.