grafos_profile/
data_flow.rs

1//! Data-flow diagram — auto-generated from observed read/write patterns across leases.
2//!
3//! Infers a DAG of `(span-name) --[writes N bytes]--> (lease) --[reads M bytes]--> (span-name)`
4//! from the recording's per-span lease IDs and operation types.
5
6use alloc::collections::BTreeMap;
7use alloc::string::String;
8use alloc::vec::Vec;
9
10use grafos_observe::event::{OpType, ResourceType};
11
12use crate::recording::ProfileRecording;
13
14/// Kind of node in the data-flow graph.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum DataFlowNodeKind {
17    /// A span (operation) node.
18    Span,
19    /// A lease (resource) node.
20    Lease,
21}
22
23/// A node in the data-flow diagram.
24#[derive(Debug, Clone)]
25pub struct DataFlowNode {
26    /// Unique identifier for this node.
27    pub id: String,
28    /// Display label.
29    pub label: String,
30    /// Kind of node (span or lease).
31    pub kind: DataFlowNodeKind,
32    /// Resource type (for lease nodes).
33    pub resource_type: Option<ResourceType>,
34    /// Size metric (bytes for leases, ops for spans).
35    pub size: u64,
36}
37
38/// A directed edge in the data-flow diagram.
39#[derive(Debug, Clone)]
40pub struct DataFlowEdge {
41    /// Source node ID.
42    pub from: String,
43    /// Target node ID.
44    pub to: String,
45    /// Bytes transferred along this edge.
46    pub bytes: u64,
47    /// Edge label (e.g. "writes 4096 bytes").
48    pub label: String,
49}
50
51/// Data-flow diagram inferred from a recording.
52#[derive(Debug, Clone)]
53pub struct DataFlowDiagram {
54    /// All nodes in the graph.
55    pub nodes: Vec<DataFlowNode>,
56    /// All edges in the graph.
57    pub edges: Vec<DataFlowEdge>,
58}
59
60impl DataFlowDiagram {
61    /// Build a data-flow diagram from a profile recording.
62    ///
63    /// For each lease, finds all spans that wrote to it and all spans that read
64    /// from it. Creates directed edges: writer-span -> lease -> reader-span.
65    /// Spans with the same name are collapsed into single nodes.
66    pub fn from_recording(rec: &ProfileRecording) -> Self {
67        // Track per-lease writers and readers
68        // Key: lease_id, Value: { writers: [(span_name, bytes)], readers: [(span_name, bytes)] }
69        let mut lease_writers: BTreeMap<u128, Vec<(String, u64)>> = BTreeMap::new();
70        let mut lease_readers: BTreeMap<u128, Vec<(String, u64)>> = BTreeMap::new();
71        let mut lease_resource_type: BTreeMap<u128, ResourceType> = BTreeMap::new();
72
73        // Track span total ops for sizing
74        let mut span_ops: BTreeMap<String, u64> = BTreeMap::new();
75
76        for span in &rec.spans {
77            let is_writer = span
78                .ops
79                .iter()
80                .any(|(k, _)| matches!(k.op_type, OpType::Write | OpType::WriteBlock));
81            let is_reader = span
82                .ops
83                .iter()
84                .any(|(k, _)| matches!(k.op_type, OpType::Read | OpType::ReadBlock));
85
86            let total_ops: u64 = span.ops.iter().map(|(_, c)| c).sum();
87            *span_ops.entry(span.name.clone()).or_insert(0) += total_ops;
88
89            // Infer resource type from ops
90            let rt = span
91                .ops
92                .iter()
93                .max_by_key(|(_, c)| c)
94                .map(|(k, _)| k.resource_type);
95
96            for &lease_id in &span.lease_ids {
97                if let Some(r) = rt {
98                    lease_resource_type.entry(lease_id).or_insert(r);
99                }
100
101                if is_writer {
102                    lease_writers
103                        .entry(lease_id)
104                        .or_default()
105                        .push((span.name.clone(), span.bytes_written));
106                }
107                if is_reader {
108                    lease_readers
109                        .entry(lease_id)
110                        .or_default()
111                        .push((span.name.clone(), span.bytes_read));
112                }
113            }
114        }
115
116        // Build nodes
117        let mut nodes: Vec<DataFlowNode> = Vec::new();
118        let mut node_ids: BTreeMap<String, usize> = BTreeMap::new();
119
120        // Add span nodes (collapsed by name)
121        for (name, &ops) in &span_ops {
122            let id = alloc::format!("span:{}", name);
123            if !node_ids.contains_key(&id) {
124                node_ids.insert(id.clone(), nodes.len());
125                nodes.push(DataFlowNode {
126                    id,
127                    label: name.clone(),
128                    kind: DataFlowNodeKind::Span,
129                    resource_type: None,
130                    size: ops,
131                });
132            }
133        }
134
135        // Add lease nodes
136        let all_lease_ids: Vec<u128> = {
137            let mut ids: Vec<u128> = lease_writers
138                .keys()
139                .chain(lease_readers.keys())
140                .copied()
141                .collect();
142            ids.sort();
143            ids.dedup();
144            ids
145        };
146
147        for &lease_id in &all_lease_ids {
148            let id = alloc::format!("lease:{:x}", lease_id);
149            if !node_ids.contains_key(&id) {
150                let rt = lease_resource_type.get(&lease_id).copied();
151                node_ids.insert(id.clone(), nodes.len());
152                nodes.push(DataFlowNode {
153                    id,
154                    label: alloc::format!("lease {:x}", lease_id),
155                    kind: DataFlowNodeKind::Lease,
156                    resource_type: rt,
157                    size: 0,
158                });
159            }
160        }
161
162        // Build edges
163        let mut edges: Vec<DataFlowEdge> = Vec::new();
164        // Track aggregated edges: (from, to) -> total_bytes
165        let mut edge_map: BTreeMap<(String, String), u64> = BTreeMap::new();
166
167        for (&lease_id, writers) in &lease_writers {
168            let lease_node_id = alloc::format!("lease:{:x}", lease_id);
169            for (span_name, bytes) in writers {
170                let span_node_id = alloc::format!("span:{}", span_name);
171                let key = (span_node_id, lease_node_id.clone());
172                *edge_map.entry(key).or_insert(0) += bytes;
173            }
174        }
175
176        for (&lease_id, readers) in &lease_readers {
177            let lease_node_id = alloc::format!("lease:{:x}", lease_id);
178            for (span_name, bytes) in readers {
179                let span_node_id = alloc::format!("span:{}", span_name);
180                let key = (lease_node_id.clone(), span_node_id);
181                *edge_map.entry(key).or_insert(0) += bytes;
182            }
183        }
184
185        for ((from, to), bytes) in &edge_map {
186            edges.push(DataFlowEdge {
187                from: from.clone(),
188                to: to.clone(),
189                bytes: *bytes,
190                label: format_bytes(*bytes),
191            });
192        }
193
194        DataFlowDiagram { nodes, edges }
195    }
196
197    /// Render as self-contained HTML with force-directed layout.
198    #[cfg(feature = "html")]
199    pub fn render_html(&self) -> String {
200        let mut html = String::new();
201        html.push_str("<!DOCTYPE html>\n<html>\n<head>\n");
202        html.push_str("<meta charset=\"utf-8\">\n");
203        html.push_str("<title>grafOS Data Flow Diagram</title>\n");
204        html.push_str("<style>\n");
205        html.push_str("body { font-family: monospace; margin: 0; padding: 20px; background: #1a1a2e; color: #eee; }\n");
206        html.push_str("h1 { font-size: 18px; }\n");
207        html.push_str("svg { border: 1px solid #333; background: #0d0d1a; }\n");
208        html.push_str(".node-span rect { stroke: #eee; stroke-width: 1; }\n");
209        html.push_str(".node-lease circle { stroke: #eee; stroke-width: 1; }\n");
210        html.push_str(".edge line { stroke: #666; }\n");
211        html.push_str(".edge-label { fill: #aaa; font-size: 10px; }\n");
212        html.push_str(".node-label { fill: #eee; font-size: 11px; text-anchor: middle; }\n");
213        html.push_str("</style>\n</head>\n<body>\n");
214        html.push_str("<h1>grafOS Data Flow Diagram</h1>\n");
215        html.push_str(&alloc::format!(
216            "<p>{} nodes, {} edges</p>\n",
217            self.nodes.len(),
218            self.edges.len()
219        ));
220
221        // Simple grid layout (force-directed would need JS library; keep it minimal)
222        let svg_width = 900;
223        let svg_height = 600;
224
225        html.push_str(&alloc::format!(
226            "<svg width=\"{}\" height=\"{}\">\n",
227            svg_width,
228            svg_height
229        ));
230
231        // Position nodes in a simple grid
232        let n = self.nodes.len().max(1);
233        let cols = ((n as f64).sqrt().ceil() as usize).max(1);
234
235        let mut node_positions: BTreeMap<String, (f64, f64)> = BTreeMap::new();
236        for (i, node) in self.nodes.iter().enumerate() {
237            let col = i % cols;
238            let row = i / cols;
239            let x = 80.0 + (col as f64) * 160.0;
240            let y = 80.0 + (row as f64) * 120.0;
241            node_positions.insert(node.id.clone(), (x, y));
242        }
243
244        // Draw edges
245        for edge in &self.edges {
246            if let (Some(&(x1, y1)), Some(&(x2, y2))) =
247                (node_positions.get(&edge.from), node_positions.get(&edge.to))
248            {
249                let mx = (x1 + x2) / 2.0;
250                let my = (y1 + y2) / 2.0;
251                html.push_str(&alloc::format!(
252                    "<g class=\"edge\"><line x1=\"{x1:.0}\" y1=\"{y1:.0}\" x2=\"{x2:.0}\" y2=\"{y2:.0}\"/>\
253                     <text class=\"edge-label\" x=\"{mx:.0}\" y=\"{my:.0}\">{}</text></g>\n",
254                    edge.label
255                ));
256            }
257        }
258
259        // Draw nodes
260        for node in &self.nodes {
261            if let Some(&(x, y)) = node_positions.get(&node.id) {
262                match node.kind {
263                    DataFlowNodeKind::Span => {
264                        html.push_str(&alloc::format!(
265                            "<g class=\"node-span\"><rect x=\"{:.0}\" y=\"{:.0}\" width=\"120\" height=\"30\" rx=\"3\" fill=\"#2a2a4a\"/>\
266                             <text class=\"node-label\" x=\"{:.0}\" y=\"{:.0}\">{}</text></g>\n",
267                            x - 60.0, y - 15.0, x, y + 4.0, node.label
268                        ));
269                    }
270                    DataFlowNodeKind::Lease => {
271                        let color = match node.resource_type {
272                            Some(ResourceType::Mem) => "#4a90d9",
273                            Some(ResourceType::Block) => "#50c878",
274                            Some(ResourceType::Gpu) => "#ff8c42",
275                            Some(ResourceType::Cpu) => "#9b59b6",
276                            Some(ResourceType::Net) => "#e74c3c",
277                            None => "#999",
278                        };
279                        html.push_str(&alloc::format!(
280                            "<g class=\"node-lease\"><circle cx=\"{x:.0}\" cy=\"{y:.0}\" r=\"20\" fill=\"{color}\"/>\
281                             <text class=\"node-label\" x=\"{x:.0}\" y=\"{:.0}\">{}</text></g>\n",
282                            y + 35.0, node.label
283                        ));
284                    }
285                }
286            }
287        }
288
289        html.push_str("</svg>\n</body>\n</html>\n");
290        html
291    }
292}
293
294fn format_bytes(bytes: u64) -> String {
295    if bytes >= 1_073_741_824 {
296        alloc::format!("{:.1} GB", bytes as f64 / 1_073_741_824.0)
297    } else if bytes >= 1_048_576 {
298        alloc::format!("{:.1} MB", bytes as f64 / 1_048_576.0)
299    } else if bytes >= 1024 {
300        alloc::format!("{:.1} KB", bytes as f64 / 1024.0)
301    } else {
302        alloc::format!("{} B", bytes)
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use grafos_observe::event::{OpType, ResourceType};
310    use grafos_observe::span::ResourceSpan;
311    use grafos_observe::trace::TraceContext;
312
313    fn test_ctx() -> TraceContext {
314        let mut bytes = [0u8; 24];
315        for (i, b) in bytes.iter_mut().enumerate() {
316            *b = (i as u8).wrapping_add(0x42);
317        }
318        TraceContext::new_root(&bytes)
319    }
320
321    #[test]
322    fn writer_to_reader_edge() {
323        let ctx = test_ctx();
324        let c1 = ctx.child(&[0xAA; 8]);
325
326        // Span A writes to lease X
327        let mut span_a = ResourceSpan::new("writer_a", ctx);
328        span_a.start_time_unix_us = 1000;
329        span_a.end_time_unix_us = 2000;
330        span_a.add_lease_id(0x42);
331        span_a.bytes_written = 4096;
332        span_a.record_op(ResourceType::Mem, OpType::Write, 1);
333
334        // Span B reads from lease X
335        let mut span_b = ResourceSpan::new("reader_b", c1);
336        span_b.start_time_unix_us = 2000;
337        span_b.end_time_unix_us = 3000;
338        span_b.add_lease_id(0x42);
339        span_b.bytes_read = 4096;
340        span_b.record_op(ResourceType::Mem, OpType::Read, 1);
341
342        let rec = ProfileRecording::from_spans(vec![span_a, span_b]);
343        let diagram = DataFlowDiagram::from_recording(&rec);
344
345        // Should have 3 nodes: writer_a (span), lease 42, reader_b (span)
346        assert_eq!(diagram.nodes.len(), 3);
347
348        let span_a_node = diagram
349            .nodes
350            .iter()
351            .find(|n| n.label == "writer_a")
352            .unwrap();
353        assert_eq!(span_a_node.kind, DataFlowNodeKind::Span);
354
355        let span_b_node = diagram
356            .nodes
357            .iter()
358            .find(|n| n.label == "reader_b")
359            .unwrap();
360        assert_eq!(span_b_node.kind, DataFlowNodeKind::Span);
361
362        let lease_node = diagram
363            .nodes
364            .iter()
365            .find(|n| n.kind == DataFlowNodeKind::Lease)
366            .unwrap();
367        assert_eq!(lease_node.resource_type, Some(ResourceType::Mem));
368
369        // Should have 2 edges: writer_a -> lease, lease -> reader_b
370        assert_eq!(diagram.edges.len(), 2);
371
372        let write_edge = diagram
373            .edges
374            .iter()
375            .find(|e| e.from.contains("writer_a"))
376            .unwrap();
377        assert!(write_edge.to.contains("lease:"));
378        assert_eq!(write_edge.bytes, 4096);
379
380        let read_edge = diagram
381            .edges
382            .iter()
383            .find(|e| e.to.contains("reader_b"))
384            .unwrap();
385        assert!(read_edge.from.contains("lease:"));
386        assert_eq!(read_edge.bytes, 4096);
387    }
388
389    #[test]
390    fn collapsed_same_name_spans() {
391        let ctx = test_ctx();
392        let c1 = ctx.child(&[0xAA; 8]);
393
394        // Two spans with the same name
395        let mut s1 = ResourceSpan::new("worker", ctx);
396        s1.add_lease_id(1);
397        s1.bytes_written = 100;
398        s1.record_op(ResourceType::Mem, OpType::Write, 1);
399
400        let mut s2 = ResourceSpan::new("worker", c1);
401        s2.add_lease_id(1);
402        s2.bytes_written = 200;
403        s2.record_op(ResourceType::Mem, OpType::Write, 1);
404
405        let rec = ProfileRecording::from_spans(vec![s1, s2]);
406        let diagram = DataFlowDiagram::from_recording(&rec);
407
408        // Only one "worker" span node (collapsed)
409        let span_nodes: Vec<_> = diagram
410            .nodes
411            .iter()
412            .filter(|n| n.kind == DataFlowNodeKind::Span)
413            .collect();
414        assert_eq!(span_nodes.len(), 1);
415        assert_eq!(span_nodes[0].label, "worker");
416
417        // Edge should have aggregated bytes: 100 + 200 = 300
418        let write_edge = diagram
419            .edges
420            .iter()
421            .find(|e| e.from.contains("worker"))
422            .unwrap();
423        assert_eq!(write_edge.bytes, 300);
424    }
425
426    #[cfg(feature = "html")]
427    #[test]
428    fn data_flow_html_structure() {
429        let ctx = test_ctx();
430        let mut span = ResourceSpan::new("test", ctx);
431        span.add_lease_id(1);
432        span.bytes_written = 100;
433        span.record_op(ResourceType::Mem, OpType::Write, 1);
434
435        let rec = ProfileRecording::from_spans(vec![span]);
436        let diagram = DataFlowDiagram::from_recording(&rec);
437        let html = diagram.render_html();
438
439        assert!(html.contains("<!DOCTYPE html>"));
440        assert!(html.contains("Data Flow Diagram"));
441        assert!(html.contains("</html>"));
442    }
443}