1extern crate alloc;
27
28use alloc::vec::Vec;
29use core::sync::atomic::{AtomicU64, Ordering};
30
31use grafos_std::error::{FabricError, Result};
32use grafos_std::lease::LeaseStatus;
33use grafos_std::mem::MemLease;
34
35use crate::{EMPTY, ERROR, PROCESSING, REQUEST_READY, RESPONSE_READY};
36
37pub const SLOT_HEADER_SIZE: usize = 20;
44
45pub const DEFAULT_NUM_SLOTS: usize = 8;
47
48pub const DEFAULT_SLOT_PAYLOAD_SIZE: usize = 4096;
50
51const DEFAULT_MAX_POLL_ITERATIONS: u32 = 1_000_000;
53
54#[inline]
60fn slot_offset(index: usize, slot_size: usize) -> u64 {
61 (index * slot_size) as u64
62}
63
64fn encode_slot(request_id: u64, method_id: u32, status: u8, payload: &[u8]) -> Vec<u8> {
74 let mut buf = Vec::with_capacity(SLOT_HEADER_SIZE + payload.len());
75 buf.extend_from_slice(&request_id.to_le_bytes());
76 buf.extend_from_slice(&method_id.to_le_bytes());
77 buf.push(status);
78 buf.extend_from_slice(&[0u8; 3]); buf.extend_from_slice(&(payload.len() as u32).to_le_bytes());
80 buf.extend_from_slice(payload);
81 buf
82}
83
84struct SlotHeader {
85 request_id: u64,
86 method_id: u32,
87 status: u8,
88 payload_len: u32,
89}
90
91fn decode_slot_header(data: &[u8]) -> Option<SlotHeader> {
92 if data.len() < SLOT_HEADER_SIZE {
93 return None;
94 }
95 Some(SlotHeader {
96 request_id: u64::from_le_bytes(data[0..8].try_into().ok()?),
97 method_id: u32::from_le_bytes(data[8..12].try_into().ok()?),
98 status: data[12],
99 payload_len: u32::from_le_bytes(data[16..20].try_into().ok()?),
101 })
102}
103
104pub struct RpcMuxClient<'a> {
113 lease: &'a MemLease,
114 num_slots: usize,
115 slot_size: usize,
116 max_poll_iterations: u32,
117 next_request_id: AtomicU64,
118}
119
120impl<'a> RpcMuxClient<'a> {
121 pub fn new(lease: &'a MemLease, num_slots: usize, slot_payload_size: usize) -> Self {
128 RpcMuxClient {
129 lease,
130 num_slots,
131 slot_size: SLOT_HEADER_SIZE + slot_payload_size,
132 max_poll_iterations: DEFAULT_MAX_POLL_ITERATIONS,
133 next_request_id: AtomicU64::new(1),
134 }
135 }
136
137 pub fn with_max_poll_iterations(mut self, n: u32) -> Self {
139 self.max_poll_iterations = n;
140 self
141 }
142
143 pub fn call(&self, method_id: u32, request: &[u8]) -> Result<Vec<u8>> {
156 match self.lease.status() {
157 LeaseStatus::Active => {}
158 LeaseStatus::Revoked => return Err(FabricError::Revoked),
159 _ => return Err(FabricError::LeaseExpired),
160 }
161
162 let max_payload = self.slot_size - SLOT_HEADER_SIZE;
163 if request.len() > max_payload {
164 return Err(FabricError::CapacityExceeded);
165 }
166
167 let slot_idx = self.find_free_slot()?;
169 let off = slot_offset(slot_idx, self.slot_size);
170 let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
171
172 let buf = encode_slot(request_id, method_id, REQUEST_READY, request);
174 self.lease.mem().write(off, &buf)?;
175
176 for _ in 0..self.max_poll_iterations {
178 let slot_data = self.lease.mem().read(off, self.slot_size as u32)?;
179 let sh = match decode_slot_header(&slot_data) {
180 Some(h) => h,
181 None => continue,
182 };
183
184 if sh.request_id != request_id {
185 continue;
186 }
187
188 match sh.status {
189 RESPONSE_READY => {
190 let payload_len = sh.payload_len as usize;
191 let response =
192 if payload_len > 0 && slot_data.len() >= SLOT_HEADER_SIZE + payload_len {
193 slot_data[SLOT_HEADER_SIZE..SLOT_HEADER_SIZE + payload_len].to_vec()
194 } else {
195 Vec::new()
196 };
197 self.clear_slot(off)?;
199 return Ok(response);
200 }
201 ERROR => {
202 self.clear_slot(off)?;
203 return Err(FabricError::IoError(-102));
204 }
205 _ => {
206 }
208 }
209 }
210
211 self.clear_slot(off)?;
213 Err(FabricError::LeaseExpired)
214 }
215
216 fn find_free_slot(&self) -> Result<usize> {
218 for i in 0..self.num_slots {
219 let off = slot_offset(i, self.slot_size);
220 let hdr_bytes = self.lease.mem().read(off, SLOT_HEADER_SIZE as u32)?;
221 if let Some(sh) = decode_slot_header(&hdr_bytes) {
222 if sh.status == EMPTY {
223 return Ok(i);
224 }
225 } else {
226 return Ok(i);
228 }
229 }
230 Err(FabricError::CapacityExceeded)
231 }
232
233 fn clear_slot(&self, off: u64) -> Result<()> {
235 let buf = encode_slot(0, 0, EMPTY, &[]);
236 self.lease.mem().write(off, &buf)
237 }
238}
239
240pub struct RpcMuxServer<'a> {
249 lease: &'a MemLease,
250 num_slots: usize,
251 slot_size: usize,
252}
253
254impl<'a> RpcMuxServer<'a> {
255 pub fn new(lease: &'a MemLease, num_slots: usize, slot_payload_size: usize) -> Self {
257 RpcMuxServer {
258 lease,
259 num_slots,
260 slot_size: SLOT_HEADER_SIZE + slot_payload_size,
261 }
262 }
263
264 pub fn poll_once(&self, handler: &impl crate::RpcHandler) -> Result<u32> {
268 match self.lease.status() {
269 LeaseStatus::Active => {}
270 LeaseStatus::Revoked => return Err(FabricError::Revoked),
271 _ => return Err(FabricError::LeaseExpired),
272 }
273
274 let mut processed = 0u32;
275 for i in 0..self.num_slots {
276 let off = slot_offset(i, self.slot_size);
277 let slot_data = self.lease.mem().read(off, self.slot_size as u32)?;
278
279 let sh = match decode_slot_header(&slot_data) {
280 Some(h) => h,
281 None => continue,
282 };
283
284 if sh.status != REQUEST_READY {
285 continue;
286 }
287
288 let max_payload = self.slot_size - SLOT_HEADER_SIZE;
290 if sh.payload_len as usize > max_payload {
291 let buf = encode_slot(0, 0, EMPTY, &[]);
293 self.lease.mem().write(off, &buf)?;
294 continue;
295 }
296
297 let payload_len = sh.payload_len as usize;
299 let payload = if payload_len > 0 && slot_data.len() >= SLOT_HEADER_SIZE + payload_len {
300 &slot_data[SLOT_HEADER_SIZE..SLOT_HEADER_SIZE + payload_len]
301 } else {
302 &[]
303 };
304 let processing_buf = encode_slot(sh.request_id, sh.method_id, PROCESSING, payload);
305 self.lease.mem().write(off, &processing_buf)?;
306
307 match handler.handle(sh.method_id, payload) {
309 Ok(resp) => {
310 let buf = encode_slot(sh.request_id, 0, RESPONSE_READY, &resp);
311 self.lease.mem().write(off, &buf)?;
312 }
313 Err(_) => {
314 let buf = encode_slot(sh.request_id, 0, ERROR, &[]);
315 self.lease.mem().write(off, &buf)?;
316 }
317 }
318 processed += 1;
319 }
320
321 Ok(processed)
322 }
323
324 pub fn serve_n(&self, handler: &impl crate::RpcHandler, n: u32) -> Result<u32> {
328 let mut total = 0u32;
329 for _ in 0..n {
330 total += self.poll_once(handler)?;
331 }
332 Ok(total)
333 }
334}
335
336#[cfg(test)]
341mod tests {
342 use super::*;
343 use crate::RpcHandler;
344 use alloc::vec;
345 use grafos_std::error::FabricError;
346 use grafos_std::host;
347 use grafos_std::mem::MemBuilder;
348
349 const NUM_SLOTS: usize = 4;
350 const SLOT_PAYLOAD: usize = 256;
351 const ARENA_SIZE: u64 = (SLOT_HEADER_SIZE + SLOT_PAYLOAD) as u64 * NUM_SLOTS as u64;
352
353 struct Echo;
355 impl RpcHandler for Echo {
356 fn handle(&self, _method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
357 Ok(payload.to_vec())
358 }
359 }
360
361 struct Doubler;
363 impl RpcHandler for Doubler {
364 fn handle(&self, _method_id: u32, payload: &[u8]) -> Result<Vec<u8>> {
365 if payload.len() < 4 {
366 return Err(FabricError::IoError(-200));
367 }
368 let val = u32::from_le_bytes(payload[..4].try_into().unwrap());
369 Ok((val * 2).to_le_bytes().to_vec())
370 }
371 }
372
373 struct Failing;
375 impl RpcHandler for Failing {
376 fn handle(&self, _method_id: u32, _payload: &[u8]) -> Result<Vec<u8>> {
377 Err(FabricError::Unsupported)
378 }
379 }
380
381 fn setup() -> MemLease {
383 host::reset_mock();
384 host::mock_set_fbmu_arena_size(ARENA_SIZE.max(65536));
385 MemBuilder::new()
386 .min_bytes(ARENA_SIZE)
387 .acquire()
388 .expect("lease")
389 }
390
391 fn write_request(
393 lease: &MemLease,
394 slot_idx: usize,
395 request_id: u64,
396 method_id: u32,
397 payload: &[u8],
398 ) {
399 let off = slot_offset(slot_idx, SLOT_HEADER_SIZE + SLOT_PAYLOAD);
400 let buf = encode_slot(request_id, method_id, REQUEST_READY, payload);
401 lease.mem().write(off, &buf).unwrap();
402 }
403
404 fn read_slot(lease: &MemLease, slot_idx: usize) -> (SlotHeader, Vec<u8>) {
406 let off = slot_offset(slot_idx, SLOT_HEADER_SIZE + SLOT_PAYLOAD);
407 let data = lease
408 .mem()
409 .read(off, (SLOT_HEADER_SIZE + SLOT_PAYLOAD) as u32)
410 .unwrap();
411 let sh = decode_slot_header(&data).unwrap();
412 let payload_len = sh.payload_len as usize;
413 let payload = if payload_len > 0 && data.len() >= SLOT_HEADER_SIZE + payload_len {
414 data[SLOT_HEADER_SIZE..SLOT_HEADER_SIZE + payload_len].to_vec()
415 } else {
416 Vec::new()
417 };
418 (sh, payload)
419 }
420
421 #[test]
422 fn single_call_roundtrip() {
423 let lease = setup();
424 let _client =
425 RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD).with_max_poll_iterations(10);
426 let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
427
428 let req = b"hello mux";
429 write_request(&lease, 0, 1, 42, req);
430
431 let n = server.poll_once(&Echo).unwrap();
433 assert_eq!(n, 1);
434
435 let (sh, resp) = read_slot(&lease, 0);
437 assert_eq!(sh.status, RESPONSE_READY);
438 assert_eq!(sh.request_id, 1);
439 assert_eq!(sh.payload_len, req.len() as u32);
440 assert_eq!(&resp, req);
441 }
442
443 #[test]
444 fn multi_client_different_slots() {
445 let lease = setup();
446 let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
447
448 let req_a = b"request-a";
449 let req_b = b"request-b";
450 write_request(&lease, 0, 10, 1, req_a);
451 write_request(&lease, 1, 20, 2, req_b);
452
453 let n = server.poll_once(&Echo).unwrap();
455 assert_eq!(n, 2);
456
457 let (sh_a, resp_a) = read_slot(&lease, 0);
459 assert_eq!(sh_a.status, RESPONSE_READY);
460 assert_eq!(sh_a.request_id, 10);
461 assert_eq!(&resp_a, req_a);
462
463 let (sh_b, resp_b) = read_slot(&lease, 1);
464 assert_eq!(sh_b.status, RESPONSE_READY);
465 assert_eq!(sh_b.request_id, 20);
466 assert_eq!(&resp_b, req_b);
467 }
468
469 #[test]
470 fn all_slots_busy_returns_capacity_exceeded() {
471 let lease = setup();
472 let client = RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
473
474 for i in 0..NUM_SLOTS {
476 let off = slot_offset(i, SLOT_HEADER_SIZE + SLOT_PAYLOAD);
477 let buf = encode_slot(i as u64 + 1, 0, PROCESSING, &[]);
478 lease.mem().write(off, &buf).unwrap();
479 }
480
481 let result = client.call(0, b"should fail");
483 assert_eq!(result.unwrap_err(), FabricError::CapacityExceeded);
484 }
485
486 #[test]
487 fn timeout_when_server_never_processes() {
488 let lease = setup();
489 let client = RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD).with_max_poll_iterations(5);
490
491 let result = client.call(0, b"orphan");
493 assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
494
495 let (sh, _) = read_slot(&lease, 0);
497 assert_eq!(sh.status, EMPTY);
498 }
499
500 #[test]
501 fn malformed_slot_gets_reset() {
502 let lease = setup();
503 let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
504
505 let off = slot_offset(0, SLOT_HEADER_SIZE + SLOT_PAYLOAD);
507 let mut buf = encode_slot(1, 0, REQUEST_READY, &[]);
508 let bad_len = (SLOT_PAYLOAD + 100) as u32;
510 buf[16..20].copy_from_slice(&bad_len.to_le_bytes());
511 lease.mem().write(off, &buf).unwrap();
512
513 let n = server.poll_once(&Echo).unwrap();
515 assert_eq!(n, 0); let (sh, _) = read_slot(&lease, 0);
519 assert_eq!(sh.status, EMPTY);
520 }
521
522 #[test]
523 fn server_error_propagates_to_slot() {
524 let lease = setup();
525 let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
526
527 write_request(&lease, 0, 1, 0, &[0xAB, 0xCD]);
528
529 let n = server.poll_once(&Failing).unwrap();
531 assert_eq!(n, 1);
532
533 let (sh, _) = read_slot(&lease, 0);
535 assert_eq!(sh.status, ERROR);
536 assert_eq!(sh.request_id, 1);
537 }
538
539 #[test]
540 fn serve_n_processes_multiple_rounds() {
541 let lease = setup();
542 let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
543
544 let payload = 7u32.to_le_bytes();
545 write_request(&lease, 0, 1, 0, &payload);
546
547 let total = server.serve_n(&Doubler, 3).unwrap();
549 assert_eq!(total, 1);
550
551 let (sh, resp) = read_slot(&lease, 0);
553 assert_eq!(sh.status, RESPONSE_READY);
554 let val = u32::from_le_bytes(resp[..4].try_into().unwrap());
555 assert_eq!(val, 14); }
557
558 #[test]
559 fn empty_payload_roundtrip() {
560 let lease = setup();
561 let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
562
563 write_request(&lease, 0, 1, 0, &[]);
564
565 let n = server.poll_once(&Echo).unwrap();
566 assert_eq!(n, 1);
567
568 let (sh, _) = read_slot(&lease, 0);
569 assert_eq!(sh.status, RESPONSE_READY);
570 assert_eq!(sh.payload_len, 0);
571 }
572
573 #[test]
574 fn payload_too_large_rejected() {
575 let lease = setup();
576 let client = RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
577
578 let big = vec![0xAA; SLOT_PAYLOAD + 1];
579 let result = client.call(0, &big);
580 assert_eq!(result.unwrap_err(), FabricError::CapacityExceeded);
581 }
582
583 #[test]
584 fn rpc_mux_client_call_on_revoked_lease() {
585 let lease = setup();
586 let client = RpcMuxClient::new(&lease, NUM_SLOTS, SLOT_PAYLOAD).with_max_poll_iterations(5);
587
588 lease.free();
590
591 let result = client.call(0, b"should fail");
592 assert_eq!(result.unwrap_err(), FabricError::Revoked);
593 }
594
595 #[test]
596 fn rpc_mux_server_poll_on_revoked_lease() {
597 let lease = setup();
598 let server = RpcMuxServer::new(&lease, NUM_SLOTS, SLOT_PAYLOAD);
599
600 lease.free();
602
603 let result = server.poll_once(&Echo);
604 assert_eq!(result.unwrap_err(), FabricError::Revoked);
605 }
606
607 #[test]
608 fn slot_offset_calculation() {
609 let slot_size = SLOT_HEADER_SIZE + SLOT_PAYLOAD;
610 assert_eq!(slot_offset(0, slot_size), 0);
611 assert_eq!(slot_offset(1, slot_size), slot_size as u64);
612 assert_eq!(slot_offset(3, slot_size), (3 * slot_size) as u64);
613 }
614}