grafos_observe/
context.rs1use alloc::collections::BTreeSet;
9use alloc::string::String;
10use alloc::vec::Vec;
11
12use crate::event::ResourceType;
13
14#[derive(Debug, Clone)]
16pub struct LeaseInfo {
17 pub resource_type: ResourceType,
19 pub lease_id: u64,
21 pub node_addr: String,
23 pub bytes_held: u64,
25}
26
27#[derive(Debug, Clone, Default)]
29pub struct OpCounters {
30 pub fbmu_writes: u64,
31 pub fbmu_reads: u64,
32 pub fbbu_writes: u64,
33 pub fbbu_reads: u64,
34 pub gpu_submits: u64,
35 pub tasklet_submits: u64,
36}
37
38#[derive(Debug, Clone)]
40pub struct ContextSnapshot {
41 pub lease_count: usize,
42 pub total_bytes_held: u64,
43 pub nodes_involved: usize,
44 pub ops: OpCounters,
45}
46
47pub struct ResourceContext {
83 active_leases: Vec<LeaseInfo>,
84 ops: OpCounters,
85}
86
87impl ResourceContext {
88 fn new() -> Self {
89 Self {
90 active_leases: Vec::new(),
91 ops: OpCounters::default(),
92 }
93 }
94
95 pub fn push_lease(&mut self, info: LeaseInfo) {
97 self.active_leases.push(info);
98 }
99
100 pub fn pop_lease(&mut self, lease_id: u64) {
102 if let Some(pos) = self
103 .active_leases
104 .iter()
105 .position(|l| l.lease_id == lease_id)
106 {
107 self.active_leases.remove(pos);
108 }
109 }
110
111 pub fn lease_count(&self) -> usize {
113 self.active_leases.len()
114 }
115
116 pub fn total_bytes_held(&self) -> u64 {
118 self.active_leases.iter().map(|l| l.bytes_held).sum()
119 }
120
121 pub fn nodes_involved(&self) -> usize {
123 let nodes: BTreeSet<&str> = self
124 .active_leases
125 .iter()
126 .map(|l| l.node_addr.as_str())
127 .collect();
128 nodes.len()
129 }
130
131 pub fn active_leases(&self) -> &[LeaseInfo] {
133 &self.active_leases
134 }
135
136 pub fn ops_mut(&mut self) -> &mut OpCounters {
138 &mut self.ops
139 }
140
141 pub fn ops(&self) -> &OpCounters {
143 &self.ops
144 }
145
146 pub fn snapshot(&self) -> ContextSnapshot {
148 ContextSnapshot {
149 lease_count: self.lease_count(),
150 total_bytes_held: self.total_bytes_held(),
151 nodes_involved: self.nodes_involved(),
152 ops: self.ops.clone(),
153 }
154 }
155
156 pub fn record_fbmu_write(&mut self) {
158 self.ops.fbmu_writes += 1;
159 }
160
161 pub fn record_fbmu_read(&mut self) {
163 self.ops.fbmu_reads += 1;
164 }
165
166 pub fn record_fbbu_write(&mut self) {
168 self.ops.fbbu_writes += 1;
169 }
170
171 pub fn record_fbbu_read(&mut self) {
173 self.ops.fbbu_reads += 1;
174 }
175
176 pub fn record_gpu_submit(&mut self) {
178 self.ops.gpu_submits += 1;
179 }
180
181 pub fn record_tasklet_submit(&mut self) {
183 self.ops.tasklet_submits += 1;
184 }
185}
186
187#[cfg(feature = "std")]
188mod thread_local_ctx {
189 use super::{ContextSnapshot, ResourceContext};
190 use std::cell::RefCell;
191
192 thread_local! {
193 static CONTEXT: RefCell<ResourceContext> = RefCell::new(ResourceContext::new());
194 }
195
196 impl ResourceContext {
197 pub fn with<F, R>(f: F) -> R
199 where
200 F: FnOnce(&ResourceContext) -> R,
201 {
202 CONTEXT.with(|ctx| f(&ctx.borrow()))
203 }
204
205 pub fn with_mut<F, R>(f: F) -> R
207 where
208 F: FnOnce(&mut ResourceContext) -> R,
209 {
210 CONTEXT.with(|ctx| f(&mut ctx.borrow_mut()))
211 }
212
213 pub fn current_snapshot() -> ContextSnapshot {
215 Self::with(|ctx| ctx.snapshot())
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn push_pop_lease() {
226 ResourceContext::with_mut(|ctx| {
227 while ctx.lease_count() > 0 {
229 let id = ctx.active_leases()[0].lease_id;
230 ctx.pop_lease(id);
231 }
232
233 ctx.push_lease(LeaseInfo {
234 resource_type: ResourceType::Mem,
235 lease_id: 1,
236 node_addr: "10.10.0.11".into(),
237 bytes_held: 4096,
238 });
239 ctx.push_lease(LeaseInfo {
240 resource_type: ResourceType::Block,
241 lease_id: 2,
242 node_addr: "10.10.0.12".into(),
243 bytes_held: 8192,
244 });
245 ctx.push_lease(LeaseInfo {
246 resource_type: ResourceType::Mem,
247 lease_id: 3,
248 node_addr: "10.10.0.11".into(),
249 bytes_held: 2048,
250 });
251
252 assert_eq!(ctx.lease_count(), 3);
253 assert_eq!(ctx.total_bytes_held(), 4096 + 8192 + 2048);
254 assert_eq!(ctx.nodes_involved(), 2);
255
256 ctx.pop_lease(2);
257 assert_eq!(ctx.lease_count(), 2);
258 assert_eq!(ctx.total_bytes_held(), 4096 + 2048);
259 assert_eq!(ctx.nodes_involved(), 1);
260
261 ctx.pop_lease(1);
263 ctx.pop_lease(3);
264 assert_eq!(ctx.lease_count(), 0);
265 assert_eq!(ctx.total_bytes_held(), 0);
266 assert_eq!(ctx.nodes_involved(), 0);
267 });
268 }
269
270 #[test]
271 fn snapshot_captures_state() {
272 ResourceContext::with_mut(|ctx| {
273 while ctx.lease_count() > 0 {
274 let id = ctx.active_leases()[0].lease_id;
275 ctx.pop_lease(id);
276 }
277
278 ctx.push_lease(LeaseInfo {
279 resource_type: ResourceType::Mem,
280 lease_id: 10,
281 node_addr: "node-a".into(),
282 bytes_held: 1024,
283 });
284 ctx.record_fbmu_write();
285 ctx.record_fbmu_write();
286 ctx.record_fbmu_read();
287 });
288
289 let snap = ResourceContext::current_snapshot();
290 assert!(snap.lease_count >= 1);
291 assert!(snap.total_bytes_held >= 1024);
292 assert!(snap.ops.fbmu_writes >= 2);
293 assert!(snap.ops.fbmu_reads >= 1);
294
295 ResourceContext::with_mut(|ctx| {
297 ctx.pop_lease(10);
298 });
299 }
300
301 #[test]
302 fn op_counters() {
303 ResourceContext::with_mut(|ctx| {
304 let ops = ctx.ops_mut();
305 ops.fbmu_writes = 0;
306 ops.fbmu_reads = 0;
307 ops.fbbu_writes = 0;
308 ops.fbbu_reads = 0;
309 ops.gpu_submits = 0;
310 ops.tasklet_submits = 0;
311
312 ctx.record_fbmu_write();
313 ctx.record_fbmu_read();
314 ctx.record_fbbu_write();
315 ctx.record_fbbu_read();
316 ctx.record_gpu_submit();
317 ctx.record_tasklet_submit();
318
319 assert_eq!(ctx.ops().fbmu_writes, 1);
320 assert_eq!(ctx.ops().fbmu_reads, 1);
321 assert_eq!(ctx.ops().fbbu_writes, 1);
322 assert_eq!(ctx.ops().fbbu_reads, 1);
323 assert_eq!(ctx.ops().gpu_submits, 1);
324 assert_eq!(ctx.ops().tasklet_submits, 1);
325 });
326 }
327
328 #[test]
329 fn pop_nonexistent_lease_is_noop() {
330 ResourceContext::with_mut(|ctx| {
331 let before = ctx.lease_count();
332 ctx.pop_lease(999999);
333 assert_eq!(ctx.lease_count(), before);
334 });
335 }
336}