Recipe 23: ETL Pipeline With Automatic Resource Teardown
Situation
ETL pipelines produce intermediate artifacts that must be cleaned up. In conventional systems, cleanup is a separate phase that can fail or be forgotten, leaving orphaned temporary tables and staging files.
The lease model eliminates the cleanup phase: intermediate data lives in leased memory with a TTL scoped to the wave that produces it. When the next wave consumes the data, the previous wave’s leases expire. No cleanup nodes. No garbage collection.
What You Build
A diamond DAG (A produces; B and C transform in parallel; D merges) using grafos_batch::TaskGraph with
wave-based execution. Intermediate artifacts live in short-TTL leased memory.
Building Blocks
grafos_batch::{TaskGraph, TaskDef, Executor, ExecutionPlan, TaskOutput, ResourceReq, DataRef, DataFormat}— DAG definition and execution — sourcegrafos_jobs::{JobCoordinator, RetryPolicy, MemoryOutputStore}— retry and idempotent output — sourcegrafos_leasekit::{RenewalManager, RenewalPolicy}— intermediate lease TTL — source
Design
Diamond DAG Structure
A (extract) / \ B C (transform, parallel) \ / D (merge/load)TaskGraph validates the DAG (cycle detection, data reference resolution) and produces an ExecutionPlan
with topologically sorted waves:
- Wave 0:
[A] - Wave 1:
[B, C](independent, parallelizable) - Wave 2:
[D]
Wave Execution
Executor::run(plan) processes waves sequentially. Outputs from completed tasks pass to downstream tasks via
DataRef name matching. Failed tasks retry up to TaskDef.retries times; exhausted retries cause all
transitive downstream tasks to be marked Skipped.
Intermediate Leases with Scoped TTL
Each wave’s outputs live in leased memory. Register the lease with RenewalManager when the wave starts,
TTL covering the next wave’s execution time plus margin. When downstream completes, stop renewing.
Retry Policy
JobCoordinator wraps execution with configurable retry and idempotent output capture. MemoryOutputStore
deduplicates outputs by chunk ID, so retries do not produce duplicates.
Walkthrough (Implementation Sketch)
Core grafOS API Path
The core path is TaskGraph to validated ExecutionPlan to
Executor::run, with intermediate leases registered only for the waves that
need them:
use grafos_batch::{ DataFormat, DataRef, Executor, ResourceReq, TaskDef, TaskGraph, TaskOutput,};use grafos_leasekit::{RenewalManager, RenewalPolicy};use std::collections::HashMap;
let mut graph = TaskGraph::new();let extract = graph.add_task(TaskDef { name: "extract".into(), task_fn: Box::new(|_ctx| { let mut data = HashMap::new(); data.insert("raw".into(), b"raw".to_vec()); Ok(TaskOutput { data }) }), resource_req: ResourceReq::default(), inputs: vec![], outputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }], retries: 1,});let load = graph.add_task(TaskDef { name: "load".into(), task_fn: Box::new(|ctx| { assert_eq!(ctx.inputs.get("raw").unwrap(), b"raw"); Ok(TaskOutput { data: HashMap::new() }) }), resource_req: ResourceReq::default(), inputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }], outputs: vec![], retries: 1,});graph.add_dependency(extract, load)?;
let plan = graph.build()?;let mut renewals = RenewalManager::new();renewals.register(intermediate_lease_id, intermediate_expires_at, RenewalPolicy::default());
let result = Executor::run(plan)?;let summary = renewals.tick(now);# let _ = (result, summary);# Ok::<(), grafos_std::FabricError>(())Stop registering or ticking the intermediate lease after the consuming wave finishes; lease expiry is the cleanup mechanism.
1. Define the DAG
use grafos_batch::*;use std::collections::HashMap;
let mut graph = TaskGraph::new();
let a = graph.add_task(TaskDef { name: "extract".into(), task_fn: Box::new(|_ctx| { let mut data = HashMap::new(); data.insert("raw".into(), fetch_source_data()); Ok(TaskOutput { data }) }), resource_req: ResourceReq { min_memory: 1 << 20, ..Default::default() }, inputs: vec![], outputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }], retries: 2,});
// B and C: parallel transforms consuming "raw", producing "agg" and "enriched"let b = graph.add_task(TaskDef { name: "transform-aggregate".into(), task_fn: Box::new(|ctx| { let mut data = HashMap::new(); data.insert("agg".into(), aggregate(ctx.inputs.get("raw").unwrap())); Ok(TaskOutput { data }) }), resource_req: ResourceReq::default(), inputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }], outputs: vec![DataRef { name: "agg".into(), format: DataFormat::RawBytes }], retries: 1,});let c = graph.add_task(TaskDef { name: "transform-enrich".into(), task_fn: Box::new(|ctx| { let mut data = HashMap::new(); data.insert("enriched".into(), enrich(ctx.inputs.get("raw").unwrap())); Ok(TaskOutput { data }) }), resource_req: ResourceReq::default(), inputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }], outputs: vec![DataRef { name: "enriched".into(), format: DataFormat::RawBytes }], retries: 1,});
let d = graph.add_task(TaskDef { name: "merge-load".into(), task_fn: Box::new(|ctx| { write_to_destination(ctx.inputs.get("agg").unwrap(), ctx.inputs.get("enriched").unwrap()); Ok(TaskOutput { data: HashMap::new() }) }), resource_req: ResourceReq::default(), inputs: vec![DataRef { name: "agg".into(), format: DataFormat::RawBytes }, DataRef { name: "enriched".into(), format: DataFormat::RawBytes }], outputs: vec![], retries: 2,});2. Add Dependencies
graph.add_dependency(a, b)?;graph.add_dependency(a, c)?;graph.add_dependency(b, d)?;graph.add_dependency(c, d)?;3. Build the Execution Plan
let plan = graph.build()?;assert_eq!(plan.waves().len(), 3); // Wave 0: [A], Wave 1: [B, C], Wave 2: [D]assert_eq!(plan.task_count(), 4);4. Register Intermediate Leases
use grafos_leasekit::{RenewalManager, RenewalPolicy};
let mut renewals = RenewalManager::new();let now = unix_time_secs();// Intermediate data from wave 0 needs to survive through wave 1// Give it TTL = wave execution budget + marginrenewals.register(0, now + 120, RenewalPolicy::default());5. Execute
let result = Executor::run(plan)?;assert_eq!(result.succeeded, 4);assert_eq!(result.failed, 0);assert_eq!(result.skipped, 0);6. Automatic Teardown
After wave 2 completes, stop calling renewals.tick(). Intermediate leases expire and memory returns to the
fabric. The final output (written by task D to a durable destination) persists.
Failure Modes
- Task failure exhausts retries:
TaskStatus::Failed. Transitive downstream tasks becomeSkipped. - Intermediate lease expires before downstream consumes: downstream task fails. Size TTLs conservatively.
- Cycle in DAG:
TaskGraph::build()returnsFabricError::IoError(-101). - Unresolved data reference:
TaskGraph::build()returnsFabricError::IoError(-102).
Observability
ExecutionResult.succeeded / failed / skipped— per-run healthTaskResult.retries_used— flakiness indicator per taskRenewalSummaryfromtick()— intermediate lease health
Variations
- Use
JobCoordinatorfor chunk-parallel execution within a single task (e.g., shard source data) - Chain multiple diamond DAGs for multi-stage ETL with cascading TTLs
- Persist final outputs with
grafos_kv::FabricKvStorefor downstream query access