grafos_mq/
partition.rs

1//! Partition: ring buffer in leased memory with slot-based indexing.
2//!
3//! Each partition is a bounded ring buffer backed by a [`MemLease`]. Messages
4//! are serialized into fixed-size slots. When the buffer wraps, old messages
5//! are overwritten (log compaction by offset).
6//!
7//! # Memory layout
8//!
9//! ```text
10//! Offset 0:  [header]  write_offset: u64 (8) | capacity: u64 (8) | stride: u64 (8) | msg_count: u64 (8)
11//! Offset 32: [slots]   slot 0 (stride bytes) | slot 1 | ... | slot (capacity-1)
12//! ```
13
14extern crate alloc;
15use alloc::vec;
16
17use crate::message::Message;
18use grafos_std::error::{FabricError, Result};
19use grafos_std::host;
20use grafos_std::mem::{MemBuilder, MemLease};
21
22const HEADER_SIZE: u64 = 32;
23
24/// A single partition backed by a memory lease.
25pub struct Partition {
26    lease: MemLease,
27    /// Next offset to write at (monotonically increasing, mod capacity for slot index).
28    write_offset: u64,
29    /// Number of slots in the ring buffer.
30    capacity: u64,
31    /// Size of each slot in bytes.
32    stride: u64,
33    /// Total number of messages ever written (for offset tracking).
34    msg_count: u64,
35}
36
37impl Partition {
38    /// Create a new partition with the given capacity and slot stride.
39    ///
40    /// Acquires a memory lease large enough for the header plus all slots.
41    pub fn new(capacity: usize, stride: usize) -> Result<Self> {
42        let needed = HEADER_SIZE + (capacity as u64) * (stride as u64);
43        let lease = MemBuilder::new().min_bytes(needed).acquire()?;
44        let p = Partition {
45            lease,
46            write_offset: 0,
47            capacity: capacity as u64,
48            stride: stride as u64,
49            msg_count: 0,
50        };
51        p.write_header()?;
52        Ok(p)
53    }
54
55    /// Append a message to the partition.
56    ///
57    /// The message's `offset` and `timestamp` fields are set automatically.
58    /// Returns the assigned offset, or `CapacityExceeded` if the serialized
59    /// message exceeds the slot stride.
60    pub fn append(&mut self, key: Option<&[u8]>, value: &[u8]) -> Result<u64> {
61        let now = host::unix_time_secs();
62        let offset = self.msg_count;
63        let msg = Message {
64            offset,
65            timestamp: now,
66            key: key.map(|k| k.to_vec()),
67            value: value.to_vec(),
68            headers: alloc::vec::Vec::new(),
69        };
70
71        let bytes = postcard::to_allocvec(&msg).map_err(|_| FabricError::IoError(-1))?;
72        if bytes.len() as u64 + 4 > self.stride {
73            return Err(FabricError::CapacityExceeded);
74        }
75
76        let slot_idx = self.write_offset % self.capacity;
77        let slot_offset = HEADER_SIZE + slot_idx * self.stride;
78
79        let mut slot = vec![0u8; self.stride as usize];
80        let len_bytes = (bytes.len() as u32).to_le_bytes();
81        slot[..4].copy_from_slice(&len_bytes);
82        slot[4..4 + bytes.len()].copy_from_slice(&bytes);
83
84        self.lease.mem().write(slot_offset, &slot)?;
85        self.write_offset += 1;
86        self.msg_count += 1;
87        self.write_header()?;
88
89        Ok(offset)
90    }
91
92    /// Read the message at the given absolute offset.
93    ///
94    /// Returns `None` if the offset has been overwritten (wrapped) or is
95    /// beyond the current write position.
96    pub fn read_at(&self, offset: u64) -> Result<Option<Message>> {
97        if offset >= self.msg_count {
98            return Ok(None);
99        }
100        // Check if offset has been overwritten
101        let oldest_available = self.msg_count.saturating_sub(self.capacity);
102        if offset < oldest_available {
103            return Ok(None);
104        }
105
106        let slot_idx = offset % self.capacity;
107        let slot_offset = HEADER_SIZE + slot_idx * self.stride;
108
109        let raw = self.lease.mem().read(slot_offset, self.stride as u32)?;
110        let ser_len = u32::from_le_bytes([raw[0], raw[1], raw[2], raw[3]]) as usize;
111        if ser_len == 0 || ser_len + 4 > raw.len() {
112            return Ok(None);
113        }
114        let msg: Message =
115            postcard::from_bytes(&raw[4..4 + ser_len]).map_err(|_| FabricError::IoError(-1))?;
116        Ok(Some(msg))
117    }
118
119    /// Returns the next offset that will be assigned.
120    pub fn next_offset(&self) -> u64 {
121        self.msg_count
122    }
123
124    /// Returns the oldest available offset (messages before this have been overwritten).
125    pub fn oldest_offset(&self) -> u64 {
126        self.msg_count.saturating_sub(self.capacity)
127    }
128
129    /// Returns the number of messages currently retained in the ring buffer.
130    pub fn len(&self) -> usize {
131        let available = self.msg_count - self.oldest_offset();
132        available as usize
133    }
134
135    /// Returns `true` if no messages are in the partition.
136    pub fn is_empty(&self) -> bool {
137        self.msg_count == 0
138    }
139
140    /// Returns the slot capacity of this partition.
141    pub fn capacity(&self) -> usize {
142        self.capacity as usize
143    }
144
145    fn write_header(&self) -> Result<()> {
146        let mut hdr = [0u8; HEADER_SIZE as usize];
147        hdr[0..8].copy_from_slice(&self.write_offset.to_le_bytes());
148        hdr[8..16].copy_from_slice(&self.capacity.to_le_bytes());
149        hdr[16..24].copy_from_slice(&self.stride.to_le_bytes());
150        hdr[24..32].copy_from_slice(&self.msg_count.to_le_bytes());
151        self.lease.mem().write(0, &hdr)
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use grafos_std::host;
159
160    fn setup() {
161        host::reset_mock();
162        host::mock_set_fbmu_arena_size(65536);
163    }
164
165    #[test]
166    fn append_and_read() {
167        setup();
168        let mut part = Partition::new(16, 256).expect("new");
169        assert!(part.is_empty());
170
171        let off = part.append(None, b"hello").expect("append");
172        assert_eq!(off, 0);
173        assert_eq!(part.len(), 1);
174
175        let msg = part.read_at(0).expect("read").expect("some");
176        assert_eq!(msg.offset, 0);
177        assert_eq!(msg.value, b"hello");
178        assert_eq!(msg.key, None);
179    }
180
181    #[test]
182    fn keyed_append() {
183        setup();
184        let mut part = Partition::new(16, 256).expect("new");
185        part.append(Some(b"k1"), b"v1").expect("append");
186
187        let msg = part.read_at(0).expect("read").expect("some");
188        assert_eq!(msg.key, Some(b"k1".to_vec()));
189        assert_eq!(msg.value, b"v1");
190    }
191
192    #[test]
193    fn wraparound() {
194        setup();
195        let mut part = Partition::new(4, 256).expect("new");
196
197        for i in 0..6u64 {
198            part.append(None, &[i as u8; 4]).expect("append");
199        }
200
201        // Offsets 0 and 1 should be overwritten
202        assert_eq!(part.oldest_offset(), 2);
203        assert!(part.read_at(0).expect("read").is_none());
204        assert!(part.read_at(1).expect("read").is_none());
205
206        // Offsets 2-5 should be available
207        for i in 2..6u64 {
208            let msg = part.read_at(i).expect("read").expect("some");
209            assert_eq!(msg.offset, i);
210        }
211    }
212
213    #[test]
214    fn read_beyond_write() {
215        setup();
216        let mut part = Partition::new(8, 256).expect("new");
217        part.append(None, b"one").expect("append");
218        assert!(part.read_at(1).expect("read").is_none());
219        assert!(part.read_at(100).expect("read").is_none());
220    }
221
222    #[test]
223    fn capacity_exceeded_for_large_message() {
224        setup();
225        let mut part = Partition::new(4, 32).expect("new");
226        // A message with 30+ bytes of payload won't fit in 32 - 4 = 28 bytes of data
227        let big = vec![0u8; 200];
228        let result = part.append(None, &big);
229        assert_eq!(result.unwrap_err(), FabricError::CapacityExceeded);
230    }
231}