Recipe 14: An Event-Driven Workflow Engine in 200 Lines
Situation
You need a workflow engine (DAG of tasks, retries, observability) but do not want:
- a database
- a queue system
- a container orchestrator
In a lease-based model, many of those can be expressed as resource leases and shared-state primitives.
What You Build
A minimal workflow engine using grafos_batch:
- task DAG defined via
TaskGraph - automatic wave-based scheduling (independent tasks run in parallel)
- data dependencies resolved by name matching
- retry with downstream skip propagation on failure
Building Blocks
grafos_batch::{TaskGraph, TaskDef, Executor, ExecutionPlan}— sourcegrafos_batch::{TaskId, TaskStatus, TaskOutput, ResourceReq, DataRef, DataFormat}— sourcegrafos_observefor event stream
Related API docs:
Design
TaskGraph as the DAG Definition
Build a DAG by adding tasks and dependency edges. Each TaskDef specifies:
name: human-readable labeltask_fn: closure receiving aTaskContext(inputs) and returningTaskOutputresource_req: CPU, memory, block, fuel requirementsinputs/outputs: namedDataRefentries for data flowretries: automatic retry count on failure
Wave-Based Execution
TaskGraph::build() validates the graph (cycle detection, unresolved data refs) and computes an
ExecutionPlan with topologically sorted waves. Tasks within the same wave are independent and can
run in parallel.
Failure Propagation
If a task fails after exhausting retries, all transitive downstream tasks are marked Skipped.
No panic, no partial execution of downstream work.
Walkthrough (Implementation Sketch)
1. Define the DAG
use grafos_batch::*;use std::collections::HashMap;
let mut graph = TaskGraph::new();
let extract = graph.add_task(TaskDef { name: "extract".into(), task_fn: Box::new(|_ctx| { let mut data = HashMap::new(); data.insert("raw_data".into(), fetch_raw_data()); Ok(TaskOutput { data }) }), resource_req: ResourceReq::default(), inputs: vec![], outputs: vec![DataRef { name: "raw_data".into(), format: DataFormat::RawBytes }], retries: 2,});
let transform = graph.add_task(TaskDef { name: "transform".into(), task_fn: Box::new(|ctx| { let raw = ctx.inputs.get("raw_data").unwrap(); let mut data = HashMap::new(); data.insert("clean_data".into(), transform_data(raw)); Ok(TaskOutput { data }) }), resource_req: ResourceReq::default(), inputs: vec![DataRef { name: "raw_data".into(), format: DataFormat::RawBytes }], outputs: vec![DataRef { name: "clean_data".into(), format: DataFormat::RawBytes }], retries: 1,});
let load = graph.add_task(TaskDef { name: "load".into(), task_fn: Box::new(|ctx| { let clean = ctx.inputs.get("clean_data").unwrap(); write_to_destination(clean); Ok(TaskOutput { data: HashMap::new() }) }), resource_req: ResourceReq::default(), inputs: vec![DataRef { name: "clean_data".into(), format: DataFormat::RawBytes }], outputs: vec![], retries: 0,});2. Add Dependencies
graph.add_dependency(extract, transform)?;graph.add_dependency(transform, load)?;3. Validate and Build
let plan: ExecutionPlan = graph.build()?;
// Inspect the planassert_eq!(plan.waves().len(), 3); // extract -> transform -> loadassert_eq!(plan.task_count(), 3);4. Execute
let result = Executor::run(plan)?;
assert_eq!(result.succeeded, 3);assert_eq!(result.failed, 0);assert_eq!(result.skipped, 0);
// Inspect individual task resultsfor (id, task_result) in &result.task_results { match &task_result.status { TaskStatus::Success => { /* ok */ } TaskStatus::Failed(msg) => eprintln!("{id}: failed: {msg}"), TaskStatus::Skipped => eprintln!("{id}: skipped (upstream failed)"), }}Variations
- diamond DAGs: multiple tasks in the same wave (fork-join patterns)
- flaky tasks: set
retries > 0for tasks that may encounter transient failures - barrier phases: natural wave boundaries enforce “all tasks in stage A before stage B”
- multi-coordinator leader election (Recipe 4)