grafos_rpc/
mux.rs

1//! Multi-slot / multi-client concurrency for lease-backed RPC.
2//!
3//! The [`RpcMuxClient`] and [`RpcMuxServer`] types multiplex several
4//! independent RPC slots into a single [`MemLease`].  Each slot has its own
5//! status byte so that multiple clients can have requests in flight
6//! simultaneously.
7//!
8//! # Slot wire layout
9//!
10//! All slots are contiguous in the leased arena.  Each slot is
11//! `SLOT_HEADER_SIZE + slot_payload_size` bytes:
12//!
13//! ```text
14//! [request_id: u64 LE (8B)]
15//! [method_id:  u32 LE (4B)]
16//! [status:     u8      (1B)]
17//! [reserved:   [u8; 3] (3B)]   // alignment padding
18//! [payload_len:u32 LE (4B)]
19//! [payload:    [u8; slot_payload_size]]
20//! ```
21//!
22//! `SLOT_HEADER_SIZE` = 20 bytes (8 + 4 + 1 + 3 + 4).
23//!
24//! Slot *i* begins at byte offset `i * (SLOT_HEADER_SIZE + slot_payload_size)`.
25
26extern crate alloc;
27
28use alloc::vec::Vec;
29use core::sync::atomic::{AtomicU64, Ordering};
30
31use grafos_std::error::{FabricError, Result};
32use grafos_std::lease::LeaseStatus;
33use grafos_std::mem::MemLease;
34
35use crate::{EMPTY, ERROR, PROCESSING, REQUEST_READY, RESPONSE_READY};
36
37// ---------------------------------------------------------------------------
38// Layout constants
39// ---------------------------------------------------------------------------
40
41/// Header size per slot: request_id(8) + method_id(4) + status(1) +
42/// reserved(3) + payload_len(4) = 20 bytes.
43pub const SLOT_HEADER_SIZE: usize = 20;
44
45/// Default number of slots in a mux arena.
46pub const DEFAULT_NUM_SLOTS: usize = 8;
47
48/// Default payload capacity per slot (bytes).
49pub const DEFAULT_SLOT_PAYLOAD_SIZE: usize = 4096;
50
51/// Default maximum poll iterations before a client call times out.
52const DEFAULT_MAX_POLL_ITERATIONS: u32 = 1_000_000;
53
54// ---------------------------------------------------------------------------
55// Slot offset helpers
56// ---------------------------------------------------------------------------
57
58/// Byte offset of slot `index` within the arena.
59#[inline]
60fn slot_offset(index: usize, slot_size: usize) -> u64 {
61    (index * slot_size) as u64
62}
63
64// ---------------------------------------------------------------------------
65// Slot encode / decode
66// ---------------------------------------------------------------------------
67
68/// Encode header + payload into a single contiguous buffer.
69///
70/// The mock FBMU backend stores data keyed by write-offset, so header and
71/// payload must be written in one call to be readable as a contiguous
72/// region.
73fn encode_slot(request_id: u64, method_id: u32, status: u8, payload: &[u8]) -> Vec<u8> {
74    let mut buf = Vec::with_capacity(SLOT_HEADER_SIZE + payload.len());
75    buf.extend_from_slice(&request_id.to_le_bytes());
76    buf.extend_from_slice(&method_id.to_le_bytes());
77    buf.push(status);
78    buf.extend_from_slice(&[0u8; 3]); // reserved
79    buf.extend_from_slice(&(payload.len() as u32).to_le_bytes());
80    buf.extend_from_slice(payload);
81    buf
82}
83
84struct SlotHeader {
85    request_id: u64,
86    method_id: u32,
87    status: u8,
88    payload_len: u32,
89}
90
91fn decode_slot_header(data: &[u8]) -> Option<SlotHeader> {
92    if data.len() < SLOT_HEADER_SIZE {
93        return None;
94    }
95    Some(SlotHeader {
96        request_id: u64::from_le_bytes(data[0..8].try_into().ok()?),
97        method_id: u32::from_le_bytes(data[8..12].try_into().ok()?),
98        status: data[12],
99        // reserved: data[13..16]
100        payload_len: u32::from_le_bytes(data[16..20].try_into().ok()?),
101    })
102}
103
104// ---------------------------------------------------------------------------
105// RpcMuxClient
106// ---------------------------------------------------------------------------
107
108/// Multi-slot RPC client.
109///
110/// Scans the slot array for an empty slot, writes a request, and polls
111/// that same slot for the server's response.
112pub struct RpcMuxClient<'a> {
113    lease: &'a MemLease,
114    num_slots: usize,
115    slot_size: usize,
116    max_poll_iterations: u32,
117    next_request_id: AtomicU64,
118}
119
120impl<'a> RpcMuxClient<'a> {
121    /// Create a new mux client.
122    ///
123    /// - `lease`: shared memory lease backing the slot array.
124    /// - `num_slots`: number of slots (default: [`DEFAULT_NUM_SLOTS`]).
125    /// - `slot_payload_size`: max payload per slot (default:
126    ///   [`DEFAULT_SLOT_PAYLOAD_SIZE`]).
127    pub fn new(lease: &'a MemLease, num_slots: usize, slot_payload_size: usize) -> Self {
128        RpcMuxClient {
129            lease,
130            num_slots,
131            slot_size: SLOT_HEADER_SIZE + slot_payload_size,
132            max_poll_iterations: DEFAULT_MAX_POLL_ITERATIONS,
133            next_request_id: AtomicU64::new(1),
134        }
135    }
136
137    /// Set the maximum poll iterations before a call times out.
138    pub fn with_max_poll_iterations(mut self, n: u32) -> Self {
139        self.max_poll_iterations = n;
140        self
141    }
142
143    /// Perform an RPC call through the mux.
144    ///
145    /// Finds a free slot, writes the request, and polls for the response.
146    ///
147    /// # Errors
148    ///
149    /// - [`FabricError::CapacityExceeded`] -- no free slot available or
150    ///   payload exceeds the per-slot capacity.
151    /// - [`FabricError::LeaseExpired`] -- poll limit exceeded (timeout) or
152    ///   lease has expired.
153    /// - [`FabricError::Revoked`] -- lease was explicitly revoked.
154    /// - `FabricError::IoError(-102)` -- server returned ERROR status.
155    pub fn call(&self, method_id: u32, request: &[u8]) -> Result<Vec<u8>> {
156        match self.lease.status() {
157            LeaseStatus::Active => {}
158            LeaseStatus::Revoked => return Err(FabricError::Revoked),
159            _ => return Err(FabricError::LeaseExpired),
160        }
161
162        let max_payload = self.slot_size - SLOT_HEADER_SIZE;
163        if request.len() > max_payload {
164            return Err(FabricError::CapacityExceeded);
165        }
166
167        // Find a free slot (status == EMPTY).
168        let slot_idx = self.find_free_slot()?;
169        let off = slot_offset(slot_idx, self.slot_size);
170        let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
171
172        // Write header + payload as a single contiguous buffer.
173        let buf = encode_slot(request_id, method_id, REQUEST_READY, request);
174        self.lease.mem().write(off, &buf)?;
175
176        // Poll for response.
177        for _ in 0..self.max_poll_iterations {
178            let slot_data = self.lease.mem().read(off, self.slot_size as u32)?;
179            let sh = match decode_slot_header(&slot_data) {
180                Some(h) => h,
181                None => continue,
182            };
183
184            if sh.request_id != request_id {
185                continue;
186            }
187
188            match sh.status {
189                RESPONSE_READY => {
190                    let payload_len = sh.payload_len as usize;
191                    let response =
192                        if payload_len > 0 && slot_data.len() >= SLOT_HEADER_SIZE + payload_len {
193                            slot_data[SLOT_HEADER_SIZE..SLOT_HEADER_SIZE + payload_len].to_vec()
194                        } else {
195                            Vec::new()
196                        };
197                    // Reset slot to EMPTY.
198                    self.clear_slot(off)?;
199                    return Ok(response);
200                }
201                ERROR => {
202                    self.clear_slot(off)?;
203                    return Err(FabricError::IoError(-102));
204                }
205                _ => {
206                    // Still PROCESSING or REQUEST_READY -- keep polling.
207                }
208            }
209        }
210
211        // Timed out -- reset the slot so it is not stuck.
212        self.clear_slot(off)?;
213        Err(FabricError::LeaseExpired)
214    }
215
216    /// Scan from slot 0 for the first empty slot.
217    fn find_free_slot(&self) -> Result<usize> {
218        for i in 0..self.num_slots {
219            let off = slot_offset(i, self.slot_size);
220            let hdr_bytes = self.lease.mem().read(off, SLOT_HEADER_SIZE as u32)?;
221            if let Some(sh) = decode_slot_header(&hdr_bytes) {
222                if sh.status == EMPTY {
223                    return Ok(i);
224                }
225            } else {
226                // Unreadable / zeroed header -- treat as empty.
227                return Ok(i);
228            }
229        }
230        Err(FabricError::CapacityExceeded)
231    }
232
233    /// Zero out a slot header (sets status to EMPTY).
234    fn clear_slot(&self, off: u64) -> Result<()> {
235        let buf = encode_slot(0, 0, EMPTY, &[]);
236        self.lease.mem().write(off, &buf)
237    }
238}
239
240// ---------------------------------------------------------------------------
241// RpcMuxServer
242// ---------------------------------------------------------------------------
243
244/// Multi-slot RPC server.
245///
246/// Scans all slots for pending requests, dispatches them through the
247/// provided [`crate::RpcHandler`], and writes responses back into the same slot.
248pub struct RpcMuxServer<'a> {
249    lease: &'a MemLease,
250    num_slots: usize,
251    slot_size: usize,
252}
253
254impl<'a> RpcMuxServer<'a> {
255    /// Create a new mux server.
256    pub fn new(lease: &'a MemLease, num_slots: usize, slot_payload_size: usize) -> Self {
257        RpcMuxServer {
258            lease,
259            num_slots,
260            slot_size: SLOT_HEADER_SIZE + slot_payload_size,
261        }
262    }
263
264    /// Poll all slots once, dispatching any pending requests.
265    ///
266    /// Returns the number of requests processed in this pass.
267    pub fn poll_once(&self, handler: &impl crate::RpcHandler) -> Result<u32> {
268        match self.lease.status() {
269            LeaseStatus::Active => {}
270            LeaseStatus::Revoked => return Err(FabricError::Revoked),
271            _ => return Err(FabricError::LeaseExpired),
272        }
273
274        let mut processed = 0u32;
275        for i in 0..self.num_slots {
276            let off = slot_offset(i, self.slot_size);
277            let slot_data = self.lease.mem().read(off, self.slot_size as u32)?;
278
279            let sh = match decode_slot_header(&slot_data) {
280                Some(h) => h,
281                None => continue,
282            };
283
284            if sh.status != REQUEST_READY {
285                continue;
286            }
287
288            // Validate payload_len fits within the slot.
289            let max_payload = self.slot_size - SLOT_HEADER_SIZE;
290            if sh.payload_len as usize > max_payload {
291                // Malformed slot -- reset to EMPTY.
292                let buf = encode_slot(0, 0, EMPTY, &[]);
293                self.lease.mem().write(off, &buf)?;
294                continue;
295            }
296
297            // Mark as PROCESSING (single contiguous write preserving payload).
298            let payload_len = sh.payload_len as usize;
299            let payload = if payload_len > 0 && slot_data.len() >= SLOT_HEADER_SIZE + payload_len {
300                &slot_data[SLOT_HEADER_SIZE..SLOT_HEADER_SIZE + payload_len]
301            } else {
302                &[]
303            };
304            let processing_buf = encode_slot(sh.request_id, sh.method_id, PROCESSING, payload);
305            self.lease.mem().write(off, &processing_buf)?;
306
307            // Dispatch.
308            match handler.handle(sh.method_id, payload) {
309                Ok(resp) => {
310                    let buf = encode_slot(sh.request_id, 0, RESPONSE_READY, &resp);
311                    self.lease.mem().write(off, &buf)?;
312                }
313                Err(_) => {
314                    let buf = encode_slot(sh.request_id, 0, ERROR, &[]);
315                    self.lease.mem().write(off, &buf)?;
316                }
317            }
318            processed += 1;
319        }
320
321        Ok(processed)
322    }
323
324    /// Call [`poll_once`](Self::poll_once) repeatedly, up to `n` iterations.
325    ///
326    /// Returns the total number of requests processed across all iterations.
327    pub fn serve_n(&self, handler: &impl crate::RpcHandler, n: u32) -> Result<u32> {
328        let mut total = 0u32;
329        for _ in 0..n {
330            total += self.poll_once(handler)?;
331        }
332        Ok(total)
333    }
334}
335
336// ---------------------------------------------------------------------------
337// Tests
338// ---------------------------------------------------------------------------
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343    use crate::RpcHandler;
344    use alloc::vec;
345    use grafos_std::error::FabricError;
346    use grafos_std::host;
347    use grafos_std::mem::MemBuilder;
348
349    const NUM_SLOTS: usize = 4;
350    const SLOT_PAYLOAD: usize = 256;
351    const ARENA_SIZE: u64 = (SLOT_HEADER_SIZE + SLOT_PAYLOAD) as u64 * NUM_SLOTS as u64;
352
353    /// Echo handler -- returns the payload unchanged.
354    struct Echo;
355    impl RpcHandler for Echo {
356        fn handle(&self, _method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
357            Ok(payload.to_vec())
358        }
359    }
360
361    /// Doubler -- interprets payload as LE u32, returns doubled value.
362    struct Doubler;
363    impl RpcHandler for Doubler {
364        fn handle(&self, _method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
365            if payload.len() < 4 {
366                return Err(FabricError::IoError(-200));
367            }
368            let val = u32::from_le_bytes(payload[..4].try_into().unwrap());
369            Ok((val * 2).to_le_bytes().to_vec())
370        }
371    }
372
373    /// Always-error handler.
374    struct Failing;
375    impl RpcHandler for Failing {
376        fn handle(&self, _method_id: u32, _payload: &[u8]) -> Result<Vec<u8>> {
377            Err(FabricError::Unsupported)
378        }
379    }
380
381    /// Helper: acquire a single lease shared between client and server.
382    fn setup() -> MemLease {
383        host::reset_mock();
384        host::mock_set_fbmu_arena_size(ARENA_SIZE.max(65536));
385        MemBuilder::new()
386            .min_bytes(ARENA_SIZE)
387            .acquire()
388            .expect("lease")
389    }
390
391    /// Write a request into a slot using a single contiguous write.
392    fn write_request(
393        lease: &MemLease,
394        slot_idx: usize,
395        request_id: u64,
396        method_id: u32,
397        payload: &[u8],
398    ) {
399        let off = slot_offset(slot_idx, SLOT_HEADER_SIZE + SLOT_PAYLOAD);
400        let buf = encode_slot(request_id, method_id, REQUEST_READY, payload);
401        lease.mem().write(off, &buf).unwrap();
402    }
403
404    /// Read slot header + payload back.
405    fn read_slot(lease: &MemLease, slot_idx: usize) -> (SlotHeader, Vec<u8>) {
406        let off = slot_offset(slot_idx, SLOT_HEADER_SIZE + SLOT_PAYLOAD);
407        let data = lease
408            .mem()
409            .read(off, (SLOT_HEADER_SIZE + SLOT_PAYLOAD) as u32)
410            .unwrap();
411        let sh = decode_slot_header(&data).unwrap();
412        let payload_len = sh.payload_len as usize;
413        let payload = if payload_len > 0 && data.len() >= SLOT_HEADER_SIZE + payload_len {
414            data[SLOT_HEADER_SIZE..SLOT_HEADER_SIZE + payload_len].to_vec()
415        } else {
416            Vec::new()
417        };
418        (sh, payload)
419    }
420
421    #[test]
422    fn single_call_roundtrip() {
423        let lease = setup();
424        let _client =
425            RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD).with_max_poll_iterations(10);
426        let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
427
428        let req = b"hello mux";
429        write_request(&lease, 0, 1, 42, req);
430
431        // Server processes.
432        let n = server.poll_once(&Echo).unwrap();
433        assert_eq!(n, 1);
434
435        // Verify the response.
436        let (sh, resp) = read_slot(&lease, 0);
437        assert_eq!(sh.status, RESPONSE_READY);
438        assert_eq!(sh.request_id, 1);
439        assert_eq!(sh.payload_len, req.len() as u32);
440        assert_eq!(&resp, req);
441    }
442
443    #[test]
444    fn multi_client_different_slots() {
445        let lease = setup();
446        let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
447
448        let req_a = b"request-a";
449        let req_b = b"request-b";
450        write_request(&lease, 0, 10, 1, req_a);
451        write_request(&lease, 1, 20, 2, req_b);
452
453        // Server processes both in one poll_once pass.
454        let n = server.poll_once(&Echo).unwrap();
455        assert_eq!(n, 2);
456
457        // Verify both responses.
458        let (sh_a, resp_a) = read_slot(&lease, 0);
459        assert_eq!(sh_a.status, RESPONSE_READY);
460        assert_eq!(sh_a.request_id, 10);
461        assert_eq!(&resp_a, req_a);
462
463        let (sh_b, resp_b) = read_slot(&lease, 1);
464        assert_eq!(sh_b.status, RESPONSE_READY);
465        assert_eq!(sh_b.request_id, 20);
466        assert_eq!(&resp_b, req_b);
467    }
468
469    #[test]
470    fn all_slots_busy_returns_capacity_exceeded() {
471        let lease = setup();
472        let client = RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
473
474        // Fill every slot with a PROCESSING status.
475        for i in 0..NUM_SLOTS {
476            let off = slot_offset(i, SLOT_HEADER_SIZE + SLOT_PAYLOAD);
477            let buf = encode_slot(i as u64 + 1, 0, PROCESSING, &[]);
478            lease.mem().write(off, &buf).unwrap();
479        }
480
481        // Client should fail to find a free slot.
482        let result = client.call(0, b"should fail");
483        assert_eq!(result.unwrap_err(), FabricError::CapacityExceeded);
484    }
485
486    #[test]
487    fn timeout_when_server_never_processes() {
488        let lease = setup();
489        let client = RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD).with_max_poll_iterations(5);
490
491        // Call without any server running -- should time out.
492        let result = client.call(0, b"orphan");
493        assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
494
495        // Verify the slot was cleaned up after timeout.
496        let (sh, _) = read_slot(&lease, 0);
497        assert_eq!(sh.status, EMPTY);
498    }
499
500    #[test]
501    fn malformed_slot_gets_reset() {
502        let lease = setup();
503        let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
504
505        // Write a slot with payload_len larger than the slot capacity.
506        let off = slot_offset(0, SLOT_HEADER_SIZE + SLOT_PAYLOAD);
507        let mut buf = encode_slot(1, 0, REQUEST_READY, &[]);
508        // Overwrite payload_len to a bad value.
509        let bad_len = (SLOT_PAYLOAD + 100) as u32;
510        buf[16..20].copy_from_slice(&bad_len.to_le_bytes());
511        lease.mem().write(off, &buf).unwrap();
512
513        // Server should detect and reset the malformed slot.
514        let n = server.poll_once(&Echo).unwrap();
515        assert_eq!(n, 0); // malformed slot is not counted as processed
516
517        // Verify the slot was reset to EMPTY.
518        let (sh, _) = read_slot(&lease, 0);
519        assert_eq!(sh.status, EMPTY);
520    }
521
522    #[test]
523    fn server_error_propagates_to_slot() {
524        let lease = setup();
525        let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
526
527        write_request(&lease, 0, 1, 0, &[0xAB, 0xCD]);
528
529        // Server returns an error.
530        let n = server.poll_once(&Failing).unwrap();
531        assert_eq!(n, 1);
532
533        // Verify ERROR status was written.
534        let (sh, _) = read_slot(&lease, 0);
535        assert_eq!(sh.status, ERROR);
536        assert_eq!(sh.request_id, 1);
537    }
538
539    #[test]
540    fn serve_n_processes_multiple_rounds() {
541        let lease = setup();
542        let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
543
544        let payload = 7u32.to_le_bytes();
545        write_request(&lease, 0, 1, 0, &payload);
546
547        // serve_n with 3 iterations -- should handle 1 request on the first pass.
548        let total = server.serve_n(&Doubler, 3).unwrap();
549        assert_eq!(total, 1);
550
551        // Verify response.
552        let (sh, resp) = read_slot(&lease, 0);
553        assert_eq!(sh.status, RESPONSE_READY);
554        let val = u32::from_le_bytes(resp[..4].try_into().unwrap());
555        assert_eq!(val, 14); // 7 * 2
556    }
557
558    #[test]
559    fn empty_payload_roundtrip() {
560        let lease = setup();
561        let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
562
563        write_request(&lease, 0, 1, 0, &[]);
564
565        let n = server.poll_once(&Echo).unwrap();
566        assert_eq!(n, 1);
567
568        let (sh, _) = read_slot(&lease, 0);
569        assert_eq!(sh.status, RESPONSE_READY);
570        assert_eq!(sh.payload_len, 0);
571    }
572
573    #[test]
574    fn payload_too_large_rejected() {
575        let lease = setup();
576        let client = RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
577
578        let big = vec![0xAA; SLOT_PAYLOAD + 1];
579        let result = client.call(0, &big);
580        assert_eq!(result.unwrap_err(), FabricError::CapacityExceeded);
581    }
582
583    #[test]
584    fn rpc_mux_client_call_on_revoked_lease() {
585        let lease = setup();
586        let client = RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD).with_max_poll_iterations(5);
587
588        // Free the lease to revoke it.
589        lease.free();
590
591        let result = client.call(0, b"should fail");
592        assert_eq!(result.unwrap_err(), FabricError::Revoked);
593    }
594
595    #[test]
596    fn rpc_mux_server_poll_on_revoked_lease() {
597        let lease = setup();
598        let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
599
600        // Free the lease to revoke it.
601        lease.free();
602
603        let result = server.poll_once(&Echo);
604        assert_eq!(result.unwrap_err(), FabricError::Revoked);
605    }
606
607    #[test]
608    fn slot_offset_calculation() {
609        let slot_size = SLOT_HEADER_SIZE + SLOT_PAYLOAD;
610        assert_eq!(slot_offset(0, slot_size), 0);
611        assert_eq!(slot_offset(1, slot_size), slot_size as u64);
612        assert_eq!(slot_offset(3, slot_size), (3 * slot_size) as u64);
613    }
614}