grafos_observe/
event.rs

1//! Event system for fabric observability.
2//!
3//! The [`FabricEvent`] enum represents all observable events in the fabric.
4//! Events are pushed to an [`EventSink`] implementation — either stored in
5//! an [`EventRingBuffer`] for later inspection, or forwarded to sinks like
6//! [`StdoutSink`] for immediate output.
7
8use alloc::string::String;
9use core::fmt;
10
11// ---------------------------------------------------------------------------
12// Global event sink (std only)
13// ---------------------------------------------------------------------------
14
15#[cfg(feature = "std")]
16static GLOBAL_SINK: std::sync::OnceLock<Box<dyn EventSink + Send + Sync>> =
17    std::sync::OnceLock::new();
18
19/// Register a process-wide [`EventSink`].
20///
21/// Only the first call takes effect (uses `OnceLock`). Returns `true` if the
22/// sink was registered, `false` if one was already set. After registration,
23/// [`emit_event`] forwards events to this sink.
24#[cfg(feature = "std")]
25pub fn set_global_sink(sink: Box<dyn EventSink + Send + Sync>) -> bool {
26    GLOBAL_SINK.set(sink).is_ok()
27}
28
29/// Emit an event to the global sink (if one has been registered via
30/// [`set_global_sink`]). No-op if no sink is registered or if the `std`
31/// feature is disabled.
32#[cfg(feature = "std")]
33pub fn emit_event(event: FabricEvent) {
34    if let Some(sink) = GLOBAL_SINK.get() {
35        sink.emit(&event);
36    }
37}
38
39/// No-op on `no_std` — events are discarded.
40#[cfg(not(feature = "std"))]
41pub fn emit_event(_event: FabricEvent) {}
42
43/// No-op on `no_std`.
44#[cfg(not(feature = "std"))]
45pub fn set_global_sink(_sink: ()) -> bool {
46    false
47}
48
49/// An observable fabric event.
50///
51/// Each variant captures the relevant context for a single observable
52/// occurrence in the grafOS system. Events are passed to an [`EventSink`]
53/// or stored in an [`EventRingBuffer`] for later inspection.
54///
55/// All variants implement [`Display`](core::fmt::Display) for human-readable
56/// output and [`Clone`] for buffering.
57#[derive(Debug, Clone)]
58pub enum FabricEvent {
59    /// A new lease was acquired.
60    LeaseAcquired {
61        /// Kind of resource leased.
62        resource_type: ResourceType,
63        /// Unique lease identifier.
64        lease_id: u64,
65        /// Node address (e.g. `"10.10.0.11"`).
66        node: String,
67        /// Size of the leased region in bytes.
68        bytes: u64,
69        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
70        trace_id: Option<String>,
71    },
72    /// A lease was dropped (normal release).
73    LeaseDropped {
74        /// Kind of resource released.
75        resource_type: ResourceType,
76        /// Unique lease identifier.
77        lease_id: u64,
78        /// Node address.
79        node: String,
80    },
81    /// A lease expired without being dropped.
82    LeaseExpired {
83        /// Kind of resource whose lease expired.
84        resource_type: ResourceType,
85        /// Unique lease identifier.
86        lease_id: u64,
87        /// Node address.
88        node: String,
89    },
90    /// A data-plane operation completed successfully.
91    OpCompleted {
92        /// Operation type (read, write, etc.).
93        op_type: OpType,
94        /// Duration in microseconds.
95        duration_us: u64,
96        /// Bytes transferred.
97        bytes: u64,
98    },
99    /// A data-plane operation failed.
100    OpFailed {
101        /// Operation type that failed.
102        op_type: OpType,
103        /// Error description.
104        error: String,
105    },
106    /// A graph rewrite plan started execution.
107    RewriteStarted {
108        /// Unique rewrite plan identifier.
109        plan_id: u64,
110    },
111    /// A graph rewrite plan completed a phase.
112    RewriteCompleted {
113        /// Unique rewrite plan identifier.
114        plan_id: u64,
115        /// Phase that was completed.
116        phase: RewritePhase,
117    },
118    /// A service was registered in the service registry.
119    ServiceRegistered {
120        /// Service name.
121        name: String,
122        /// Service version.
123        version: String,
124    },
125    /// A service was deregistered from the service registry.
126    ServiceDeregistered {
127        /// Service name.
128        name: String,
129    },
130    /// A message was published to a topic.
131    MessagePublished {
132        /// Topic name.
133        topic: String,
134        /// Message size in bytes.
135        bytes: u64,
136    },
137    /// A message was consumed from a topic.
138    MessageConsumed {
139        /// Topic name.
140        topic: String,
141        /// Consumer group.
142        group: String,
143    },
144    /// An object was stored in a bucket.
145    ObjectStored {
146        /// Bucket name.
147        bucket: String,
148        /// Object key.
149        key: String,
150        /// Object size in bytes.
151        bytes: u64,
152    },
153    /// An object was retrieved from a bucket.
154    ObjectRetrieved {
155        /// Bucket name.
156        bucket: String,
157        /// Object key.
158        key: String,
159        /// Object size in bytes.
160        bytes: u64,
161    },
162    /// A lease was explicitly revoked (not just expired).
163    LeaseRevoked {
164        /// Kind of resource revoked.
165        resource_type: ResourceType,
166        /// Unique lease identifier.
167        lease_id: u64,
168        /// Node address.
169        node: String,
170        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
171        trace_id: Option<String>,
172    },
173    /// A resource entered fenced state after teardown failure.
174    LeaseFenced {
175        /// Kind of resource fenced.
176        resource_type: ResourceType,
177        /// Unique lease identifier.
178        lease_id: u64,
179        /// Node address.
180        node: String,
181        /// Reason for fencing.
182        reason: String,
183        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
184        trace_id: Option<String>,
185    },
186    /// A teardown operation failed.
187    TeardownFailed {
188        /// Kind of resource whose teardown failed.
189        resource_type: ResourceType,
190        /// Unique lease identifier.
191        lease_id: u64,
192        /// Node address.
193        node: String,
194        /// Error description.
195        error: String,
196    },
197    /// An authentication attempt failed.
198    AuthFailed {
199        /// Node address.
200        node: String,
201        /// Reason for failure.
202        reason: String,
203        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
204        trace_id: Option<String>,
205    },
206    /// An anti-replay cache rejected a message.
207    ReplayRejected {
208        /// Node address.
209        node: String,
210        /// Rejected nonce.
211        nonce: u64,
212        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
213        trace_id: Option<String>,
214    },
215    /// A capability token validation failed.
216    TokenValidationFailed {
217        /// Node address.
218        node: String,
219        /// Reason for failure.
220        reason: String,
221    },
222    /// A service listener was acquired (bound to a port).
223    ListenerAcquired {
224        /// Port number.
225        port: u16,
226        /// Node address.
227        node: String,
228        /// Lease identifier.
229        lease_id: u64,
230    },
231    /// A service listener was revoked.
232    ListenerRevoked {
233        /// Port number.
234        port: u16,
235        /// Node address.
236        node: String,
237        /// Lease identifier.
238        lease_id: u64,
239    },
240    /// A service listener entered fenced state.
241    ListenerFenced {
242        /// Port number.
243        port: u16,
244        /// Node address.
245        node: String,
246        /// Lease identifier.
247        lease_id: u64,
248        /// Reason for fencing.
249        reason: String,
250    },
251    /// A session was accepted on a service listener.
252    SessionAccepted {
253        /// Listener port.
254        listener_port: u16,
255        /// Session identifier.
256        session_id: u64,
257        /// Node address.
258        node: String,
259    },
260    /// A session was closed on a service listener.
261    SessionClosed {
262        /// Listener port.
263        listener_port: u16,
264        /// Session identifier.
265        session_id: u64,
266        /// Node address.
267        node: String,
268    },
269    /// Sessions were drained from a service listener.
270    SessionDrained {
271        /// Listener port.
272        listener_port: u16,
273        /// Number of sessions drained.
274        sessions_drained: u32,
275        /// Node address.
276        node: String,
277    },
278    /// A service was deployed with replicated instances.
279    ServiceDeployed {
280        /// Service name.
281        name: String,
282        /// Number of instances.
283        instance_count: u32,
284    },
285    /// A service instance changed state.
286    ServiceInstanceStateChanged {
287        /// Service name.
288        name: String,
289        /// Instance identifier.
290        instance_id: u64,
291        /// New state description.
292        state: String,
293    },
294    /// A planned service cutover started.
295    ServiceCutoverStarted {
296        /// Service name.
297        name: String,
298    },
299    /// A planned service cutover completed.
300    ServiceCutoverCompleted {
301        /// Service name.
302        name: String,
303    },
304    /// An unplanned service failover was triggered.
305    ServiceFailoverTriggered {
306        /// Service name.
307        name: String,
308        /// Reason for failover.
309        reason: String,
310    },
311    /// An unplanned service failover completed.
312    ServiceFailoverCompleted {
313        /// Service name.
314        name: String,
315    },
316    /// A service instance's ingress was fenced.
317    ServiceIngressFenced {
318        /// Service name.
319        name: String,
320        /// Instance identifier.
321        instance_id: u64,
322    },
323    /// A service was undeployed.
324    ServiceUndeployed {
325        /// Service name.
326        name: String,
327    },
328    /// A tasklet was submitted for execution.
329    TaskletSubmitted {
330        /// Unique tasklet identifier.
331        tasklet_id: u64,
332        /// Node address where the tasklet will execute.
333        node: String,
334        /// Runtime type that will execute this tasklet.
335        runtime_type: String,
336        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
337        trace_id: Option<String>,
338    },
339    /// A tasklet completed successfully.
340    TaskletCompleted {
341        /// Unique tasklet identifier.
342        tasklet_id: u64,
343        /// Status code (STATUS_OK).
344        status: u8,
345        /// Execution duration in microseconds.
346        duration_us: u64,
347        /// Output size in bytes.
348        output_bytes: u64,
349        /// Runtime type that executed this tasklet.
350        runtime_type: String,
351        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
352        trace_id: Option<String>,
353    },
354    /// A tasklet execution failed.
355    TaskletFailed {
356        /// Unique tasklet identifier.
357        tasklet_id: u64,
358        /// Status code (error code).
359        status: u8,
360        /// Execution duration in microseconds.
361        duration_us: u64,
362        /// Failure reason description.
363        reason: String,
364        /// Runtime type that executed this tasklet.
365        runtime_type: String,
366        /// W3C traceparent correlation ID (55 chars) for distributed tracing.
367        trace_id: Option<String>,
368    },
369    /// The active security profile at daemon startup.
370    SecurityProfileActive {
371        /// Profile mode (e.g. `"secure-default"` or `"trusted-fabric"`).
372        mode: String,
373    },
374    /// A scheduler admission request was approved.
375    AdmissionApproved {
376        /// Tenant identifier.
377        tenant_id: String,
378        /// Node address where resources were admitted.
379        node: String,
380        /// Kind of resource admitted.
381        resource_type: String,
382        /// Size of the admitted resource in bytes.
383        bytes: u64,
384        /// W3C traceparent correlation ID for distributed tracing.
385        trace_id: Option<String>,
386    },
387    /// A scheduler admission request was denied.
388    AdmissionDenied {
389        /// Tenant identifier.
390        tenant_id: String,
391        /// Kind of resource requested.
392        resource_type: String,
393        /// Reason for denial.
394        reason: String,
395        /// W3C traceparent correlation ID for distributed tracing.
396        trace_id: Option<String>,
397    },
398    /// A placement decision was made by the scheduler.
399    PlacementDecision {
400        /// Tenant identifier.
401        tenant_id: String,
402        /// Node address chosen for placement.
403        node: String,
404        /// Placement strategy used.
405        strategy: String,
406        /// Placement score (higher is better).
407        score: f64,
408        /// W3C traceparent correlation ID for distributed tracing.
409        trace_id: Option<String>,
410    },
411    /// A lease preemption was triggered by the scheduler.
412    PreemptionTriggered {
413        /// Lease identifier of the victim.
414        victim_lease_id: u64,
415        /// Tenant that owns the preempted lease.
416        victim_tenant: String,
417        /// Tenant that triggered the preemption.
418        preemptor_tenant: String,
419        /// Node address where preemption occurred.
420        node: String,
421        /// W3C traceparent correlation ID for distributed tracing.
422        trace_id: Option<String>,
423    },
424    /// A tenant exceeded their resource quota.
425    QuotaExceeded {
426        /// Tenant identifier.
427        tenant_id: String,
428        /// Kind of resource that exceeded quota.
429        resource_type: String,
430        /// Amount requested in bytes.
431        requested: u64,
432        /// Quota limit in bytes.
433        limit: u64,
434        /// W3C traceparent correlation ID for distributed tracing.
435        trace_id: Option<String>,
436    },
437    /// A capability token was minted by the scheduler.
438    TokenMinted {
439        /// Tenant identifier.
440        tenant_id: String,
441        /// Node address the token is scoped to.
442        node: String,
443        /// Token time-to-live in seconds.
444        ttl_secs: u32,
445        /// W3C traceparent correlation ID for distributed tracing.
446        trace_id: Option<String>,
447    },
448    /// A process (scenario) started execution.
449    ProcessStarted {
450        /// Process identifier (OS PID as string).
451        pid: String,
452        /// Scenario name.
453        scenario: String,
454        /// Comma-separated node addresses.
455        nodes: String,
456    },
457    /// A heartbeat from a running process.
458    ProcessHeartbeat {
459        /// Process identifier (OS PID as string).
460        pid: String,
461    },
462    /// A process (scenario) completed execution.
463    ProcessCompleted {
464        /// Process identifier (OS PID as string).
465        pid: String,
466        /// Exit code (0 = success).
467        exit_code: i32,
468        /// Duration in seconds.
469        duration_secs: u64,
470    },
471    /// The scheduler lost its election (meta-lease renewal failure, etc.).
472    SchedulerElectionLost {
473        /// Human-readable reason for the loss.
474        reason: String,
475        /// Leader epoch at the time of loss.
476        epoch: u64,
477    },
478    /// The scheduler failed to promote after acquiring the meta-lease.
479    SchedulerPromotionFailed {
480        /// Human-readable reason for the failure.
481        reason: String,
482        /// Leader epoch at the time of failure.
483        epoch: u64,
484    },
485    /// A stale leader was detected during fencing (local epoch < node epoch).
486    SchedulerStaleLeaderDetected {
487        /// Local epoch the scheduler believed was current.
488        local_epoch: u64,
489        /// Epoch observed on the node (higher than local).
490        node_epoch: u64,
491    },
492    /// The scheduler successfully promoted to Active.
493    SchedulerPromoted {
494        /// New leader epoch after promotion.
495        epoch: u64,
496        /// Number of nodes successfully fenced.
497        nodes_fenced: usize,
498        /// Time from meta-lease acquisition to Active state (milliseconds).
499        latency_ms: u64,
500    },
501}
502
503/// Resource types tracked by the observability system.
504///
505/// Maps to the grafOS resource kinds (memory, block storage, GPU, CPU).
506#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
507pub enum ResourceType {
508    /// Memory (DRAM, CXL-attached).
509    Mem,
510    /// Block storage (NVMe, SD).
511    Block,
512    /// GPU compute.
513    Gpu,
514    /// CPU compute.
515    Cpu,
516    /// Network (service listeners, sessions).
517    Net,
518}
519
520impl fmt::Display for ResourceType {
521    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
522        match self {
523            ResourceType::Mem => write!(f, "mem"),
524            ResourceType::Block => write!(f, "block"),
525            ResourceType::Gpu => write!(f, "gpu"),
526            ResourceType::Cpu => write!(f, "cpu"),
527            ResourceType::Net => write!(f, "net"),
528        }
529    }
530}
531
532/// Data-plane operation types.
533///
534/// Covers both memory (FBMU) and block (FBBU) data-plane operations,
535/// plus GPU and tasklet submission.
536#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
537pub enum OpType {
538    /// Memory read (FBMU READ).
539    Read,
540    /// Memory write (FBMU WRITE).
541    Write,
542    /// Block read (FBBU READ_BLOCK).
543    ReadBlock,
544    /// Block write (FBBU WRITE_BLOCK).
545    WriteBlock,
546    /// GPU kernel submission.
547    GpuSubmit,
548    /// Tasklet (lightweight compute) submission.
549    TaskletSubmit,
550}
551
552impl fmt::Display for OpType {
553    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
554        match self {
555            OpType::Read => write!(f, "read"),
556            OpType::Write => write!(f, "write"),
557            OpType::ReadBlock => write!(f, "read_block"),
558            OpType::WriteBlock => write!(f, "write_block"),
559            OpType::GpuSubmit => write!(f, "gpu_submit"),
560            OpType::TaskletSubmit => write!(f, "tasklet_submit"),
561        }
562    }
563}
564
565/// Rewrite plan phases.
566///
567/// Follows the grafOS rewrite lifecycle:
568/// Validate -> Stage -> [Canary] -> Cutover -> Cleanup -> Commit, with Rollback
569/// possible from any pre-Cleanup phase.
570#[derive(Debug, Clone, Copy, PartialEq, Eq)]
571pub enum RewritePhase {
572    /// Schema and constraint validation.
573    Validate,
574    /// Resources staged for cutover.
575    Stage,
576    /// Canary evaluation (optional).
577    Canary,
578    /// Atomic cutover to new bindings.
579    Cutover,
580    /// Post-cutover cleanup of old bindings.
581    Cleanup,
582    /// Durable audit and generation bumps.
583    Commit,
584    /// Rollback to previous state (failure path).
585    Rollback,
586}
587
588impl fmt::Display for RewritePhase {
589    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
590        match self {
591            RewritePhase::Validate => write!(f, "validate"),
592            RewritePhase::Stage => write!(f, "stage"),
593            RewritePhase::Canary => write!(f, "canary"),
594            RewritePhase::Cutover => write!(f, "cutover"),
595            RewritePhase::Cleanup => write!(f, "cleanup"),
596            RewritePhase::Commit => write!(f, "commit"),
597            RewritePhase::Rollback => write!(f, "rollback"),
598        }
599    }
600}
601
602impl fmt::Display for FabricEvent {
603    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
604        match self {
605            FabricEvent::LeaseAcquired {
606                resource_type,
607                lease_id,
608                node,
609                bytes,
610                trace_id,
611            } => {
612                write!(
613                    f,
614                    "lease_acquired: type={resource_type} id={lease_id} node={node} bytes={bytes}"
615                )?;
616                if let Some(tid) = trace_id {
617                    write!(f, " trace_id={tid}")?;
618                }
619                Ok(())
620            }
621            FabricEvent::LeaseDropped {
622                resource_type,
623                lease_id,
624                node,
625            } => write!(
626                f,
627                "lease_dropped: type={resource_type} id={lease_id} node={node}"
628            ),
629            FabricEvent::LeaseExpired {
630                resource_type,
631                lease_id,
632                node,
633            } => write!(
634                f,
635                "lease_expired: type={resource_type} id={lease_id} node={node}"
636            ),
637            FabricEvent::OpCompleted {
638                op_type,
639                duration_us,
640                bytes,
641            } => write!(
642                f,
643                "op_completed: op={op_type} duration_us={duration_us} bytes={bytes}"
644            ),
645            FabricEvent::OpFailed { op_type, error } => {
646                write!(f, "op_failed: op={op_type} error={error}")
647            }
648            FabricEvent::RewriteStarted { plan_id } => {
649                write!(f, "rewrite_started: plan_id={plan_id}")
650            }
651            FabricEvent::RewriteCompleted { plan_id, phase } => {
652                write!(f, "rewrite_completed: plan_id={plan_id} phase={phase}")
653            }
654            FabricEvent::ServiceRegistered { name, version } => {
655                write!(f, "service_registered: name={name} version={version}")
656            }
657            FabricEvent::ServiceDeregistered { name } => {
658                write!(f, "service_deregistered: name={name}")
659            }
660            FabricEvent::MessagePublished { topic, bytes } => {
661                write!(f, "message_published: topic={topic} bytes={bytes}")
662            }
663            FabricEvent::MessageConsumed { topic, group } => {
664                write!(f, "message_consumed: topic={topic} group={group}")
665            }
666            FabricEvent::ObjectStored { bucket, key, bytes } => {
667                write!(f, "object_stored: bucket={bucket} key={key} bytes={bytes}")
668            }
669            FabricEvent::ObjectRetrieved { bucket, key, bytes } => {
670                write!(
671                    f,
672                    "object_retrieved: bucket={bucket} key={key} bytes={bytes}"
673                )
674            }
675            FabricEvent::LeaseRevoked {
676                resource_type,
677                lease_id,
678                node,
679                trace_id,
680            } => {
681                write!(
682                    f,
683                    "lease_revoked: type={resource_type} id={lease_id} node={node}"
684                )?;
685                if let Some(tid) = trace_id {
686                    write!(f, " trace_id={tid}")?;
687                }
688                Ok(())
689            }
690            FabricEvent::LeaseFenced {
691                resource_type,
692                lease_id,
693                node,
694                reason,
695                trace_id,
696            } => {
697                write!(
698                    f,
699                    "lease_fenced: type={resource_type} id={lease_id} node={node} reason={reason}"
700                )?;
701                if let Some(tid) = trace_id {
702                    write!(f, " trace_id={tid}")?;
703                }
704                Ok(())
705            }
706            FabricEvent::TeardownFailed {
707                resource_type,
708                lease_id,
709                node,
710                error,
711            } => write!(
712                f,
713                "teardown_failed: type={resource_type} id={lease_id} node={node} error={error}"
714            ),
715            FabricEvent::AuthFailed {
716                node,
717                reason,
718                trace_id,
719            } => {
720                write!(f, "auth_failed: node={node} reason={reason}")?;
721                if let Some(tid) = trace_id {
722                    write!(f, " trace_id={tid}")?;
723                }
724                Ok(())
725            }
726            FabricEvent::ReplayRejected {
727                node,
728                nonce,
729                trace_id,
730            } => {
731                write!(f, "replay_rejected: node={node} nonce={nonce}")?;
732                if let Some(tid) = trace_id {
733                    write!(f, " trace_id={tid}")?;
734                }
735                Ok(())
736            }
737            FabricEvent::TokenValidationFailed { node, reason } => {
738                write!(f, "token_validation_failed: node={node} reason={reason}")
739            }
740            FabricEvent::ListenerAcquired {
741                port,
742                node,
743                lease_id,
744            } => write!(
745                f,
746                "listener_acquired: port={port} node={node} lease_id={lease_id}"
747            ),
748            FabricEvent::ListenerRevoked {
749                port,
750                node,
751                lease_id,
752            } => write!(
753                f,
754                "listener_revoked: port={port} node={node} lease_id={lease_id}"
755            ),
756            FabricEvent::ListenerFenced {
757                port,
758                node,
759                lease_id,
760                reason,
761            } => write!(
762                f,
763                "listener_fenced: port={port} node={node} lease_id={lease_id} reason={reason}"
764            ),
765            FabricEvent::SessionAccepted {
766                listener_port,
767                session_id,
768                node,
769            } => write!(
770                f,
771                "session_accepted: listener_port={listener_port} session_id={session_id} node={node}"
772            ),
773            FabricEvent::SessionClosed {
774                listener_port,
775                session_id,
776                node,
777            } => write!(
778                f,
779                "session_closed: listener_port={listener_port} session_id={session_id} node={node}"
780            ),
781            FabricEvent::SessionDrained {
782                listener_port,
783                sessions_drained,
784                node,
785            } => write!(
786                f,
787                "session_drained: listener_port={listener_port} sessions_drained={sessions_drained} node={node}"
788            ),
789            FabricEvent::ServiceDeployed {
790                name,
791                instance_count,
792            } => write!(
793                f,
794                "service_deployed: name={name} instance_count={instance_count}"
795            ),
796            FabricEvent::ServiceInstanceStateChanged {
797                name,
798                instance_id,
799                state,
800            } => write!(
801                f,
802                "service_instance_state_changed: name={name} instance_id={instance_id} state={state}"
803            ),
804            FabricEvent::ServiceCutoverStarted { name } => {
805                write!(f, "service_cutover_started: name={name}")
806            }
807            FabricEvent::ServiceCutoverCompleted { name } => {
808                write!(f, "service_cutover_completed: name={name}")
809            }
810            FabricEvent::ServiceFailoverTriggered { name, reason } => {
811                write!(
812                    f,
813                    "service_failover_triggered: name={name} reason={reason}"
814                )
815            }
816            FabricEvent::ServiceFailoverCompleted { name } => {
817                write!(f, "service_failover_completed: name={name}")
818            }
819            FabricEvent::ServiceIngressFenced { name, instance_id } => {
820                write!(
821                    f,
822                    "service_ingress_fenced: name={name} instance_id={instance_id}"
823                )
824            }
825            FabricEvent::ServiceUndeployed { name } => {
826                write!(f, "service_undeployed: name={name}")
827            }
828            FabricEvent::TaskletSubmitted {
829                tasklet_id,
830                node,
831                runtime_type,
832                trace_id,
833            } => {
834                write!(
835                    f,
836                    "tasklet_submitted: tasklet_id={tasklet_id} node={node} runtime_type={runtime_type}"
837                )?;
838                if let Some(tid) = trace_id {
839                    write!(f, " trace_id={tid}")?;
840                }
841                Ok(())
842            }
843            FabricEvent::TaskletCompleted {
844                tasklet_id,
845                status,
846                duration_us,
847                output_bytes,
848                runtime_type,
849                trace_id,
850            } => {
851                write!(
852                    f,
853                    "tasklet_completed: tasklet_id={tasklet_id} status={status} duration_us={duration_us} output_bytes={output_bytes} runtime_type={runtime_type}"
854                )?;
855                if let Some(tid) = trace_id {
856                    write!(f, " trace_id={tid}")?;
857                }
858                Ok(())
859            }
860            FabricEvent::TaskletFailed {
861                tasklet_id,
862                status,
863                duration_us,
864                reason,
865                runtime_type,
866                trace_id,
867            } => {
868                write!(
869                    f,
870                    "tasklet_failed: tasklet_id={tasklet_id} status={status} duration_us={duration_us} reason={reason} runtime_type={runtime_type}"
871                )?;
872                if let Some(tid) = trace_id {
873                    write!(f, " trace_id={tid}")?;
874                }
875                Ok(())
876            }
877            FabricEvent::SecurityProfileActive { mode } => {
878                write!(f, "security_profile_active: mode={mode}")
879            }
880            FabricEvent::AdmissionApproved {
881                tenant_id,
882                node,
883                resource_type,
884                bytes,
885                trace_id,
886            } => {
887                write!(
888                    f,
889                    "admission_approved: tenant={tenant_id} node={node} type={resource_type} bytes={bytes}"
890                )?;
891                if let Some(tid) = trace_id {
892                    write!(f, " trace_id={tid}")?;
893                }
894                Ok(())
895            }
896            FabricEvent::AdmissionDenied {
897                tenant_id,
898                resource_type,
899                reason,
900                trace_id,
901            } => {
902                write!(
903                    f,
904                    "admission_denied: tenant={tenant_id} type={resource_type} reason={reason}"
905                )?;
906                if let Some(tid) = trace_id {
907                    write!(f, " trace_id={tid}")?;
908                }
909                Ok(())
910            }
911            FabricEvent::PlacementDecision {
912                tenant_id,
913                node,
914                strategy,
915                score,
916                trace_id,
917            } => {
918                write!(
919                    f,
920                    "placement_decision: tenant={tenant_id} node={node} strategy={strategy} score={score}"
921                )?;
922                if let Some(tid) = trace_id {
923                    write!(f, " trace_id={tid}")?;
924                }
925                Ok(())
926            }
927            FabricEvent::PreemptionTriggered {
928                victim_lease_id,
929                victim_tenant,
930                preemptor_tenant,
931                node,
932                trace_id,
933            } => {
934                write!(
935                    f,
936                    "preemption_triggered: victim_lease_id={victim_lease_id} victim_tenant={victim_tenant} preemptor_tenant={preemptor_tenant} node={node}"
937                )?;
938                if let Some(tid) = trace_id {
939                    write!(f, " trace_id={tid}")?;
940                }
941                Ok(())
942            }
943            FabricEvent::QuotaExceeded {
944                tenant_id,
945                resource_type,
946                requested,
947                limit,
948                trace_id,
949            } => {
950                write!(
951                    f,
952                    "quota_exceeded: tenant={tenant_id} type={resource_type} requested={requested} limit={limit}"
953                )?;
954                if let Some(tid) = trace_id {
955                    write!(f, " trace_id={tid}")?;
956                }
957                Ok(())
958            }
959            FabricEvent::TokenMinted {
960                tenant_id,
961                node,
962                ttl_secs,
963                trace_id,
964            } => {
965                write!(
966                    f,
967                    "token_minted: tenant={tenant_id} node={node} ttl_secs={ttl_secs}"
968                )?;
969                if let Some(tid) = trace_id {
970                    write!(f, " trace_id={tid}")?;
971                }
972                Ok(())
973            }
974            FabricEvent::ProcessStarted {
975                pid,
976                scenario,
977                nodes,
978            } => {
979                write!(
980                    f,
981                    "process_started: pid={pid} scenario={scenario} nodes={nodes}"
982                )
983            }
984            FabricEvent::ProcessHeartbeat { pid } => {
985                write!(f, "process_heartbeat: pid={pid}")
986            }
987            FabricEvent::ProcessCompleted {
988                pid,
989                exit_code,
990                duration_secs,
991            } => {
992                write!(
993                    f,
994                    "process_completed: pid={pid} exit_code={exit_code} duration_secs={duration_secs}"
995                )
996            }
997            FabricEvent::SchedulerElectionLost { reason, epoch } => {
998                write!(
999                    f,
1000                    "scheduler_election_lost: reason={reason} epoch={epoch}"
1001                )
1002            }
1003            FabricEvent::SchedulerPromotionFailed { reason, epoch } => {
1004                write!(
1005                    f,
1006                    "scheduler_promotion_failed: reason={reason} epoch={epoch}"
1007                )
1008            }
1009            FabricEvent::SchedulerStaleLeaderDetected {
1010                local_epoch,
1011                node_epoch,
1012            } => {
1013                write!(
1014                    f,
1015                    "scheduler_stale_leader_detected: local_epoch={local_epoch} node_epoch={node_epoch}"
1016                )
1017            }
1018            FabricEvent::SchedulerPromoted {
1019                epoch,
1020                nodes_fenced,
1021                latency_ms,
1022            } => {
1023                write!(
1024                    f,
1025                    "scheduler_promoted: epoch={epoch} nodes_fenced={nodes_fenced} latency_ms={latency_ms}"
1026                )
1027            }
1028        }
1029    }
1030}
1031
1032/// Trait for consuming fabric events.
1033///
1034/// Implementations decide what to do with events — log them, store them,
1035/// forward them to an external system, or discard them.
1036///
1037/// Built-in implementations:
1038/// - [`NullSink`] — discards all events (default for `no_std`).
1039/// - [`StdoutSink`] — prints events to stdout (requires `std` feature).
1040/// - [`EventRingBuffer`] — no-op via this trait; use [`EventRingBuffer::push`] directly.
1041///
1042/// Optional feature-gated implementations:
1043/// - `JsonEventSink` — one-line JSON to stdout (requires `json-log` feature).
1044pub trait EventSink {
1045    /// Consume an event. Implementations should not block.
1046    fn emit(&self, event: &FabricEvent);
1047}
1048
1049/// A sink that discards all events. Default `no_std` sink.
1050pub struct NullSink;
1051
1052impl EventSink for NullSink {
1053    fn emit(&self, _event: &FabricEvent) {}
1054}
1055
1056/// A sink that prints events to stdout.
1057#[cfg(feature = "std")]
1058pub struct StdoutSink;
1059
1060#[cfg(feature = "std")]
1061impl EventSink for StdoutSink {
1062    fn emit(&self, event: &FabricEvent) {
1063        println!("[fabric] {event}");
1064    }
1065}
1066
1067/// Fixed-size ring buffer for storing recent events.
1068///
1069/// When the buffer is full, new events overwrite the oldest entries.
1070/// The default capacity is 1024 events.
1071///
1072/// # Examples
1073///
1074/// ```
1075/// use grafos_observe::{EventRingBuffer, FabricEvent, ResourceType};
1076///
1077/// let mut ring = EventRingBuffer::new(4);
1078/// ring.push(FabricEvent::LeaseAcquired {
1079///     resource_type: ResourceType::Mem,
1080///     lease_id: 1,
1081///     node: "node-a".into(),
1082///     bytes: 4096,
1083///     trace_id: None,
1084/// });
1085/// assert_eq!(ring.len(), 1);
1086///
1087/// // Iterate over stored events (oldest first)
1088/// for event in ring.iter() {
1089///     println!("{event}");
1090/// }
1091///
1092/// // Drain removes and returns all events
1093/// let events = ring.drain();
1094/// assert!(ring.is_empty());
1095/// ```
1096pub struct EventRingBuffer {
1097    buf: alloc::vec::Vec<Option<FabricEvent>>,
1098    /// Write position (wraps around).
1099    head: usize,
1100    /// Number of events currently stored (capped at capacity).
1101    len: usize,
1102}
1103
1104impl EventRingBuffer {
1105    /// Default ring buffer capacity.
1106    pub const DEFAULT_CAPACITY: usize = 1024;
1107
1108    /// Create a new ring buffer with the given capacity.
1109    pub fn new(capacity: usize) -> Self {
1110        let cap = if capacity == 0 { 1 } else { capacity };
1111        let mut buf = alloc::vec::Vec::with_capacity(cap);
1112        buf.resize_with(cap, || None);
1113        Self {
1114            buf,
1115            head: 0,
1116            len: 0,
1117        }
1118    }
1119
1120    /// Create a new ring buffer with the default capacity (1024).
1121    pub fn with_default_capacity() -> Self {
1122        Self::new(Self::DEFAULT_CAPACITY)
1123    }
1124
1125    /// Push an event into the ring buffer.
1126    ///
1127    /// If the buffer is full, the oldest event is overwritten.
1128    pub fn push(&mut self, event: FabricEvent) {
1129        self.buf[self.head] = Some(event);
1130        self.head = (self.head + 1) % self.buf.len();
1131        if self.len < self.buf.len() {
1132            self.len += 1;
1133        }
1134    }
1135
1136    /// Number of events currently stored.
1137    pub fn len(&self) -> usize {
1138        self.len
1139    }
1140
1141    /// Whether the buffer is empty.
1142    pub fn is_empty(&self) -> bool {
1143        self.len == 0
1144    }
1145
1146    /// Capacity of the ring buffer.
1147    pub fn capacity(&self) -> usize {
1148        self.buf.len()
1149    }
1150
1151    /// Iterate over stored events in order (oldest first).
1152    pub fn iter(&self) -> EventRingIter<'_> {
1153        let start = if self.len < self.buf.len() {
1154            0
1155        } else {
1156            self.head
1157        };
1158        EventRingIter {
1159            buf: &self.buf,
1160            pos: start,
1161            remaining: self.len,
1162        }
1163    }
1164
1165    /// Drain all events from the buffer, returning them in order (oldest first).
1166    ///
1167    /// The buffer is empty after this call.
1168    pub fn drain(&mut self) -> alloc::vec::Vec<FabricEvent> {
1169        let mut events = alloc::vec::Vec::with_capacity(self.len);
1170        let start = if self.len < self.buf.len() {
1171            0
1172        } else {
1173            self.head
1174        };
1175        for i in 0..self.len {
1176            let idx = (start + i) % self.buf.len();
1177            if let Some(ev) = self.buf[idx].take() {
1178                events.push(ev);
1179            }
1180        }
1181        self.head = 0;
1182        self.len = 0;
1183        events
1184    }
1185}
1186
1187impl EventSink for EventRingBuffer {
1188    fn emit(&self, event: &FabricEvent) {
1189        // EventSink takes &self but push needs &mut self.
1190        // For the ring buffer, callers should use push() directly.
1191        // This impl exists so the ring buffer can be used behind a
1192        // wrapper that provides interior mutability (e.g. Mutex).
1193        let _ = event;
1194    }
1195}
1196
1197/// Iterator over events in an [`EventRingBuffer`].
1198pub struct EventRingIter<'a> {
1199    buf: &'a [Option<FabricEvent>],
1200    pos: usize,
1201    remaining: usize,
1202}
1203
1204impl<'a> Iterator for EventRingIter<'a> {
1205    type Item = &'a FabricEvent;
1206
1207    fn next(&mut self) -> Option<Self::Item> {
1208        if self.remaining == 0 {
1209            return None;
1210        }
1211        let idx = self.pos % self.buf.len();
1212        self.pos += 1;
1213        self.remaining -= 1;
1214        self.buf[idx].as_ref()
1215    }
1216
1217    fn size_hint(&self) -> (usize, Option<usize>) {
1218        (self.remaining, Some(self.remaining))
1219    }
1220}
1221
1222impl<'a> ExactSizeIterator for EventRingIter<'a> {}
1223
1224#[cfg(test)]
1225mod tests {
1226    use super::*;
1227    use alloc::string::ToString;
1228
1229    fn sample_lease_acquired() -> FabricEvent {
1230        FabricEvent::LeaseAcquired {
1231            resource_type: ResourceType::Mem,
1232            lease_id: 1,
1233            node: "10.10.0.11".to_string(),
1234            bytes: 4096,
1235            trace_id: None,
1236        }
1237    }
1238
1239    fn sample_op_completed() -> FabricEvent {
1240        FabricEvent::OpCompleted {
1241            op_type: OpType::Write,
1242            duration_us: 500,
1243            bytes: 1024,
1244        }
1245    }
1246
1247    #[test]
1248    fn ring_buffer_push_and_len() {
1249        let mut rb = EventRingBuffer::new(4);
1250        assert!(rb.is_empty());
1251        assert_eq!(rb.len(), 0);
1252        assert_eq!(rb.capacity(), 4);
1253
1254        rb.push(sample_lease_acquired());
1255        assert_eq!(rb.len(), 1);
1256        assert!(!rb.is_empty());
1257    }
1258
1259    #[test]
1260    fn ring_buffer_overflow_wraps() {
1261        let mut rb = EventRingBuffer::new(2);
1262        rb.push(FabricEvent::RewriteStarted { plan_id: 1 });
1263        rb.push(FabricEvent::RewriteStarted { plan_id: 2 });
1264        rb.push(FabricEvent::RewriteStarted { plan_id: 3 });
1265
1266        // Capacity is 2, so len should be 2 (oldest was overwritten).
1267        assert_eq!(rb.len(), 2);
1268
1269        let events: alloc::vec::Vec<_> = rb.iter().collect();
1270        assert_eq!(events.len(), 2);
1271        // Oldest remaining should be plan_id=2.
1272        match &events[0] {
1273            FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 2),
1274            _ => panic!("unexpected event type"),
1275        }
1276        match &events[1] {
1277            FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 3),
1278            _ => panic!("unexpected event type"),
1279        }
1280    }
1281
1282    #[test]
1283    fn ring_buffer_drain() {
1284        let mut rb = EventRingBuffer::new(4);
1285        rb.push(sample_lease_acquired());
1286        rb.push(sample_op_completed());
1287        assert_eq!(rb.len(), 2);
1288
1289        let drained = rb.drain();
1290        assert_eq!(drained.len(), 2);
1291        assert!(rb.is_empty());
1292        assert_eq!(rb.len(), 0);
1293    }
1294
1295    #[test]
1296    fn ring_buffer_drain_after_overflow() {
1297        let mut rb = EventRingBuffer::new(2);
1298        rb.push(FabricEvent::RewriteStarted { plan_id: 1 });
1299        rb.push(FabricEvent::RewriteStarted { plan_id: 2 });
1300        rb.push(FabricEvent::RewriteStarted { plan_id: 3 });
1301
1302        let drained = rb.drain();
1303        assert_eq!(drained.len(), 2);
1304        match &drained[0] {
1305            FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 2),
1306            _ => panic!("unexpected event type"),
1307        }
1308        match &drained[1] {
1309            FabricEvent::RewriteStarted { plan_id } => assert_eq!(*plan_id, 3),
1310            _ => panic!("unexpected event type"),
1311        }
1312    }
1313
1314    #[test]
1315    fn ring_buffer_iter_exact_size() {
1316        let mut rb = EventRingBuffer::new(4);
1317        rb.push(sample_lease_acquired());
1318        rb.push(sample_op_completed());
1319        let iter = rb.iter();
1320        assert_eq!(iter.len(), 2);
1321    }
1322
1323    #[test]
1324    fn ring_buffer_default_capacity() {
1325        let rb = EventRingBuffer::with_default_capacity();
1326        assert_eq!(rb.capacity(), 1024);
1327        assert!(rb.is_empty());
1328    }
1329
1330    #[test]
1331    fn ring_buffer_zero_capacity_becomes_one() {
1332        let rb = EventRingBuffer::new(0);
1333        assert_eq!(rb.capacity(), 1);
1334    }
1335
1336    #[test]
1337    fn null_sink_does_not_panic() {
1338        let sink = NullSink;
1339        sink.emit(&sample_lease_acquired());
1340        sink.emit(&sample_op_completed());
1341    }
1342
1343    #[test]
1344    fn event_display() {
1345        let ev = sample_lease_acquired();
1346        let s = alloc::format!("{ev}");
1347        assert!(s.contains("lease_acquired"));
1348        assert!(s.contains("10.10.0.11"));
1349        assert!(s.contains("4096"));
1350
1351        let ev2 = FabricEvent::OpFailed {
1352            op_type: OpType::Read,
1353            error: "timeout".to_string(),
1354        };
1355        let s2 = alloc::format!("{ev2}");
1356        assert!(s2.contains("op_failed"));
1357        assert!(s2.contains("timeout"));
1358    }
1359
1360    #[test]
1361    fn resource_type_display() {
1362        assert_eq!(alloc::format!("{}", ResourceType::Mem), "mem");
1363        assert_eq!(alloc::format!("{}", ResourceType::Block), "block");
1364        assert_eq!(alloc::format!("{}", ResourceType::Gpu), "gpu");
1365        assert_eq!(alloc::format!("{}", ResourceType::Cpu), "cpu");
1366    }
1367
1368    #[test]
1369    fn op_type_display() {
1370        assert_eq!(alloc::format!("{}", OpType::Read), "read");
1371        assert_eq!(alloc::format!("{}", OpType::Write), "write");
1372        assert_eq!(alloc::format!("{}", OpType::ReadBlock), "read_block");
1373        assert_eq!(alloc::format!("{}", OpType::WriteBlock), "write_block");
1374        assert_eq!(alloc::format!("{}", OpType::GpuSubmit), "gpu_submit");
1375        assert_eq!(
1376            alloc::format!("{}", OpType::TaskletSubmit),
1377            "tasklet_submit"
1378        );
1379    }
1380
1381    #[test]
1382    fn rewrite_phase_display() {
1383        assert_eq!(alloc::format!("{}", RewritePhase::Validate), "validate");
1384        assert_eq!(alloc::format!("{}", RewritePhase::Stage), "stage");
1385        assert_eq!(alloc::format!("{}", RewritePhase::Canary), "canary");
1386        assert_eq!(alloc::format!("{}", RewritePhase::Cutover), "cutover");
1387        assert_eq!(alloc::format!("{}", RewritePhase::Cleanup), "cleanup");
1388        assert_eq!(alloc::format!("{}", RewritePhase::Commit), "commit");
1389        assert_eq!(alloc::format!("{}", RewritePhase::Rollback), "rollback");
1390    }
1391
1392    #[test]
1393    fn new_event_variants_display() {
1394        let ev = FabricEvent::LeaseRevoked {
1395            resource_type: ResourceType::Mem,
1396            lease_id: 10,
1397            node: "node-a".to_string(),
1398            trace_id: None,
1399        };
1400        let s = alloc::format!("{ev}");
1401        assert!(s.contains("lease_revoked"));
1402        assert!(s.contains("node-a"));
1403
1404        let ev = FabricEvent::LeaseFenced {
1405            resource_type: ResourceType::Block,
1406            lease_id: 11,
1407            node: "node-b".to_string(),
1408            reason: "teardown timeout".to_string(),
1409            trace_id: None,
1410        };
1411        let s = alloc::format!("{ev}");
1412        assert!(s.contains("lease_fenced"));
1413        assert!(s.contains("teardown timeout"));
1414
1415        let ev = FabricEvent::TeardownFailed {
1416            resource_type: ResourceType::Gpu,
1417            lease_id: 12,
1418            node: "node-c".to_string(),
1419            error: "connection lost".to_string(),
1420        };
1421        let s = alloc::format!("{ev}");
1422        assert!(s.contains("teardown_failed"));
1423        assert!(s.contains("connection lost"));
1424
1425        let ev = FabricEvent::AuthFailed {
1426            node: "node-d".to_string(),
1427            reason: "bad cert".to_string(),
1428            trace_id: None,
1429        };
1430        let s = alloc::format!("{ev}");
1431        assert!(s.contains("auth_failed"));
1432        assert!(s.contains("bad cert"));
1433
1434        let ev = FabricEvent::ReplayRejected {
1435            node: "node-e".to_string(),
1436            nonce: 12345,
1437            trace_id: None,
1438        };
1439        let s = alloc::format!("{ev}");
1440        assert!(s.contains("replay_rejected"));
1441        assert!(s.contains("12345"));
1442
1443        let ev = FabricEvent::TokenValidationFailed {
1444            node: "node-f".to_string(),
1445            reason: "expired".to_string(),
1446        };
1447        let s = alloc::format!("{ev}");
1448        assert!(s.contains("token_validation_failed"));
1449        assert!(s.contains("expired"));
1450    }
1451
1452    #[cfg(feature = "std")]
1453    #[test]
1454    fn stdout_sink_does_not_panic() {
1455        let sink = StdoutSink;
1456        sink.emit(&sample_lease_acquired());
1457    }
1458
1459    #[test]
1460    fn tasklet_event_variants_display() {
1461        let ev = FabricEvent::TaskletSubmitted {
1462            tasklet_id: 42,
1463            node: "node-a".to_string(),
1464            runtime_type: "linux-native-tasklet".to_string(),
1465            trace_id: None,
1466        };
1467        let s = alloc::format!("{ev}");
1468        assert!(s.contains("tasklet_submitted"));
1469        assert!(s.contains("42"));
1470
1471        let ev = FabricEvent::TaskletCompleted {
1472            tasklet_id: 42,
1473            status: 0,
1474            duration_us: 1500,
1475            output_bytes: 256,
1476            runtime_type: "linux-native-tasklet".to_string(),
1477            trace_id: None,
1478        };
1479        let s = alloc::format!("{ev}");
1480        assert!(s.contains("tasklet_completed"));
1481        assert!(s.contains("1500"));
1482
1483        let ev = FabricEvent::TaskletFailed {
1484            tasklet_id: 42,
1485            status: 3,
1486            duration_us: 500,
1487            reason: "fuel_exhausted".to_string(),
1488            runtime_type: "linux-native-tasklet".to_string(),
1489            trace_id: None,
1490        };
1491        let s = alloc::format!("{ev}");
1492        assert!(s.contains("tasklet_failed"));
1493        assert!(s.contains("fuel_exhausted"));
1494    }
1495
1496    #[cfg(feature = "std")]
1497    #[test]
1498    fn emit_event_without_sink_is_noop() {
1499        // Before any sink is registered, emit_event should not panic.
1500        emit_event(sample_lease_acquired());
1501    }
1502}