1extern crate alloc;
19use alloc::vec::Vec;
20
21use grafos_std::error::Result;
22use grafos_std::mem::{FabricMem, MemLease};
23
24const VERSION_OFFSET: u64 = 0;
25const DATA_LEN_OFFSET: u64 = 8;
26const DATA_OFFSET: u64 = 12;
27
28fn read_u64(mem: &FabricMem, offset: u64) -> Result<u64> {
29 let data = mem.read(offset, 8)?;
30 if data.len() < 8 {
31 return Ok(0);
32 }
33 let mut buf = [0u8; 8];
34 buf.copy_from_slice(&data[..8]);
35 Ok(u64::from_le_bytes(buf))
36}
37
38fn write_u64(mem: &FabricMem, offset: u64, val: u64) -> Result<()> {
39 mem.write(offset, &val.to_le_bytes())
40}
41
42fn read_u32(mem: &FabricMem, offset: u64) -> Result<u32> {
43 let data = mem.read(offset, 4)?;
44 if data.len() < 4 {
45 return Ok(0);
46 }
47 let mut buf = [0u8; 4];
48 buf.copy_from_slice(&data[..4]);
49 Ok(u32::from_le_bytes(buf))
50}
51
52fn write_u32(mem: &FabricMem, offset: u64, val: u32) -> Result<()> {
53 mem.write(offset, &val.to_le_bytes())
54}
55
56pub struct WatchSender {
61 lease: MemLease,
62 base_offset: u64,
63}
64
65pub struct WatchReceiver {
71 lease: MemLease,
72 base_offset: u64,
73 last_version: u64,
74}
75
76pub fn watch(
109 sender_lease: MemLease,
110 receiver_lease: MemLease,
111 base_offset: u64,
112 initial: &[u8],
113) -> Result<(WatchSender, WatchReceiver)> {
114 let mem = sender_lease.mem();
115
116 write_u64(mem, base_offset + VERSION_OFFSET, 0)?;
118 write_u32(mem, base_offset + DATA_LEN_OFFSET, initial.len() as u32)?;
119 if !initial.is_empty() {
120 mem.write(base_offset + DATA_OFFSET, initial)?;
121 }
122
123 let sender = WatchSender {
124 lease: sender_lease,
125 base_offset,
126 };
127 let receiver = WatchReceiver {
128 lease: receiver_lease,
129 base_offset,
130 last_version: 0,
131 };
132 Ok((sender, receiver))
133}
134
135impl WatchSender {
136 pub fn send(&self, data: &[u8]) -> Result<()> {
142 let mem = self.lease.mem();
143 let base = self.base_offset;
144
145 if !data.is_empty() {
147 mem.write(base + DATA_OFFSET, data)?;
148 }
149 write_u32(mem, base + DATA_LEN_OFFSET, data.len() as u32)?;
150
151 let ver = read_u64(mem, base + VERSION_OFFSET)?;
152 write_u64(mem, base + VERSION_OFFSET, ver + 1)?;
153
154 Ok(())
155 }
156
157 pub fn version(&self) -> Result<u64> {
161 read_u64(self.lease.mem(), self.base_offset + VERSION_OFFSET)
162 }
163}
164
165impl WatchReceiver {
166 pub fn recv(&mut self) -> Result<Vec<u8>> {
172 let mem = self.lease.mem();
173 let base = self.base_offset;
174
175 let ver = read_u64(mem, base + VERSION_OFFSET)?;
176 self.last_version = ver;
177
178 let data_len = read_u32(mem, base + DATA_LEN_OFFSET)?;
179 if data_len == 0 {
180 return Ok(Vec::new());
181 }
182 mem.read(base + DATA_OFFSET, data_len)
183 }
184
185 pub fn changed(&self) -> Result<bool> {
191 let ver = read_u64(self.lease.mem(), self.base_offset + VERSION_OFFSET)?;
192 Ok(ver != self.last_version)
193 }
194
195 pub fn wait_for_change(&mut self, max_polls: u32) -> Result<Vec<u8>> {
202 for _ in 0..max_polls {
203 if self.changed()? {
204 return self.recv();
205 }
206 }
207 Err(grafos_std::error::FabricError::LeaseExpired)
208 }
209
210 pub fn version(&self) -> Result<u64> {
216 read_u64(self.lease.mem(), self.base_offset + VERSION_OFFSET)
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use grafos_std::host;
224 use grafos_std::mem::MemBuilder;
225
226 fn setup_pair() -> (MemLease, MemLease) {
227 host::reset_mock();
228 host::mock_set_fbmu_arena_size(4096);
229 let l1 = MemBuilder::new().acquire().expect("lease");
230 let l2 = l1.dup();
231 (l1, l2)
232 }
233
234 #[test]
235 fn watch_send_recv_roundtrip() {
236 let (sl, rl) = setup_pair();
237 let (sender, mut receiver) = watch(sl, rl, 0, b"init").expect("watch");
238
239 let val = receiver.recv().expect("recv initial");
240 assert_eq!(&val, b"init");
241
242 sender.send(b"updated").expect("send");
243 let val = receiver.recv().expect("recv updated");
244 assert_eq!(&val, b"updated");
245 }
246
247 #[test]
248 fn watch_changed_tracks_version() {
249 let (sl, rl) = setup_pair();
250 let (sender, mut receiver) = watch(sl, rl, 0, b"v0").expect("watch");
251
252 let _ = receiver.recv().expect("initial recv");
254 assert!(!receiver.changed().unwrap());
255
256 sender.send(b"v1").expect("send");
258 assert!(receiver.changed().unwrap());
259
260 let _ = receiver.recv().expect("recv v1");
262 assert!(!receiver.changed().unwrap());
263 }
264
265 #[test]
266 fn watch_version_increments() {
267 let (sl, rl) = setup_pair();
268 let (sender, receiver) = watch(sl, rl, 0, b"start").expect("watch");
269
270 assert_eq!(sender.version().unwrap(), 0);
271 assert_eq!(receiver.version().unwrap(), 0);
272
273 sender.send(b"one").expect("send 1");
274 assert_eq!(sender.version().unwrap(), 1);
275
276 sender.send(b"two").expect("send 2");
277 assert_eq!(sender.version().unwrap(), 2);
278 }
279
280 #[test]
281 fn watch_empty_values() {
282 let (sl, rl) = setup_pair();
283 let (sender, mut receiver) = watch(sl, rl, 0, &[]).expect("watch");
284
285 let val = receiver.recv().expect("recv empty");
286 assert!(val.is_empty());
287
288 sender.send(b"data").expect("send data");
289 let val = receiver.recv().expect("recv data");
290 assert_eq!(&val, b"data");
291
292 sender.send(&[]).expect("send empty");
293 let val = receiver.recv().expect("recv empty again");
294 assert!(val.is_empty());
295 }
296}