grafos_sync/
mutex.rs

1//! Distributed mutual exclusion backed by a fabric memory lease.
2//!
3//! A [`FabricMutex<T>`] stores lock metadata and a `Copy`-typed value `T`
4//! in leased fabric memory. The holder is identified by a `u128` id; a
5//! zero holder means "unlocked". A generation counter increments on each
6//! acquire, letting observers detect ownership transitions even when the
7//! holder id wraps or is reused.
8//!
9//! # Memory layout at `base_offset`
10//!
11//! ```text
12//! offset +0:  holder_id   (u128, 16 bytes) — 0 = unlocked
13//! offset +16: generation  (u64, 8 bytes)   — incremented on each acquire
14//! offset +24: data_len    (u32, 4 bytes)   — byte length of serialized T
15//! offset +28: data        ([u8])           — serialized T value
16//! ```
17//!
18//! Total header overhead: 28 bytes before the user data.
19
20extern crate alloc;
21use core::marker::PhantomData;
22
23use grafos_std::error::{FabricError, Result};
24use grafos_std::mem::{FabricMem, MemLease};
25
26const HOLDER_ID_OFFSET: u64 = 0;
27const GENERATION_OFFSET: u64 = 16;
28const DATA_LEN_OFFSET: u64 = 24;
29const DATA_OFFSET: u64 = 28;
30
31/// Distributed mutex backed by a fabric memory lease.
32///
33/// The lock state is stored in leased fabric memory. When a holder crashes
34/// without releasing the lock, the lease expires and the lock auto-releases.
35///
36/// `T` must be `Copy` — the value is stored via raw byte copy in the leased
37/// arena. For complex types, use the `serde` feature on `grafos-std`.
38///
39/// # Example
40///
41/// ```rust
42/// # grafos_std::host::reset_mock();
43/// # grafos_std::host::mock_set_fbmu_arena_size(65536);
44/// use grafos_sync::FabricMutex;
45/// use grafos_std::mem::MemBuilder;
46///
47/// let lease = MemBuilder::new().acquire().unwrap();
48/// let mtx = FabricMutex::new(lease, 0, 42u64).unwrap();
49///
50/// // Acquire with holder_id=1, up to 100 spin attempts
51/// let guard = mtx.lock(1, 100).unwrap();
52/// assert_eq!(*guard, 42);
53/// guard.unlock().unwrap();
54///
55/// assert_eq!(mtx.holder().unwrap(), 0); // unlocked
56/// assert_eq!(mtx.generation().unwrap(), 1); // one acquire happened
57/// ```
58pub struct FabricMutex<T> {
59    lease: MemLease,
60    base_offset: u64,
61    _marker: PhantomData<T>,
62}
63
64/// RAII guard returned by [`FabricMutex::lock`] and [`FabricMutex::try_lock`].
65///
66/// Provides [`Deref`](core::ops::Deref) and [`DerefMut`](core::ops::DerefMut)
67/// access to the locked value. On drop, writes the (potentially modified) value
68/// back to leased memory and clears the holder_id, releasing the lock.
69///
70/// You can also call [`unlock`](FabricMutexGuard::unlock) explicitly to check
71/// for write errors instead of silently ignoring them in `Drop`.
72pub struct FabricMutexGuard<'a, T: Copy> {
73    mutex: &'a FabricMutex<T>,
74    value: T,
75    #[allow(dead_code)]
76    holder_id: u128,
77}
78
79impl<'a, T: Copy + core::fmt::Debug> core::fmt::Debug for FabricMutexGuard<'a, T> {
80    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
81        f.debug_struct("FabricMutexGuard")
82            .field("value", &self.value)
83            .field("holder_id", &self.holder_id)
84            .finish()
85    }
86}
87
88fn read_u128(mem: &FabricMem, offset: u64) -> Result<u128> {
89    let data = mem.read(offset, 16)?;
90    if data.len() < 16 {
91        return Ok(0);
92    }
93    let mut buf = [0u8; 16];
94    buf.copy_from_slice(&data[..16]);
95    Ok(u128::from_le_bytes(buf))
96}
97
98fn write_u128(mem: &FabricMem, offset: u64, val: u128) -> Result<()> {
99    mem.write(offset, &val.to_le_bytes())
100}
101
102fn read_u64(mem: &FabricMem, offset: u64) -> Result<u64> {
103    let data = mem.read(offset, 8)?;
104    if data.len() < 8 {
105        return Ok(0);
106    }
107    let mut buf = [0u8; 8];
108    buf.copy_from_slice(&data[..8]);
109    Ok(u64::from_le_bytes(buf))
110}
111
112fn write_u64(mem: &FabricMem, offset: u64, val: u64) -> Result<()> {
113    mem.write(offset, &val.to_le_bytes())
114}
115
116fn write_u32(mem: &FabricMem, offset: u64, val: u32) -> Result<()> {
117    mem.write(offset, &val.to_le_bytes())
118}
119
120impl<T: Copy> FabricMutex<T> {
121    /// Create a new mutex with the given initial value, stored at `base_offset`
122    /// in the provided lease's memory arena.
123    ///
124    /// Initializes holder_id to 0 (unlocked) and generation to 0. The caller
125    /// is responsible for ensuring that distinct mutexes use non-overlapping
126    /// offset ranges. The total space consumed is 28 + `size_of::<T>()` bytes.
127    pub fn new(lease: MemLease, base_offset: u64, initial: T) -> Result<Self> {
128        let mem = lease.mem();
129        // Initialize: holder_id = 0 (unlocked), generation = 0
130        write_u128(mem, base_offset + HOLDER_ID_OFFSET, 0)?;
131        write_u64(mem, base_offset + GENERATION_OFFSET, 0)?;
132
133        // Write initial value
134        let size = core::mem::size_of::<T>();
135        let bytes = unsafe { core::slice::from_raw_parts(&initial as *const T as *const u8, size) };
136        write_u32(mem, base_offset + DATA_LEN_OFFSET, size as u32)?;
137        mem.write(base_offset + DATA_OFFSET, bytes)?;
138
139        Ok(FabricMutex {
140            lease,
141            base_offset,
142            _marker: PhantomData,
143        })
144    }
145
146    /// Acquire the lock, spinning up to `max_attempts` times.
147    ///
148    /// On each iteration, reads the holder_id from leased memory. If it is
149    /// zero (unlocked), writes `holder_id`, increments the generation counter,
150    /// and returns an RAII [`FabricMutexGuard`] with `Deref`/`DerefMut` access
151    /// to the protected value.
152    ///
153    /// Returns [`FabricError::LeaseExpired`] if `max_attempts` is exhausted
154    /// without acquiring the lock.
155    pub fn lock(&self, holder_id: u128, max_attempts: u32) -> Result<FabricMutexGuard<'_, T>> {
156        let mem = self.lease.mem();
157        let base = self.base_offset;
158
159        for _ in 0..max_attempts {
160            let current = read_u128(mem, base + HOLDER_ID_OFFSET)?;
161            if current == 0 {
162                // Lock is free — acquire it
163                write_u128(mem, base + HOLDER_ID_OFFSET, holder_id)?;
164
165                // Increment generation
166                let gen = read_u64(mem, base + GENERATION_OFFSET)?;
167                write_u64(mem, base + GENERATION_OFFSET, gen + 1)?;
168
169                // Read the current value
170                let value = self.read_value()?;
171
172                return Ok(FabricMutexGuard {
173                    mutex: self,
174                    value,
175                    holder_id,
176                });
177            }
178            // Lock is held — in a real system we'd backoff, but in tests
179            // this is effectively a spin.
180        }
181        Err(FabricError::LeaseExpired)
182    }
183
184    /// Non-blocking lock attempt.
185    ///
186    /// Reads the holder_id once. Returns `Ok(Some(guard))` if the lock was
187    /// free and successfully acquired, `Ok(None)` if it was already held, or
188    /// `Err` on an underlying I/O failure.
189    pub fn try_lock(&self, holder_id: u128) -> Result<Option<FabricMutexGuard<'_, T>>> {
190        let mem = self.lease.mem();
191        let base = self.base_offset;
192
193        let current = read_u128(mem, base + HOLDER_ID_OFFSET)?;
194        if current != 0 {
195            return Ok(None);
196        }
197
198        write_u128(mem, base + HOLDER_ID_OFFSET, holder_id)?;
199
200        let gen = read_u64(mem, base + GENERATION_OFFSET)?;
201        write_u64(mem, base + GENERATION_OFFSET, gen + 1)?;
202
203        let value = self.read_value()?;
204
205        Ok(Some(FabricMutexGuard {
206            mutex: self,
207            value,
208            holder_id,
209        }))
210    }
211
212    /// Read the current generation counter.
213    ///
214    /// The generation is incremented by one each time the lock is successfully
215    /// acquired (via [`lock`](Self::lock) or [`try_lock`](Self::try_lock)).
216    /// Starts at 0 when the mutex is created.
217    pub fn generation(&self) -> Result<u64> {
218        read_u64(self.lease.mem(), self.base_offset + GENERATION_OFFSET)
219    }
220
221    /// Read the current holder_id (0 = unlocked).
222    ///
223    /// Returns the `u128` identifier of the party currently holding the lock,
224    /// or 0 if the lock is not held. This is a snapshot; the value may change
225    /// between the read and any action taken on it.
226    pub fn holder(&self) -> Result<u128> {
227        read_u128(self.lease.mem(), self.base_offset + HOLDER_ID_OFFSET)
228    }
229
230    /// Returns the lease ID of the underlying memory lease for external
231    /// renewal management (e.g. via [`grafos_leasekit::RenewalManager`]).
232    pub fn lease_id(&self) -> u128 {
233        self.lease.lease_id()
234    }
235
236    /// Returns the expiry time (unix seconds) of the underlying memory
237    /// lease for external renewal management.
238    pub fn expires_at_unix_secs(&self) -> u64 {
239        self.lease.expires_at_unix_secs()
240    }
241
242    fn read_value(&self) -> Result<T> {
243        let mem = self.lease.mem();
244        let size = core::mem::size_of::<T>();
245        let data = mem.read(self.base_offset + DATA_OFFSET, size as u32)?;
246        if data.len() < size {
247            return Err(FabricError::IoError(-99));
248        }
249        let value = unsafe { core::ptr::read_unaligned(data.as_ptr() as *const T) };
250        Ok(value)
251    }
252
253    fn write_value(&self, value: &T) -> Result<()> {
254        let mem = self.lease.mem();
255        let size = core::mem::size_of::<T>();
256        let bytes = unsafe { core::slice::from_raw_parts(value as *const T as *const u8, size) };
257        mem.write(self.base_offset + DATA_OFFSET, bytes)
258    }
259}
260
261impl<'a, T: Copy> FabricMutexGuard<'a, T> {
262    /// Explicitly unlock, writing the (potentially modified) value back
263    /// to leased memory and clearing the holder_id.
264    ///
265    /// Prefer this over relying on `Drop` when you need to handle write errors.
266    pub fn unlock(self) -> Result<()> {
267        self.release_inner()
268    }
269
270    fn release_inner(&self) -> Result<()> {
271        self.mutex.write_value(&self.value)?;
272        write_u128(
273            self.mutex.lease.mem(),
274            self.mutex.base_offset + HOLDER_ID_OFFSET,
275            0,
276        )
277    }
278}
279
280impl<'a, T: Copy> core::ops::Deref for FabricMutexGuard<'a, T> {
281    type Target = T;
282    fn deref(&self) -> &T {
283        &self.value
284    }
285}
286
287impl<'a, T: Copy> core::ops::DerefMut for FabricMutexGuard<'a, T> {
288    fn deref_mut(&mut self) -> &mut T {
289        &mut self.value
290    }
291}
292
293impl<'a, T: Copy> Drop for FabricMutexGuard<'a, T> {
294    fn drop(&mut self) {
295        let _ = self.release_inner();
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use grafos_std::host;
303    use grafos_std::mem::MemBuilder;
304
305    fn setup() -> MemLease {
306        host::reset_mock();
307        host::mock_set_fbmu_arena_size(4096);
308        MemBuilder::new().acquire().expect("acquire lease")
309    }
310
311    #[test]
312    fn mutex_lock_unlock_roundtrip() {
313        let lease = setup();
314        let mtx = FabricMutex::new(lease, 0, 42u64).expect("new mutex");
315
316        let guard = mtx.lock(1, 10).expect("lock");
317        assert_eq!(*guard, 42);
318        guard.unlock().expect("unlock");
319
320        // Lock is released — holder should be 0
321        assert_eq!(mtx.holder().unwrap(), 0);
322    }
323
324    #[test]
325    fn mutex_guard_drops_on_scope_exit() {
326        let lease = setup();
327        let mtx = FabricMutex::new(lease, 0, 100u32).expect("new mutex");
328
329        {
330            let _guard = mtx.lock(42, 10).expect("lock");
331            assert_eq!(mtx.holder().unwrap(), 42);
332            // guard drops here
333        }
334
335        // After drop, lock should be released
336        assert_eq!(mtx.holder().unwrap(), 0);
337    }
338
339    #[test]
340    fn mutex_try_lock_returns_none_when_held() {
341        let lease = setup();
342        let mtx = FabricMutex::new(lease, 0, 7u8).expect("new mutex");
343
344        let _guard = mtx.lock(1, 10).expect("lock");
345
346        // try_lock should see holder_id != 0 and return None
347        let result = mtx.try_lock(2).expect("try_lock io");
348        assert!(result.is_none());
349    }
350
351    #[test]
352    fn mutex_generation_increments() {
353        let lease = setup();
354        let mtx = FabricMutex::new(lease, 0, 0u64).expect("new mutex");
355
356        assert_eq!(mtx.generation().unwrap(), 0);
357
358        {
359            let _g = mtx.lock(1, 10).expect("lock 1");
360        }
361        assert_eq!(mtx.generation().unwrap(), 1);
362
363        {
364            let _g = mtx.lock(2, 10).expect("lock 2");
365        }
366        assert_eq!(mtx.generation().unwrap(), 2);
367
368        {
369            let _g = mtx.lock(3, 10).expect("lock 3");
370        }
371        assert_eq!(mtx.generation().unwrap(), 3);
372    }
373
374    #[test]
375    fn mutex_deref_mut_modifies_value() {
376        let lease = setup();
377        let mtx = FabricMutex::new(lease, 0, 10u64).expect("new mutex");
378
379        {
380            let mut guard = mtx.lock(1, 10).expect("lock");
381            *guard = 99;
382            // guard drops, writing 99 back
383        }
384
385        // Re-lock and verify updated value
386        let guard = mtx.lock(2, 10).expect("lock 2");
387        assert_eq!(*guard, 99);
388    }
389
390    #[test]
391    fn mutex_lock_timeout() {
392        let lease = setup();
393        let mtx = FabricMutex::new(lease, 0, 0u32).expect("new mutex");
394
395        // Hold the lock
396        let _guard = mtx.lock(1, 10).expect("lock");
397
398        // Another lock attempt should timeout
399        let result = mtx.lock(2, 5);
400        assert_eq!(result.unwrap_err(), FabricError::LeaseExpired);
401    }
402}