Skip to content

Recipe 14: An Event-Driven Workflow Engine in 200 Lines

Situation

You need a workflow engine (DAG of tasks, retries, observability) but do not want:

  • a database
  • a queue system
  • a container orchestrator

In a lease-based model, many of those can be expressed as resource leases and shared-state primitives.

What You Build

A minimal workflow engine using grafos_batch:

  • task DAG defined via TaskGraph
  • automatic wave-based scheduling (independent tasks run in parallel)
  • data dependencies resolved by name matching
  • retry with downstream skip propagation on failure

Building Blocks

  • grafos_batch::{TaskGraph, TaskDef, Executor, ExecutionPlan}source
  • grafos_batch::{TaskId, TaskStatus, TaskOutput, ResourceReq, DataRef, DataFormat}source
  • grafos_observe for event stream

Related API docs:

Design

TaskGraph as the DAG Definition

Build a DAG by adding tasks and dependency edges. Each TaskDef specifies:

  • name: human-readable label
  • task_fn: closure receiving a TaskContext (inputs) and returning TaskOutput
  • resource_req: CPU, memory, block, fuel requirements
  • inputs / outputs: named DataRef entries for data flow
  • retries: automatic retry count on failure

Wave-Based Execution

TaskGraph::build() validates the graph (cycle detection, unresolved data refs) and computes an ExecutionPlan with topologically sorted waves. Tasks within the same wave are independent and can run in parallel.

Failure Propagation

If a task fails after exhausting retries, all transitive downstream tasks are marked Skipped. No panic, no partial execution of downstream work.

Walkthrough (Implementation Sketch)

1. Define the DAG

use grafos_batch::*;
use std::collections::HashMap;
let mut graph = TaskGraph::new();
let extract = graph.add_task(TaskDef {
name: "extract".into(),
task_fn: Box::new(|_ctx| {
let mut data = HashMap::new();
data.insert("raw_data".into(), fetch_raw_data());
Ok(TaskOutput { data })
}),
resource_req: ResourceReq::default(),
inputs: vec![],
outputs: vec![DataRef { name: "raw_data".into(), format: DataFormat::RawBytes }],
retries: 2,
});
let transform = graph.add_task(TaskDef {
name: "transform".into(),
task_fn: Box::new(|ctx| {
let raw = ctx.inputs.get("raw_data").unwrap();
let mut data = HashMap::new();
data.insert("clean_data".into(), transform_data(raw));
Ok(TaskOutput { data })
}),
resource_req: ResourceReq::default(),
inputs: vec![DataRef { name: "raw_data".into(), format: DataFormat::RawBytes }],
outputs: vec![DataRef { name: "clean_data".into(), format: DataFormat::RawBytes }],
retries: 1,
});
let load = graph.add_task(TaskDef {
name: "load".into(),
task_fn: Box::new(|ctx| {
let clean = ctx.inputs.get("clean_data").unwrap();
write_to_destination(clean);
Ok(TaskOutput { data: HashMap::new() })
}),
resource_req: ResourceReq::default(),
inputs: vec![DataRef { name: "clean_data".into(), format: DataFormat::RawBytes }],
outputs: vec![],
retries: 0,
});

2. Add Dependencies

graph.add_dependency(extract, transform)?;
graph.add_dependency(transform, load)?;

3. Validate and Build

let plan: ExecutionPlan = graph.build()?;
// Inspect the plan
assert_eq!(plan.waves().len(), 3); // extract -> transform -> load
assert_eq!(plan.task_count(), 3);

4. Execute

let result = Executor::run(plan)?;
assert_eq!(result.succeeded, 3);
assert_eq!(result.failed, 0);
assert_eq!(result.skipped, 0);
// Inspect individual task results
for (id, task_result) in &result.task_results {
match &task_result.status {
TaskStatus::Success => { /* ok */ }
TaskStatus::Failed(msg) => eprintln!("{id}: failed: {msg}"),
TaskStatus::Skipped => eprintln!("{id}: skipped (upstream failed)"),
}
}

Variations

  • diamond DAGs: multiple tasks in the same wave (fork-join patterns)
  • flaky tasks: set retries > 0 for tasks that may encounter transient failures
  • barrier phases: natural wave boundaries enforce “all tasks in stage A before stage B”
  • multi-coordinator leader election (Recipe 4)