grafos_sync/
watch.rs

1//! Single-producer multi-consumer broadcast via shared leased memory.
2//!
3//! The [`watch()`] function creates a ([`WatchSender`], [`WatchReceiver`]) pair.
4//! The sender writes new values and bumps a version counter; receivers detect
5//! changes by comparing versions. Multiple receivers can independently track
6//! their own last-seen version.
7//!
8//! # Memory layout at `base_offset`
9//!
10//! ```text
11//! offset +0:  version    (u64, 8 bytes)  — incremented on each send
12//! offset +8:  data_len   (u32, 4 bytes)  — byte length of current value
13//! offset +12: data       ([u8])          — serialized current value
14//! ```
15//!
16//! Total header: 12 bytes before the user data.
17
18extern crate alloc;
19use alloc::vec::Vec;
20
21use grafos_std::error::Result;
22use grafos_std::mem::{FabricMem, MemLease};
23
24const VERSION_OFFSET: u64 = 0;
25const DATA_LEN_OFFSET: u64 = 8;
26const DATA_OFFSET: u64 = 12;
27
28fn read_u64(mem: &FabricMem, offset: u64) -> Result<u64> {
29    let data = mem.read(offset, 8)?;
30    if data.len() < 8 {
31        return Ok(0);
32    }
33    let mut buf = [0u8; 8];
34    buf.copy_from_slice(&data[..8]);
35    Ok(u64::from_le_bytes(buf))
36}
37
38fn write_u64(mem: &FabricMem, offset: u64, val: u64) -> Result<()> {
39    mem.write(offset, &val.to_le_bytes())
40}
41
42fn read_u32(mem: &FabricMem, offset: u64) -> Result<u32> {
43    let data = mem.read(offset, 4)?;
44    if data.len() < 4 {
45        return Ok(0);
46    }
47    let mut buf = [0u8; 4];
48    buf.copy_from_slice(&data[..4]);
49    Ok(u32::from_le_bytes(buf))
50}
51
52fn write_u32(mem: &FabricMem, offset: u64, val: u32) -> Result<()> {
53    mem.write(offset, &val.to_le_bytes())
54}
55
56/// Sending half of a fabric watch channel.
57///
58/// Writes new values to shared leased memory and increments the version
59/// counter so receivers can detect changes via [`WatchReceiver::changed`].
60pub struct WatchSender {
61    lease: MemLease,
62    base_offset: u64,
63}
64
65/// Receiving half of a fabric watch channel.
66///
67/// Reads the current value from shared memory and tracks the last-seen
68/// version to detect changes. Multiple receivers can be created from
69/// the same base offset (each tracking its own version independently).
70pub struct WatchReceiver {
71    lease: MemLease,
72    base_offset: u64,
73    last_version: u64,
74}
75
76/// Create a watch channel pair ([`WatchSender`], [`WatchReceiver`]) backed
77/// by leased memory.
78///
79/// Initializes version to 0 and stores `initial` as the current value.
80/// The sender can update the value with [`WatchSender::send`]; receivers
81/// detect changes via [`WatchReceiver::changed`] and read with
82/// [`WatchReceiver::recv`].
83///
84/// `sender_lease` and `receiver_lease` must point to the same underlying
85/// arena (or the same shared memory region) so that writes from the sender
86/// are visible to the receiver.
87///
88/// # Example
89///
90/// ```rust,no_run
91/// # grafos_std::host::reset_mock();
92/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
93/// use grafos_sync::watch;
94/// use grafos_std::mem::MemBuilder;
95///
96/// let sl = MemBuilder::new().acquire().unwrap();
97/// let rl = MemBuilder::new().acquire().unwrap();
98/// let (sender, mut receiver) = watch(sl, rl, 0, b"hello").unwrap();
99///
100/// let val = receiver.recv().unwrap();
101/// assert_eq!(&val, b"hello");
102///
103/// sender.send(b"world").unwrap();
104/// assert!(receiver.changed().unwrap());
105/// let val = receiver.recv().unwrap();
106/// assert_eq!(&val, b"world");
107/// ```
108pub fn watch(
109    sender_lease: MemLease,
110    receiver_lease: MemLease,
111    base_offset: u64,
112    initial: &[u8],
113) -> Result<(WatchSender, WatchReceiver)> {
114    let mem = sender_lease.mem();
115
116    // Initialize: version = 0, write initial value
117    write_u64(mem, base_offset + VERSION_OFFSET, 0)?;
118    write_u32(mem, base_offset + DATA_LEN_OFFSET, initial.len() as u32)?;
119    if !initial.is_empty() {
120        mem.write(base_offset + DATA_OFFSET, initial)?;
121    }
122
123    let sender = WatchSender {
124        lease: sender_lease,
125        base_offset,
126    };
127    let receiver = WatchReceiver {
128        lease: receiver_lease,
129        base_offset,
130        last_version: 0,
131    };
132    Ok((sender, receiver))
133}
134
135impl WatchSender {
136    /// Send a new value to all receivers.
137    ///
138    /// Writes the data to shared memory and increments the version counter.
139    /// Receivers will see [`changed()`](WatchReceiver::changed) return `true`
140    /// after this call.
141    pub fn send(&self, data: &[u8]) -> Result<()> {
142        let mem = self.lease.mem();
143        let base = self.base_offset;
144
145        // Write data, then length, then bump version (ordered for readers)
146        if !data.is_empty() {
147            mem.write(base + DATA_OFFSET, data)?;
148        }
149        write_u32(mem, base + DATA_LEN_OFFSET, data.len() as u32)?;
150
151        let ver = read_u64(mem, base + VERSION_OFFSET)?;
152        write_u64(mem, base + VERSION_OFFSET, ver + 1)?;
153
154        Ok(())
155    }
156
157    /// Read the current version counter.
158    ///
159    /// Starts at 0 and increments by one on each [`send`](Self::send) call.
160    pub fn version(&self) -> Result<u64> {
161        read_u64(self.lease.mem(), self.base_offset + VERSION_OFFSET)
162    }
163}
164
165impl WatchReceiver {
166    /// Read the current value from shared memory.
167    ///
168    /// Updates the receiver's tracked version to the current version, so
169    /// subsequent calls to [`changed`](Self::changed) will return `false`
170    /// until the sender publishes a new value.
171    pub fn recv(&mut self) -> Result<Vec<u8>> {
172        let mem = self.lease.mem();
173        let base = self.base_offset;
174
175        let ver = read_u64(mem, base + VERSION_OFFSET)?;
176        self.last_version = ver;
177
178        let data_len = read_u32(mem, base + DATA_LEN_OFFSET)?;
179        if data_len == 0 {
180            return Ok(Vec::new());
181        }
182        mem.read(base + DATA_OFFSET, data_len)
183    }
184
185    /// Check whether the value has changed since the last [`recv`](WatchReceiver::recv) call.
186    ///
187    /// Compares the current version in shared memory against the version
188    /// recorded during the last `recv()`. Does **not** update the tracked
189    /// version; call `recv()` to consume the change.
190    pub fn changed(&self) -> Result<bool> {
191        let ver = read_u64(self.lease.mem(), self.base_offset + VERSION_OFFSET)?;
192        Ok(ver != self.last_version)
193    }
194
195    /// Poll until the version changes, up to `max_polls` iterations.
196    ///
197    /// Combines [`changed`](Self::changed) and [`recv`](Self::recv) in a tight
198    /// loop. Returns the new value once a change is detected, or
199    /// [`grafos_std::error::FabricError::LeaseExpired`] if `max_polls` is exhausted without
200    /// observing a version change.
201    pub fn wait_for_change(&mut self, max_polls: u32) -> Result<Vec<u8>> {
202        for _ in 0..max_polls {
203            if self.changed()? {
204                return self.recv();
205            }
206        }
207        Err(grafos_std::error::FabricError::LeaseExpired)
208    }
209
210    /// Read the current version counter from shared memory.
211    ///
212    /// This returns the version stored in the arena, not the receiver's
213    /// last-seen version. Compare with the result of a previous `recv()` to
214    /// detect changes, or use [`changed`](Self::changed) for convenience.
215    pub fn version(&self) -> Result<u64> {
216        read_u64(self.lease.mem(), self.base_offset + VERSION_OFFSET)
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use grafos_std::host;
224    use grafos_std::mem::MemBuilder;
225
226    fn setup_pair() -> (MemLease, MemLease) {
227        host::reset_mock();
228        host::mock_set_fbmu_arena_size(4096);
229        let l1 = MemBuilder::new().acquire().expect("lease");
230        let l2 = l1.dup();
231        (l1, l2)
232    }
233
234    #[test]
235    fn watch_send_recv_roundtrip() {
236        let (sl, rl) = setup_pair();
237        let (sender, mut receiver) = watch(sl, rl, 0, b"init").expect("watch");
238
239        let val = receiver.recv().expect("recv initial");
240        assert_eq!(&val, b"init");
241
242        sender.send(b"updated").expect("send");
243        let val = receiver.recv().expect("recv updated");
244        assert_eq!(&val, b"updated");
245    }
246
247    #[test]
248    fn watch_changed_tracks_version() {
249        let (sl, rl) = setup_pair();
250        let (sender, mut receiver) = watch(sl, rl, 0, b"v0").expect("watch");
251
252        // Initial recv to sync version
253        let _ = receiver.recv().expect("initial recv");
254        assert!(!receiver.changed().unwrap());
255
256        // Send new value
257        sender.send(b"v1").expect("send");
258        assert!(receiver.changed().unwrap());
259
260        // After recv, changed should be false again
261        let _ = receiver.recv().expect("recv v1");
262        assert!(!receiver.changed().unwrap());
263    }
264
265    #[test]
266    fn watch_version_increments() {
267        let (sl, rl) = setup_pair();
268        let (sender, receiver) = watch(sl, rl, 0, b"start").expect("watch");
269
270        assert_eq!(sender.version().unwrap(), 0);
271        assert_eq!(receiver.version().unwrap(), 0);
272
273        sender.send(b"one").expect("send 1");
274        assert_eq!(sender.version().unwrap(), 1);
275
276        sender.send(b"two").expect("send 2");
277        assert_eq!(sender.version().unwrap(), 2);
278    }
279
280    #[test]
281    fn watch_empty_values() {
282        let (sl, rl) = setup_pair();
283        let (sender, mut receiver) = watch(sl, rl, 0, &[]).expect("watch");
284
285        let val = receiver.recv().expect("recv empty");
286        assert!(val.is_empty());
287
288        sender.send(b"data").expect("send data");
289        let val = receiver.recv().expect("recv data");
290        assert_eq!(&val, b"data");
291
292        sender.send(&[]).expect("send empty");
293        let val = receiver.recv().expect("recv empty again");
294        assert!(val.is_empty());
295    }
296}