Skip to content

Recipe 23: ETL Pipeline With Automatic Resource Teardown

Situation

ETL pipelines produce intermediate artifacts that must be cleaned up. In conventional systems, cleanup is a separate phase that can fail or be forgotten, leaving orphaned temporary tables and staging files.

The lease model eliminates the cleanup phase: intermediate data lives in leased memory with a TTL scoped to the wave that produces it. When the next wave consumes the data, the previous wave’s leases expire. No cleanup nodes. No garbage collection.

What You Build

A diamond DAG (A produces; B and C transform in parallel; D merges) using grafos_batch::TaskGraph with wave-based execution. Intermediate artifacts live in short-TTL leased memory.

Building Blocks

  • grafos_batch::{TaskGraph, TaskDef, Executor, ExecutionPlan, TaskOutput, ResourceReq, DataRef, DataFormat} — DAG definition and execution — source
  • grafos_jobs::{JobCoordinator, RetryPolicy, MemoryOutputStore} — retry and idempotent output — source
  • grafos_leasekit::{RenewalManager, RenewalPolicy} — intermediate lease TTL — source

Design

Diamond DAG Structure

A (extract)
/ \
B C (transform, parallel)
\ /
D (merge/load)

TaskGraph validates the DAG (cycle detection, data reference resolution) and produces an ExecutionPlan with topologically sorted waves:

  • Wave 0: [A]
  • Wave 1: [B, C] (independent, parallelizable)
  • Wave 2: [D]

Wave Execution

Executor::run(plan) processes waves sequentially. Outputs from completed tasks pass to downstream tasks via DataRef name matching. Failed tasks retry up to TaskDef.retries times; exhausted retries cause all transitive downstream tasks to be marked Skipped.

Intermediate Leases with Scoped TTL

Each wave’s outputs live in leased memory. Register the lease with RenewalManager when the wave starts, TTL covering the next wave’s execution time plus margin. When downstream completes, stop renewing.

Retry Policy

JobCoordinator wraps execution with configurable retry and idempotent output capture. MemoryOutputStore deduplicates outputs by chunk ID, so retries do not produce duplicates.

Walkthrough (Implementation Sketch)

Core grafOS API Path

The core path is TaskGraph to validated ExecutionPlan to Executor::run, with intermediate leases registered only for the waves that need them:

use grafos_batch::{
DataFormat, DataRef, Executor, ResourceReq, TaskDef, TaskGraph, TaskOutput,
};
use grafos_leasekit::{RenewalManager, RenewalPolicy};
use std::collections::HashMap;
let mut graph = TaskGraph::new();
let extract = graph.add_task(TaskDef {
name: "extract".into(),
task_fn: Box::new(|_ctx| {
let mut data = HashMap::new();
data.insert("raw".into(), b"raw".to_vec());
Ok(TaskOutput { data })
}),
resource_req: ResourceReq::default(),
inputs: vec![],
outputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }],
retries: 1,
});
let load = graph.add_task(TaskDef {
name: "load".into(),
task_fn: Box::new(|ctx| {
assert_eq!(ctx.inputs.get("raw").unwrap(), b"raw");
Ok(TaskOutput { data: HashMap::new() })
}),
resource_req: ResourceReq::default(),
inputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }],
outputs: vec![],
retries: 1,
});
graph.add_dependency(extract, load)?;
let plan = graph.build()?;
let mut renewals = RenewalManager::new();
renewals.register(intermediate_lease_id, intermediate_expires_at, RenewalPolicy::default());
let result = Executor::run(plan)?;
let summary = renewals.tick(now);
# let _ = (result, summary);
# Ok::<(), grafos_std::FabricError>(())

Stop registering or ticking the intermediate lease after the consuming wave finishes; lease expiry is the cleanup mechanism.

1. Define the DAG

use grafos_batch::*;
use std::collections::HashMap;
let mut graph = TaskGraph::new();
let a = graph.add_task(TaskDef {
name: "extract".into(),
task_fn: Box::new(|_ctx| {
let mut data = HashMap::new();
data.insert("raw".into(), fetch_source_data());
Ok(TaskOutput { data })
}),
resource_req: ResourceReq { min_memory: 1 << 20, ..Default::default() },
inputs: vec![],
outputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }],
retries: 2,
});
// B and C: parallel transforms consuming "raw", producing "agg" and "enriched"
let b = graph.add_task(TaskDef {
name: "transform-aggregate".into(),
task_fn: Box::new(|ctx| {
let mut data = HashMap::new();
data.insert("agg".into(), aggregate(ctx.inputs.get("raw").unwrap()));
Ok(TaskOutput { data })
}),
resource_req: ResourceReq::default(),
inputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }],
outputs: vec![DataRef { name: "agg".into(), format: DataFormat::RawBytes }],
retries: 1,
});
let c = graph.add_task(TaskDef {
name: "transform-enrich".into(),
task_fn: Box::new(|ctx| {
let mut data = HashMap::new();
data.insert("enriched".into(), enrich(ctx.inputs.get("raw").unwrap()));
Ok(TaskOutput { data })
}),
resource_req: ResourceReq::default(),
inputs: vec![DataRef { name: "raw".into(), format: DataFormat::RawBytes }],
outputs: vec![DataRef { name: "enriched".into(), format: DataFormat::RawBytes }],
retries: 1,
});
let d = graph.add_task(TaskDef {
name: "merge-load".into(),
task_fn: Box::new(|ctx| {
write_to_destination(ctx.inputs.get("agg").unwrap(),
ctx.inputs.get("enriched").unwrap());
Ok(TaskOutput { data: HashMap::new() })
}),
resource_req: ResourceReq::default(),
inputs: vec![DataRef { name: "agg".into(), format: DataFormat::RawBytes },
DataRef { name: "enriched".into(), format: DataFormat::RawBytes }],
outputs: vec![],
retries: 2,
});

2. Add Dependencies

graph.add_dependency(a, b)?;
graph.add_dependency(a, c)?;
graph.add_dependency(b, d)?;
graph.add_dependency(c, d)?;

3. Build the Execution Plan

let plan = graph.build()?;
assert_eq!(plan.waves().len(), 3); // Wave 0: [A], Wave 1: [B, C], Wave 2: [D]
assert_eq!(plan.task_count(), 4);

4. Register Intermediate Leases

use grafos_leasekit::{RenewalManager, RenewalPolicy};
let mut renewals = RenewalManager::new();
let now = unix_time_secs();
// Intermediate data from wave 0 needs to survive through wave 1
// Give it TTL = wave execution budget + margin
renewals.register(0, now + 120, RenewalPolicy::default());

5. Execute

let result = Executor::run(plan)?;
assert_eq!(result.succeeded, 4);
assert_eq!(result.failed, 0);
assert_eq!(result.skipped, 0);

6. Automatic Teardown

After wave 2 completes, stop calling renewals.tick(). Intermediate leases expire and memory returns to the fabric. The final output (written by task D to a durable destination) persists.

Failure Modes

  • Task failure exhausts retries: TaskStatus::Failed. Transitive downstream tasks become Skipped.
  • Intermediate lease expires before downstream consumes: downstream task fails. Size TTLs conservatively.
  • Cycle in DAG: TaskGraph::build() returns FabricError::IoError(-101).
  • Unresolved data reference: TaskGraph::build() returns FabricError::IoError(-102).

Observability

  • ExecutionResult.succeeded / failed / skipped — per-run health
  • TaskResult.retries_used — flakiness indicator per task
  • RenewalSummary from tick() — intermediate lease health

Variations

  • Use JobCoordinator for chunk-parallel execution within a single task (e.g., shard source data)
  • Chain multiple diamond DAGs for multi-stage ETL with cascading TTLs
  • Persist final outputs with grafos_kv::FabricKvStore for downstream query access