grafos_rpc/lib.rs
1//! grafos-rpc -- Lease-backed RPC framework for grafOS.
2//!
3//! The hot path is a leased shared memory write + read instead of a network
4//! round trip. Client writes a request to a leased memory region; server
5//! reads from the same region and writes back a response. The underlying
6//! transport is FBMU (Fabric Bootstrap Memory Unit), so co-located services
7//! exchange calls without touching the network stack.
8//!
9//! # Quick start
10//!
11//! ```rust
12//! use grafos_rpc::{RpcHandler, RpcClient, RpcServer};
13//! use grafos_std::error::{FabricError, Result};
14//!
15//! // 1. Define your service handler.
16//! struct Adder;
17//! impl RpcHandler for Adder {
18//! fn handle(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
19//! let args: (f64, f64) = postcard::from_bytes(payload)
20//! .map_err(|_| FabricError::IoError(-200))?;
21//! postcard::to_allocvec(&(args.0 + args.1))
22//! .map_err(|_| FabricError::IoError(-201))
23//! }
24//! }
25//!
26//! // 2. Acquire a lease (mock backend for native testing).
27//! grafos_std::host::reset_mock();
28//! grafos_std::host::mock_set_fbmu_arena_size(65536);
29//! let lease = grafos_std::mem::MemBuilder::new().min_bytes(65536).acquire()?;
30//!
31//! // 3. Create client and server on the same lease.
32//! let mut client = RpcClient::new(&lease);
33//! let server = RpcServer::new(&lease);
34//! # Ok::<(), FabricError>(())
35//! ```
36//!
37//! # Architecture
38//!
39//! The shared memory arena is divided into two regions at fixed offsets:
40//!
41//! - **Request region** (offset 0): Written by the client, read by the server.
42//! - **Response region** (offset 32768): Written by the server, read by the
43//! client.
44//!
45//! Each region has a status byte that drives the protocol state machine:
46//!
47//! | Value | Constant | Meaning |
48//! |-------|----------|---------|
49//! | 0 | [`EMPTY`] | Slot is idle |
50//! | 1 | [`REQUEST_READY`] | Client has written a request |
51//! | 2 | [`PROCESSING`] | Server is handling the request |
52//! | 3 | [`RESPONSE_READY`] | Server has written the response |
53//! | 4 | [`ERROR`] | Server encountered an error |
54//!
55//! # Wire layout
56//!
57//! Request region (written at offset 0):
58//! ```text
59//! [request_id: u64 LE (8B)] [method_id: u32 LE (4B)] [status: u8 (1B)]
60//! [payload_len: u32 LE (4B)] [payload: [u8; payload_len]]
61//! ```
62//!
63//! Response region (written at offset 32768):
64//! ```text
65//! [request_id: u64 LE (8B)] [status: u8 (1B)]
66//! [payload_len: u32 LE (4B)] [payload: [u8; payload_len]]
67//! ```
68//!
69//! All integers are little-endian. Payloads are serialized with
70//! [postcard](https://docs.rs/postcard) (compact, `no_std`-friendly).
71//! Maximum payload size is 30 KiB.
72
73#![cfg_attr(not(feature = "std"), no_std)]
74
75extern crate alloc;
76#[cfg(test)]
77extern crate self as grafos_rpc;
78
79use alloc::vec::Vec;
80
81use grafos_std::error::{FabricError, Result};
82use grafos_std::lease::LeaseStatus;
83use grafos_std::mem::MemLease;
84
85pub mod mux;
86pub mod transport;
87pub use grafos_rpc_macros::grafos_rpc_service;
88pub use mux::{RpcMuxClient, RpcMuxServer};
89pub use transport::{
90 AutoTransport, QuicTransport, RpcTransport, ServiceHandlerAdapter, SharedMemoryTransport,
91};
92
93// ---------------------------------------------------------------------------
94// Status constants
95// ---------------------------------------------------------------------------
96
97/// Slot is idle — no pending request or response.
98pub const EMPTY: u8 = 0;
99/// Client has written a request; server should process it.
100pub const REQUEST_READY: u8 = 1;
101/// Server is currently processing the request.
102pub const PROCESSING: u8 = 2;
103/// Server has written the response; client can read it.
104pub const RESPONSE_READY: u8 = 3;
105/// Server encountered an error processing the request.
106pub const ERROR: u8 = 4;
107
108// ---------------------------------------------------------------------------
109// Layout constants
110// ---------------------------------------------------------------------------
111
112/// Byte offset within the arena where the request region starts.
113const REQUEST_OFFSET: u64 = 0;
114
115/// Byte offset within the arena where the response region starts.
116/// Placed at 32 KiB boundary to separate from request region.
117const RESPONSE_OFFSET: u64 = 32768;
118
119/// Header size for request: request_id(8) + method_id(4) + status(1) + payload_len(4) = 17
120const REQUEST_HEADER_SIZE: usize = 17;
121
122/// Header size for response: request_id(8) + status(1) + payload_len(4) = 13
123const RESPONSE_HEADER_SIZE: usize = 13;
124
125/// Maximum payload size (limited by arena region size).
126const MAX_PAYLOAD_SIZE: usize = 30 * 1024; // 30 KiB
127
128/// Default poll iteration limit (used as timeout proxy in tests).
129const DEFAULT_MAX_POLL_ITERATIONS: u64 = 1_000_000;
130
131// ---------------------------------------------------------------------------
132// Request/response encoding
133// ---------------------------------------------------------------------------
134
135fn encode_request(request_id: u64, method_id: u32, status: u8, payload: &[u8]) -> Vec<u8> {
136 let mut buf = Vec::with_capacity(REQUEST_HEADER_SIZE + payload.len());
137 buf.extend_from_slice(&request_id.to_le_bytes());
138 buf.extend_from_slice(&method_id.to_le_bytes());
139 buf.push(status);
140 buf.extend_from_slice(&(payload.len() as u32).to_le_bytes());
141 buf.extend_from_slice(payload);
142 buf
143}
144
145fn decode_request(data: &[u8]) -> Option<(u64, u32, u8, &[u8])> {
146 if data.len() < REQUEST_HEADER_SIZE {
147 return None;
148 }
149 let request_id = u64::from_le_bytes(data[0..8].try_into().ok()?);
150 let method_id = u32::from_le_bytes(data[8..12].try_into().ok()?);
151 let status = data[12];
152 let payload_len = u32::from_le_bytes(data[13..17].try_into().ok()?) as usize;
153 if data.len() < REQUEST_HEADER_SIZE + payload_len {
154 return None;
155 }
156 Some((request_id, method_id, status, &data[17..17 + payload_len]))
157}
158
159fn encode_response(request_id: u64, status: u8, payload: &[u8]) -> Vec<u8> {
160 let mut buf = Vec::with_capacity(RESPONSE_HEADER_SIZE + payload.len());
161 buf.extend_from_slice(&request_id.to_le_bytes());
162 buf.push(status);
163 buf.extend_from_slice(&(payload.len() as u32).to_le_bytes());
164 buf.extend_from_slice(payload);
165 buf
166}
167
168fn decode_response(data: &[u8]) -> Option<(u64, u8, &[u8])> {
169 if data.len() < RESPONSE_HEADER_SIZE {
170 return None;
171 }
172 let request_id = u64::from_le_bytes(data[0..8].try_into().ok()?);
173 let status = data[8];
174 let payload_len = u32::from_le_bytes(data[9..13].try_into().ok()?) as usize;
175 if data.len() < RESPONSE_HEADER_SIZE + payload_len {
176 return None;
177 }
178 Some((request_id, status, &data[13..13 + payload_len]))
179}
180
181// ---------------------------------------------------------------------------
182// RpcHandler trait
183// ---------------------------------------------------------------------------
184
185/// Trait for handling RPC method dispatches on the server side.
186///
187/// Implement this for your service type to map `method_id` values to
188/// handler logic. The `payload` is the postcard-serialized request arguments;
189/// the return value should be the postcard-serialized response.
190///
191/// # Example
192///
193/// ```rust
194/// use grafos_rpc::RpcHandler;
195/// use grafos_std::error::{FabricError, Result};
196/// use serde::{Serialize, Deserialize};
197///
198/// const METHOD_GREET: u32 = 0;
199///
200/// #[derive(Serialize, Deserialize)]
201/// struct GreetRequest { name: String }
202///
203/// struct Greeter;
204///
205/// impl RpcHandler for Greeter {
206/// fn handle(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
207/// match method_id {
208/// METHOD_GREET => {
209/// let req: GreetRequest = postcard::from_bytes(payload)
210/// .map_err(|_| FabricError::IoError(-200))?;
211/// let greeting = format!("hello, {}!", req.name);
212/// postcard::to_allocvec(&greeting)
213/// .map_err(|_| FabricError::IoError(-201))
214/// }
215/// _ => Err(FabricError::Unsupported),
216/// }
217/// }
218/// }
219/// ```
220pub trait RpcHandler {
221 /// Dispatch a single RPC call.
222 ///
223 /// - `method_id`: Identifies which method is being called (by convention,
224 /// `u32` constants starting at 0).
225 /// - `payload`: Postcard-serialized request arguments.
226 ///
227 /// Returns postcard-serialized response bytes on success. Return
228 /// `Err(FabricError::Unsupported)` for unknown method IDs.
229 fn handle(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>>;
230}
231
232// ---------------------------------------------------------------------------
233// RpcClient
234// ---------------------------------------------------------------------------
235
236/// Client side of the shared-memory RPC protocol.
237///
238/// Writes serialized requests into the lease's request region (offset 0),
239/// then polls the response region (offset 32768) until the server has
240/// written back a result.
241///
242/// Each call assigns a monotonically increasing `request_id` so that
243/// stale responses from a previous call are ignored.
244///
245/// # Example
246///
247/// ```rust
248/// use grafos_rpc::RpcClient;
249/// use grafos_std::mem::MemBuilder;
250///
251/// # grafos_std::host::reset_mock();
252/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
253/// let lease = MemBuilder::new().min_bytes(65536).acquire()?;
254/// let mut client = RpcClient::new(&lease)
255/// .with_max_poll_iterations(100); // short timeout for tests
256/// # Ok::<(), grafos_std::error::FabricError>(())
257/// ```
258pub struct RpcClient<'a> {
259 lease: &'a MemLease,
260 next_request_id: u64,
261 max_poll_iterations: u64,
262}
263
264impl<'a> RpcClient<'a> {
265 /// Create a new RPC client backed by the given memory lease.
266 ///
267 /// Initializes with `request_id = 1` and a default poll limit of
268 /// 1,000,000 iterations. Use [`with_max_poll_iterations`](Self::with_max_poll_iterations)
269 /// to adjust the timeout behavior.
270 pub fn new(lease: &'a MemLease) -> Self {
271 RpcClient {
272 lease,
273 next_request_id: 1,
274 max_poll_iterations: DEFAULT_MAX_POLL_ITERATIONS,
275 }
276 }
277
278 /// Set the maximum number of poll iterations before [`call`](Self::call)
279 /// returns `Err(FabricError::LeaseExpired)`.
280 ///
281 /// A low value (e.g. 10) is useful in tests to fail fast when no server
282 /// is running. The default is 1,000,000.
283 pub fn with_max_poll_iterations(mut self, n: u64) -> Self {
284 self.max_poll_iterations = n;
285 self
286 }
287
288 /// Perform an RPC call.
289 ///
290 /// Serializes `req` with postcard, writes it to the request region with
291 /// `method_id`, then polls the response region until the server responds.
292 ///
293 /// # Errors
294 ///
295 /// - `FabricError::IoError(-100)` -- serialization of `req` failed.
296 /// - [`FabricError::CapacityExceeded`] -- serialized payload exceeds 30 KiB.
297 /// - [`FabricError::LeaseExpired`] -- poll limit exceeded (timeout).
298 /// - `FabricError::IoError(-101)` -- deserialization of response failed.
299 /// - `FabricError::IoError(-102)` -- server returned [`ERROR`] status.
300 /// - Other [`FabricError`] variants from the underlying memory operations.
301 pub fn call<Req, Resp>(&mut self, method_id: u32, req: &Req) -> Result<Resp>
302 where
303 Req: serde::Serialize,
304 Resp: serde::de::DeserializeOwned,
305 {
306 match self.lease.status() {
307 LeaseStatus::Active => {}
308 LeaseStatus::Revoked => return Err(FabricError::Revoked),
309 _ => return Err(FabricError::LeaseExpired),
310 }
311 let request_id = self.next_request_id;
312 self.next_request_id += 1;
313
314 // Serialize the request payload.
315 let payload = postcard::to_allocvec(req).map_err(|_| FabricError::IoError(-100))?;
316 if payload.len() > MAX_PAYLOAD_SIZE {
317 return Err(FabricError::CapacityExceeded);
318 }
319
320 // Write request to shared memory.
321 let req_buf = encode_request(request_id, method_id, REQUEST_READY, &payload);
322 self.lease.mem().write(REQUEST_OFFSET, &req_buf)?;
323
324 // Poll response region until server writes back.
325 for _ in 0..self.max_poll_iterations {
326 let resp_data = self.lease.mem().read(
327 RESPONSE_OFFSET,
328 (RESPONSE_HEADER_SIZE + MAX_PAYLOAD_SIZE) as u32,
329 )?;
330
331 if let Some((resp_id, status, resp_payload)) = decode_response(&resp_data) {
332 if resp_id == request_id {
333 match status {
334 RESPONSE_READY => {
335 // Clear the response region.
336 let clear = encode_response(0, EMPTY, &[]);
337 self.lease.mem().write(RESPONSE_OFFSET, &clear)?;
338
339 let result: Resp = postcard::from_bytes(resp_payload)
340 .map_err(|_| FabricError::IoError(-101))?;
341 return Ok(result);
342 }
343 ERROR => {
344 // Read the error message from payload, then clear.
345 let clear = encode_response(0, EMPTY, &[]);
346 self.lease.mem().write(RESPONSE_OFFSET, &clear)?;
347
348 return Err(FabricError::IoError(-102));
349 }
350 _ => {
351 // Still processing or not ready yet — continue polling.
352 }
353 }
354 }
355 }
356 }
357
358 Err(FabricError::LeaseExpired)
359 }
360}
361
362// ---------------------------------------------------------------------------
363// RpcServer
364// ---------------------------------------------------------------------------
365
366/// Server side of the shared-memory RPC protocol.
367///
368/// Polls the request region (offset 0) for incoming calls, dispatches them
369/// via the provided [`RpcHandler`], and writes responses to the response
370/// region (offset 32768).
371///
372/// # Example
373///
374/// ```rust
375/// use grafos_rpc::{RpcHandler, RpcServer};
376/// use grafos_std::error::{FabricError, Result};
377/// use grafos_std::mem::MemBuilder;
378///
379/// struct Echo;
380/// impl RpcHandler for Echo {
381/// fn handle(&self, _method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
382/// Ok(payload.to_vec())
383/// }
384/// }
385///
386/// # grafos_std::host::reset_mock();
387/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
388/// let lease = MemBuilder::new().min_bytes(65536).acquire()?;
389/// let server = RpcServer::new(&lease);
390///
391/// // In your event loop:
392/// // let handled = server.poll_once(&Echo)?;
393/// # Ok::<(), FabricError>(())
394/// ```
395pub struct RpcServer<'a> {
396 lease: &'a MemLease,
397}
398
399impl<'a> RpcServer<'a> {
400 /// Create a new RPC server backed by the given memory lease.
401 pub fn new(lease: &'a MemLease) -> Self {
402 RpcServer { lease }
403 }
404
405 /// Poll for and handle a single request.
406 ///
407 /// Reads the request region. If a request with status [`REQUEST_READY`]
408 /// is present:
409 ///
410 /// 1. Sets status to [`PROCESSING`].
411 /// 2. Calls `handler.handle(method_id, payload)`.
412 /// 3. Writes [`RESPONSE_READY`] (or [`ERROR`]) to the response region.
413 /// 4. Clears the request region to [`EMPTY`].
414 /// 5. Returns `Ok(true)`.
415 ///
416 /// If no request is pending, returns `Ok(false)` without side effects.
417 pub fn poll_once<H: RpcHandler>(&self, handler: &H) -> Result<bool> {
418 match self.lease.status() {
419 LeaseStatus::Active => {}
420 LeaseStatus::Revoked => return Err(FabricError::Revoked),
421 _ => return Err(FabricError::LeaseExpired),
422 }
423
424 let req_data = self.lease.mem().read(
425 REQUEST_OFFSET,
426 (REQUEST_HEADER_SIZE + MAX_PAYLOAD_SIZE) as u32,
427 )?;
428
429 let (request_id, method_id, status, payload) = match decode_request(&req_data) {
430 Some(decoded) => decoded,
431 None => return Ok(false),
432 };
433
434 if status != REQUEST_READY {
435 return Ok(false);
436 }
437
438 // Mark as PROCESSING.
439 let processing_buf = encode_request(request_id, method_id, PROCESSING, payload);
440 self.lease.mem().write(REQUEST_OFFSET, &processing_buf)?;
441
442 // Dispatch to handler.
443 match handler.handle(method_id, payload) {
444 Ok(resp_payload) => {
445 let resp_buf = encode_response(request_id, RESPONSE_READY, &resp_payload);
446 self.lease.mem().write(RESPONSE_OFFSET, &resp_buf)?;
447 }
448 Err(_) => {
449 let resp_buf = encode_response(request_id, ERROR, &[]);
450 self.lease.mem().write(RESPONSE_OFFSET, &resp_buf)?;
451 }
452 }
453
454 // Clear request region.
455 let clear = encode_request(0, 0, EMPTY, &[]);
456 self.lease.mem().write(REQUEST_OFFSET, &clear)?;
457
458 Ok(true)
459 }
460
461 /// Serve requests in a loop for up to `max_iterations` polls.
462 ///
463 /// Returns the number of requests actually handled (i.e. the number of
464 /// times [`poll_once`](Self::poll_once) returned `Ok(true)`).
465 ///
466 /// This is a convenience for testing where you know a fixed number of
467 /// calls will arrive. In production, call [`poll_once`](Self::poll_once)
468 /// from your own event loop.
469 pub fn serve_n<H: RpcHandler>(&self, handler: &H, max_iterations: u64) -> Result<u64> {
470 let mut handled = 0u64;
471 for _ in 0..max_iterations {
472 if self.poll_once(handler)? {
473 handled += 1;
474 }
475 }
476 Ok(handled)
477 }
478}
479
480// ---------------------------------------------------------------------------
481// Tests
482// ---------------------------------------------------------------------------
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use grafos_std::host;
488 use grafos_std::mem::MemBuilder;
489 use serde::{Deserialize, Serialize};
490
491 // -- Calculator service (manual, no proc macro) --
492
493 /// Request arguments for Calculator methods.
494 #[derive(Serialize, Deserialize)]
495 struct CalcArgs {
496 a: f64,
497 b: f64,
498 }
499
500 const METHOD_ADD: u32 = 0;
501 const METHOD_MULTIPLY: u32 = 1;
502
503 /// Calculator service implementation.
504 struct Calculator;
505
506 impl RpcHandler for Calculator {
507 fn handle(&self, method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
508 match method_id {
509 METHOD_ADD => {
510 let args: CalcArgs =
511 postcard::from_bytes(payload).map_err(|_| FabricError::IoError(-200))?;
512 let result = args.a + args.b;
513 postcard::to_allocvec(&result).map_err(|_| FabricError::IoError(-201))
514 }
515 METHOD_MULTIPLY => {
516 let args: CalcArgs =
517 postcard::from_bytes(payload).map_err(|_| FabricError::IoError(-200))?;
518 let result = args.a * args.b;
519 postcard::to_allocvec(&result).map_err(|_| FabricError::IoError(-201))
520 }
521 _ => Err(FabricError::Unsupported),
522 }
523 }
524 }
525
526 /// Helper: run a single call through client → server → client in one thread.
527 ///
528 /// This simulates the protocol by interleaving client write, server poll,
529 /// and client read, since both share the same mock memory.
530 fn call_roundtrip<Req, Resp>(
531 lease: &MemLease,
532 handler: &impl RpcHandler,
533 method_id: u32,
534 req: &Req,
535 ) -> Result<Resp>
536 where
537 Req: serde::Serialize,
538 Resp: serde::de::DeserializeOwned,
539 {
540 // Step 1: Client writes request.
541 // Use a fixed request_id (1) since call_roundtrip is stateless.
542 let request_id: u64 = 1;
543
544 let payload = postcard::to_allocvec(req).map_err(|_| FabricError::IoError(-100))?;
545 let req_buf = encode_request(request_id, method_id, REQUEST_READY, &payload);
546 lease.mem().write(REQUEST_OFFSET, &req_buf)?;
547
548 // Step 2: Server processes.
549 let server = RpcServer::new(lease);
550 let handled = server.poll_once(handler)?;
551 assert!(handled);
552
553 // Step 3: Client reads response.
554 let resp_data = lease.mem().read(
555 RESPONSE_OFFSET,
556 (RESPONSE_HEADER_SIZE + MAX_PAYLOAD_SIZE) as u32,
557 )?;
558 let (resp_id, status, resp_payload) = decode_response(&resp_data).unwrap();
559 assert_eq!(resp_id, request_id);
560 assert_eq!(status, RESPONSE_READY);
561
562 let result: Resp =
563 postcard::from_bytes(resp_payload).map_err(|_| FabricError::IoError(-101))?;
564
565 // Clear response.
566 let clear = encode_response(0, EMPTY, &[]);
567 lease.mem().write(RESPONSE_OFFSET, &clear)?;
568
569 Ok(result)
570 }
571
572 #[test]
573 fn calculator_add_roundtrip() {
574 host::reset_mock();
575 host::mock_set_fbmu_arena_size(65536);
576
577 let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
578 let calc = Calculator;
579
580 let result: f64 =
581 call_roundtrip(&lease, &calc, METHOD_ADD, &CalcArgs { a: 1.5, b: 2.5 }).unwrap();
582 assert!((result - 4.0).abs() < f64::EPSILON);
583 }
584
585 #[test]
586 fn calculator_multiply_roundtrip() {
587 host::reset_mock();
588 host::mock_set_fbmu_arena_size(65536);
589
590 let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
591 let calc = Calculator;
592
593 let result: f64 =
594 call_roundtrip(&lease, &calc, METHOD_MULTIPLY, &CalcArgs { a: 3.0, b: 7.0 }).unwrap();
595 assert!((result - 21.0).abs() < f64::EPSILON);
596 }
597
598 #[test]
599 fn unknown_method_returns_error() {
600 host::reset_mock();
601 host::mock_set_fbmu_arena_size(65536);
602
603 let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
604 let calc = Calculator;
605
606 // Write request with unknown method.
607 let payload = postcard::to_allocvec(&CalcArgs { a: 1.0, b: 1.0 }).unwrap();
608 let req_buf = encode_request(1, 99, REQUEST_READY, &payload);
609 lease.mem().write(REQUEST_OFFSET, &req_buf).unwrap();
610
611 let server = RpcServer::new(&lease);
612 server.poll_once(&calc).unwrap();
613
614 // Read response — should be ERROR.
615 let resp_data = lease
616 .mem()
617 .read(
618 RESPONSE_OFFSET,
619 (RESPONSE_HEADER_SIZE + MAX_PAYLOAD_SIZE) as u32,
620 )
621 .unwrap();
622 let (resp_id, status, _) = decode_response(&resp_data).unwrap();
623 assert_eq!(resp_id, 1);
624 assert_eq!(status, ERROR);
625 }
626
627 #[test]
628 fn timeout_when_no_server() {
629 host::reset_mock();
630 host::mock_set_fbmu_arena_size(65536);
631
632 let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
633
634 // Client sends request but no server ever processes it.
635 let mut client = RpcClient::new(&lease).with_max_poll_iterations(10);
636 let result: Result<f64> = client.call(METHOD_ADD, &CalcArgs { a: 1.0, b: 2.0 });
637
638 assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
639 }
640
641 #[test]
642 fn server_poll_returns_false_when_empty() {
643 host::reset_mock();
644 host::mock_set_fbmu_arena_size(65536);
645
646 let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
647 let calc = Calculator;
648
649 // Clear memory — write EMPTY status to request region.
650 let clear = encode_request(0, 0, EMPTY, &[]);
651 lease.mem().write(REQUEST_OFFSET, &clear).unwrap();
652
653 let server = RpcServer::new(&lease);
654 assert!(!server.poll_once(&calc).unwrap());
655 }
656
657 #[test]
658 fn multiple_sequential_calls() {
659 host::reset_mock();
660 host::mock_set_fbmu_arena_size(65536);
661
662 let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
663 let calc = Calculator;
664
665 // Call 1: add
666 let r1: f64 =
667 call_roundtrip(&lease, &calc, METHOD_ADD, &CalcArgs { a: 10.0, b: 20.0 }).unwrap();
668 assert!((r1 - 30.0).abs() < f64::EPSILON);
669
670 // Call 2: multiply
671 let r2: f64 =
672 call_roundtrip(&lease, &calc, METHOD_MULTIPLY, &CalcArgs { a: 5.0, b: 6.0 }).unwrap();
673 assert!((r2 - 30.0).abs() < f64::EPSILON);
674
675 // Call 3: add again
676 let r3: f64 =
677 call_roundtrip(&lease, &calc, METHOD_ADD, &CalcArgs { a: -1.0, b: 1.0 }).unwrap();
678 assert!(r3.abs() < f64::EPSILON);
679 }
680
681 #[test]
682 fn encode_decode_request_roundtrip() {
683 let payload = b"hello";
684 let encoded = encode_request(42, 7, REQUEST_READY, payload);
685 let (rid, mid, status, p) = decode_request(&encoded).unwrap();
686 assert_eq!(rid, 42);
687 assert_eq!(mid, 7);
688 assert_eq!(status, REQUEST_READY);
689 assert_eq!(p, payload);
690 }
691
692 #[test]
693 fn encode_decode_response_roundtrip() {
694 let payload = b"world";
695 let encoded = encode_response(99, RESPONSE_READY, payload);
696 let (rid, status, p) = decode_response(&encoded).unwrap();
697 assert_eq!(rid, 99);
698 assert_eq!(status, RESPONSE_READY);
699 assert_eq!(p, payload);
700 }
701
702 #[test]
703 fn decode_request_rejects_truncated() {
704 assert!(decode_request(&[0; 10]).is_none());
705 assert!(decode_request(&[]).is_none());
706 }
707
708 #[test]
709 fn decode_response_rejects_truncated() {
710 assert!(decode_response(&[0; 5]).is_none());
711 assert!(decode_response(&[]).is_none());
712 }
713
714 // -- Proc macro generated service test --
715
716 #[grafos_rpc_service]
717 pub trait Greeter {
718 fn greet(&self, name: String) -> String;
719 fn add(&self, a: f64, b: f64) -> f64;
720 }
721
722 struct GreeterImpl;
723
724 impl GreeterServer for GreeterImpl {
725 fn greet(&self, name: String) -> String {
726 alloc::format!("hello, {}!", name)
727 }
728
729 fn add(&self, a: f64, b: f64) -> f64 {
730 a + b
731 }
732 }
733
734 #[test]
735 fn proc_macro_generates_server_dispatch() {
736 let svc = GreeterImpl;
737
738 // method_id 0 = greet
739 let payload = postcard::to_allocvec(&("world".to_string(),)).unwrap();
740 let resp = svc.dispatch(0, &payload).unwrap();
741 let greeting: String = postcard::from_bytes(&resp).unwrap();
742 assert_eq!(greeting, "hello, world!");
743
744 // method_id 1 = add
745 let payload = postcard::to_allocvec(&(1.5f64, 2.5f64)).unwrap();
746 let resp = svc.dispatch(1, &payload).unwrap();
747 let sum: f64 = postcard::from_bytes(&resp).unwrap();
748 assert!((sum - 4.0).abs() < f64::EPSILON);
749 }
750
751 #[test]
752 fn proc_macro_unknown_method_returns_unsupported() {
753 let svc = GreeterImpl;
754 let result = svc.dispatch(99, &[]);
755 assert_eq!(result.unwrap_err(), FabricError::Unsupported);
756 }
757
758 #[test]
759 fn rpc_client_call_on_revoked_lease() {
760 host::reset_mock();
761 host::mock_set_fbmu_arena_size(65536);
762
763 let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
764 let mut client = RpcClient::new(&lease).with_max_poll_iterations(10);
765
766 // Free the lease to revoke it.
767 lease.free();
768
769 let result: Result<f64> = client.call(METHOD_ADD, &CalcArgs { a: 1.0, b: 2.0 });
770 assert_eq!(result.unwrap_err(), FabricError::Revoked);
771 }
772
773 #[test]
774 fn rpc_server_poll_on_revoked_lease() {
775 host::reset_mock();
776 host::mock_set_fbmu_arena_size(65536);
777
778 let lease = MemBuilder::new().min_bytes(65536).acquire().unwrap();
779 let server = RpcServer::new(&lease);
780 let calc = Calculator;
781
782 // Free the lease to revoke it.
783 lease.free();
784
785 let result = server.poll_once(&calc);
786 assert_eq!(result.unwrap_err(), FabricError::Revoked);
787 }
788
789 #[test]
790 fn proc_macro_client_roundtrip_via_quic_transport() {
791 // Use a QuicTransport that dispatches to the GreeterImpl server.
792 let svc = GreeterImpl;
793 let transport = crate::QuicTransport::new(alloc::boxed::Box::new(
794 move |method_id, _req_id, payload| svc.dispatch(method_id, payload),
795 ));
796
797 let client = GreeterClient::new(alloc::boxed::Box::new(transport));
798
799 let greeting = client.greet("fabric".to_string()).unwrap();
800 assert_eq!(greeting, "hello, fabric!");
801
802 let sum = client.add(10.0, 20.0).unwrap();
803 assert!((sum - 30.0).abs() < f64::EPSILON);
804 }
805}