grafos_observe/
span.rs

1//! Resource-attributed spans for grafOS distributed tracing.
2//!
3//! A [`ResourceSpan`] extends standard OTel span data with grafOS-specific
4//! resource attribution: lease IDs, operation counts, byte throughput, and
5//! lease cost. This metadata makes grafOS traces strictly richer than
6//! standard application traces.
7
8use alloc::string::String;
9use alloc::vec::Vec;
10
11use crate::event::{OpType, ResourceType};
12use crate::trace::{SpanId, TraceContext, TraceId};
13
14/// Status of a completed span.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum SpanStatus {
17    /// The operation completed without error.
18    Ok,
19    /// The operation failed with the given error message.
20    Error(String),
21}
22
23/// Key for operation count tracking: (resource type, operation type).
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
25pub struct OpKey {
26    pub resource_type: ResourceType,
27    pub op_type: OpType,
28}
29
30/// A resource-attributed span carrying grafOS-specific metadata.
31///
32/// Populated automatically by lease API instrumentation hooks and the
33/// `#[grafos::instrument]` proc macro. Exported to OTel collectors via
34/// the `otlp` exporter (requires `otlp` feature).
35#[derive(Debug, Clone)]
36pub struct ResourceSpan {
37    /// Human-readable name for this span (usually the function name).
38    pub name: String,
39
40    /// Trace context (trace_id + span_id + flags).
41    pub trace_context: TraceContext,
42
43    /// Parent span ID (if this span has a parent in the same service).
44    pub parent_span_id: Option<SpanId>,
45
46    /// Start time as Unix microseconds.
47    pub start_time_unix_us: u64,
48
49    /// End time as Unix microseconds.
50    pub end_time_unix_us: u64,
51
52    /// Completion status.
53    pub status: SpanStatus,
54
55    // -- Resource attribution --
56    /// Lease IDs touched during this span.
57    pub lease_ids: Vec<u128>,
58
59    /// Operation counts by (resource_type, op_type).
60    pub ops: Vec<(OpKey, u64)>,
61
62    /// Total bytes read across all lease operations in this span.
63    pub bytes_read: u64,
64
65    /// Total bytes written across all lease operations in this span.
66    pub bytes_written: u64,
67
68    /// Sum of (leased_bytes * seconds_held) for all leases in this span.
69    pub lease_cost_byte_secs: u64,
70
71    /// Time spent waiting for lease acquisition, in microseconds.
72    pub lease_acquire_wait_us: u64,
73
74    /// User-defined key-value attributes.
75    pub attributes: Vec<(String, String)>,
76}
77
78impl ResourceSpan {
79    /// Create a new span with the given name and trace context.
80    pub fn new(name: &str, trace_context: TraceContext) -> Self {
81        ResourceSpan {
82            name: String::from(name),
83            trace_context,
84            parent_span_id: None,
85            start_time_unix_us: 0,
86            end_time_unix_us: 0,
87            status: SpanStatus::Ok,
88            lease_ids: Vec::new(),
89            ops: Vec::new(),
90            bytes_read: 0,
91            bytes_written: 0,
92            lease_cost_byte_secs: 0,
93            lease_acquire_wait_us: 0,
94            attributes: Vec::new(),
95        }
96    }
97
98    /// Duration in microseconds.
99    pub fn duration_us(&self) -> u64 {
100        self.end_time_unix_us
101            .saturating_sub(self.start_time_unix_us)
102    }
103
104    /// Record an operation.
105    pub fn record_op(&mut self, resource_type: ResourceType, op_type: OpType, count: u64) {
106        let key = OpKey {
107            resource_type,
108            op_type,
109        };
110        if let Some(entry) = self.ops.iter_mut().find(|(k, _)| *k == key) {
111            entry.1 += count;
112        } else {
113            self.ops.push((key, count));
114        }
115    }
116
117    /// Get the operation count for a specific (resource_type, op_type) pair.
118    pub fn op_count(&self, resource_type: ResourceType, op_type: OpType) -> u64 {
119        let key = OpKey {
120            resource_type,
121            op_type,
122        };
123        self.ops
124            .iter()
125            .find(|(k, _)| *k == key)
126            .map(|(_, c)| *c)
127            .unwrap_or(0)
128    }
129
130    /// Add a lease ID to the span's lease list.
131    pub fn add_lease_id(&mut self, lease_id: u128) {
132        if !self.lease_ids.contains(&lease_id) {
133            self.lease_ids.push(lease_id);
134        }
135    }
136
137    /// Add a user-defined attribute.
138    pub fn set_attribute(&mut self, key: &str, value: &str) {
139        if let Some(entry) = self.attributes.iter_mut().find(|(k, _)| k == key) {
140            entry.1 = String::from(value);
141        } else {
142            self.attributes
143                .push((String::from(key), String::from(value)));
144        }
145    }
146
147    /// Convenience: trace ID from the context.
148    pub fn trace_id(&self) -> TraceId {
149        self.trace_context.trace_id
150    }
151
152    /// Convenience: span ID from the context.
153    pub fn span_id(&self) -> SpanId {
154        self.trace_context.span_id
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::trace::TraceContext;
162
163    fn test_ctx() -> TraceContext {
164        let mut bytes = [0u8; 24];
165        for (i, b) in bytes.iter_mut().enumerate() {
166            *b = (i as u8).wrapping_add(0x42);
167        }
168        TraceContext::new_root(&bytes)
169    }
170
171    #[test]
172    fn new_span_has_correct_defaults() {
173        let ctx = test_ctx();
174        let span = ResourceSpan::new("test_op", ctx);
175        assert_eq!(span.name, "test_op");
176        assert_eq!(span.trace_id(), ctx.trace_id);
177        assert_eq!(span.status, SpanStatus::Ok);
178        assert!(span.lease_ids.is_empty());
179        assert!(span.ops.is_empty());
180        assert_eq!(span.bytes_read, 0);
181        assert_eq!(span.bytes_written, 0);
182    }
183
184    #[test]
185    fn duration_calculation() {
186        let mut span = ResourceSpan::new("d", test_ctx());
187        span.start_time_unix_us = 1000;
188        span.end_time_unix_us = 2500;
189        assert_eq!(span.duration_us(), 1500);
190    }
191
192    #[test]
193    fn record_op_accumulates() {
194        let mut span = ResourceSpan::new("ops", test_ctx());
195        span.record_op(ResourceType::Mem, OpType::Write, 5);
196        span.record_op(ResourceType::Mem, OpType::Write, 3);
197        span.record_op(ResourceType::Block, OpType::ReadBlock, 1);
198
199        assert_eq!(span.op_count(ResourceType::Mem, OpType::Write), 8);
200        assert_eq!(span.op_count(ResourceType::Block, OpType::ReadBlock), 1);
201        assert_eq!(span.op_count(ResourceType::Gpu, OpType::GpuSubmit), 0);
202    }
203
204    #[test]
205    fn add_lease_id_deduplicates() {
206        let mut span = ResourceSpan::new("leases", test_ctx());
207        span.add_lease_id(1);
208        span.add_lease_id(2);
209        span.add_lease_id(1); // duplicate
210        assert_eq!(span.lease_ids.len(), 2);
211    }
212
213    #[test]
214    fn set_attribute_updates_existing() {
215        let mut span = ResourceSpan::new("attrs", test_ctx());
216        span.set_attribute("env", "dev");
217        span.set_attribute("env", "prod");
218        assert_eq!(span.attributes.len(), 1);
219        assert_eq!(span.attributes[0].1, "prod");
220    }
221
222    #[test]
223    fn span_status_error() {
224        let mut span = ResourceSpan::new("fail", test_ctx());
225        span.status = SpanStatus::Error(String::from("connection refused"));
226        match &span.status {
227            SpanStatus::Error(msg) => assert_eq!(msg, "connection refused"),
228            _ => panic!("expected error status"),
229        }
230    }
231}