Skip to content

Recipe 31: Event-Driven Pipeline Without Network Hops

Situation

You need a classic event-driven backend: objects land in storage, notifications fire, workers process them. In a conventional setup this means wiring together three separate services — S3 for storage, SQS for queuing, Lambda for compute — with network round-trips, API gateways, and IAM policies between each hop. Moving a 1 KB object from storage to queue to compute crosses three network boundaries before any work happens.

In the fabricBIOS model, storage, queuing, and compute are all leased from the same fabric memory pool. An “S3 event → SQS message → Lambda function” becomes writes to different data structures in the same address space. No network hops between services. No serialization at service boundaries. The object, the notification, and the compute result all live in leased DRAM — and all three vanish when their leases expire.

What You Build

An event pipeline where:

  • Object writes go to a MemObjectStore (leased DRAM).
  • Each write produces a notification on a grafos-mq topic (same leased DRAM pool).
  • Workers consume notifications, read objects from the store, process them, and write results back.
  • All three layers share the same fabric memory pool — zero network hops between stages.
  • Lease expiry handles cleanup: abandoned pipelines simply cease to exist.

Building Blocks

  • grafos_store::{MemObjectStore, ObjectStore, FabricUri} — object storage — source
  • grafos_mq::topic::{TopicManager, TopicConfig} — topic lifecycle — source
  • grafos_mq::producer::Producer — message production — source
  • grafos_mq::consumer::{Consumer, SeekPolicy} — partition-assigned polling — source
  • grafos_mq::group::ConsumerGroup — decentralized partition assignment — source
  • grafos_jobs::{JobCoordinator, RetryPolicy, MemoryOutputStore} — idempotent retry scaffolding — source
  • grafos_leasekit::{RenewalManager, RenewalPolicy} — TTL-driven lease renewal — source

Design

Why One Fabric Changes the Game

In a conventional cloud pipeline, each service boundary is a network call: the storage service serializes a notification, sends it over HTTP to the queue service, the queue service serializes the message, the compute service fetches it over HTTP, then makes another HTTP call back to storage to read the object. Three services, three serialization round-trips, three sets of credentials.

When all three live in the same leased memory pool, the “notification” is a pointer-sized write to a ring buffer that already exists in the same address space. The “fetch” is a hash map lookup. The only real work is the computation itself.

Architecture

Producer (your code)
├── store.put(uri, data) → MemObjectStore (FabricHashMap in leased DRAM)
└── producer.send(uri_bytes) → TopicManager partition (ring buffer in leased DRAM)
Consumer.poll()
store.get(uri) → same MemObjectStore
process(data)
store.put(result_uri, output)

No network between any of these steps. The MemObjectStore and TopicManager partitions are both backed by FabricHashMap / FabricQueue over the same leased DRAM.

Notification as a Lightweight Pointer

The notification message is just the FabricUri string (e.g. fabric://default/uploads/img-042.png). The consumer uses it to look up the object in the store. Since the store and queue share the same memory pool, this “look up” is a hash map get — not a network fetch.

Walkthrough (Implementation Sketch)

1. Create the Store and Notification Topic

use grafos_store::{MemObjectStore, ObjectStore, FabricUri};
use grafos_mq::topic::{TopicManager, TopicConfig};
use grafos_mq::producer::Producer;
// Object store — 256 hash buckets in leased DRAM
let mut store = MemObjectStore::new(256)?;
// Notification topic — 4 partitions, each a ring buffer
let mut topics = TopicManager::new();
topics.create("object-events", TopicConfig {
num_partitions: 4,
partition_capacity: 128,
partition_stride: 512,
})?;
let mut notifier = Producer::new("object-events");

2. Write an Object and Notify

fn ingest(
store: &mut MemObjectStore,
topics: &mut TopicManager,
notifier: &mut Producer,
key: &str,
data: &[u8],
) -> grafos_std::Result<()> {
let uri: FabricUri = format!("fabric://default/uploads/{}", key).parse()?;
// Write to store (hash map insert — no network)
store.put(&uri, data, None)?;
// Notify (ring buffer append — no network)
notifier.send(topics, uri.to_string().as_bytes())?;
Ok(())
}
ingest(&mut store, &mut topics, &mut notifier, "img-042.png", &image_bytes)?;
ingest(&mut store, &mut topics, &mut notifier, "img-043.png", &image_bytes)?;

Both operations are writes to leased DRAM. No serialization boundary, no HTTP call.

3. Consume Notifications and Process

use grafos_mq::consumer::{Consumer, SeekPolicy};
use grafos_mq::group::ConsumerGroup;
use grafos_mq::offset::MemOffsetStore;
let mut group = ConsumerGroup::new("processors", "object-events", 4, 30);
let claimed = group.claim("worker-1");
let mut offsets = MemOffsetStore::new();
let mut consumer = Consumer::new("object-events", "processors");
consumer.assign(&claimed);
consumer.seek(&topics, &offsets, SeekPolicy::Earliest)?;
let messages = consumer.poll(&topics, 100)?;
for (_part_idx, msg) in &messages {
// Parse the URI from the notification
let uri_str = core::str::from_utf8(&msg.value).unwrap();
let uri: FabricUri = uri_str.parse()?;
// Read the object — hash map lookup, not a network fetch
let obj = store.get(&uri)?.expect("object exists");
// Process
let result = process_image(&obj.data);
// Write result back to store
let result_uri: FabricUri = format!("fabric://default/results/{}", uri.key()).parse()?;
store.put(&result_uri, &result, None)?;
}
consumer.commit(&mut offsets);
group.heartbeat("worker-1");

4. Orchestrate with JobCoordinator (Optional)

For idempotent processing with retry:

use grafos_jobs::{JobCoordinator, RetryPolicy, MemoryOutputStore};
let mut output_store = MemoryOutputStore::new();
let mut coord = JobCoordinator::new(RetryPolicy::default());
let result = coord.run(
&chunks,
&mut output_store,
|chunk_bytes| {
let uri: FabricUri = core::str::from_utf8(chunk_bytes)
.map_err(|_| grafos_std::FabricError::IoError(-1))?
.parse()?;
let obj = store.get(&uri)?.ok_or(grafos_std::FabricError::IoError(-2))?;
Ok(process_image(&obj.data))
},
|outputs| {
// Aggregate: count processed
postcard::to_allocvec(&(outputs.len() as u64)).unwrap()
},
)?;

5. Lease Expiry Handles Cleanup

Register all backing leases with RenewalManager. When you stop calling tick(), leases expire. The store, the topic partitions, and all results vanish. No cleanup scripts, no cron jobs.

use grafos_leasekit::{RenewalManager, RenewalPolicy};
let mut renewals = RenewalManager::new();
let retention_secs: u64 = 3600;
let now = grafos_std::host::unix_time_secs();
// Register store and topic partition leases
for lease_id in all_backing_lease_ids {
renewals.register(lease_id, now + retention_secs, RenewalPolicy::default());
}
// In your main loop:
let _summary = renewals.tick(now);

Failure Modes

  • Worker crash: ConsumerGroup heartbeat goes stale, partitions rebalance to surviving workers. Messages are reprocessed from the last committed offset.
  • Object deleted before processing: store.get() returns None. Handle as a permanent error (don’t retry — the object is gone).
  • Store lease expires: All objects and results disappear. Workers see empty reads. This is the intended behavior for bounded-retention pipelines. For durable pipelines, use RenewalManager to keep leases alive.
  • Topic partition full: Ring buffer wraps — oldest unprocessed messages are overwritten. Size partition_capacity for your ingest rate.

Observability

  • store_put_bytes / store_get_bytes — object I/O volume (enable observe feature on grafos-store)
  • mq_messages_published / mq_messages_consumed — pipeline throughput
  • Consumer group assignment map — which worker owns which partitions
  • Lease TTL remaining via RenewalManager — time until automatic cleanup
  • End-to-end latency: timestamp at store.put() vs timestamp at result write

Variations

  • Fan-out: One object write produces multiple notifications on different topics (e.g. thumbnails, metadata-extraction, search-indexing). Each topic has its own consumer group.
  • Tiered storage: Use TieredObjectStore instead of MemObjectStore — hot objects in DRAM, cold objects spill to block storage automatically.
  • Dead-letter routing: Add a DlqRouter for messages that fail processing after N retries. The DLQ topic shares the same lease-scoped lifetime.
  • Multi-stage pipeline: Chain multiple topic/consumer pairs — stage 1 writes to stage-2-events, stage 2 writes to stage-3-events. Each stage is a separate consumer group.
  • Durable offsets: Replace MemOffsetStore with grafos_kv::FabricKvStore for offset persistence across worker restarts.

Testing

Terminal window
cargo test -p grafos-store -- mem_store # object store roundtrips
cargo test -p grafos-mq -- topic # topic lifecycle
cargo test -p grafos-mq -- producer # message production
cargo test -p grafos-mq -- consumer # polling and offset tracking
cargo test -p grafos-jobs -- coordinator # retry and aggregation