1use alloc::collections::BTreeMap;
7use alloc::string::String;
8use alloc::vec::Vec;
9
10use grafos_observe::event::{OpType, ResourceType};
11
12use crate::recording::ProfileRecording;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum DataFlowNodeKind {
17 Span,
19 Lease,
21}
22
23#[derive(Debug, Clone)]
25pub struct DataFlowNode {
26 pub id: String,
28 pub label: String,
30 pub kind: DataFlowNodeKind,
32 pub resource_type: Option<ResourceType>,
34 pub size: u64,
36}
37
38#[derive(Debug, Clone)]
40pub struct DataFlowEdge {
41 pub from: String,
43 pub to: String,
45 pub bytes: u64,
47 pub label: String,
49}
50
51#[derive(Debug, Clone)]
53pub struct DataFlowDiagram {
54 pub nodes: Vec<DataFlowNode>,
56 pub edges: Vec<DataFlowEdge>,
58}
59
60impl DataFlowDiagram {
61 pub fn from_recording(rec: &ProfileRecording) -> Self {
67 let mut lease_writers: BTreeMap<u128, Vec<(String, u64)>> = BTreeMap::new();
70 let mut lease_readers: BTreeMap<u128, Vec<(String, u64)>> = BTreeMap::new();
71 let mut lease_resource_type: BTreeMap<u128, ResourceType> = BTreeMap::new();
72
73 let mut span_ops: BTreeMap<String, u64> = BTreeMap::new();
75
76 for span in &rec.spans {
77 let is_writer = span
78 .ops
79 .iter()
80 .any(|(k, _)| matches!(k.op_type, OpType::Write | OpType::WriteBlock));
81 let is_reader = span
82 .ops
83 .iter()
84 .any(|(k, _)| matches!(k.op_type, OpType::Read | OpType::ReadBlock));
85
86 let total_ops: u64 = span.ops.iter().map(|(_, c)| c).sum();
87 *span_ops.entry(span.name.clone()).or_insert(0) += total_ops;
88
89 let rt = span
91 .ops
92 .iter()
93 .max_by_key(|(_, c)| c)
94 .map(|(k, _)| k.resource_type);
95
96 for &lease_id in &span.lease_ids {
97 if let Some(r) = rt {
98 lease_resource_type.entry(lease_id).or_insert(r);
99 }
100
101 if is_writer {
102 lease_writers
103 .entry(lease_id)
104 .or_default()
105 .push((span.name.clone(), span.bytes_written));
106 }
107 if is_reader {
108 lease_readers
109 .entry(lease_id)
110 .or_default()
111 .push((span.name.clone(), span.bytes_read));
112 }
113 }
114 }
115
116 let mut nodes: Vec<DataFlowNode> = Vec::new();
118 let mut node_ids: BTreeMap<String, usize> = BTreeMap::new();
119
120 for (name, &ops) in &span_ops {
122 let id = alloc::format!("span:{}", name);
123 if !node_ids.contains_key(&id) {
124 node_ids.insert(id.clone(), nodes.len());
125 nodes.push(DataFlowNode {
126 id,
127 label: name.clone(),
128 kind: DataFlowNodeKind::Span,
129 resource_type: None,
130 size: ops,
131 });
132 }
133 }
134
135 let all_lease_ids: Vec<u128> = {
137 let mut ids: Vec<u128> = lease_writers
138 .keys()
139 .chain(lease_readers.keys())
140 .copied()
141 .collect();
142 ids.sort();
143 ids.dedup();
144 ids
145 };
146
147 for &lease_id in &all_lease_ids {
148 let id = alloc::format!("lease:{:x}", lease_id);
149 if !node_ids.contains_key(&id) {
150 let rt = lease_resource_type.get(&lease_id).copied();
151 node_ids.insert(id.clone(), nodes.len());
152 nodes.push(DataFlowNode {
153 id,
154 label: alloc::format!("lease {:x}", lease_id),
155 kind: DataFlowNodeKind::Lease,
156 resource_type: rt,
157 size: 0,
158 });
159 }
160 }
161
162 let mut edges: Vec<DataFlowEdge> = Vec::new();
164 let mut edge_map: BTreeMap<(String, String), u64> = BTreeMap::new();
166
167 for (&lease_id, writers) in &lease_writers {
168 let lease_node_id = alloc::format!("lease:{:x}", lease_id);
169 for (span_name, bytes) in writers {
170 let span_node_id = alloc::format!("span:{}", span_name);
171 let key = (span_node_id, lease_node_id.clone());
172 *edge_map.entry(key).or_insert(0) += bytes;
173 }
174 }
175
176 for (&lease_id, readers) in &lease_readers {
177 let lease_node_id = alloc::format!("lease:{:x}", lease_id);
178 for (span_name, bytes) in readers {
179 let span_node_id = alloc::format!("span:{}", span_name);
180 let key = (lease_node_id.clone(), span_node_id);
181 *edge_map.entry(key).or_insert(0) += bytes;
182 }
183 }
184
185 for ((from, to), bytes) in &edge_map {
186 edges.push(DataFlowEdge {
187 from: from.clone(),
188 to: to.clone(),
189 bytes: *bytes,
190 label: format_bytes(*bytes),
191 });
192 }
193
194 DataFlowDiagram { nodes, edges }
195 }
196
197 #[cfg(feature = "html")]
199 pub fn render_html(&self) -> String {
200 let mut html = String::new();
201 html.push_str("<!DOCTYPE html>\n<html>\n<head>\n");
202 html.push_str("<meta charset=\"utf-8\">\n");
203 html.push_str("<title>grafOS Data Flow Diagram</title>\n");
204 html.push_str("<style>\n");
205 html.push_str("body { font-family: monospace; margin: 0; padding: 20px; background: #1a1a2e; color: #eee; }\n");
206 html.push_str("h1 { font-size: 18px; }\n");
207 html.push_str("svg { border: 1px solid #333; background: #0d0d1a; }\n");
208 html.push_str(".node-span rect { stroke: #eee; stroke-width: 1; }\n");
209 html.push_str(".node-lease circle { stroke: #eee; stroke-width: 1; }\n");
210 html.push_str(".edge line { stroke: #666; }\n");
211 html.push_str(".edge-label { fill: #aaa; font-size: 10px; }\n");
212 html.push_str(".node-label { fill: #eee; font-size: 11px; text-anchor: middle; }\n");
213 html.push_str("</style>\n</head>\n<body>\n");
214 html.push_str("<h1>grafOS Data Flow Diagram</h1>\n");
215 html.push_str(&alloc::format!(
216 "<p>{} nodes, {} edges</p>\n",
217 self.nodes.len(),
218 self.edges.len()
219 ));
220
221 let svg_width = 900;
223 let svg_height = 600;
224
225 html.push_str(&alloc::format!(
226 "<svg width=\"{}\" height=\"{}\">\n",
227 svg_width,
228 svg_height
229 ));
230
231 let n = self.nodes.len().max(1);
233 let cols = ((n as f64).sqrt().ceil() as usize).max(1);
234
235 let mut node_positions: BTreeMap<String, (f64, f64)> = BTreeMap::new();
236 for (i, node) in self.nodes.iter().enumerate() {
237 let col = i % cols;
238 let row = i / cols;
239 let x = 80.0 + (col as f64) * 160.0;
240 let y = 80.0 + (row as f64) * 120.0;
241 node_positions.insert(node.id.clone(), (x, y));
242 }
243
244 for edge in &self.edges {
246 if let (Some(&(x1, y1)), Some(&(x2, y2))) =
247 (node_positions.get(&edge.from), node_positions.get(&edge.to))
248 {
249 let mx = (x1 + x2) / 2.0;
250 let my = (y1 + y2) / 2.0;
251 html.push_str(&alloc::format!(
252 "<g class=\"edge\"><line x1=\"{x1:.0}\" y1=\"{y1:.0}\" x2=\"{x2:.0}\" y2=\"{y2:.0}\"/>\
253 <text class=\"edge-label\" x=\"{mx:.0}\" y=\"{my:.0}\">{}</text></g>\n",
254 edge.label
255 ));
256 }
257 }
258
259 for node in &self.nodes {
261 if let Some(&(x, y)) = node_positions.get(&node.id) {
262 match node.kind {
263 DataFlowNodeKind::Span => {
264 html.push_str(&alloc::format!(
265 "<g class=\"node-span\"><rect x=\"{:.0}\" y=\"{:.0}\" width=\"120\" height=\"30\" rx=\"3\" fill=\"#2a2a4a\"/>\
266 <text class=\"node-label\" x=\"{:.0}\" y=\"{:.0}\">{}</text></g>\n",
267 x - 60.0, y - 15.0, x, y + 4.0, node.label
268 ));
269 }
270 DataFlowNodeKind::Lease => {
271 let color = match node.resource_type {
272 Some(ResourceType::Mem) => "#4a90d9",
273 Some(ResourceType::Block) => "#50c878",
274 Some(ResourceType::Gpu) => "#ff8c42",
275 Some(ResourceType::GpuMem) => "#d97706",
276 Some(ResourceType::Cpu) => "#9b59b6",
277 Some(ResourceType::Net) => "#e74c3c",
278 None => "#999",
279 };
280 html.push_str(&alloc::format!(
281 "<g class=\"node-lease\"><circle cx=\"{x:.0}\" cy=\"{y:.0}\" r=\"20\" fill=\"{color}\"/>\
282 <text class=\"node-label\" x=\"{x:.0}\" y=\"{:.0}\">{}</text></g>\n",
283 y + 35.0, node.label
284 ));
285 }
286 }
287 }
288 }
289
290 html.push_str("</svg>\n</body>\n</html>\n");
291 html
292 }
293}
294
295fn format_bytes(bytes: u64) -> String {
296 if bytes >= 1_073_741_824 {
297 alloc::format!("{:.1} GB", bytes as f64 / 1_073_741_824.0)
298 } else if bytes >= 1_048_576 {
299 alloc::format!("{:.1} MB", bytes as f64 / 1_048_576.0)
300 } else if bytes >= 1024 {
301 alloc::format!("{:.1} KB", bytes as f64 / 1024.0)
302 } else {
303 alloc::format!("{} B", bytes)
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use grafos_observe::event::{OpType, ResourceType};
311 use grafos_observe::span::ResourceSpan;
312 use grafos_observe::trace::TraceContext;
313
314 fn test_ctx() -> TraceContext {
315 let mut bytes = [0u8; 24];
316 for (i, b) in bytes.iter_mut().enumerate() {
317 *b = (i as u8).wrapping_add(0x42);
318 }
319 TraceContext::new_root(&bytes)
320 }
321
322 #[test]
323 fn writer_to_reader_edge() {
324 let ctx = test_ctx();
325 let c1 = ctx.child(&[0xAA; 8]);
326
327 let mut span_a = ResourceSpan::new("writer_a", ctx);
329 span_a.start_time_unix_us = 1000;
330 span_a.end_time_unix_us = 2000;
331 span_a.add_lease_id(0x42);
332 span_a.bytes_written = 4096;
333 span_a.record_op(ResourceType::Mem, OpType::Write, 1);
334
335 let mut span_b = ResourceSpan::new("reader_b", c1);
337 span_b.start_time_unix_us = 2000;
338 span_b.end_time_unix_us = 3000;
339 span_b.add_lease_id(0x42);
340 span_b.bytes_read = 4096;
341 span_b.record_op(ResourceType::Mem, OpType::Read, 1);
342
343 let rec = ProfileRecording::from_spans(vec![span_a, span_b]);
344 let diagram = DataFlowDiagram::from_recording(&rec);
345
346 assert_eq!(diagram.nodes.len(), 3);
348
349 let span_a_node = diagram
350 .nodes
351 .iter()
352 .find(|n| n.label == "writer_a")
353 .unwrap();
354 assert_eq!(span_a_node.kind, DataFlowNodeKind::Span);
355
356 let span_b_node = diagram
357 .nodes
358 .iter()
359 .find(|n| n.label == "reader_b")
360 .unwrap();
361 assert_eq!(span_b_node.kind, DataFlowNodeKind::Span);
362
363 let lease_node = diagram
364 .nodes
365 .iter()
366 .find(|n| n.kind == DataFlowNodeKind::Lease)
367 .unwrap();
368 assert_eq!(lease_node.resource_type, Some(ResourceType::Mem));
369
370 assert_eq!(diagram.edges.len(), 2);
372
373 let write_edge = diagram
374 .edges
375 .iter()
376 .find(|e| e.from.contains("writer_a"))
377 .unwrap();
378 assert!(write_edge.to.contains("lease:"));
379 assert_eq!(write_edge.bytes, 4096);
380
381 let read_edge = diagram
382 .edges
383 .iter()
384 .find(|e| e.to.contains("reader_b"))
385 .unwrap();
386 assert!(read_edge.from.contains("lease:"));
387 assert_eq!(read_edge.bytes, 4096);
388 }
389
390 #[test]
391 fn collapsed_same_name_spans() {
392 let ctx = test_ctx();
393 let c1 = ctx.child(&[0xAA; 8]);
394
395 let mut s1 = ResourceSpan::new("worker", ctx);
397 s1.add_lease_id(1);
398 s1.bytes_written = 100;
399 s1.record_op(ResourceType::Mem, OpType::Write, 1);
400
401 let mut s2 = ResourceSpan::new("worker", c1);
402 s2.add_lease_id(1);
403 s2.bytes_written = 200;
404 s2.record_op(ResourceType::Mem, OpType::Write, 1);
405
406 let rec = ProfileRecording::from_spans(vec![s1, s2]);
407 let diagram = DataFlowDiagram::from_recording(&rec);
408
409 let span_nodes: Vec<_> = diagram
411 .nodes
412 .iter()
413 .filter(|n| n.kind == DataFlowNodeKind::Span)
414 .collect();
415 assert_eq!(span_nodes.len(), 1);
416 assert_eq!(span_nodes[0].label, "worker");
417
418 let write_edge = diagram
420 .edges
421 .iter()
422 .find(|e| e.from.contains("worker"))
423 .unwrap();
424 assert_eq!(write_edge.bytes, 300);
425 }
426
427 #[cfg(feature = "html")]
428 #[test]
429 fn data_flow_html_structure() {
430 let ctx = test_ctx();
431 let mut span = ResourceSpan::new("test", ctx);
432 span.add_lease_id(1);
433 span.bytes_written = 100;
434 span.record_op(ResourceType::Mem, OpType::Write, 1);
435
436 let rec = ProfileRecording::from_spans(vec![span]);
437 let diagram = DataFlowDiagram::from_recording(&rec);
438 let html = diagram.render_html();
439
440 assert!(html.contains("<!DOCTYPE html>"));
441 assert!(html.contains("Data Flow Diagram"));
442 assert!(html.contains("</html>"));
443 }
444}