Skip to content

Recipe 8: A Stream Pipeline That Rewires Around Failures

Situation

You have a multi-stage pipeline (ingest -> parse -> enrich -> store). One stage failure traditionally requires an orchestrator restart or a manual reconfiguration.

In a lease-based world, you want failure recovery to look like normal error handling:

  • stage detects output transport failure
  • stage reacquires resources elsewhere
  • downstream reconnects to the new endpoint

The trick is: you need a stable rendezvous point so downstream can find the “current” output queue.

What You Build

A pipeline where:

  • boundaries are FabricQueue instances backed by leased memory
  • progress is checkpointed with Durable
  • stage endpoints are published through a durable “handoff record” with a generation counter

This complements Recipe 19 (which is a larger “moves mid-flight” story) by focusing on the minimal rewiring primitive.

Cross-Domain Boundary

This recipe is intentionally lease-local. FabricQueue, Durable, and RelocatableQueueEdge let a stage recover from a failed local queue segment, but they do not create a cross-zone, cross-region, or cross-provider replicated stream by themselves.

Use this recipe when the problem is local handoff/rebind. Use replicated queue, topic, or log resources when the problem is logical stream availability across failure domains. In that design, local FabricQueue segments may still exist underneath the replicated resource, but the user-facing contract comes from PlacementPolicy, ReplicaPolicy, replicated cursors, idempotency, and resource observations.

Building Blocks

  • grafos_collections::queue::FabricQueue
  • grafos_collections::durable::Durable
  • grafos_std::mem::MemBuilder
  • grafos_std::block::BlockBuilder
  • grafos_stream::Pipeline for declarative pipeline construction — source
  • grafos_pipeline::RelocatableQueueEdge for queue edges that relocate on failure — source
  • grafos_fence::FenceEpoch for generation tracking — source

Related API docs:

Design

Pipeline Construction

Use grafos_stream::Pipeline for declarative pipeline assembly:

use grafos_stream::Pipeline;
Pipeline::from_source(ingest_source)
.map(|raw| parse(raw))
.filter(|record| record.is_valid())
.sink(store_sink)
.run()?;

Under the hood, inter-stage boundaries are FabricQueue instances backed by leased memory.

Relocatable Queue Edges

Instead of ad-hoc handoff records, use RelocatableQueueEdge from grafos_pipeline:

  • wraps a FabricQueue with a locator and a FenceEpoch generation counter
  • on failure, call .relocate() to acquire a new lease, create a fresh queue, bump the generation, and publish a new locator
  • downstream detects the generation change via FenceEpoch::is_stale() and rebinds
use grafos_pipeline::RelocatableQueueEdge;
use grafos_fence::FenceEpoch;
let edge = RelocatableQueueEdge::new(queue, locator, handoff_reader);
// Normal path:
edge.push(item)?;
// On FabricError::Disconnected:
edge.relocate()?; // acquires new lease, bumps generation
let gen = edge.generation(); // FenceEpoch — downstream checks staleness

Offsets and Idempotency

To avoid reprocessing:

  • each stage checkpoints input offset / last committed id
  • sinks should be idempotent or detect duplicates

This recipe is about the rewiring mechanism; end-to-end exactly-once is an application-layer topic.

Walkthrough (Implementation Sketch)

Core grafOS API Path

The edge is a FabricQueue plus a generationed locator handoff:

use grafos_collections::queue::FabricQueue;
use grafos_fence::FenceEpoch;
use grafos_locator::handoff::{HandoffReader, HandoffState, HandoffWriter};
use grafos_locator::locator::QueueLocator;
use grafos_pipeline::RelocatableQueueEdge;
use grafos_std::block::BlockBuilder;
use grafos_std::mem::MemBuilder;
let queue_lease = MemBuilder::new().min_bytes(8192).lease_secs(300).acquire()?;
let queue_lease_id = queue_lease.lease_id();
let queue: FabricQueue<Vec<u8>> = FabricQueue::new(queue_lease, 16, 256)?;
let locator = QueueLocator::new(queue_lease_id, 0, 1);
let handoff_lease = BlockBuilder::new().min_blocks(64).lease_secs(600).acquire()?;
let initial = HandoffState {
stage_id: 1,
generation: FenceEpoch::zero(),
locator: locator.clone(),
};
let mut writer = HandoffWriter::new(initial, handoff_lease);
writer.publish(locator.clone())?;
let reader_lease = writer.into_block_lease();
let reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
edge.push(&b"event-1".to_vec())?;
let item = edge.pop()?;
# let _ = item;
# Ok::<(), grafos_pipeline::EdgeError>(())

When push or pop sees a recoverable lease error, RelocatableQueueEdge polls the handoff reader and rebases the queue handle to the newer locator generation.

1. Build Stage Queues

Each stage has:

  • input: a RelocatableQueueEdge pointing at the upstream stage’s output
  • output: a RelocatableQueueEdge wrapping a FabricQueue in a leased arena

On startup, it reads the upstream edge’s locator to find its input queue.

2. On Disconnected: Relocate the Edge

When push returns FabricError::Disconnected:

edge.relocate()?;
// Internally: acquires new MemLease, creates new FabricQueue,
// bumps FenceEpoch generation, publishes new locator.

No manual lease management or generation bookkeeping — RelocatableQueueEdge handles it.

3. Downstream Rebinds on Generation Change

Downstream checks the edge’s generation (a FenceEpoch):

if edge.generation().is_stale(&my_cached_generation) {
// Rebind: re-read locator and reconstruct input queue handle.
my_cached_generation = edge.generation();
}

Failure Modes

  • Disconnected: triggers rebuild and rebinding.
  • LeaseExpired: treat as “queue gone”; rebuild.
  • Partial writes: ensure queue implementation is robust to partial updates (or treat as corruption and rebuild).

Observability

Track:

  • stage restarts / rebinds
  • generation changes
  • throughput dip during failover

Variations

  • use watch() for faster notification (needs shared memory visibility)
  • Replicated queue/topic/log for higher availability across explicit failure domains