grafos_pipeline/
edge.rs

1//! Relocatable queue edge — the key primitive for pipeline inter-stage
2//! connections that survive lease expiry and node failures.
3
4extern crate alloc;
5use alloc::format;
6
7use grafos_collections::queue::FabricQueue;
8use grafos_fence::FenceEpoch;
9use grafos_locator::handoff::{HandoffReader, HandoffState};
10use grafos_locator::locator::QueueLocator;
11use grafos_std::mem::MemBuilder;
12use serde::{de::DeserializeOwned, Serialize};
13
14use crate::error::EdgeError;
15
16/// A queue edge that can relocate to a new backing queue on failure.
17///
18/// Wraps a [`FabricQueue`] with a [`HandoffReader`] so that when the
19/// current queue becomes unavailable (lease expired, disconnected),
20/// the edge can poll for a new [`QueueLocator`] published by the
21/// producer and rebind transparently.
22///
23/// # Relocation protocol
24///
25/// When `push` or `pop` fails with a recoverable error:
26///
27/// 1. The **producer** allocates a new queue (via [`MemBuilder`]), publishes
28///    the new [`QueueLocator`] through a [`HandoffWriter`](grafos_locator::handoff::HandoffWriter),
29///    and resumes pushing.
30/// 2. The **consumer** calls [`relocate`](Self::relocate), which polls the
31///    [`HandoffReader`] for a new locator. If the generation has bumped,
32///    it creates a new [`FabricQueue`] from the new locator and resumes.
33pub struct RelocatableQueueEdge<T> {
34    queue: FabricQueue<T>,
35    locator: QueueLocator,
36    handoff_reader: HandoffReader<QueueLocator>,
37    generation: FenceEpoch,
38}
39
40impl<T: Serialize + DeserializeOwned> RelocatableQueueEdge<T> {
41    /// Create a new relocatable edge.
42    ///
43    /// - `queue`: the initial [`FabricQueue`] to use.
44    /// - `locator`: the [`QueueLocator`] describing the queue's location.
45    /// - `handoff_reader`: a reader for detecting when the producer has
46    ///   relocated the queue to a new lease.
47    pub fn new(
48        queue: FabricQueue<T>,
49        locator: QueueLocator,
50        handoff_reader: HandoffReader<QueueLocator>,
51    ) -> Self {
52        let generation = FenceEpoch::new(locator.generation);
53        Self {
54            queue,
55            locator,
56            handoff_reader,
57            generation,
58        }
59    }
60
61    /// Push an item onto the queue.
62    ///
63    /// Returns `Ok(())` on success. If the queue is full, returns
64    /// [`EdgeError::QueueFull`]. On lease expiry or disconnection,
65    /// attempts relocation before returning the error.
66    pub fn push(&mut self, item: &T) -> Result<(), EdgeError> {
67        match self.queue.push(item) {
68            Ok(true) => Ok(()),
69            Ok(false) => Err(EdgeError::QueueFull),
70            Err(e) => {
71                let edge_err = EdgeError::from(e);
72                if matches!(edge_err, EdgeError::LeaseExpired | EdgeError::Disconnected) {
73                    self.relocate()?;
74                    match self.queue.push(item) {
75                        Ok(true) => Ok(()),
76                        Ok(false) => Err(EdgeError::QueueFull),
77                        Err(e2) => Err(EdgeError::from(e2)),
78                    }
79                } else {
80                    Err(edge_err)
81                }
82            }
83        }
84    }
85
86    /// Pop an item from the queue.
87    ///
88    /// Returns `Ok(Some(item))` if an item was available, `Ok(None)` if
89    /// the queue is empty. On lease expiry or disconnection, attempts
90    /// relocation before returning the error.
91    pub fn pop(&mut self) -> Result<Option<T>, EdgeError> {
92        match self.queue.pop() {
93            Ok(item) => Ok(item),
94            Err(e) => {
95                let edge_err = EdgeError::from(e);
96                if matches!(edge_err, EdgeError::LeaseExpired | EdgeError::Disconnected) {
97                    self.relocate()?;
98                    self.queue.pop().map_err(EdgeError::from)
99                } else {
100                    Err(edge_err)
101                }
102            }
103        }
104    }
105
106    /// Poll the handoff reader for a new queue locator.
107    ///
108    /// If a new generation is found, creates a new [`FabricQueue`] from
109    /// the published locator and switches to it. Returns `Ok(())` if
110    /// relocation succeeded or no relocation was needed.
111    pub fn relocate(&mut self) -> Result<(), EdgeError> {
112        let state: Option<HandoffState<QueueLocator>> = self
113            .handoff_reader
114            .poll()
115            .map_err(|e| EdgeError::RelocateFailed(format!("{e}")))?;
116
117        let Some(state) = state else {
118            return Err(EdgeError::RelocateFailed("no new locator available".into()));
119        };
120
121        if state.generation.is_stale(&self.generation) {
122            return Err(EdgeError::StaleGeneration {
123                expected: self.generation,
124                got: state.generation,
125            });
126        }
127
128        let new_locator = state.locator;
129        let arena_bytes = new_locator.len_or_default();
130        let lease = MemBuilder::new()
131            .min_bytes(arena_bytes)
132            .acquire()
133            .map_err(|e| EdgeError::RelocateFailed(format!("{e}")))?;
134
135        let capacity = self.queue.max_len() + 1;
136        let stride = self.stride_from_queue();
137
138        let new_queue = FabricQueue::new(lease, capacity, stride)
139            .map_err(|e| EdgeError::RelocateFailed(format!("{e}")))?;
140
141        self.queue = new_queue;
142        self.locator = new_locator;
143        self.generation = state.generation;
144
145        Ok(())
146    }
147
148    /// The current edge generation (epoch).
149    pub fn generation(&self) -> FenceEpoch {
150        self.generation
151    }
152
153    /// The current queue locator.
154    pub fn locator(&self) -> &QueueLocator {
155        &self.locator
156    }
157
158    /// Compute stride from the current queue layout.
159    ///
160    /// FabricQueue doesn't expose stride directly, but we can derive it
161    /// from the locator's arena size and capacity. For simplicity, we use
162    /// a default stride of 256 bytes (enough for most pipeline messages).
163    fn stride_from_queue(&self) -> usize {
164        // Use a conservative default stride. Real pipelines should
165        // configure this based on their message type's serialized size.
166        256
167    }
168}
169
170/// Extension trait for QueueLocator to provide a length hint for relocation.
171trait QueueLocatorExt {
172    fn len_or_default(&self) -> u64;
173}
174
175impl QueueLocatorExt for QueueLocator {
176    fn len_or_default(&self) -> u64 {
177        // Default arena size for relocated queues: 8 KiB
178        8192
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185    use grafos_fence::FenceEpoch;
186    use grafos_locator::handoff::{HandoffState, HandoffWriter};
187    use grafos_std::block::BlockBuilder;
188    use grafos_std::host;
189    use grafos_std::mem::MemBuilder;
190
191    fn setup() {
192        host::reset_mock();
193        host::mock_set_fbmu_arena_size(65536);
194        host::mock_set_fbbu_num_blocks(64);
195    }
196
197    #[test]
198    fn push_pop_through_edge() {
199        setup();
200
201        let lease = MemBuilder::new()
202            .min_bytes(4096)
203            .acquire()
204            .expect("acquire");
205        let queue: FabricQueue<u32> = FabricQueue::new(lease, 16, 16).expect("queue");
206        let locator = QueueLocator::new(1, 0, 0);
207
208        let block_lease = BlockBuilder::new().acquire().expect("block");
209        let initial_state = HandoffState {
210            stage_id: 0,
211            generation: FenceEpoch::zero(),
212            locator: locator.clone(),
213        };
214        let mut writer = HandoffWriter::new(initial_state, block_lease);
215        writer.publish(locator.clone()).expect("initial publish");
216        let reader_lease = writer.into_block_lease();
217        let reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
218
219        let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
220
221        edge.push(&42u32).expect("push");
222        edge.push(&99u32).expect("push");
223
224        assert_eq!(edge.pop().expect("pop"), Some(42));
225        assert_eq!(edge.pop().expect("pop"), Some(99));
226        assert_eq!(edge.pop().expect("pop"), None);
227    }
228
229    #[test]
230    fn generation_tracks_locator() {
231        setup();
232
233        let lease = MemBuilder::new()
234            .min_bytes(4096)
235            .acquire()
236            .expect("acquire");
237        let queue: FabricQueue<u32> = FabricQueue::new(lease, 8, 16).expect("queue");
238        let locator = QueueLocator::new(1, 0, 5);
239
240        let block_lease = BlockBuilder::new().acquire().expect("block");
241        let reader: HandoffReader<QueueLocator> = HandoffReader::new(block_lease);
242
243        let edge = RelocatableQueueEdge::new(queue, locator, reader);
244        assert_eq!(edge.generation(), FenceEpoch::new(5));
245    }
246
247    #[test]
248    fn relocate_detects_generation_bump() {
249        setup();
250
251        let lease = MemBuilder::new()
252            .min_bytes(4096)
253            .acquire()
254            .expect("acquire");
255        let queue: FabricQueue<u64> = FabricQueue::new(lease, 8, 16).expect("queue");
256        let locator = QueueLocator::new(1, 0, 0);
257
258        let block_lease = BlockBuilder::new().acquire().expect("block");
259        let initial_state = HandoffState {
260            stage_id: 0,
261            generation: FenceEpoch::zero(),
262            locator: locator.clone(),
263        };
264        let mut writer = HandoffWriter::new(initial_state, block_lease);
265
266        // Publish generation 1 with new locator
267        let new_locator = QueueLocator::new(2, 0, 1);
268        writer.publish(new_locator).expect("publish");
269
270        let reader_lease = writer.into_block_lease();
271        let reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
272
273        let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
274        assert_eq!(edge.generation(), FenceEpoch::new(0));
275
276        // Relocate should pick up generation 1
277        edge.relocate().expect("relocate");
278        assert_eq!(edge.generation(), FenceEpoch::new(1));
279        assert_eq!(edge.locator().lease_id, 2);
280    }
281
282    #[test]
283    fn stale_generation_rejected() {
284        setup();
285
286        // Start at generation 5
287        let lease = MemBuilder::new()
288            .min_bytes(4096)
289            .acquire()
290            .expect("acquire");
291        let queue: FabricQueue<u32> = FabricQueue::new(lease, 8, 16).expect("queue");
292        let locator = QueueLocator::new(1, 0, 5);
293
294        // Write a handoff record at generation 3 (stale)
295        let block_lease = BlockBuilder::new().acquire().expect("block");
296        let initial_state = HandoffState {
297            stage_id: 0,
298            generation: FenceEpoch::new(2),
299            locator: QueueLocator::new(1, 0, 2),
300        };
301        let mut writer = HandoffWriter::new(initial_state, block_lease);
302        writer
303            .publish(QueueLocator::new(99, 0, 3))
304            .expect("publish");
305
306        let reader_lease = writer.into_block_lease();
307        let reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
308
309        let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
310
311        let err = edge.relocate().unwrap_err();
312        assert!(matches!(err, EdgeError::StaleGeneration { .. }));
313    }
314
315    #[test]
316    fn queue_full_returns_error() {
317        setup();
318
319        // capacity=4, 3 usable slots
320        let needed = 32 + 4 * 16;
321        let lease = MemBuilder::new()
322            .min_bytes(needed as u64)
323            .acquire()
324            .expect("acquire");
325        let queue: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("queue");
326        let locator = QueueLocator::new(1, 0, 0);
327
328        let block_lease = BlockBuilder::new().acquire().expect("block");
329        let reader: HandoffReader<QueueLocator> = HandoffReader::new(block_lease);
330
331        let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
332
333        edge.push(&1).expect("push 1");
334        edge.push(&2).expect("push 2");
335        edge.push(&3).expect("push 3");
336
337        let err = edge.push(&4).unwrap_err();
338        assert_eq!(err, EdgeError::QueueFull);
339    }
340}