grafos_observe/
event.rs

1//! Event system for fabric observability.
2//!
3//! The [`FabricEvent`] enum represents all observable events in the fabric.
4//! Events are pushed to an [`EventSink`] implementation — either stored in
5//! an [`EventRingBuffer`] for later inspection, or forwarded to sinks like
6//! [`StdoutSink`] for immediate output.
7
8use alloc::string::String;
9use core::fmt;
10
11// ---------------------------------------------------------------------------
12// Global event sink (std only)
13// ---------------------------------------------------------------------------
14
15#[cfg(feature = "std")]
16static GLOBAL_SINK: std::sync::OnceLock<Box<dyn EventSink + Send + Sync>> =
17    std::sync::OnceLock::new();
18
19/// Register a process-wide [`EventSink`].
20///
21/// Only the first call takes effect (uses `OnceLock`). Returns `true` if the
22/// sink was registered, `false` if one was already set. After registration,
23/// [`emit_event`] forwards events to this sink.
24#[cfg(feature = "std")]
25pub fn set_global_sink(sink: Box<dyn EventSink + Send + Sync>) -> bool {
26    GLOBAL_SINK.set(sink).is_ok()
27}
28
29/// Emit an event to the global sink (if one has been registered via
30/// [`set_global_sink`]). No-op if no sink is registered or if the `std`
31/// feature is disabled.
32#[cfg(feature = "std")]
33pub fn emit_event(event: FabricEvent) {
34    if let Some(sink) = GLOBAL_SINK.get() {
35        sink.emit(&event);
36    }
37}
38
39/// No-op on `no_std` — events are discarded.
40#[cfg(not(feature = "std"))]
41pub fn emit_event(_event: FabricEvent) {}
42
43/// No-op on `no_std`.
44#[cfg(not(feature = "std"))]
45pub fn set_global_sink(_sink: ()) -> bool {
46    false
47}
48
49/// An observable fabric event.
50///
51/// Each variant captures the relevant context for a single observable
52/// occurrence in the grafOS system. Events are passed to an [`EventSink`]
53/// or stored in an [`EventRingBuffer`] for later inspection.
54///
55/// All variants implement [`Display`](core::fmt::Display) for human-readable
56/// output and [`Clone`] for buffering.
57#[derive(Debug, Clone)]
58pub enum FabricEvent {
59    /// A new lease was acquired.
60    LeaseAcquired {
61        /// Kind of resource leased.
62        resource_type: ResourceType,
63        /// Unique lease identifier.
64        lease_id: u64,
65        /// Node address (e.g. `"10.10.0.11"`).
66        node: String,
67        /// Size of the leased region in bytes.
68        bytes: u64,
69        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
70        trace_id: Option<String>,
71    },
72    /// A lease was dropped (normal release).
73    LeaseDropped {
74        /// Kind of resource released.
75        resource_type: ResourceType,
76        /// Unique lease identifier.
77        lease_id: u64,
78        /// Node address.
79        node: String,
80    },
81    /// A lease expired without being dropped.
82    LeaseExpired {
83        /// Kind of resource whose lease expired.
84        resource_type: ResourceType,
85        /// Unique lease identifier.
86        lease_id: u64,
87        /// Node address.
88        node: String,
89    },
90    /// A data-plane operation completed successfully.
91    OpCompleted {
92        /// Operation type (read, write, etc.).
93        op_type: OpType,
94        /// Duration in microseconds.
95        duration_us: u64,
96        /// Bytes transferred.
97        bytes: u64,
98    },
99    /// A data-plane operation failed.
100    OpFailed {
101        /// Operation type that failed.
102        op_type: OpType,
103        /// Error description.
104        error: String,
105    },
106    /// A graph rewrite plan started execution.
107    RewriteStarted {
108        /// Unique rewrite plan identifier.
109        plan_id: u64,
110    },
111    /// A graph rewrite plan completed a phase.
112    RewriteCompleted {
113        /// Unique rewrite plan identifier.
114        plan_id: u64,
115        /// Phase that was completed.
116        phase: RewritePhase,
117    },
118    /// A service was registered in the service registry.
119    ServiceRegistered {
120        /// Service name.
121        name: String,
122        /// Service version.
123        version: String,
124    },
125    /// A service was deregistered from the service registry.
126    ServiceDeregistered {
127        /// Service name.
128        name: String,
129    },
130    /// A message was published to a topic.
131    MessagePublished {
132        /// Topic name.
133        topic: String,
134        /// Message size in bytes.
135        bytes: u64,
136    },
137    /// A message was consumed from a topic.
138    MessageConsumed {
139        /// Topic name.
140        topic: String,
141        /// Consumer group.
142        group: String,
143    },
144    /// An object was stored in a bucket.
145    ObjectStored {
146        /// Bucket name.
147        bucket: String,
148        /// Object key.
149        key: String,
150        /// Object size in bytes.
151        bytes: u64,
152    },
153    /// An object was retrieved from a bucket.
154    ObjectRetrieved {
155        /// Bucket name.
156        bucket: String,
157        /// Object key.
158        key: String,
159        /// Object size in bytes.
160        bytes: u64,
161    },
162    /// A lease was explicitly revoked (not just expired).
163    LeaseRevoked {
164        /// Kind of resource revoked.
165        resource_type: ResourceType,
166        /// Unique lease identifier.
167        lease_id: u64,
168        /// Node address.
169        node: String,
170        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
171        trace_id: Option<String>,
172    },
173    /// A resource entered fenced state after teardown failure.
174    LeaseFenced {
175        /// Kind of resource fenced.
176        resource_type: ResourceType,
177        /// Unique lease identifier.
178        lease_id: u64,
179        /// Node address.
180        node: String,
181        /// Reason for fencing.
182        reason: String,
183        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
184        trace_id: Option<String>,
185    },
186    /// A teardown operation failed.
187    TeardownFailed {
188        /// Kind of resource whose teardown failed.
189        resource_type: ResourceType,
190        /// Unique lease identifier.
191        lease_id: u64,
192        /// Node address.
193        node: String,
194        /// Error description.
195        error: String,
196    },
197    /// An authentication attempt failed.
198    AuthFailed {
199        /// Node address.
200        node: String,
201        /// Reason for failure.
202        reason: String,
203        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
204        trace_id: Option<String>,
205    },
206    /// An anti-replay cache rejected a message.
207    ReplayRejected {
208        /// Node address.
209        node: String,
210        /// Rejected nonce.
211        nonce: u64,
212        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
213        trace_id: Option<String>,
214    },
215    /// A capability token validation failed.
216    TokenValidationFailed {
217        /// Node address.
218        node: String,
219        /// Reason for failure.
220        reason: String,
221    },
222    /// A service listener was acquired (bound to a port).
223    ListenerAcquired {
224        /// Port number.
225        port: u16,
226        /// Node address.
227        node: String,
228        /// Lease identifier.
229        lease_id: u64,
230    },
231    /// A service listener was revoked.
232    ListenerRevoked {
233        /// Port number.
234        port: u16,
235        /// Node address.
236        node: String,
237        /// Lease identifier.
238        lease_id: u64,
239    },
240    /// A service listener entered fenced state.
241    ListenerFenced {
242        /// Port number.
243        port: u16,
244        /// Node address.
245        node: String,
246        /// Lease identifier.
247        lease_id: u64,
248        /// Reason for fencing.
249        reason: String,
250    },
251    /// A session was accepted on a service listener.
252    SessionAccepted {
253        /// Listener port.
254        listener_port: u16,
255        /// Session identifier.
256        session_id: u64,
257        /// Node address.
258        node: String,
259    },
260    /// A session was closed on a service listener.
261    SessionClosed {
262        /// Listener port.
263        listener_port: u16,
264        /// Session identifier.
265        session_id: u64,
266        /// Node address.
267        node: String,
268    },
269    /// Sessions were drained from a service listener.
270    SessionDrained {
271        /// Listener port.
272        listener_port: u16,
273        /// Number of sessions drained.
274        sessions_drained: u32,
275        /// Node address.
276        node: String,
277    },
278    /// A service was deployed with replicated instances.
279    ServiceDeployed {
280        /// Service name.
281        name: String,
282        /// Number of instances.
283        instance_count: u32,
284    },
285    /// A service instance changed state.
286    ServiceInstanceStateChanged {
287        /// Service name.
288        name: String,
289        /// Instance identifier.
290        instance_id: u64,
291        /// New state description.
292        state: String,
293    },
294    /// A planned service cutover started.
295    ServiceCutoverStarted {
296        /// Service name.
297        name: String,
298    },
299    /// A planned service cutover completed.
300    ServiceCutoverCompleted {
301        /// Service name.
302        name: String,
303    },
304    /// An unplanned service failover was triggered.
305    ServiceFailoverTriggered {
306        /// Service name.
307        name: String,
308        /// Reason for failover.
309        reason: String,
310    },
311    /// An unplanned service failover completed.
312    ServiceFailoverCompleted {
313        /// Service name.
314        name: String,
315    },
316    /// A service instance's ingress was fenced.
317    ServiceIngressFenced {
318        /// Service name.
319        name: String,
320        /// Instance identifier.
321        instance_id: u64,
322    },
323    /// A service was undeployed.
324    ServiceUndeployed {
325        /// Service name.
326        name: String,
327    },
328    /// A tasklet was submitted for execution.
329    TaskletSubmitted {
330        /// Unique tasklet identifier.
331        tasklet_id: u64,
332        /// Node address where the tasklet will execute.
333        node: String,
334        /// Runtime type that will execute this tasklet.
335        runtime_type: String,
336        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
337        trace_id: Option<String>,
338    },
339    /// A tasklet completed successfully.
340    TaskletCompleted {
341        /// Unique tasklet identifier.
342        tasklet_id: u64,
343        /// Status code (STATUS_OK).
344        status: u8,
345        /// Execution duration in microseconds.
346        duration_us: u64,
347        /// Output size in bytes.
348        output_bytes: u64,
349        /// Runtime type that executed this tasklet.
350        runtime_type: String,
351        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
352        trace_id: Option<String>,
353    },
354    /// A tasklet execution failed.
355    TaskletFailed {
356        /// Unique tasklet identifier.
357        tasklet_id: u64,
358        /// Status code (error code).
359        status: u8,
360        /// Execution duration in microseconds.
361        duration_us: u64,
362        /// Failure reason description.
363        reason: String,
364        /// Runtime type that executed this tasklet.
365        runtime_type: String,
366        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
367        trace_id: Option<String>,
368    },
369    /// The active security profile at daemon startup.
370    SecurityProfileActive {
371        /// Profile mode (e.g. `"secure-default"` or `"trusted-fabric"`).
372        mode: String,
373    },
374    /// A scheduler admission request was approved.
375    AdmissionApproved {
376        /// Tenant identifier.
377        tenant_id: String,
378        /// Node address where resources were admitted.
379        node: String,
380        /// Kind of resource admitted.
381        resource_type: String,
382        /// Size of the admitted resource in bytes.
383        bytes: u64,
384        /// W3C traceparent correlation ID for distributed tracing.
385        trace_id: Option<String>,
386    },
387    /// A scheduler admission request was denied.
388    AdmissionDenied {
389        /// Tenant identifier.
390        tenant_id: String,
391        /// Kind of resource requested.
392        resource_type: String,
393        /// Reason for denial.
394        reason: String,
395        /// Structured quota violation when the denial came from quota
396        /// enforcement. `None` for non-quota admission denials.
397        quota_violation: Option<grafos_core::QuotaViolation>,
398        /// W3C traceparent correlation ID for distributed tracing.
399        trace_id: Option<String>,
400    },
401    /// A placement decision was made by the scheduler.
402    PlacementDecision {
403        /// Tenant identifier.
404        tenant_id: String,
405        /// Node address chosen for placement.
406        node: String,
407        /// Placement strategy used.
408        strategy: String,
409        /// Placement score (higher is better).
410        score: f64,
411        /// W3C traceparent correlation ID for distributed tracing.
412        trace_id: Option<String>,
413    },
414    /// A lease preemption was triggered by the scheduler.
415    PreemptionTriggered {
416        /// Lease identifier of the victim.
417        victim_lease_id: u64,
418        /// Tenant that owns the preempted lease.
419        victim_tenant: String,
420        /// Tenant that triggered the preemption.
421        preemptor_tenant: String,
422        /// Node address where preemption occurred.
423        node: String,
424        /// Phase 218 typed preemption reason. Required — there is no
425        /// `Other(String)` variant on purpose. See
426        /// `grafos_core::PreemptionReason` for the full set and the
427        /// rationale for the no-free-form-string rule.
428        reason: grafos_core::PreemptionReason,
429        /// W3C traceparent correlation ID for distributed tracing.
430        trace_id: Option<String>,
431    },
432    /// Phase 220 / slice 15 — a cross-state disagreement was
433    /// observed and resolved with a documented protocol (idempotent
434    /// success, fail-closed, etc.). The discipline preamble for
435    /// phases 218–222 says: "When Kubernetes and fabricBIOS,
436    /// scheduler and daemon, or controller and node plugin
437    /// disagree, the resolution is a typed observable fail-closed
438    /// state — never a quiet 'best-effort' reconciliation that
439    /// papers over the disagreement." This event is what makes
440    /// the resolution observable: every site that historically
441    /// would have logged "treating as OK" emits this instead, so
442    /// SIEM / dashboard collectors can alert on disagreement rates
443    /// even when the individual response is correct.
444    ///
445    /// `protocol` names the resolution rule (`"idempotent_success"`,
446    /// `"fail_closed_stale_epoch"`, `"fail_closed_token_revoked"`,
447    /// etc.) so a collector can group by resolution type without
448    /// parsing free-form text. `kind` names the disagreement source
449    /// (`"lease_free_not_found"`, `"lease_free_not_active"`,
450    /// `"capability_replay_window"`, etc.).
451    CrossStateDisagreementResolved {
452        /// Stable tag for the disagreement source. MUST come from a
453        /// fixed vocabulary at the call site — no free-form strings.
454        kind: &'static str,
455        /// Stable tag for the resolution rule applied. MUST come
456        /// from a fixed vocabulary at the call site.
457        protocol: &'static str,
458        /// Lease identifier when applicable. `0` means N/A.
459        lease_id: u64,
460        /// Authoritative-side identifier (`"daemon"`, `"scheduler"`,
461        /// `"node-<hex>"`).
462        authority: String,
463        /// W3C traceparent for distributed tracing correlation.
464        trace_id: Option<String>,
465    },
466    /// A tenant exceeded their resource quota.
467    QuotaExceeded {
468        /// Tenant identifier.
469        tenant_id: String,
470        /// Kind of resource that exceeded quota.
471        resource_type: String,
472        /// Amount requested in bytes.
473        requested: u64,
474        /// Quota limit in bytes.
475        limit: u64,
476        /// W3C traceparent correlation ID for distributed tracing.
477        trace_id: Option<String>,
478    },
479    /// A capability token was minted by the scheduler.
480    TokenMinted {
481        /// Tenant identifier.
482        tenant_id: String,
483        /// Node address the token is scoped to.
484        node: String,
485        /// Token time-to-live in seconds.
486        ttl_secs: u32,
487        /// W3C traceparent correlation ID for distributed tracing.
488        trace_id: Option<String>,
489    },
490    /// A process (scenario) started execution.
491    ProcessStarted {
492        /// Process identifier (OS PID as string).
493        pid: String,
494        /// Scenario name.
495        scenario: String,
496        /// Comma-separated node addresses.
497        nodes: String,
498    },
499    /// A heartbeat from a running process.
500    ProcessHeartbeat {
501        /// Process identifier (OS PID as string).
502        pid: String,
503    },
504    /// A process (scenario) completed execution.
505    ProcessCompleted {
506        /// Process identifier (OS PID as string).
507        pid: String,
508        /// Exit code (0 = success).
509        exit_code: i32,
510        /// Duration in seconds.
511        duration_secs: u64,
512    },
513    /// The scheduler lost its election (meta-lease renewal failure, etc.).
514    SchedulerElectionLost {
515        /// Human-readable reason for the loss.
516        reason: String,
517        /// Leader epoch at the time of loss.
518        epoch: u64,
519    },
520    /// The scheduler failed to promote after acquiring the meta-lease.
521    SchedulerPromotionFailed {
522        /// Human-readable reason for the failure.
523        reason: String,
524        /// Leader epoch at the time of failure.
525        epoch: u64,
526    },
527    /// A stale leader was detected during fencing (local epoch < node epoch).
528    SchedulerStaleLeaderDetected {
529        /// Local epoch the scheduler believed was current.
530        local_epoch: u64,
531        /// Epoch observed on the node (higher than local).
532        node_epoch: u64,
533    },
534    /// The scheduler successfully promoted to Active.
535    SchedulerPromoted {
536        /// New leader epoch after promotion.
537        epoch: u64,
538        /// Number of nodes successfully fenced.
539        nodes_fenced: usize,
540        /// Time from meta-lease acquisition to Active state (milliseconds).
541        latency_ms: u64,
542    },
543}
544
545/// Resource types tracked by the observability system.
546///
547/// Maps to the grafOS resource kinds (memory, block storage, GPU, CPU).
548#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
549pub enum ResourceType {
550    /// Memory (DRAM, CXL-attached).
551    Mem,
552    /// Block storage (NVMe, SD).
553    Block,
554    /// GPU compute.
555    Gpu,
556    /// Accelerator-local memory / GPU VRAM.
557    GpuMem,
558    /// CPU compute.
559    Cpu,
560    /// Network (service listeners, sessions).
561    Net,
562}
563
564impl fmt::Display for ResourceType {
565    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
566        match self {
567            ResourceType::Mem => write!(f, "mem"),
568            ResourceType::Block => write!(f, "block"),
569            ResourceType::Gpu => write!(f, "gpu"),
570            ResourceType::GpuMem => write!(f, "gpu_mem"),
571            ResourceType::Cpu => write!(f, "cpu"),
572            ResourceType::Net => write!(f, "net"),
573        }
574    }
575}
576
577/// Canonical bridge from the substrate-level resource taxonomy
578/// (`grafos_core::ResourceKind`, 7 variants) to the event-layer
579/// resource taxonomy (`grafos_observe::ResourceType`, 6 variants).
580///
581/// Six of seven variants map 1:1 to a matching ResourceType.
582/// The seventh — `Tasklet` — is composite CPU+memory on the same
583/// node; this bridge reports it as `Cpu` (the typically-fenced
584/// half, mirroring the v1.1 §3 ResourceKind docstring that the
585/// CPU half is the binding-determining resource). Picking a
586/// single canonical bridge resolves a pre-existing divergence
587/// between `grafos-scheduler::observe_hooks` (which used
588/// Tasklet → Mem) and `grafos-scheduler-service::observe_resource
589/// _type_for` (which used Tasklet → Cpu) — same data path,
590/// different SIEM labels.
591///
592/// Future variant additions on either side force this impl to
593/// be updated.
594impl From<grafos_core::ResourceKind> for ResourceType {
595    fn from(k: grafos_core::ResourceKind) -> Self {
596        match k {
597            grafos_core::ResourceKind::Mem => ResourceType::Mem,
598            grafos_core::ResourceKind::Block => ResourceType::Block,
599            grafos_core::ResourceKind::Net => ResourceType::Net,
600            grafos_core::ResourceKind::Cpu => ResourceType::Cpu,
601            grafos_core::ResourceKind::Gpu => ResourceType::Gpu,
602            grafos_core::ResourceKind::GpuMem => ResourceType::GpuMem,
603            // Composite CPU+memory; observe as Cpu (the
604            // typically-fenced half). Distinct from the slice
605            // 218 EdgeOutcome bridge which kept the typed
606            // distinction — for observability we collapse onto
607            // a single label intentionally because the event
608            // schema has no Tasklet variant.
609            grafos_core::ResourceKind::Tasklet => ResourceType::Cpu,
610        }
611    }
612}
613
614/// Data-plane operation types.
615///
616/// Covers both memory (FBMU) and block (FBBU) data-plane operations,
617/// plus GPU and tasklet submission.
618#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
619pub enum OpType {
620    /// Memory read (FBMU READ).
621    Read,
622    /// Memory write (FBMU WRITE).
623    Write,
624    /// Block read (FBBU READ_BLOCK).
625    ReadBlock,
626    /// Block write (FBBU WRITE_BLOCK).
627    WriteBlock,
628    /// GPU kernel submission.
629    GpuSubmit,
630    /// Tasklet (lightweight compute) submission.
631    TaskletSubmit,
632}
633
634impl fmt::Display for OpType {
635    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
636        match self {
637            OpType::Read => write!(f, "read"),
638            OpType::Write => write!(f, "write"),
639            OpType::ReadBlock => write!(f, "read_block"),
640            OpType::WriteBlock => write!(f, "write_block"),
641            OpType::GpuSubmit => write!(f, "gpu_submit"),
642            OpType::TaskletSubmit => write!(f, "tasklet_submit"),
643        }
644    }
645}
646
647/// Rewrite plan phases.
648///
649/// Follows the grafOS rewrite lifecycle:
650/// Validate -> Stage -> [Canary] -> Cutover -> Cleanup -> Commit, with Rollback
651/// possible from any pre-Cleanup phase.
652#[derive(Debug, Clone, Copy, PartialEq, Eq)]
653pub enum RewritePhase {
654    /// Schema and constraint validation.
655    Validate,
656    /// Resources staged for cutover.
657    Stage,
658    /// Canary evaluation (optional).
659    Canary,
660    /// Atomic cutover to new bindings.
661    Cutover,
662    /// Post-cutover cleanup of old bindings.
663    Cleanup,
664    /// Durable audit and generation bumps.
665    Commit,
666    /// Rollback to previous state (failure path).
667    Rollback,
668}
669
670impl fmt::Display for RewritePhase {
671    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
672        match self {
673            RewritePhase::Validate => write!(f, "validate"),
674            RewritePhase::Stage => write!(f, "stage"),
675            RewritePhase::Canary => write!(f, "canary"),
676            RewritePhase::Cutover => write!(f, "cutover"),
677            RewritePhase::Cleanup => write!(f, "cleanup"),
678            RewritePhase::Commit => write!(f, "commit"),
679            RewritePhase::Rollback => write!(f, "rollback"),
680        }
681    }
682}
683
684/// Lossless bridge from the runtime `grafos_core::RewritePhase`
685/// state-machine variant to the event-layer `grafos_observe::
686/// RewritePhase` variant. The two enums coexist by design — the
687/// core one is the lifecycle state machine carried on
688/// `RewritePlan`, the observe one is the event-payload form
689/// consumed by the EventSink / dashboard. They share the same
690/// 7 variants with matching `Display` strings (slice 232 pinned
691/// the core side; this impl pins the cross-layer 1:1 mapping).
692///
693/// Exhaustive match; future variant additions on either side
694/// force this impl to be updated.
695impl From<grafos_core::RewritePhase> for RewritePhase {
696    fn from(p: grafos_core::RewritePhase) -> Self {
697        match p {
698            grafos_core::RewritePhase::Validate => RewritePhase::Validate,
699            grafos_core::RewritePhase::Stage => RewritePhase::Stage,
700            grafos_core::RewritePhase::Canary => RewritePhase::Canary,
701            grafos_core::RewritePhase::Cutover => RewritePhase::Cutover,
702            grafos_core::RewritePhase::Cleanup => RewritePhase::Cleanup,
703            grafos_core::RewritePhase::Commit => RewritePhase::Commit,
704            grafos_core::RewritePhase::Rollback => RewritePhase::Rollback,
705        }
706    }
707}
708
709/// Inverse of [`From<grafos_core::RewritePhase>`] — converts an
710/// observed phase back to the core state-machine variant.
711/// Lossless and exhaustive.
712impl From<RewritePhase> for grafos_core::RewritePhase {
713    fn from(p: RewritePhase) -> Self {
714        match p {
715            RewritePhase::Validate => grafos_core::RewritePhase::Validate,
716            RewritePhase::Stage => grafos_core::RewritePhase::Stage,
717            RewritePhase::Canary => grafos_core::RewritePhase::Canary,
718            RewritePhase::Cutover => grafos_core::RewritePhase::Cutover,
719            RewritePhase::Cleanup => grafos_core::RewritePhase::Cleanup,
720            RewritePhase::Commit => grafos_core::RewritePhase::Commit,
721            RewritePhase::Rollback => grafos_core::RewritePhase::Rollback,
722        }
723    }
724}
725
726impl fmt::Display for FabricEvent {
727    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728        match self {
729            FabricEvent::LeaseAcquired {
730                resource_type,
731                lease_id,
732                node,
733                bytes,
734                trace_id,
735            } => {
736                write!(
737                    f,
738                    "lease_acquired: type={resource_type} id={lease_id} node={node} bytes={bytes}"
739                )?;
740                if let Some(tid) = trace_id {
741                    write!(f, " trace_id={tid}")?;
742                }
743                Ok(())
744            }
745            FabricEvent::LeaseDropped {
746                resource_type,
747                lease_id,
748                node,
749            } => write!(
750                f,
751                "lease_dropped: type={resource_type} id={lease_id} node={node}"
752            ),
753            FabricEvent::LeaseExpired {
754                resource_type,
755                lease_id,
756                node,
757            } => write!(
758                f,
759                "lease_expired: type={resource_type} id={lease_id} node={node}"
760            ),
761            FabricEvent::OpCompleted {
762                op_type,
763                duration_us,
764                bytes,
765            } => write!(
766                f,
767                "op_completed: op={op_type} duration_us={duration_us} bytes={bytes}"
768            ),
769            FabricEvent::OpFailed { op_type, error } => {
770                write!(f, "op_failed: op={op_type} error={error}")
771            }
772            FabricEvent::RewriteStarted { plan_id } => {
773                write!(f, "rewrite_started: plan_id={plan_id}")
774            }
775            FabricEvent::RewriteCompleted { plan_id, phase } => {
776                write!(f, "rewrite_completed: plan_id={plan_id} phase={phase}")
777            }
778            FabricEvent::ServiceRegistered { name, version } => {
779                write!(f, "service_registered: name={name} version={version}")
780            }
781            FabricEvent::ServiceDeregistered { name } => {
782                write!(f, "service_deregistered: name={name}")
783            }
784            FabricEvent::MessagePublished { topic, bytes } => {
785                write!(f, "message_published: topic={topic} bytes={bytes}")
786            }
787            FabricEvent::MessageConsumed { topic, group } => {
788                write!(f, "message_consumed: topic={topic} group={group}")
789            }
790            FabricEvent::ObjectStored { bucket, key, bytes } => {
791                write!(f, "object_stored: bucket={bucket} key={key} bytes={bytes}")
792            }
793            FabricEvent::ObjectRetrieved { bucket, key, bytes } => {
794                write!(
795                    f,
796                    "object_retrieved: bucket={bucket} key={key} bytes={bytes}"
797                )
798            }
799            FabricEvent::LeaseRevoked {
800                resource_type,
801                lease_id,
802                node,
803                trace_id,
804            } => {
805                write!(
806                    f,
807                    "lease_revoked: type={resource_type} id={lease_id} node={node}"
808                )?;
809                if let Some(tid) = trace_id {
810                    write!(f, " trace_id={tid}")?;
811                }
812                Ok(())
813            }
814            FabricEvent::LeaseFenced {
815                resource_type,
816                lease_id,
817                node,
818                reason,
819                trace_id,
820            } => {
821                write!(
822                    f,
823                    "lease_fenced: type={resource_type} id={lease_id} node={node} reason={reason}"
824                )?;
825                if let Some(tid) = trace_id {
826                    write!(f, " trace_id={tid}")?;
827                }
828                Ok(())
829            }
830            FabricEvent::TeardownFailed {
831                resource_type,
832                lease_id,
833                node,
834                error,
835            } => write!(
836                f,
837                "teardown_failed: type={resource_type} id={lease_id} node={node} error={error}"
838            ),
839            FabricEvent::AuthFailed {
840                node,
841                reason,
842                trace_id,
843            } => {
844                write!(f, "auth_failed: node={node} reason={reason}")?;
845                if let Some(tid) = trace_id {
846                    write!(f, " trace_id={tid}")?;
847                }
848                Ok(())
849            }
850            FabricEvent::ReplayRejected {
851                node,
852                nonce,
853                trace_id,
854            } => {
855                write!(f, "replay_rejected: node={node} nonce={nonce}")?;
856                if let Some(tid) = trace_id {
857                    write!(f, " trace_id={tid}")?;
858                }
859                Ok(())
860            }
861            FabricEvent::TokenValidationFailed { node, reason } => {
862                write!(f, "token_validation_failed: node={node} reason={reason}")
863            }
864            FabricEvent::ListenerAcquired {
865                port,
866                node,
867                lease_id,
868            } => write!(
869                f,
870                "listener_acquired: port={port} node={node} lease_id={lease_id}"
871            ),
872            FabricEvent::ListenerRevoked {
873                port,
874                node,
875                lease_id,
876            } => write!(
877                f,
878                "listener_revoked: port={port} node={node} lease_id={lease_id}"
879            ),
880            FabricEvent::ListenerFenced {
881                port,
882                node,
883                lease_id,
884                reason,
885            } => write!(
886                f,
887                "listener_fenced: port={port} node={node} lease_id={lease_id} reason={reason}"
888            ),
889            FabricEvent::SessionAccepted {
890                listener_port,
891                session_id,
892                node,
893            } => write!(
894                f,
895                "session_accepted: listener_port={listener_port} session_id={session_id} node={node}"
896            ),
897            FabricEvent::SessionClosed {
898                listener_port,
899                session_id,
900                node,
901            } => write!(
902                f,
903                "session_closed: listener_port={listener_port} session_id={session_id} node={node}"
904            ),
905            FabricEvent::SessionDrained {
906                listener_port,
907                sessions_drained,
908                node,
909            } => write!(
910                f,
911                "session_drained: listener_port={listener_port} sessions_drained={sessions_drained} node={node}"
912            ),
913            FabricEvent::ServiceDeployed {
914                name,
915                instance_count,
916            } => write!(
917                f,
918                "service_deployed: name={name} instance_count={instance_count}"
919            ),
920            FabricEvent::ServiceInstanceStateChanged {
921                name,
922                instance_id,
923                state,
924            } => write!(
925                f,
926                "service_instance_state_changed: name={name} instance_id={instance_id} state={state}"
927            ),
928            FabricEvent::ServiceCutoverStarted { name } => {
929                write!(f, "service_cutover_started: name={name}")
930            }
931            FabricEvent::ServiceCutoverCompleted { name } => {
932                write!(f, "service_cutover_completed: name={name}")
933            }
934            FabricEvent::ServiceFailoverTriggered { name, reason } => {
935                write!(
936                    f,
937                    "service_failover_triggered: name={name} reason={reason}"
938                )
939            }
940            FabricEvent::ServiceFailoverCompleted { name } => {
941                write!(f, "service_failover_completed: name={name}")
942            }
943            FabricEvent::ServiceIngressFenced { name, instance_id } => {
944                write!(
945                    f,
946                    "service_ingress_fenced: name={name} instance_id={instance_id}"
947                )
948            }
949            FabricEvent::ServiceUndeployed { name } => {
950                write!(f, "service_undeployed: name={name}")
951            }
952            FabricEvent::TaskletSubmitted {
953                tasklet_id,
954                node,
955                runtime_type,
956                trace_id,
957            } => {
958                write!(
959                    f,
960                    "tasklet_submitted: tasklet_id={tasklet_id} node={node} runtime_type={runtime_type}"
961                )?;
962                if let Some(tid) = trace_id {
963                    write!(f, " trace_id={tid}")?;
964                }
965                Ok(())
966            }
967            FabricEvent::TaskletCompleted {
968                tasklet_id,
969                status,
970                duration_us,
971                output_bytes,
972                runtime_type,
973                trace_id,
974            } => {
975                write!(
976                    f,
977                    "tasklet_completed: tasklet_id={tasklet_id} status={status} duration_us={duration_us} output_bytes={output_bytes} runtime_type={runtime_type}"
978                )?;
979                if let Some(tid) = trace_id {
980                    write!(f, " trace_id={tid}")?;
981                }
982                Ok(())
983            }
984            FabricEvent::TaskletFailed {
985                tasklet_id,
986                status,
987                duration_us,
988                reason,
989                runtime_type,
990                trace_id,
991            } => {
992                write!(
993                    f,
994                    "tasklet_failed: tasklet_id={tasklet_id} status={status} duration_us={duration_us} reason={reason} runtime_type={runtime_type}"
995                )?;
996                if let Some(tid) = trace_id {
997                    write!(f, " trace_id={tid}")?;
998                }
999                Ok(())
1000            }
1001            FabricEvent::SecurityProfileActive { mode } => {
1002                write!(f, "security_profile_active: mode={mode}")
1003            }
1004            FabricEvent::AdmissionApproved {
1005                tenant_id,
1006                node,
1007                resource_type,
1008                bytes,
1009                trace_id,
1010            } => {
1011                write!(
1012                    f,
1013                    "admission_approved: tenant={tenant_id} node={node} type={resource_type} bytes={bytes}"
1014                )?;
1015                if let Some(tid) = trace_id {
1016                    write!(f, " trace_id={tid}")?;
1017                }
1018                Ok(())
1019            }
1020            FabricEvent::AdmissionDenied {
1021                tenant_id,
1022                resource_type,
1023                reason,
1024                quota_violation,
1025                trace_id,
1026            } => {
1027                write!(
1028                    f,
1029                    "admission_denied: tenant={tenant_id} type={resource_type} reason={reason}"
1030                )?;
1031                if let Some(violation) = quota_violation {
1032                    write!(f, " quota_violation={}", violation.as_str())?;
1033                }
1034                if let Some(tid) = trace_id {
1035                    write!(f, " trace_id={tid}")?;
1036                }
1037                Ok(())
1038            }
1039            FabricEvent::PlacementDecision {
1040                tenant_id,
1041                node,
1042                strategy,
1043                score,
1044                trace_id,
1045            } => {
1046                write!(
1047                    f,
1048                    "placement_decision: tenant={tenant_id} node={node} strategy={strategy} score={score}"
1049                )?;
1050                if let Some(tid) = trace_id {
1051                    write!(f, " trace_id={tid}")?;
1052                }
1053                Ok(())
1054            }
1055            FabricEvent::PreemptionTriggered {
1056                victim_lease_id,
1057                victim_tenant,
1058                preemptor_tenant,
1059                node,
1060                reason,
1061                trace_id,
1062            } => {
1063                write!(
1064                    f,
1065                    "preemption_triggered: victim_lease_id={victim_lease_id} victim_tenant={victim_tenant} preemptor_tenant={preemptor_tenant} node={node} reason={reason}"
1066                )?;
1067                if let Some(tid) = trace_id {
1068                    write!(f, " trace_id={tid}")?;
1069                }
1070                Ok(())
1071            }
1072            FabricEvent::CrossStateDisagreementResolved {
1073                kind,
1074                protocol,
1075                lease_id,
1076                authority,
1077                trace_id,
1078            } => {
1079                write!(
1080                    f,
1081                    "cross_state_disagreement_resolved: kind={kind} protocol={protocol} lease_id={lease_id} authority={authority}"
1082                )?;
1083                if let Some(tid) = trace_id {
1084                    write!(f, " trace_id={tid}")?;
1085                }
1086                Ok(())
1087            }
1088            FabricEvent::QuotaExceeded {
1089                tenant_id,
1090                resource_type,
1091                requested,
1092                limit,
1093                trace_id,
1094            } => {
1095                write!(
1096                    f,
1097                    "quota_exceeded: tenant={tenant_id} type={resource_type} requested={requested} limit={limit}"
1098                )?;
1099                if let Some(tid) = trace_id {
1100                    write!(f, " trace_id={tid}")?;
1101                }
1102                Ok(())
1103            }
1104            FabricEvent::TokenMinted {
1105                tenant_id,
1106                node,
1107                ttl_secs,
1108                trace_id,
1109            } => {
1110                write!(
1111                    f,
1112                    "token_minted: tenant={tenant_id} node={node} ttl_secs={ttl_secs}"
1113                )?;
1114                if let Some(tid) = trace_id {
1115                    write!(f, " trace_id={tid}")?;
1116                }
1117                Ok(())
1118            }
1119            FabricEvent::ProcessStarted {
1120                pid,
1121                scenario,
1122                nodes,
1123            } => {
1124                write!(
1125                    f,
1126                    "process_started: pid={pid} scenario={scenario} nodes={nodes}"
1127                )
1128            }
1129            FabricEvent::ProcessHeartbeat { pid } => {
1130                write!(f, "process_heartbeat: pid={pid}")
1131            }
1132            FabricEvent::ProcessCompleted {
1133                pid,
1134                exit_code,
1135                duration_secs,
1136            } => {
1137                write!(
1138                    f,
1139                    "process_completed: pid={pid} exit_code={exit_code} duration_secs={duration_secs}"
1140                )
1141            }
1142            FabricEvent::SchedulerElectionLost { reason, epoch } => {
1143                write!(
1144                    f,
1145                    "scheduler_election_lost: reason={reason} epoch={epoch}"
1146                )
1147            }
1148            FabricEvent::SchedulerPromotionFailed { reason, epoch } => {
1149                write!(
1150                    f,
1151                    "scheduler_promotion_failed: reason={reason} epoch={epoch}"
1152                )
1153            }
1154            FabricEvent::SchedulerStaleLeaderDetected {
1155                local_epoch,
1156                node_epoch,
1157            } => {
1158                write!(
1159                    f,
1160                    "scheduler_stale_leader_detected: local_epoch={local_epoch} node_epoch={node_epoch}"
1161                )
1162            }
1163            FabricEvent::SchedulerPromoted {
1164                epoch,
1165                nodes_fenced,
1166                latency_ms,
1167            } => {
1168                write!(
1169                    f,
1170                    "scheduler_promoted: epoch={epoch} nodes_fenced={nodes_fenced} latency_ms={latency_ms}"
1171                )
1172            }
1173        }
1174    }
1175}
1176
1177/// Trait for consuming fabric events.
1178///
1179/// Implementations decide what to do with events — log them, store them,
1180/// forward them to an external system, or discard them.
1181///
1182/// Built-in implementations:
1183/// - [`NullSink`] — discards all events (default for `no_std`).
1184/// - [`StdoutSink`] — prints events to stdout (requires `std` feature).
1185/// - [`EventRingBuffer`] — no-op via this trait; use [`EventRingBuffer::push`] directly.
1186///
1187/// Optional feature-gated implementations:
1188/// - `JsonEventSink` — one-line JSON to stdout (requires `json-log` feature).
1189pub trait EventSink {
1190    /// Consume an event. Implementations should not block.
1191    fn emit(&self, event: &FabricEvent);
1192}
1193
1194/// A sink that discards all events. Default `no_std` sink.
1195pub struct NullSink;
1196
1197impl EventSink for NullSink {
1198    fn emit(&self, _event: &FabricEvent) {}
1199}
1200
1201/// A sink that prints events to stdout.
1202#[cfg(feature = "std")]
1203pub struct StdoutSink;
1204
1205#[cfg(feature = "std")]
1206impl EventSink for StdoutSink {
1207    fn emit(&self, event: &FabricEvent) {
1208        println!("[fabric] {event}");
1209    }
1210}
1211
1212/// Fixed-size ring buffer for storing recent events.
1213///
1214/// When the buffer is full, new events overwrite the oldest entries.
1215/// The default capacity is 1024 events.
1216///
1217/// # Examples
1218///
1219/// ```
1220/// use grafos_observe::{EventRingBuffer, FabricEvent, ResourceType};
1221///
1222/// let mut ring = EventRingBuffer::new(4);
1223/// ring.push(FabricEvent::LeaseAcquired {
1224///     resource_type: ResourceType::Mem,
1225///     lease_id: 1,
1226///     node: "node-a".into(),
1227///     bytes: 4096,
1228///     trace_id: None,
1229/// });
1230/// assert_eq!(ring.len(), 1);
1231///
1232/// // Iterate over stored events (oldest first)
1233/// for event in ring.iter() {
1234///     println!("{event}");
1235/// }
1236///
1237/// // Drain removes and returns all events
1238/// let events = ring.drain();
1239/// assert!(ring.is_empty());
1240/// ```
1241pub struct EventRingBuffer {
1242    buf: alloc::vec::Vec<Option<FabricEvent>>,
1243    /// Write position (wraps around).
1244    head: usize,
1245    /// Number of events currently stored (capped at capacity).
1246    len: usize,
1247}
1248
1249impl EventRingBuffer {
1250    /// Default ring buffer capacity.
1251    pub const DEFAULT_CAPACITY: usize = 1024;
1252
1253    /// Create a new ring buffer with the given capacity.
1254    pub fn new(capacity: usize) -> Self {
1255        let cap = if capacity == 0 { 1 } else { capacity };
1256        let mut buf = alloc::vec::Vec::with_capacity(cap);
1257        buf.resize_with(cap, || None);
1258        Self {
1259            buf,
1260            head: 0,
1261            len: 0,
1262        }
1263    }
1264
1265    /// Create a new ring buffer with the default capacity (1024).
1266    pub fn with_default_capacity() -> Self {
1267        Self::new(Self::DEFAULT_CAPACITY)
1268    }
1269
1270    /// Push an event into the ring buffer.
1271    ///
1272    /// If the buffer is full, the oldest event is overwritten.
1273    pub fn push(&mut self, event: FabricEvent) {
1274        self.buf[self.head] = Some(event);
1275        self.head = (self.head + 1) % self.buf.len();
1276        if self.len < self.buf.len() {
1277            self.len += 1;
1278        }
1279    }
1280
1281    /// Number of events currently stored.
1282    pub fn len(&self) -> usize {
1283        self.len
1284    }
1285
1286    /// Whether the buffer is empty.
1287    pub fn is_empty(&self) -> bool {
1288        self.len == 0
1289    }
1290
1291    /// Capacity of the ring buffer.
1292    pub fn capacity(&self) -> usize {
1293        self.buf.len()
1294    }
1295
1296    /// Iterate over stored events in order (oldest first).
1297    pub fn iter(&self) -> EventRingIter<'_> {
1298        let start = if self.len < self.buf.len() {
1299            0
1300        } else {
1301            self.head
1302        };
1303        EventRingIter {
1304            buf: &self.buf,
1305            pos: start,
1306            remaining: self.len,
1307        }
1308    }
1309
1310    /// Drain all events from the buffer, returning them in order (oldest first).
1311    ///
1312    /// The buffer is empty after this call.
1313    pub fn drain(&mut self) -> alloc::vec::Vec<FabricEvent> {
1314        let mut events = alloc::vec::Vec::with_capacity(self.len);
1315        let start = if self.len < self.buf.len() {
1316            0
1317        } else {
1318            self.head
1319        };
1320        for i in 0..self.len {
1321            let idx = (start + i) % self.buf.len();
1322            if let Some(ev) = self.buf[idx].take() {
1323                events.push(ev);
1324            }
1325        }
1326        self.head = 0;
1327        self.len = 0;
1328        events
1329    }
1330}
1331
1332impl EventSink for EventRingBuffer {
1333    fn emit(&self, event: &FabricEvent) {
1334        // EventSink takes &self but push needs &mut self.
1335        // For the ring buffer, callers should use push() directly.
1336        // This impl exists so the ring buffer can be used behind a
1337        // wrapper that provides interior mutability (e.g. Mutex).
1338        let _ = event;
1339    }
1340}
1341
1342/// Iterator over events in an [`EventRingBuffer`].
1343pub struct EventRingIter<'a> {
1344    buf: &'a [Option<FabricEvent>],
1345    pos: usize,
1346    remaining: usize,
1347}
1348
1349impl<'a> Iterator for EventRingIter<'a> {
1350    type Item = &'a FabricEvent;
1351
1352    fn next(&mut self) -> Option<Self::Item> {
1353        if self.remaining == 0 {
1354            return None;
1355        }
1356        let idx = self.pos % self.buf.len();
1357        self.pos += 1;
1358        self.remaining -= 1;
1359        self.buf[idx].as_ref()
1360    }
1361
1362    fn size_hint(&self) -> (usize, Option<usize>) {
1363        (self.remaining, Some(self.remaining))
1364    }
1365}
1366
1367impl<'a> ExactSizeIterator for EventRingIter<'a> {}
1368
1369#[cfg(test)]
1370mod tests {
1371    use super::*;
1372    use alloc::string::ToString;
1373
1374    fn sample_lease_acquired() -> FabricEvent {
1375        FabricEvent::LeaseAcquired {
1376            resource_type: ResourceType::Mem,
1377            lease_id: 1,
1378            node: "10.10.0.11".to_string(),
1379            bytes: 4096,
1380            trace_id: None,
1381        }
1382    }
1383
1384    fn sample_op_completed() -> FabricEvent {
1385        FabricEvent::OpCompleted {
1386            op_type: OpType::Write,
1387            duration_us: 500,
1388            bytes: 1024,
1389        }
1390    }
1391
1392    #[test]
1393    fn ring_buffer_push_and_len() {
1394        let mut rb = EventRingBuffer::new(4);
1395        assert!(rb.is_empty());
1396        assert_eq!(rb.len(), 0);
1397        assert_eq!(rb.capacity(), 4);
1398
1399        rb.push(sample_lease_acquired());
1400        assert_eq!(rb.len(), 1);
1401        assert!(!rb.is_empty());
1402    }
1403
1404    #[test]
1405    fn ring_buffer_overflow_wraps() {
1406        let mut rb = EventRingBuffer::new(2);
1407        rb.push(FabricEvent::RewriteStarted { plan_id: 1 });
1408        rb.push(FabricEvent::RewriteStarted { plan_id: 2 });
1409        rb.push(FabricEvent::RewriteStarted { plan_id: 3 });
1410
1411        // Capacity is 2, so len should be 2 (oldest was overwritten).
1412        assert_eq!(rb.len(), 2);
1413
1414        let events: alloc::vec::Vec<_> = rb.iter().collect();
1415        assert_eq!(events.len(), 2);
1416        // Oldest remaining should be plan_id=2.
1417        match &events[0] {
1418            FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 2),
1419            _ => panic!("unexpected event type"),
1420        }
1421        match &events[1] {
1422            FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 3),
1423            _ => panic!("unexpected event type"),
1424        }
1425    }
1426
1427    #[test]
1428    fn ring_buffer_drain() {
1429        let mut rb = EventRingBuffer::new(4);
1430        rb.push(sample_lease_acquired());
1431        rb.push(sample_op_completed());
1432        assert_eq!(rb.len(), 2);
1433
1434        let drained = rb.drain();
1435        assert_eq!(drained.len(), 2);
1436        assert!(rb.is_empty());
1437        assert_eq!(rb.len(), 0);
1438    }
1439
1440    #[test]
1441    fn ring_buffer_drain_after_overflow() {
1442        let mut rb = EventRingBuffer::new(2);
1443        rb.push(FabricEvent::RewriteStarted { plan_id: 1 });
1444        rb.push(FabricEvent::RewriteStarted { plan_id: 2 });
1445        rb.push(FabricEvent::RewriteStarted { plan_id: 3 });
1446
1447        let drained = rb.drain();
1448        assert_eq!(drained.len(), 2);
1449        match &drained[0] {
1450            FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 2),
1451            _ => panic!("unexpected event type"),
1452        }
1453        match &drained[1] {
1454            FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 3),
1455            _ => panic!("unexpected event type"),
1456        }
1457    }
1458
1459    #[test]
1460    fn ring_buffer_iter_exact_size() {
1461        let mut rb = EventRingBuffer::new(4);
1462        rb.push(sample_lease_acquired());
1463        rb.push(sample_op_completed());
1464        let iter = rb.iter();
1465        assert_eq!(iter.len(), 2);
1466    }
1467
1468    #[test]
1469    fn ring_buffer_default_capacity() {
1470        let rb = EventRingBuffer::with_default_capacity();
1471        assert_eq!(rb.capacity(), 1024);
1472        assert!(rb.is_empty());
1473    }
1474
1475    #[test]
1476    fn ring_buffer_zero_capacity_becomes_one() {
1477        let rb = EventRingBuffer::new(0);
1478        assert_eq!(rb.capacity(), 1);
1479    }
1480
1481    #[test]
1482    fn null_sink_does_not_panic() {
1483        let sink = NullSink;
1484        sink.emit(&sample_lease_acquired());
1485        sink.emit(&sample_op_completed());
1486    }
1487
1488    #[test]
1489    fn event_display() {
1490        let ev = sample_lease_acquired();
1491        let s = alloc::format!("{ev}");
1492        assert!(s.contains("lease_acquired"));
1493        assert!(s.contains("10.10.0.11"));
1494        assert!(s.contains("4096"));
1495
1496        let ev2 = FabricEvent::OpFailed {
1497            op_type: OpType::Read,
1498            error: "timeout".to_string(),
1499        };
1500        let s2 = alloc::format!("{ev2}");
1501        assert!(s2.contains("op_failed"));
1502        assert!(s2.contains("timeout"));
1503    }
1504
1505    #[test]
1506    fn resource_type_display() {
1507        assert_eq!(alloc::format!("{}", ResourceType::Mem), "mem");
1508        assert_eq!(alloc::format!("{}", ResourceType::Block), "block");
1509        assert_eq!(alloc::format!("{}", ResourceType::Gpu), "gpu");
1510        assert_eq!(alloc::format!("{}", ResourceType::Cpu), "cpu");
1511    }
1512
1513    #[test]
1514    fn op_type_display() {
1515        assert_eq!(alloc::format!("{}", OpType::Read), "read");
1516        assert_eq!(alloc::format!("{}", OpType::Write), "write");
1517        assert_eq!(alloc::format!("{}", OpType::ReadBlock), "read_block");
1518        assert_eq!(alloc::format!("{}", OpType::WriteBlock), "write_block");
1519        assert_eq!(alloc::format!("{}", OpType::GpuSubmit), "gpu_submit");
1520        assert_eq!(
1521            alloc::format!("{}", OpType::TaskletSubmit),
1522            "tasklet_submit"
1523        );
1524    }
1525
1526    #[test]
1527    fn rewrite_phase_display() {
1528        assert_eq!(alloc::format!("{}", RewritePhase::Validate), "validate");
1529        assert_eq!(alloc::format!("{}", RewritePhase::Stage), "stage");
1530        assert_eq!(alloc::format!("{}", RewritePhase::Canary), "canary");
1531        assert_eq!(alloc::format!("{}", RewritePhase::Cutover), "cutover");
1532        assert_eq!(alloc::format!("{}", RewritePhase::Cleanup), "cleanup");
1533        assert_eq!(alloc::format!("{}", RewritePhase::Commit), "commit");
1534        assert_eq!(alloc::format!("{}", RewritePhase::Rollback), "rollback");
1535    }
1536
1537    #[test]
1538    fn rewrite_phase_bridge_from_core_is_exhaustive_and_lossless() {
1539        // Exhaustive 7-variant 1:1 mapping. Future variant
1540        // additions on either side break this test until a typed
1541        // mapping is added — mirrors slice 224's
1542        // EdgeTraceContext bridge contract.
1543        let pairs: &[(grafos_core::RewritePhase, RewritePhase)] = &[
1544            (grafos_core::RewritePhase::Validate, RewritePhase::Validate),
1545            (grafos_core::RewritePhase::Stage, RewritePhase::Stage),
1546            (grafos_core::RewritePhase::Canary, RewritePhase::Canary),
1547            (grafos_core::RewritePhase::Cutover, RewritePhase::Cutover),
1548            (grafos_core::RewritePhase::Cleanup, RewritePhase::Cleanup),
1549            (grafos_core::RewritePhase::Commit, RewritePhase::Commit),
1550            (grafos_core::RewritePhase::Rollback, RewritePhase::Rollback),
1551        ];
1552        for (core_phase, observe_phase) in pairs {
1553            let bridged: RewritePhase = (*core_phase).into();
1554            assert_eq!(bridged, *observe_phase);
1555            // Round-trip back through the inverse impl.
1556            let back: grafos_core::RewritePhase = (*observe_phase).into();
1557            assert_eq!(back, *core_phase);
1558        }
1559    }
1560
1561    #[test]
1562    fn resource_kind_to_type_bridge_pins_canonical_mapping() {
1563        // Six of seven variants map 1:1. Tasklet is the only
1564        // composite; pinned to Cpu (the typically-fenced half)
1565        // per the v1.1 §3 ResourceKind docstring. This pin
1566        // catches accidental rotation of the Tasklet bucket
1567        // (e.g. back to Mem, which would re-introduce the prior
1568        // divergence between scheduler and scheduler-service).
1569        let cases: &[(grafos_core::ResourceKind, ResourceType)] = &[
1570            (grafos_core::ResourceKind::Mem, ResourceType::Mem),
1571            (grafos_core::ResourceKind::Block, ResourceType::Block),
1572            (grafos_core::ResourceKind::Net, ResourceType::Net),
1573            (grafos_core::ResourceKind::Cpu, ResourceType::Cpu),
1574            (grafos_core::ResourceKind::Gpu, ResourceType::Gpu),
1575            (grafos_core::ResourceKind::GpuMem, ResourceType::GpuMem),
1576            // The canonical Tasklet mapping. If you change this,
1577            // also update both wrappers (scheduler::observe_hooks
1578            // and scheduler-service::observe_resource_type_for)
1579            // — they delegate via `.into()` so the strings flow
1580            // through, but downstream alert rules may need a
1581            // heads-up.
1582            (grafos_core::ResourceKind::Tasklet, ResourceType::Cpu),
1583        ];
1584        for (kind, expected) in cases {
1585            let bridged: ResourceType = (*kind).into();
1586            assert_eq!(bridged, *expected, "bridge for {kind:?}");
1587        }
1588    }
1589
1590    #[test]
1591    fn resource_kind_to_type_bridge_preserves_display_for_non_tasklet() {
1592        // Slice 238 pinned ResourceKind::as_str (lowercase). The
1593        // bridge preserves the same string for the six 1:1
1594        // variants. Tasklet does NOT preserve the string by
1595        // design — it collapses to "cpu" because the observe
1596        // event schema has no Tasklet variant.
1597        for &kind in &[
1598            grafos_core::ResourceKind::Mem,
1599            grafos_core::ResourceKind::Block,
1600            grafos_core::ResourceKind::Net,
1601            grafos_core::ResourceKind::Cpu,
1602            grafos_core::ResourceKind::Gpu,
1603            grafos_core::ResourceKind::GpuMem,
1604        ] {
1605            let bridged: ResourceType = kind.into();
1606            assert_eq!(
1607                kind.as_str(),
1608                alloc::format!("{}", bridged),
1609                "1:1 variant {kind:?} must preserve string across bridge"
1610            );
1611        }
1612        // Tasklet collapses by design.
1613        let tasklet: ResourceType = grafos_core::ResourceKind::Tasklet.into();
1614        assert_eq!(grafos_core::ResourceKind::Tasklet.as_str(), "tasklet");
1615        assert_eq!(alloc::format!("{}", tasklet), "cpu");
1616    }
1617
1618    #[test]
1619    fn rewrite_phase_bridge_preserves_display_string() {
1620        // Cross-layer string-equivalence pin: the SIEM/event
1621        // string emitted by either side must match. Slice 232
1622        // pinned the core as_str strings; this pin ensures the
1623        // bridge doesn't introduce a string-level drift.
1624        for &core_phase in &[
1625            grafos_core::RewritePhase::Validate,
1626            grafos_core::RewritePhase::Stage,
1627            grafos_core::RewritePhase::Canary,
1628            grafos_core::RewritePhase::Cutover,
1629            grafos_core::RewritePhase::Cleanup,
1630            grafos_core::RewritePhase::Commit,
1631            grafos_core::RewritePhase::Rollback,
1632        ] {
1633            let observe_phase: RewritePhase = core_phase.into();
1634            assert_eq!(
1635                core_phase.as_str(),
1636                alloc::format!("{}", observe_phase),
1637                "bridge must preserve the SIEM string across layers"
1638            );
1639        }
1640    }
1641
1642    #[test]
1643    fn new_event_variants_display() {
1644        let ev = FabricEvent::LeaseRevoked {
1645            resource_type: ResourceType::Mem,
1646            lease_id: 10,
1647            node: "node-a".to_string(),
1648            trace_id: None,
1649        };
1650        let s = alloc::format!("{ev}");
1651        assert!(s.contains("lease_revoked"));
1652        assert!(s.contains("node-a"));
1653
1654        let ev = FabricEvent::LeaseFenced {
1655            resource_type: ResourceType::Block,
1656            lease_id: 11,
1657            node: "node-b".to_string(),
1658            reason: "teardown timeout".to_string(),
1659            trace_id: None,
1660        };
1661        let s = alloc::format!("{ev}");
1662        assert!(s.contains("lease_fenced"));
1663        assert!(s.contains("teardown timeout"));
1664
1665        let ev = FabricEvent::TeardownFailed {
1666            resource_type: ResourceType::Gpu,
1667            lease_id: 12,
1668            node: "node-c".to_string(),
1669            error: "connection lost".to_string(),
1670        };
1671        let s = alloc::format!("{ev}");
1672        assert!(s.contains("teardown_failed"));
1673        assert!(s.contains("connection lost"));
1674
1675        let ev = FabricEvent::AuthFailed {
1676            node: "node-d".to_string(),
1677            reason: "bad cert".to_string(),
1678            trace_id: None,
1679        };
1680        let s = alloc::format!("{ev}");
1681        assert!(s.contains("auth_failed"));
1682        assert!(s.contains("bad cert"));
1683
1684        let ev = FabricEvent::ReplayRejected {
1685            node: "node-e".to_string(),
1686            nonce: 12345,
1687            trace_id: None,
1688        };
1689        let s = alloc::format!("{ev}");
1690        assert!(s.contains("replay_rejected"));
1691        assert!(s.contains("12345"));
1692
1693        let ev = FabricEvent::TokenValidationFailed {
1694            node: "node-f".to_string(),
1695            reason: "expired".to_string(),
1696        };
1697        let s = alloc::format!("{ev}");
1698        assert!(s.contains("token_validation_failed"));
1699        assert!(s.contains("expired"));
1700    }
1701
1702    #[cfg(feature = "std")]
1703    #[test]
1704    fn stdout_sink_does_not_panic() {
1705        let sink = StdoutSink;
1706        sink.emit(&sample_lease_acquired());
1707    }
1708
1709    #[test]
1710    fn tasklet_event_variants_display() {
1711        let ev = FabricEvent::TaskletSubmitted {
1712            tasklet_id: 42,
1713            node: "node-a".to_string(),
1714            runtime_type: "linux-native-tasklet".to_string(),
1715            trace_id: None,
1716        };
1717        let s = alloc::format!("{ev}");
1718        assert!(s.contains("tasklet_submitted"));
1719        assert!(s.contains("42"));
1720
1721        let ev = FabricEvent::TaskletCompleted {
1722            tasklet_id: 42,
1723            status: 0,
1724            duration_us: 1500,
1725            output_bytes: 256,
1726            runtime_type: "linux-native-tasklet".to_string(),
1727            trace_id: None,
1728        };
1729        let s = alloc::format!("{ev}");
1730        assert!(s.contains("tasklet_completed"));
1731        assert!(s.contains("1500"));
1732
1733        let ev = FabricEvent::TaskletFailed {
1734            tasklet_id: 42,
1735            status: 3,
1736            duration_us: 500,
1737            reason: "fuel_exhausted".to_string(),
1738            runtime_type: "linux-native-tasklet".to_string(),
1739            trace_id: None,
1740        };
1741        let s = alloc::format!("{ev}");
1742        assert!(s.contains("tasklet_failed"));
1743        assert!(s.contains("fuel_exhausted"));
1744    }
1745
1746    #[cfg(feature = "std")]
1747    #[test]
1748    fn emit_event_without_sink_is_noop() {
1749        // Before any sink is registered, emit_event should not panic.
1750        emit_event(sample_lease_acquired());
1751    }
1752}