Skip to content

Recipe 55: Consuming the Audit Chain

Situation

A compliance team needs a tamper-evident trail of every committed graph rewrite, lease lifecycle transition, and admission decision the scheduler made. The fabric already emits a SHA-256-linked audit chain in JSONL; the team needs to ingest it, verify the linkage end-to-end, and pull typed fields out of the records that matter to their reports.

In grafOS, the rewrite engine seals one EdgeRewritten record per affected edge on every successful commit, alongside the existing admission / lease / preemption / drain records. Each record’s current_event_hash chains to the next’s prev_event_hash. A consumer that walks the chain, recomputes each hash, and decodes the typed event_data can prove no record was added, removed, reordered, or modified after sealing.

What You Build

A compliance / analytics pipeline that:

  • Reads a grafos-audit JSONL stream (one AuditRecord per line);
  • Verifies linkage against a persisted anchor (the chain head from the prior batch, durable across process restarts);
  • Refuses to advance the anchor or emit observations when any record fails verification;
  • Filters records carrying event_data = EdgeRewritten { edge_id, edge_record_bytes } and decodes the embedded bytes back to a typed EdgeRecord;
  • Surfaces a Vec<EdgeRewriteObservation> in source order.

The compiled recipe lives in cookbook/recipe-55-consuming-audit-chain.

Core grafOS API Path

The pipeline is a thin layer on top of the reference collector at crates/grafos-audit-collector. The collector handles parse + linkage verify + anchor advance; this recipe adds the per-record filter + typed decode the compliance workload actually cares about.

use grafos_audit::{AnchorStore, AuditEventData, FileAnchorStore};
use grafos_audit_collector::{Collector, IngestError};
use grafos_core::edge_record::EdgeRecord;
use grafos_core::EdgeId;
let anchor = FileAnchorStore::load_or_unanchored("/var/lib/audit/anchor.dat")?;
let mut collector = Collector::new(anchor);
// Parse the batch once; the collector consumes it for verify, the
// pipeline reuses the parsed list to filter / decode after verify
// succeeds.
let records = grafos_audit::jsonl::read_chain(reader)?;
collector.ingest_records(records.clone())?; // verify + advance anchor
for record in &records {
if let Some(AuditEventData::EdgeRewritten { edge_id, edge_record_bytes }) =
record.event_data.as_ref()
{
let (decoded, _consumed) = EdgeRecord::decode(edge_record_bytes)?;
// decoded.edge_id, decoded.src_port, decoded.dst_port,
// decoded.protocol, decoded.features, decoded.rights,
// decoded.state, decoded.lease_ref, decoded.binding_ref,
// decoded.constraints — all typed.
}
}
# Ok::<(), Box<dyn std::error::Error>>(())

Program

use cookbook_recipe_55_consuming_audit_chain::Pipeline;
use grafos_audit::FileAnchorStore;
use std::fs::File;
use std::io::BufReader;
let anchor = FileAnchorStore::load_or_unanchored("/var/lib/audit/anchor.dat")?;
let mut pipeline = Pipeline::new(anchor);
let file = File::open("/var/log/audit/2026-05-12.jsonl")?;
let reader = BufReader::new(file);
let n = pipeline.ingest_jsonl(reader)?;
println!(
"ingested {n} records; observations={}; verification_failures={}",
pipeline.edge_rewrite_observations().len(),
pipeline.metrics().chain_verification_failures,
);
for obs in pipeline.edge_rewrite_observations() {
println!(
"edge_rewritten seq={} edge={} features={:#010x}",
obs.record_sequence,
obs.edge_id.to_hex_0x(),
obs.decoded.features,
);
}
# Ok::<(), Box<dyn std::error::Error>>(())

Design

The pipeline is constructed in two distinct phases by design:

  1. VerifyCollector::ingest_records walks the chain head to tail, recomputing each current_event_hash from canonical bytes and checking it against the declared field. If any record fails, the persisted anchor does NOT advance and no observations are emitted. The collector counter chain_verification_failures increments — operators wire this to an alert because it is the chain-integrity signal.

  2. Decode — only reached after verify succeeded, so the bytes the recipe decodes are the same bytes the chain hash already signed off on. A decode failure here means the embedded edge_record_bytes are a valid SHA-256 input but not a valid EdgeRecord shape — that’s a producer bug, not a tamper. The pipeline still fails closed and returns PipelineError::EdgeRecordDecode { record_sequence, edge_id, .. } so operators can localize the producer.

The anchor is the chain head pointer, not the records themselves. A file-backed anchor (FileAnchorStore::load_or_unanchored) is written atomically (tempfile-then-rename) on every successful batch, so a crash between batches resumes from the last successful head.

Failure Modes

  • Anchor mismatch: the batch’s first record’s prev_event_hash did not match the persisted anchor. Anchor unchanged. Returns PipelineError::Ingest(IngestError::AnchorMismatch { expected, actual }). Usually means: re-ingesting an already-collected window, or a producer-side restart that skipped records.
  • Hash linkage / tamper: a record’s current_event_hash does not equal SHA-256(canonical_bytes), or its prev_event_hash does not equal the prior record’s current_event_hash. Anchor unchanged. Returns PipelineError::Ingest(IngestError::LinkageFailure { failing_index }) naming the 0-indexed offset.
  • Malformed edge bytes: EdgeRecord::decode rejected the embedded edge_record_bytes. The chain hash still verified — the bytes-as-blob were sealed correctly — but they aren’t a valid EdgeRecord shape. Returns PipelineError::EdgeRecordDecode { record_sequence, edge_id, message }. Anchor unchanged, no observations emitted.
  • JSONL parse error: malformed JSON, missing required fields, or I/O. Returns PipelineError::Ingest(IngestError::Parse(_)). Distinct from chain_verification_failures on the metrics surface so transport / serialization failures don’t alert on the chain-integrity channel.

Tests

Run it with:

Terminal window
cargo test -p cookbook-recipe-55-consuming-audit-chain

The tests cover the happy path (3 records ingested, 2 EdgeRewritten observations decoded), tamper detection (no partial emit), malformed edge bytes (fail-closed despite valid chain hash), and a file-backed anchor surviving a “process restart” (re-ingesting the same batch fails with an anchor mismatch).

Adaptation Notes

  • Anchor storage: swap FileAnchorStore for any AnchorStore impl. A production collector typically writes the anchor to a small durable record in its own state store.
  • Transport: the collector exposes ingest_records for callers consuming a non-JSONL transport (Kafka, gRPC, S3 object). The verify + anchor-advance discipline is the same.
  • Filtering: this recipe filters on AuditEventData::EdgeRewritten. A SIEM consumer interested in lease churn would filter on record.kind == AuditEventKind::LeaseRevoked (or any of the 27 typed kinds — see grafos admin audit-query --kind for the full list).
  • Signature verification: each AuditRecord carries an Option<signature> over current_event_hash. The reference collector ignores the signature today (matching the development- mode NullSigner producer); production callers add a signer-side check before accepting chain_verification_failures == 0 as proof of integrity.

See also:

  • crates/grafos-auditAuditRecord, AuditEventKind, AuditEventData, verify_chain, jsonl::read_chain.
  • crates/grafos-audit-collector — the reference collector.
  • docs/spec/audit-chain-canonical-bytes.md — authoritative wire- shape spec for the 8 event-data tags.
  • docs/operations/siem-vocabulary-cookbook.md — SIEM queries keyed on AuditEventKind.