grafos_collections/
durable.rs

1//! Checkpoint wrapper that persists a collection to block storage.
2//!
3//! [`Durable<T>`] wraps any `Serialize + DeserializeOwned` value (typically
4//! a collection's state snapshot) and provides [`checkpoint()`](Durable::checkpoint)
5//! and [`restore()`](Durable::restore) to write and read the inner value
6//! from a [`BlockLease`].
7//!
8//! # Checkpoint format
9//!
10//! ```text
11//! Block 0:     [header]  magic: "DCHK" (4) | version: u32 (4) | data_len: u64 (8) | padding (496)
12//! Block 1..N:  [data]    postcard-serialized value, zero-padded to 512-byte boundaries
13//! ```
14//!
15//! # Example
16//!
17//! ```rust,no_run
18//! use grafos_collections::durable::Durable;
19//! use grafos_std::block::BlockBuilder;
20//! use serde::{Serialize, Deserialize};
21//!
22//! # grafos_std::host::reset_mock();
23//! # grafos_std::host::mock_set_fbbu_num_blocks(128);
24//! #[derive(Serialize, Deserialize, Debug, PartialEq)]
25//! struct State { counter: u64 }
26//!
27//! let lease = BlockBuilder::new().acquire()?;
28//! let durable = Durable::new(State { counter: 42 }, lease);
29//! durable.checkpoint()?;
30//!
31//! let lease = durable.into_block_lease();
32//! let restored: Durable<State> = Durable::restore(lease)?;
33//! assert_eq!(restored.inner().counter, 42);
34//! # Ok::<(), grafos_std::FabricError>(())
35//! ```
36
37extern crate alloc;
38use alloc::vec::Vec;
39
40use grafos_std::block::{BlockLease, BLOCK_SIZE};
41use grafos_std::error::{FabricError, Result};
42
43use serde::{de::DeserializeOwned, Serialize};
44
45const MAGIC: [u8; 4] = *b"DCHK";
46const VERSION: u32 = 1;
47const HEADER_BLOCK: u64 = 0;
48const DATA_START_BLOCK: u64 = 1;
49
50/// A wrapper that checkpoints its inner value to block storage.
51///
52/// `Durable<T>` owns both the inner value and a [`BlockLease`] used for
53/// persistence. Calling [`checkpoint()`](Durable::checkpoint) serializes
54/// the inner value and writes it to the block lease.
55/// [`restore()`](Durable::restore) reads a checkpoint and reconstructs
56/// the value.
57///
58/// `Durable<T>` is lease-scoped. It does not provide replicated object
59/// placement, quorum, or cross-domain recovery by itself; use
60/// `grafos_replicated` for replicated object/checkpoint resources.
61///
62/// Implements `Deref<Target = T>` and `DerefMut`, so you can access the
63/// inner value's fields and methods directly through the wrapper.
64///
65/// # Example
66///
67/// ```rust
68/// use grafos_collections::durable::Durable;
69/// use grafos_std::block::BlockBuilder;
70/// use serde::{Serialize, Deserialize};
71///
72/// # grafos_std::host::reset_mock();
73/// # grafos_std::host::mock_set_fbbu_num_blocks(128);
74/// #[derive(Serialize, Deserialize)]
75/// struct Data { values: Vec<u32> }
76///
77/// let lease = BlockBuilder::new().acquire()?;
78/// let mut d = Durable::new(Data { values: vec![1, 2, 3] }, lease);
79///
80/// // Deref access
81/// assert_eq!(d.values.len(), 3);
82///
83/// // Mutate and checkpoint
84/// d.inner_mut().values.push(4);
85/// d.checkpoint()?;
86/// # Ok::<(), grafos_std::FabricError>(())
87/// ```
88pub struct Durable<T> {
89    inner: T,
90    block_lease: BlockLease,
91    auto_checkpoint_batch: Option<u64>,
92    mutation_count: u64,
93}
94
95impl<T: Serialize + DeserializeOwned> Durable<T> {
96    /// Wrap a value with a block lease for checkpointing.
97    ///
98    /// Does not write anything to the block lease. Call
99    /// [`checkpoint()`](Durable::checkpoint) to persist the value.
100    pub fn new(inner: T, block_lease: BlockLease) -> Self {
101        Durable {
102            inner,
103            block_lease,
104            auto_checkpoint_batch: None,
105            mutation_count: 0,
106        }
107    }
108
109    /// Returns a reference to the inner value.
110    pub fn inner(&self) -> &T {
111        &self.inner
112    }
113
114    /// Returns a mutable reference to the inner value.
115    ///
116    /// This does **not** increment the mutation counter. Use
117    /// [`mutate()`](Durable::mutate) if you want auto-checkpoint
118    /// tracking.
119    pub fn inner_mut(&mut self) -> &mut T {
120        &mut self.inner
121    }
122
123    /// Enable auto-checkpoint: after every `batch_size` calls to
124    /// [`mutate()`](Durable::mutate), the state is automatically
125    /// checkpointed to block storage.
126    ///
127    /// Pass `0` or call with `None` to disable auto-checkpoint.
128    pub fn set_auto_checkpoint(&mut self, batch_size: Option<u64>) {
129        self.auto_checkpoint_batch = batch_size.filter(|&n| n > 0);
130        self.mutation_count = 0;
131    }
132
133    /// Apply a mutation to the inner value and optionally auto-checkpoint.
134    ///
135    /// The closure `f` receives `&mut T` and can modify it. After the
136    /// closure returns, the mutation counter is incremented. If
137    /// auto-checkpoint is enabled and the counter reaches the batch
138    /// size, [`checkpoint()`](Durable::checkpoint) is called
139    /// automatically and the counter resets to zero.
140    ///
141    /// # Errors
142    ///
143    /// Returns an error only if auto-checkpoint triggers and the
144    /// checkpoint itself fails.
145    pub fn mutate<F>(&mut self, f: F) -> Result<()>
146    where
147        F: FnOnce(&mut T),
148    {
149        f(&mut self.inner);
150        self.mutation_count += 1;
151        if let Some(batch) = self.auto_checkpoint_batch {
152            if self.mutation_count >= batch {
153                self.checkpoint()?;
154                self.mutation_count = 0;
155            }
156        }
157        Ok(())
158    }
159
160    /// Returns the number of mutations since the last checkpoint (or
161    /// since auto-checkpoint was enabled).
162    pub fn mutation_count(&self) -> u64 {
163        self.mutation_count
164    }
165
166    /// Returns the lease ID of the block lease for external renewal
167    /// management (e.g. via [`grafos_leasekit::RenewalManager`]).
168    pub fn lease_id(&self) -> u128 {
169        self.block_lease.lease_id()
170    }
171
172    /// Returns the expiry time (unix seconds) of the block lease for
173    /// external renewal management.
174    pub fn expires_at_unix_secs(&self) -> u64 {
175        self.block_lease.expires_at_unix_secs()
176    }
177
178    /// Consume the `Durable` and return the block lease.
179    ///
180    /// Useful for passing the same lease to [`restore()`](Durable::restore)
181    /// after a checkpoint, simulating a process restart that re-opens the
182    /// same block device.
183    pub fn into_block_lease(self) -> BlockLease {
184        self.block_lease
185    }
186
187    /// Serialize the inner value and write it to block storage.
188    ///
189    /// Writes a header to block 0 (magic, version, data length) followed
190    /// by the postcard-serialized data across blocks 1..N.
191    ///
192    /// # Errors
193    ///
194    /// - [`FabricError::CapacityExceeded`] if the serialized data requires
195    ///   more blocks than available on the device.
196    /// - [`FabricError::IoError`] if serialization or block write fails.
197    pub fn checkpoint(&self) -> Result<()> {
198        let data = postcard::to_allocvec(&self.inner).map_err(|_| FabricError::IoError(-1))?;
199        let data_len = data.len() as u64;
200
201        // Write header block
202        let mut header = [0u8; BLOCK_SIZE];
203        header[0..4].copy_from_slice(&MAGIC);
204        header[4..8].copy_from_slice(&VERSION.to_le_bytes());
205        header[8..16].copy_from_slice(&data_len.to_le_bytes());
206        self.block_lease
207            .block()
208            .write_block(HEADER_BLOCK, &header)?;
209
210        // Write data blocks
211        let num_blocks = data.len().div_ceil(BLOCK_SIZE);
212        let available = self.block_lease.block().num_blocks() as usize;
213        if num_blocks + 1 > available {
214            return Err(FabricError::CapacityExceeded);
215        }
216
217        for i in 0..num_blocks {
218            let start = i * BLOCK_SIZE;
219            let end = core::cmp::min(start + BLOCK_SIZE, data.len());
220            let mut block = [0u8; BLOCK_SIZE];
221            block[..end - start].copy_from_slice(&data[start..end]);
222            self.block_lease
223                .block()
224                .write_block(DATA_START_BLOCK + i as u64, &block)?;
225        }
226
227        Ok(())
228    }
229
230    /// Read a checkpoint from block storage and reconstruct the value.
231    ///
232    /// Reads block 0, validates the magic (`"DCHK"`) and version, then
233    /// reads the data blocks and deserializes the value.
234    ///
235    /// # Errors
236    ///
237    /// - [`FabricError::IoError(-2)`](FabricError::IoError) if the magic
238    ///   bytes do not match.
239    /// - [`FabricError::IoError(-3)`](FabricError::IoError) if the version
240    ///   is unsupported.
241    /// - [`FabricError::IoError(-1)`](FabricError::IoError) if
242    ///   deserialization fails.
243    pub fn restore(block_lease: BlockLease) -> Result<Durable<T>> {
244        let header = block_lease.block().read_block(HEADER_BLOCK)?;
245
246        // Verify magic
247        if header[0..4] != MAGIC {
248            return Err(FabricError::IoError(-2));
249        }
250        let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
251        if version != VERSION {
252            return Err(FabricError::IoError(-3));
253        }
254        let data_len = u64::from_le_bytes([
255            header[8], header[9], header[10], header[11], header[12], header[13], header[14],
256            header[15],
257        ]) as usize;
258
259        // Read data blocks
260        let num_blocks = data_len.div_ceil(BLOCK_SIZE);
261        let mut data = Vec::with_capacity(data_len);
262        for i in 0..num_blocks {
263            let block = block_lease
264                .block()
265                .read_block(DATA_START_BLOCK + i as u64)?;
266            let start = data.len();
267            let remaining = data_len - start;
268            let take = core::cmp::min(BLOCK_SIZE, remaining);
269            data.extend_from_slice(&block[..take]);
270        }
271
272        let inner: T = postcard::from_bytes(&data).map_err(|_| FabricError::IoError(-1))?;
273        Ok(Durable {
274            inner,
275            block_lease,
276            auto_checkpoint_batch: None,
277            mutation_count: 0,
278        })
279    }
280}
281
282impl<T> core::ops::Deref for Durable<T> {
283    type Target = T;
284
285    fn deref(&self) -> &T {
286        &self.inner
287    }
288}
289
290impl<T> core::ops::DerefMut for Durable<T> {
291    fn deref_mut(&mut self) -> &mut T {
292        &mut self.inner
293    }
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use grafos_std::block::BlockBuilder;
300    use grafos_std::host;
301    use serde::{Deserialize, Serialize};
302
303    fn setup_block(num_blocks: u64) -> BlockLease {
304        host::reset_mock();
305        host::mock_set_fbbu_num_blocks(num_blocks);
306        BlockBuilder::new().acquire().expect("acquire")
307    }
308
309    #[derive(Debug, PartialEq, Serialize, Deserialize)]
310    struct Snapshot {
311        items: Vec<u32>,
312        count: u64,
313    }
314
315    #[test]
316    fn checkpoint_restore_roundtrip() {
317        let block_lease = setup_block(64);
318        let snap = Snapshot {
319            items: vec![1, 2, 3, 4, 5],
320            count: 5,
321        };
322
323        let durable = Durable::new(snap, block_lease);
324        durable.checkpoint().expect("checkpoint");
325
326        // Restore from same block lease (simulating restart with same device)
327        let block_lease = durable.into_block_lease();
328        let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
329
330        assert_eq!(restored.inner().items, vec![1, 2, 3, 4, 5]);
331        assert_eq!(restored.inner().count, 5);
332    }
333
334    #[test]
335    fn deref_provides_transparent_access() {
336        let block_lease = setup_block(64);
337        let snap = Snapshot {
338            items: vec![10, 20],
339            count: 2,
340        };
341        let durable = Durable::new(snap, block_lease);
342
343        // Deref: access inner fields directly
344        assert_eq!(durable.count, 2);
345        assert_eq!(durable.items.len(), 2);
346    }
347
348    #[test]
349    fn checkpoint_large_data() {
350        let block_lease = setup_block(256);
351        let items: Vec<u32> = (0..1000).collect();
352        let snap = Snapshot { items, count: 1000 };
353
354        let durable = Durable::new(snap, block_lease);
355        durable.checkpoint().expect("checkpoint");
356
357        let block_lease = durable.into_block_lease();
358        let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
359        assert_eq!(restored.inner().count, 1000);
360        assert_eq!(restored.inner().items.len(), 1000);
361        assert_eq!(restored.inner().items[999], 999);
362    }
363
364    #[test]
365    fn restore_bad_magic_fails() {
366        let block_lease = setup_block(64);
367        // Write garbage to block 0
368        let mut garbage = [0u8; BLOCK_SIZE];
369        garbage[0..4].copy_from_slice(b"BAAD");
370        block_lease.block().write_block(0, &garbage).expect("write");
371
372        let result: Result<Durable<Snapshot>> = Durable::restore(block_lease);
373        assert!(result.is_err());
374    }
375
376    #[test]
377    fn inner_mut_allows_modification() {
378        let block_lease = setup_block(64);
379        let snap = Snapshot {
380            items: vec![1],
381            count: 1,
382        };
383        let mut durable = Durable::new(snap, block_lease);
384
385        durable.inner_mut().items.push(2);
386        durable.inner_mut().count = 2;
387        durable.checkpoint().expect("checkpoint");
388
389        let block_lease = durable.into_block_lease();
390        let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
391        assert_eq!(restored.inner().items, vec![1, 2]);
392        assert_eq!(restored.inner().count, 2);
393    }
394
395    #[test]
396    fn auto_checkpoint_after_n_mutations() {
397        let block_lease = setup_block(64);
398        let snap = Snapshot {
399            items: vec![],
400            count: 0,
401        };
402        let mut durable = Durable::new(snap, block_lease);
403        durable.set_auto_checkpoint(Some(3));
404
405        // First two mutations: no checkpoint yet
406        durable
407            .mutate(|s| {
408                s.items.push(1);
409                s.count = 1;
410            })
411            .expect("mutate 1");
412        assert_eq!(durable.mutation_count(), 1);
413
414        durable
415            .mutate(|s| {
416                s.items.push(2);
417                s.count = 2;
418            })
419            .expect("mutate 2");
420        assert_eq!(durable.mutation_count(), 2);
421
422        // Third mutation triggers auto-checkpoint, counter resets
423        durable
424            .mutate(|s| {
425                s.items.push(3);
426                s.count = 3;
427            })
428            .expect("mutate 3");
429        assert_eq!(durable.mutation_count(), 0);
430
431        // Verify the checkpoint was written by restoring
432        let block_lease = durable.into_block_lease();
433        let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
434        assert_eq!(restored.inner().items, vec![1, 2, 3]);
435        assert_eq!(restored.inner().count, 3);
436    }
437
438    #[test]
439    fn auto_checkpoint_disabled_by_default() {
440        let block_lease = setup_block(64);
441        let snap = Snapshot {
442            items: vec![],
443            count: 0,
444        };
445        let mut durable = Durable::new(snap, block_lease);
446
447        // No auto-checkpoint set — mutations just count
448        for i in 0..10u32 {
449            durable
450                .mutate(|s| {
451                    s.items.push(i);
452                    s.count += 1;
453                })
454                .expect("mutate");
455        }
456        assert_eq!(durable.mutation_count(), 10);
457
458        // No checkpoint was written, so restore should fail (no magic)
459        let block_lease = durable.into_block_lease();
460        let result: Result<Durable<Snapshot>> = Durable::restore(block_lease);
461        assert!(result.is_err());
462    }
463
464    #[test]
465    fn set_auto_checkpoint_resets_counter() {
466        let block_lease = setup_block(64);
467        let snap = Snapshot {
468            items: vec![],
469            count: 0,
470        };
471        let mut durable = Durable::new(snap, block_lease);
472
473        durable.set_auto_checkpoint(Some(5));
474        durable.mutate(|s| s.count += 1).expect("mutate");
475        durable.mutate(|s| s.count += 1).expect("mutate");
476        assert_eq!(durable.mutation_count(), 2);
477
478        // Changing batch size resets counter
479        durable.set_auto_checkpoint(Some(2));
480        assert_eq!(durable.mutation_count(), 0);
481    }
482}