1extern crate alloc;
5use alloc::format;
6
7use grafos_collections::queue::FabricQueue;
8use grafos_fence::FenceEpoch;
9use grafos_locator::handoff::{HandoffReader, HandoffState};
10use grafos_locator::locator::QueueLocator;
11use grafos_std::mem::MemBuilder;
12use serde::{de::DeserializeOwned, Serialize};
13
14use crate::error::EdgeError;
15
16pub struct RelocatableQueueEdge<T> {
34 queue: FabricQueue<T>,
35 locator: QueueLocator,
36 handoff_reader: HandoffReader<QueueLocator>,
37 generation: FenceEpoch,
38}
39
40impl<T: Serialize + DeserializeOwned> RelocatableQueueEdge<T> {
41 pub fn new(
48 queue: FabricQueue<T>,
49 locator: QueueLocator,
50 handoff_reader: HandoffReader<QueueLocator>,
51 ) -> Self {
52 let generation = FenceEpoch::new(locator.generation);
53 Self {
54 queue,
55 locator,
56 handoff_reader,
57 generation,
58 }
59 }
60
61 pub fn push(&mut self, item: &T) -> Result<(), EdgeError> {
67 match self.queue.push(item) {
68 Ok(true) => Ok(()),
69 Ok(false) => Err(EdgeError::QueueFull),
70 Err(e) => {
71 let edge_err = EdgeError::from(e);
72 if matches!(edge_err, EdgeError::LeaseExpired | EdgeError::Disconnected) {
73 self.relocate()?;
74 match self.queue.push(item) {
75 Ok(true) => Ok(()),
76 Ok(false) => Err(EdgeError::QueueFull),
77 Err(e2) => Err(EdgeError::from(e2)),
78 }
79 } else {
80 Err(edge_err)
81 }
82 }
83 }
84 }
85
86 pub fn pop(&mut self) -> Result<Option<T>, EdgeError> {
92 match self.queue.pop() {
93 Ok(item) => Ok(item),
94 Err(e) => {
95 let edge_err = EdgeError::from(e);
96 if matches!(edge_err, EdgeError::LeaseExpired | EdgeError::Disconnected) {
97 self.relocate()?;
98 self.queue.pop().map_err(EdgeError::from)
99 } else {
100 Err(edge_err)
101 }
102 }
103 }
104 }
105
106 pub fn relocate(&mut self) -> Result<(), EdgeError> {
112 let state: Option<HandoffState<QueueLocator>> = self
113 .handoff_reader
114 .poll()
115 .map_err(|e| EdgeError::RelocateFailed(format!("{e}")))?;
116
117 let Some(state) = state else {
118 return Err(EdgeError::RelocateFailed("no new locator available".into()));
119 };
120
121 if state.generation.is_stale(&self.generation) {
122 return Err(EdgeError::StaleGeneration {
123 expected: self.generation,
124 got: state.generation,
125 });
126 }
127
128 let new_locator = state.locator;
129 let arena_bytes = new_locator.len_or_default();
130 let lease = MemBuilder::new()
131 .min_bytes(arena_bytes)
132 .acquire()
133 .map_err(|e| EdgeError::RelocateFailed(format!("{e}")))?;
134
135 let capacity = self.queue.max_len() + 1;
136 let stride = self.stride_from_queue();
137
138 let new_queue = FabricQueue::new(lease, capacity, stride)
139 .map_err(|e| EdgeError::RelocateFailed(format!("{e}")))?;
140
141 self.queue = new_queue;
142 self.locator = new_locator;
143 self.generation = state.generation;
144
145 Ok(())
146 }
147
148 pub fn generation(&self) -> FenceEpoch {
150 self.generation
151 }
152
153 pub fn locator(&self) -> &QueueLocator {
155 &self.locator
156 }
157
158 fn stride_from_queue(&self) -> usize {
164 256
167 }
168}
169
170trait QueueLocatorExt {
172 fn len_or_default(&self) -> u64;
173}
174
175impl QueueLocatorExt for QueueLocator {
176 fn len_or_default(&self) -> u64 {
177 8192
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use super::*;
185 use grafos_fence::FenceEpoch;
186 use grafos_locator::handoff::{HandoffState, HandoffWriter};
187 use grafos_std::block::BlockBuilder;
188 use grafos_std::host;
189 use grafos_std::mem::MemBuilder;
190
191 fn setup() {
192 host::reset_mock();
193 host::mock_set_fbmu_arena_size(65536);
194 host::mock_set_fbbu_num_blocks(64);
195 }
196
197 #[test]
198 fn push_pop_through_edge() {
199 setup();
200
201 let lease = MemBuilder::new()
202 .min_bytes(4096)
203 .acquire()
204 .expect("acquire");
205 let queue: FabricQueue<u32> = FabricQueue::new(lease, 16, 16).expect("queue");
206 let locator = QueueLocator::new(1, 0, 0);
207
208 let block_lease = BlockBuilder::new().acquire().expect("block");
209 let initial_state = HandoffState {
210 stage_id: 0,
211 generation: FenceEpoch::zero(),
212 locator: locator.clone(),
213 };
214 let mut writer = HandoffWriter::new(initial_state, block_lease);
215 writer.publish(locator.clone()).expect("initial publish");
216 let reader_lease = writer.into_block_lease();
217 let reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
218
219 let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
220
221 edge.push(&42u32).expect("push");
222 edge.push(&99u32).expect("push");
223
224 assert_eq!(edge.pop().expect("pop"), Some(42));
225 assert_eq!(edge.pop().expect("pop"), Some(99));
226 assert_eq!(edge.pop().expect("pop"), None);
227 }
228
229 #[test]
230 fn generation_tracks_locator() {
231 setup();
232
233 let lease = MemBuilder::new()
234 .min_bytes(4096)
235 .acquire()
236 .expect("acquire");
237 let queue: FabricQueue<u32> = FabricQueue::new(lease, 8, 16).expect("queue");
238 let locator = QueueLocator::new(1, 0, 5);
239
240 let block_lease = BlockBuilder::new().acquire().expect("block");
241 let reader: HandoffReader<QueueLocator> = HandoffReader::new(block_lease);
242
243 let edge = RelocatableQueueEdge::new(queue, locator, reader);
244 assert_eq!(edge.generation(), FenceEpoch::new(5));
245 }
246
247 #[test]
248 fn relocate_detects_generation_bump() {
249 setup();
250
251 let lease = MemBuilder::new()
252 .min_bytes(4096)
253 .acquire()
254 .expect("acquire");
255 let queue: FabricQueue<u64> = FabricQueue::new(lease, 8, 16).expect("queue");
256 let locator = QueueLocator::new(1, 0, 0);
257
258 let block_lease = BlockBuilder::new().acquire().expect("block");
259 let initial_state = HandoffState {
260 stage_id: 0,
261 generation: FenceEpoch::zero(),
262 locator: locator.clone(),
263 };
264 let mut writer = HandoffWriter::new(initial_state, block_lease);
265
266 let new_locator = QueueLocator::new(2, 0, 1);
268 writer.publish(new_locator).expect("publish");
269
270 let reader_lease = writer.into_block_lease();
271 let reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
272
273 let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
274 assert_eq!(edge.generation(), FenceEpoch::new(0));
275
276 edge.relocate().expect("relocate");
278 assert_eq!(edge.generation(), FenceEpoch::new(1));
279 assert_eq!(edge.locator().lease_id, 2);
280 }
281
282 #[test]
283 fn stale_generation_rejected() {
284 setup();
285
286 let lease = MemBuilder::new()
288 .min_bytes(4096)
289 .acquire()
290 .expect("acquire");
291 let queue: FabricQueue<u32> = FabricQueue::new(lease, 8, 16).expect("queue");
292 let locator = QueueLocator::new(1, 0, 5);
293
294 let block_lease = BlockBuilder::new().acquire().expect("block");
296 let initial_state = HandoffState {
297 stage_id: 0,
298 generation: FenceEpoch::new(2),
299 locator: QueueLocator::new(1, 0, 2),
300 };
301 let mut writer = HandoffWriter::new(initial_state, block_lease);
302 writer
303 .publish(QueueLocator::new(99, 0, 3))
304 .expect("publish");
305
306 let reader_lease = writer.into_block_lease();
307 let reader: HandoffReader<QueueLocator> = HandoffReader::new(reader_lease);
308
309 let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
310
311 let err = edge.relocate().unwrap_err();
312 assert!(matches!(err, EdgeError::StaleGeneration { .. }));
313 }
314
315 #[test]
316 fn queue_full_returns_error() {
317 setup();
318
319 let needed = 32 + 4 * 16;
321 let lease = MemBuilder::new()
322 .min_bytes(needed as u64)
323 .acquire()
324 .expect("acquire");
325 let queue: FabricQueue<u32> = FabricQueue::new(lease, 4, 16).expect("queue");
326 let locator = QueueLocator::new(1, 0, 0);
327
328 let block_lease = BlockBuilder::new().acquire().expect("block");
329 let reader: HandoffReader<QueueLocator> = HandoffReader::new(block_lease);
330
331 let mut edge = RelocatableQueueEdge::new(queue, locator, reader);
332
333 edge.push(&1).expect("push 1");
334 edge.push(&2).expect("push 2");
335 edge.push(&3).expect("push 3");
336
337 let err = edge.push(&4).unwrap_err();
338 assert_eq!(err, EdgeError::QueueFull);
339 }
340}