grafos_collections/
queue.rs

1//! A bounded SPSC ring buffer stored in leased fabric memory.
2//!
3//! [`FabricQueue<T>`] implements a single-producer single-consumer FIFO queue
4//! backed by a leased memory region. Elements are serialized with postcard
5//! into fixed-size slots arranged in a circular buffer.
6//!
7//! # Memory layout
8//!
9//! ```text
10//! Offset 0:  [header]  head: u64 (8) | tail: u64 (8) | capacity: u64 (8) | stride: u64 (8)
11//! Offset 32: [slots]   slot 0 (stride bytes) | slot 1 | ... | slot (capacity-1)
12//! ```
13//!
14//! `head` is the next index to read from, `tail` is the next index to write
15//! to. The queue is empty when `head == tail` and full when
16//! `(tail + 1) % capacity == head`. One slot is always reserved for the
17//! empty/full distinction, so the usable capacity is `capacity - 1`.
18//!
19//! # Example
20//!
21//! ```rust
22//! use grafos_collections::queue::FabricQueue;
23//! use grafos_std::mem::MemBuilder;
24//!
25//! # grafos_std::host::reset_mock();
26//! # grafos_std::host::mock_set_fbmu_arena_size(65536);
27//! let lease = MemBuilder::new().min_bytes(4096).acquire()?;
28//! let mut q: FabricQueue<u32> = FabricQueue::new(lease, 16, 16)?;
29//!
30//! q.push(&1)?;
31//! q.push(&2)?;
32//! assert_eq!(q.pop()?, Some(1)); // FIFO order
33//! # Ok::<(), grafos_std::FabricError>(())
34//! ```
35
36extern crate alloc;
37use alloc::vec;
38
39use grafos_std::error::{FabricError, Result};
40use grafos_std::mem::{MemBuilder, MemLease};
41
42use serde::{de::DeserializeOwned, Serialize};
43
44const HEADER_SIZE: u64 = 32;
45
46/// A bounded SPSC ring buffer stored in leased fabric memory.
47///
48/// The queue owns its [`MemLease`] and releases it on drop. Elements are
49/// stored in FIFO order: `push()` appends to the tail, `pop()` reads from
50/// the head.
51///
52/// # Non-blocking semantics
53///
54/// - `push()` returns `Ok(false)` when the queue is full.
55/// - `pop()` returns `Ok(None)` when the queue is empty.
56///
57/// Neither operation blocks or retries.
58///
59/// # Future direction: MPMC
60///
61/// This queue is currently single-producer single-consumer. A future MPMC
62/// variant would replace the plain `head`/`tail` fields with
63/// compare-and-swap (CAS) atomic operations on the remote memory, or
64/// alternatively use a lock-based protocol with a lease-scoped mutex in
65/// the header region. MPMC support is not yet implemented.
66///
67/// # Example
68///
69/// ```rust
70/// use grafos_collections::queue::FabricQueue;
71///
72/// # grafos_std::host::reset_mock();
73/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
74/// let mut q: FabricQueue<u32> = FabricQueue::with_capacity(8, 16)?;
75/// q.push(&10)?;
76/// q.push(&20)?;
77/// assert_eq!(q.pop()?, Some(10));
78/// assert_eq!(q.pop()?, Some(20));
79/// assert_eq!(q.pop()?, None);
80/// # Ok::<(), grafos_std::FabricError>(())
81/// ```
82pub struct FabricQueue<T> {
83    lease: MemLease,
84    head: u64,
85    tail: u64,
86    capacity: u64,
87    stride: u64,
88    _marker: core::marker::PhantomData<T>,
89}
90
91impl<T: Serialize + DeserializeOwned> FabricQueue<T> {
92    /// Create a new queue over the given lease.
93    ///
94    /// `capacity` is the total number of slots. `stride` is the slot width
95    /// in bytes (must accommodate 4-byte length prefix plus serialized
96    /// element). One slot is reserved for the empty/full distinction, so
97    /// the usable capacity is `capacity - 1`.
98    ///
99    /// # Errors
100    ///
101    /// Returns [`FabricError::CapacityExceeded`] if the arena is too small
102    /// to hold the header (32 bytes) plus `capacity * stride` bytes.
103    pub fn new(lease: MemLease, capacity: usize, stride: usize) -> Result<Self> {
104        let arena = lease.mem().arena_size()?;
105        let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
106        if arena < needed {
107            return Err(FabricError::CapacityExceeded);
108        }
109        let q = FabricQueue {
110            lease,
111            head: 0,
112            tail: 0,
113            capacity: capacity as u64,
114            stride: stride as u64,
115            _marker: core::marker::PhantomData,
116        };
117        q.write_header()?;
118        Ok(q)
119    }
120
121    /// Create a new queue by acquiring a lease sized for `capacity` elements
122    /// of `stride` bytes each.
123    ///
124    /// Convenience constructor that acquires a lease via `MemBuilder` and
125    /// then calls [`FabricQueue::new`].
126    ///
127    /// # Errors
128    ///
129    /// Returns [`FabricError::CapacityExceeded`] if the host cannot provide
130    /// a large enough arena.
131    pub fn with_capacity(capacity: usize, stride: usize) -> Result<Self> {
132        let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
133        let lease = MemBuilder::new().min_bytes(needed).acquire()?;
134        Self::new(lease, capacity, stride)
135    }
136
137    /// Push an element onto the back of the queue.
138    ///
139    /// Returns `Ok(true)` if the element was enqueued, `Ok(false)` if the
140    /// queue is full. Does not block.
141    ///
142    /// # Errors
143    ///
144    /// - [`FabricError::CapacityExceeded`] if the serialized element
145    ///   (plus 4-byte length prefix) exceeds `stride`.
146    /// - [`FabricError::IoError`] if serialization or remote write fails.
147    pub fn push(&mut self, item: &T) -> Result<bool> {
148        if self.is_full() {
149            return Ok(false);
150        }
151        let bytes = postcard::to_allocvec(item).map_err(|_| FabricError::IoError(-1))?;
152        if bytes.len() as u64 + 4 > self.stride {
153            return Err(FabricError::CapacityExceeded);
154        }
155        let offset = self.slot_offset(self.tail);
156        let mut slot = vec![0u8; self.stride as usize];
157        let len_bytes = (bytes.len() as u32).to_le_bytes();
158        slot[..4].copy_from_slice(&len_bytes);
159        slot[4..4 + bytes.len()].copy_from_slice(&bytes);
160        self.lease.mem().write(offset, &slot)?;
161        self.tail = (self.tail + 1) % self.capacity;
162        self.write_header()?;
163        Ok(true)
164    }
165
166    /// Pop an element from the front of the queue.
167    ///
168    /// Returns `Ok(Some(item))` if an element was dequeued, `Ok(None)` if
169    /// the queue is empty. Does not block.
170    ///
171    /// # Errors
172    ///
173    /// Returns [`FabricError::IoError`] if the remote read or
174    /// deserialization fails.
175    pub fn pop(&mut self) -> Result<Option<T>> {
176        if self.is_empty() {
177            return Ok(None);
178        }
179        let offset = self.slot_offset(self.head);
180        let raw = self.lease.mem().read(offset, self.stride as u32)?;
181        let ser_len = u32::from_le_bytes([raw[0], raw[1], raw[2], raw[3]]) as usize;
182        let item: T =
183            postcard::from_bytes(&raw[4..4 + ser_len]).map_err(|_| FabricError::IoError(-1))?;
184        self.head = (self.head + 1) % self.capacity;
185        self.write_header()?;
186        Ok(Some(item))
187    }
188
189    /// Returns the number of elements currently in the queue.
190    pub fn len(&self) -> usize {
191        if self.tail >= self.head {
192            (self.tail - self.head) as usize
193        } else {
194            (self.capacity - self.head + self.tail) as usize
195        }
196    }
197
198    /// Returns `true` if the queue contains no elements.
199    pub fn is_empty(&self) -> bool {
200        self.head == self.tail
201    }
202
203    /// Returns `true` if the queue is full.
204    pub fn is_full(&self) -> bool {
205        (self.tail + 1) % self.capacity == self.head
206    }
207
208    /// Returns the lease ID of the memory lease for external renewal
209    /// management (e.g. via [`grafos_leasekit::RenewalManager`]).
210    pub fn lease_id(&self) -> u128 {
211        self.lease.lease_id()
212    }
213
214    /// Returns the expiry time (unix seconds) of the memory lease for
215    /// external renewal management.
216    pub fn expires_at_unix_secs(&self) -> u64 {
217        self.lease.expires_at_unix_secs()
218    }
219
220    /// Returns the maximum number of elements the queue can hold.
221    ///
222    /// This is `capacity - 1` since one slot is reserved for the
223    /// empty/full distinction.
224    pub fn max_len(&self) -> usize {
225        (self.capacity - 1) as usize
226    }
227
228    fn slot_offset(&self, index: u64) -> u64 {
229        HEADER_SIZE + index * self.stride
230    }
231
232    fn write_header(&self) -> Result<()> {
233        let mut hdr = [0u8; HEADER_SIZE as usize];
234        hdr[0..8].copy_from_slice(&self.head.to_le_bytes());
235        hdr[8..16].copy_from_slice(&self.tail.to_le_bytes());
236        hdr[16..24].copy_from_slice(&self.capacity.to_le_bytes());
237        hdr[24..32].copy_from_slice(&self.stride.to_le_bytes());
238        self.lease.mem().write(0, &hdr)
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use grafos_std::host;
246    use grafos_std::mem::MemBuilder;
247
248    fn setup(arena_size: u64) -> MemLease {
249        host::reset_mock();
250        host::mock_set_fbmu_arena_size(arena_size);
251        MemBuilder::new().acquire().expect("acquire")
252    }
253
254    #[test]
255    fn push_pop_fifo() {
256        let lease = setup(4096);
257        let mut q: FabricQueue<u32> = FabricQueue::new(lease, 16, 16).expect("new");
258
259        assert!(q.is_empty());
260        assert!(!q.is_full());
261        assert_eq!(q.len(), 0);
262
263        q.push(&1).expect("push");
264        q.push(&2).expect("push");
265        q.push(&3).expect("push");
266        assert_eq!(q.len(), 3);
267
268        assert_eq!(q.pop().expect("pop"), Some(1));
269        assert_eq!(q.pop().expect("pop"), Some(2));
270        assert_eq!(q.pop().expect("pop"), Some(3));
271        assert_eq!(q.pop().expect("pop"), None);
272    }
273
274    #[test]
275    fn full_returns_false() {
276        // capacity=4, stride=16, so 3 usable slots
277        let lease = setup(HEADER_SIZE + 4 * 16);
278        let mut q: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("new");
279
280        assert!(q.push(&1).expect("push 1"));
281        assert!(q.push(&2).expect("push 2"));
282        assert!(q.push(&3).expect("push 3"));
283        assert!(q.is_full());
284        assert!(!q.push(&4).expect("push 4 (full)"));
285    }
286
287    #[test]
288    fn wraparound() {
289        let lease = setup(HEADER_SIZE + 4 * 16);
290        let mut q: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("new");
291
292        // Fill and drain a few times to exercise wrap-around
293        for round in 0..3u32 {
294            let base = round * 100;
295            q.push(&(base + 1)).expect("push");
296            q.push(&(base + 2)).expect("push");
297            q.push(&(base + 3)).expect("push");
298            assert!(q.is_full());
299
300            assert_eq!(q.pop().expect("pop"), Some(base + 1));
301            assert_eq!(q.pop().expect("pop"), Some(base + 2));
302            assert_eq!(q.pop().expect("pop"), Some(base + 3));
303            assert!(q.is_empty());
304        }
305    }
306
307    #[test]
308    fn with_capacity_works() {
309        host::reset_mock();
310        host::mock_set_fbmu_arena_size(65536);
311        let q: FabricQueue<u64> = FabricQueue::with_capacity(32, 16).expect("with_capacity");
312        assert_eq!(q.max_len(), 31);
313    }
314
315    #[test]
316    fn interleaved_push_pop() {
317        let lease = setup(4096);
318        let mut q: FabricQueue<u32> = FabricQueue::new(lease, 8, 16).expect("new");
319
320        q.push(&10).expect("push");
321        q.push(&20).expect("push");
322        assert_eq!(q.pop().expect("pop"), Some(10));
323        q.push(&30).expect("push");
324        assert_eq!(q.pop().expect("pop"), Some(20));
325        assert_eq!(q.pop().expect("pop"), Some(30));
326        assert_eq!(q.pop().expect("pop"), None);
327    }
328}