grafos_collections/
queue.rs

1//! A bounded SPSC ring buffer stored in leased fabric memory.
2//!
3//! [`FabricQueue<T>`] implements a single-producer single-consumer FIFO queue
4//! backed by a leased memory region. Elements are serialized with postcard
5//! into fixed-size slots arranged in a circular buffer.
6//!
7//! # Memory layout
8//!
9//! ```text
10//! Offset 0:  [header]  head: u64 (8) | tail: u64 (8) | capacity: u64 (8) | stride: u64 (8)
11//! Offset 32: [slots]   slot 0 (stride bytes) | slot 1 | ... | slot (capacity-1)
12//! ```
13//!
14//! `head` is the next index to read from, `tail` is the next index to write
15//! to. The queue is empty when `head == tail` and full when
16//! `(tail + 1) % capacity == head`. One slot is always reserved for the
17//! empty/full distinction, so the usable capacity is `capacity - 1`.
18//!
19//! # Example
20//!
21//! ```rust
22//! use grafos_collections::queue::FabricQueue;
23//! use grafos_std::mem::MemBuilder;
24//!
25//! # grafos_std::host::reset_mock();
26//! # grafos_std::host::mock_set_fbmu_arena_size(65536);
27//! let lease = MemBuilder::new().min_bytes(4096).acquire()?;
28//! let mut q: FabricQueue<u32> = FabricQueue::new(lease, 16, 16)?;
29//!
30//! q.push(&1)?;
31//! q.push(&2)?;
32//! assert_eq!(q.pop()?, Some(1)); // FIFO order
33//! # Ok::<(), grafos_std::FabricError>(())
34//! ```
35
36extern crate alloc;
37use alloc::vec;
38
39use grafos_std::error::{FabricError, Result};
40use grafos_std::mem::{MemBuilder, MemLease};
41
42use serde::{de::DeserializeOwned, Serialize};
43
44const HEADER_SIZE: u64 = 32;
45
46/// A bounded SPSC ring buffer stored in leased fabric memory.
47///
48/// The queue owns its [`MemLease`] and releases it on drop. Elements are
49/// stored in FIFO order: `push()` appends to the tail, `pop()` reads from
50/// the head.
51///
52/// `FabricQueue` is lease-scoped. It does not provide placement, quorum,
53/// replica-set membership, or cross-domain recovery by itself; use
54/// `grafos_replicated` for a logical replicated queue.
55///
56/// # Non-blocking semantics
57///
58/// - `push()` returns `Ok(false)` when the queue is full.
59/// - `pop()` returns `Ok(None)` when the queue is empty.
60///
61/// Neither operation blocks or retries.
62///
63/// # Future direction: MPMC
64///
65/// This queue is currently single-producer single-consumer. A future MPMC
66/// variant would replace the plain `head`/`tail` fields with
67/// compare-and-swap (CAS) atomic operations on the remote memory, or
68/// alternatively use a lock-based protocol with a lease-scoped mutex in
69/// the header region. MPMC support is not yet implemented.
70///
71/// # Example
72///
73/// ```rust
74/// use grafos_collections::queue::FabricQueue;
75///
76/// # grafos_std::host::reset_mock();
77/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
78/// let mut q: FabricQueue<u32> = FabricQueue::with_capacity(8, 16)?;
79/// q.push(&10)?;
80/// q.push(&20)?;
81/// assert_eq!(q.pop()?, Some(10));
82/// assert_eq!(q.pop()?, Some(20));
83/// assert_eq!(q.pop()?, None);
84/// # Ok::<(), grafos_std::FabricError>(())
85/// ```
86pub struct FabricQueue<T> {
87    lease: MemLease,
88    head: u64,
89    tail: u64,
90    capacity: u64,
91    stride: u64,
92    _marker: core::marker::PhantomData<T>,
93}
94
95impl<T: Serialize + DeserializeOwned> FabricQueue<T> {
96    /// Create a new queue over the given lease.
97    ///
98    /// `capacity` is the total number of slots. `stride` is the slot width
99    /// in bytes (must accommodate 4-byte length prefix plus serialized
100    /// element). One slot is reserved for the empty/full distinction, so
101    /// the usable capacity is `capacity - 1`.
102    ///
103    /// # Errors
104    ///
105    /// Returns [`FabricError::CapacityExceeded`] if the arena is too small
106    /// to hold the header (32 bytes) plus `capacity * stride` bytes.
107    pub fn new(lease: MemLease, capacity: usize, stride: usize) -> Result<Self> {
108        let arena = lease.mem().arena_size()?;
109        let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
110        if arena < needed {
111            return Err(FabricError::CapacityExceeded);
112        }
113        let q = FabricQueue {
114            lease,
115            head: 0,
116            tail: 0,
117            capacity: capacity as u64,
118            stride: stride as u64,
119            _marker: core::marker::PhantomData,
120        };
121        q.write_header()?;
122        Ok(q)
123    }
124
125    /// Create a new queue by acquiring a lease sized for `capacity` elements
126    /// of `stride` bytes each.
127    ///
128    /// Convenience constructor that acquires a lease via `MemBuilder` and
129    /// then calls [`FabricQueue::new`].
130    ///
131    /// # Errors
132    ///
133    /// Returns [`FabricError::CapacityExceeded`] if the host cannot provide
134    /// a large enough arena.
135    pub fn with_capacity(capacity: usize, stride: usize) -> Result<Self> {
136        let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
137        let lease = MemBuilder::new().min_bytes(needed).acquire()?;
138        Self::new(lease, capacity, stride)
139    }
140
141    /// Push an element onto the back of the queue.
142    ///
143    /// Returns `Ok(true)` if the element was enqueued, `Ok(false)` if the
144    /// queue is full. Does not block.
145    ///
146    /// # Errors
147    ///
148    /// - [`FabricError::CapacityExceeded`] if the serialized element
149    ///   (plus 4-byte length prefix) exceeds `stride`.
150    /// - [`FabricError::IoError`] if serialization or remote write fails.
151    pub fn push(&mut self, item: &T) -> Result<bool> {
152        if self.is_full() {
153            return Ok(false);
154        }
155        let bytes = postcard::to_allocvec(item).map_err(|_| FabricError::IoError(-1))?;
156        if bytes.len() as u64 + 4 > self.stride {
157            return Err(FabricError::CapacityExceeded);
158        }
159        let offset = self.slot_offset(self.tail);
160        let mut slot = vec![0u8; self.stride as usize];
161        let len_bytes = (bytes.len() as u32).to_le_bytes();
162        slot[..4].copy_from_slice(&len_bytes);
163        slot[4..4 + bytes.len()].copy_from_slice(&bytes);
164        self.lease.mem().write(offset, &slot)?;
165        self.tail = (self.tail + 1) % self.capacity;
166        self.write_header()?;
167        Ok(true)
168    }
169
170    /// Pop an element from the front of the queue.
171    ///
172    /// Returns `Ok(Some(item))` if an element was dequeued, `Ok(None)` if
173    /// the queue is empty. Does not block.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`FabricError::IoError`] if the remote read or
178    /// deserialization fails.
179    pub fn pop(&mut self) -> Result<Option<T>> {
180        if self.is_empty() {
181            return Ok(None);
182        }
183        let offset = self.slot_offset(self.head);
184        let raw = self.lease.mem().read(offset, self.stride as u32)?;
185        let ser_len = u32::from_le_bytes([raw[0], raw[1], raw[2], raw[3]]) as usize;
186        let item: T =
187            postcard::from_bytes(&raw[4..4 + ser_len]).map_err(|_| FabricError::IoError(-1))?;
188        self.head = (self.head + 1) % self.capacity;
189        self.write_header()?;
190        Ok(Some(item))
191    }
192
193    /// Returns the number of elements currently in the queue.
194    pub fn len(&self) -> usize {
195        if self.tail >= self.head {
196            (self.tail - self.head) as usize
197        } else {
198            (self.capacity - self.head + self.tail) as usize
199        }
200    }
201
202    /// Returns `true` if the queue contains no elements.
203    pub fn is_empty(&self) -> bool {
204        self.head == self.tail
205    }
206
207    /// Returns `true` if the queue is full.
208    pub fn is_full(&self) -> bool {
209        (self.tail + 1) % self.capacity == self.head
210    }
211
212    /// Returns the lease ID of the memory lease for external renewal
213    /// management (e.g. via [`grafos_leasekit::RenewalManager`]).
214    pub fn lease_id(&self) -> u128 {
215        self.lease.lease_id()
216    }
217
218    /// Returns the expiry time (unix seconds) of the memory lease for
219    /// external renewal management.
220    pub fn expires_at_unix_secs(&self) -> u64 {
221        self.lease.expires_at_unix_secs()
222    }
223
224    /// Returns the maximum number of elements the queue can hold.
225    ///
226    /// This is `capacity - 1` since one slot is reserved for the
227    /// empty/full distinction.
228    pub fn max_len(&self) -> usize {
229        (self.capacity - 1) as usize
230    }
231
232    fn slot_offset(&self, index: u64) -> u64 {
233        HEADER_SIZE + index * self.stride
234    }
235
236    fn write_header(&self) -> Result<()> {
237        let mut hdr = [0u8; HEADER_SIZE as usize];
238        hdr[0..8].copy_from_slice(&self.head.to_le_bytes());
239        hdr[8..16].copy_from_slice(&self.tail.to_le_bytes());
240        hdr[16..24].copy_from_slice(&self.capacity.to_le_bytes());
241        hdr[24..32].copy_from_slice(&self.stride.to_le_bytes());
242        self.lease.mem().write(0, &hdr)
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use grafos_std::host;
250    use grafos_std::mem::MemBuilder;
251
252    fn setup(arena_size: u64) -> MemLease {
253        host::reset_mock();
254        host::mock_set_fbmu_arena_size(arena_size);
255        MemBuilder::new().acquire().expect("acquire")
256    }
257
258    #[test]
259    fn push_pop_fifo() {
260        let lease = setup(4096);
261        let mut q: FabricQueue<u32> = FabricQueue::new(lease, 16, 16).expect("new");
262
263        assert!(q.is_empty());
264        assert!(!q.is_full());
265        assert_eq!(q.len(), 0);
266
267        q.push(&1).expect("push");
268        q.push(&2).expect("push");
269        q.push(&3).expect("push");
270        assert_eq!(q.len(), 3);
271
272        assert_eq!(q.pop().expect("pop"), Some(1));
273        assert_eq!(q.pop().expect("pop"), Some(2));
274        assert_eq!(q.pop().expect("pop"), Some(3));
275        assert_eq!(q.pop().expect("pop"), None);
276    }
277
278    #[test]
279    fn full_returns_false() {
280        // capacity=4, stride=16, so 3 usable slots
281        let lease = setup(HEADER_SIZE + 4 * 16);
282        let mut q: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("new");
283
284        assert!(q.push(&1).expect("push 1"));
285        assert!(q.push(&2).expect("push 2"));
286        assert!(q.push(&3).expect("push 3"));
287        assert!(q.is_full());
288        assert!(!q.push(&4).expect("push 4 (full)"));
289    }
290
291    #[test]
292    fn wraparound() {
293        let lease = setup(HEADER_SIZE + 4 * 16);
294        let mut q: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("new");
295
296        // Fill and drain a few times to exercise wrap-around
297        for round in 0..3u32 {
298            let base = round * 100;
299            q.push(&(base + 1)).expect("push");
300            q.push(&(base + 2)).expect("push");
301            q.push(&(base + 3)).expect("push");
302            assert!(q.is_full());
303
304            assert_eq!(q.pop().expect("pop"), Some(base + 1));
305            assert_eq!(q.pop().expect("pop"), Some(base + 2));
306            assert_eq!(q.pop().expect("pop"), Some(base + 3));
307            assert!(q.is_empty());
308        }
309    }
310
311    #[test]
312    fn with_capacity_works() {
313        host::reset_mock();
314        host::mock_set_fbmu_arena_size(65536);
315        let q: FabricQueue<u64> = FabricQueue::with_capacity(32, 16).expect("with_capacity");
316        assert_eq!(q.max_len(), 31);
317    }
318
319    #[test]
320    fn interleaved_push_pop() {
321        let lease = setup(4096);
322        let mut q: FabricQueue<u32> = FabricQueue::new(lease, 8, 16).expect("new");
323
324        q.push(&10).expect("push");
325        q.push(&20).expect("push");
326        assert_eq!(q.pop().expect("pop"), Some(10));
327        q.push(&30).expect("push");
328        assert_eq!(q.pop().expect("pop"), Some(20));
329        assert_eq!(q.pop().expect("pop"), Some(30));
330        assert_eq!(q.pop().expect("pop"), None);
331    }
332}