Skip to content

Recipe 42 - Mirrored ETL Pipeline

Situation

You run an event pipeline that should keep accepting raw events and committing warehouse rows even when one availability zone, region, or cloud provider is unavailable. Traditional designs spread this across a cloud queue, a streaming service, a checkpoint table, a sink dedupe table, and provider-specific failover scripts.

What You Build

A mirrored ETL pipeline where:

  • raw events publish to a replicated topic;
  • transformers consume through a durable subscription;
  • normalized rows publish to a second replicated topic;
  • warehouse sinks consume through their own durable subscription;
  • ingress, transform, and sink domains are explicitly authorized by placement;
  • duplicate source events and duplicate sink effects are rejected or replayed by idempotency keys.

The compiled recipe lives in cookbook/recipe-42-mirrored-etl-pipeline.

use cookbook_recipe_42_mirrored_etl_pipeline::{
aws_zone, gcp_region, raw_event, MirroredEtlPipeline,
};
let mut pipeline = MirroredEtlPipeline::new()?;
let ingest = pipeline.ingest_from_domain(
aws_zone(),
raw_event("evt-1", "tenant-a", 1234),
)?;
let transformed = pipeline
.transform_one_from_domain(gcp_region(), "transformer-gcp")?
.expect("raw event");
let committed = pipeline
.commit_one_to_warehouse_from_domain(aws_zone(), "warehouse-aws")?
.expect("normalized event");
assert_eq!(transformed.raw_offset, ingest.offset);
assert_eq!(committed.warehouse_key, "warehouse://orders/tenant-a/evt-1");
# Ok::<(), cookbook_recipe_42_mirrored_etl_pipeline::EtlError>(())

Core grafOS API Path

MirroredEtlPipeline::new() creates two replicated topics, durable subscriptions, and one shared idempotency store:

use fabricbios_core::lease::FenceEpoch;
use grafos_replicated::{
LogicalResourceName, ReplicaHealth, ReplicaId, ReplicaLocator,
ReplicaRole, ReplicaSetLocator, ReplicatedIdempotencyStore,
ReplicatedTopic, ResourceGeneration, SchemaId, SubscriptionFilter,
SubscriptionName,
};
use cookbook_recipe_42_mirrored_etl_pipeline::{
aws_zone, cross_cloud_placement, etl_replica_policy, gcp_region,
NormalizedOrder, RawOrderEvent,
};
let replicas = etl_replica_policy();
let generation = ResourceGeneration(1);
let locator = ReplicaSetLocator::new(
generation,
vec![
ReplicaLocator {
replica_id: ReplicaId::new("etl-aws-a"),
domain: aws_zone(),
role: ReplicaRole::Voter,
health: ReplicaHealth::Healthy,
epoch: FenceEpoch(1),
content_generation: generation.0,
},
ReplicaLocator {
replica_id: ReplicaId::new("etl-gcp-central"),
domain: gcp_region(),
role: ReplicaRole::Voter,
health: ReplicaHealth::Healthy,
epoch: FenceEpoch(1),
content_generation: generation.0,
},
],
);
let mut raw = ReplicatedTopic::<RawOrderEvent>::new(
LogicalResourceName::new("raw-orders"),
SchemaId::new("raw-order.v1"),
FenceEpoch(1),
replicas.clone(),
locator.clone(),
)?;
raw.subscribe_durable(
SubscriptionName::new("transformers"),
SubscriptionFilter::All,
FenceEpoch(1),
)?;
let mut normalized = ReplicatedTopic::<NormalizedOrder>::new(
LogicalResourceName::new("normalized-orders"),
SchemaId::new("normalized-order.v1"),
FenceEpoch(1),
replicas.clone(),
locator.clone(),
)?;
normalized.subscribe_durable(
SubscriptionName::new("warehouse"),
SubscriptionFilter::All,
FenceEpoch(1),
)?;
let effects = ReplicatedIdempotencyStore::new(
LogicalResourceName::new("etl-effects"),
SchemaId::new("etl-effect.v1"),
FenceEpoch(1),
replicas,
locator,
)?;
let placement = cross_cloud_placement();
# let _ = (raw, normalized, effects, placement);
# Ok::<(), grafos_replicated::ReplicatedError>(())

The stage methods then call publish/poll/ack on the topics and reserve/complete on the idempotency store. The placement object is checked before ingress, transform, or sink work starts; it is not a fallback list.

Design

The pipeline uses two replicated topics:

  1. Ingress checks that the caller’s failure domain is allowed.
  2. The raw event id and payload produce a canonical fingerprint.
  3. The raw event publishes to the raw topic.
  4. The transform subscription advances through a replicated cursor.
  5. The transformer records an idempotent effect and publishes a normalized event.
  6. The warehouse subscription advances through its own replicated cursor.
  7. The sink records an idempotent warehouse commit.

The durable subscription cursors are part of replicated state. A transformer or sink can restart in another allowed domain and continue from the stored cursor instead of rebuilding progress from a local process checkpoint.

Why It Is Useful

This gives the program one coherent availability model:

  • placement decides which domains may ingest, transform, and sink;
  • topic offsets give ordered replay;
  • durable subscriptions store stage progress;
  • idempotency records protect external side effects;
  • the same recipe works for zone, region, or provider boundaries as long as the placement policy authorizes those domains.

Failure Modes

  • Duplicate raw event: same event id and payload returns the original topic offset. Same id with a different payload fails closed.
  • Disallowed ingress domain: the event is rejected with DomainUnavailable.
  • Transformer restart: the durable subscription cursor has already advanced past delivered raw events.
  • Sink retry: warehouse effects use idempotency keys, so a repeated commit cannot produce a second row under the same event id.

Tests

Run it with:

Terminal window
cargo test -p cookbook-recipe-42-mirrored-etl-pipeline

The tests cover:

  • ingest, transform, and sink across explicitly allowed clouds;
  • duplicate raw event replay;
  • changed duplicate payload fail-closed behavior;
  • disallowed ingress rejection;
  • durable subscription offset movement.

See also:

  • Recipe 8: A Stream Pipeline That Rewires Around Failures
  • Recipe 19: Self-Healing Pipeline That Moves Mid-Flight
  • crates/grafos-replicated