1extern crate alloc;
38use alloc::vec::Vec;
39
40use grafos_std::block::{BlockLease, BLOCK_SIZE};
41use grafos_std::error::{FabricError, Result};
42
43use serde::{de::DeserializeOwned, Serialize};
44
45const MAGIC: [u8; 4] = *b"DCHK";
46const VERSION: u32 = 1;
47const HEADER_BLOCK: u64 = 0;
48const DATA_START_BLOCK: u64 = 1;
49
50pub struct Durable<T> {
89 inner: T,
90 block_lease: BlockLease,
91 auto_checkpoint_batch: Option<u64>,
92 mutation_count: u64,
93}
94
95impl<T: Serialize + DeserializeOwned> Durable<T> {
96 pub fn new(inner: T, block_lease: BlockLease) -> Self {
101 Durable {
102 inner,
103 block_lease,
104 auto_checkpoint_batch: None,
105 mutation_count: 0,
106 }
107 }
108
109 pub fn inner(&self) -> &T {
111 &self.inner
112 }
113
114 pub fn inner_mut(&mut self) -> &mut T {
120 &mut self.inner
121 }
122
123 pub fn set_auto_checkpoint(&mut self, batch_size: Option<u64>) {
129 self.auto_checkpoint_batch = batch_size.filter(|&n| n > 0);
130 self.mutation_count = 0;
131 }
132
133 pub fn mutate<F>(&mut self, f: F) -> Result<()>
146 where
147 F: FnOnce(&mut T),
148 {
149 f(&mut self.inner);
150 self.mutation_count += 1;
151 if let Some(batch) = self.auto_checkpoint_batch {
152 if self.mutation_count >= batch {
153 self.checkpoint()?;
154 self.mutation_count = 0;
155 }
156 }
157 Ok(())
158 }
159
160 pub fn mutation_count(&self) -> u64 {
163 self.mutation_count
164 }
165
166 pub fn lease_id(&self) -> u128 {
169 self.block_lease.lease_id()
170 }
171
172 pub fn expires_at_unix_secs(&self) -> u64 {
175 self.block_lease.expires_at_unix_secs()
176 }
177
178 pub fn into_block_lease(self) -> BlockLease {
184 self.block_lease
185 }
186
187 pub fn checkpoint(&self) -> Result<()> {
198 let data = postcard::to_allocvec(&self.inner).map_err(|_| FabricError::IoError(-1))?;
199 let data_len = data.len() as u64;
200
201 let mut header = [0u8; BLOCK_SIZE];
203 header[0..4].copy_from_slice(&MAGIC);
204 header[4..8].copy_from_slice(&VERSION.to_le_bytes());
205 header[8..16].copy_from_slice(&data_len.to_le_bytes());
206 self.block_lease
207 .block()
208 .write_block(HEADER_BLOCK, &header)?;
209
210 let num_blocks = data.len().div_ceil(BLOCK_SIZE);
212 let available = self.block_lease.block().num_blocks() as usize;
213 if num_blocks + 1 > available {
214 return Err(FabricError::CapacityExceeded);
215 }
216
217 for i in 0..num_blocks {
218 let start = i * BLOCK_SIZE;
219 let end = core::cmp::min(start + BLOCK_SIZE, data.len());
220 let mut block = [0u8; BLOCK_SIZE];
221 block[..end - start].copy_from_slice(&data[start..end]);
222 self.block_lease
223 .block()
224 .write_block(DATA_START_BLOCK + i as u64, &block)?;
225 }
226
227 Ok(())
228 }
229
230 pub fn restore(block_lease: BlockLease) -> Result<Durable<T>> {
244 let header = block_lease.block().read_block(HEADER_BLOCK)?;
245
246 if header[0..4] != MAGIC {
248 return Err(FabricError::IoError(-2));
249 }
250 let version = u32::from_le_bytes([header[4], header[5], header[6], header[7]]);
251 if version != VERSION {
252 return Err(FabricError::IoError(-3));
253 }
254 let data_len = u64::from_le_bytes([
255 header[8], header[9], header[10], header[11], header[12], header[13], header[14],
256 header[15],
257 ]) as usize;
258
259 let num_blocks = data_len.div_ceil(BLOCK_SIZE);
261 let mut data = Vec::with_capacity(data_len);
262 for i in 0..num_blocks {
263 let block = block_lease
264 .block()
265 .read_block(DATA_START_BLOCK + i as u64)?;
266 let start = data.len();
267 let remaining = data_len - start;
268 let take = core::cmp::min(BLOCK_SIZE, remaining);
269 data.extend_from_slice(&block[..take]);
270 }
271
272 let inner: T = postcard::from_bytes(&data).map_err(|_| FabricError::IoError(-1))?;
273 Ok(Durable {
274 inner,
275 block_lease,
276 auto_checkpoint_batch: None,
277 mutation_count: 0,
278 })
279 }
280}
281
282impl<T> core::ops::Deref for Durable<T> {
283 type Target = T;
284
285 fn deref(&self) -> &T {
286 &self.inner
287 }
288}
289
290impl<T> core::ops::DerefMut for Durable<T> {
291 fn deref_mut(&mut self) -> &mut T {
292 &mut self.inner
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use super::*;
299 use grafos_std::block::BlockBuilder;
300 use grafos_std::host;
301 use serde::{Deserialize, Serialize};
302
303 fn setup_block(num_blocks: u64) -> BlockLease {
304 host::reset_mock();
305 host::mock_set_fbbu_num_blocks(num_blocks);
306 BlockBuilder::new().acquire().expect("acquire")
307 }
308
309 #[derive(Debug, PartialEq, Serialize, Deserialize)]
310 struct Snapshot {
311 items: Vec<u32>,
312 count: u64,
313 }
314
315 #[test]
316 fn checkpoint_restore_roundtrip() {
317 let block_lease = setup_block(64);
318 let snap = Snapshot {
319 items: vec![1, 2, 3, 4, 5],
320 count: 5,
321 };
322
323 let durable = Durable::new(snap, block_lease);
324 durable.checkpoint().expect("checkpoint");
325
326 let block_lease = durable.into_block_lease();
328 let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
329
330 assert_eq!(restored.inner().items, vec![1, 2, 3, 4, 5]);
331 assert_eq!(restored.inner().count, 5);
332 }
333
334 #[test]
335 fn deref_provides_transparent_access() {
336 let block_lease = setup_block(64);
337 let snap = Snapshot {
338 items: vec![10, 20],
339 count: 2,
340 };
341 let durable = Durable::new(snap, block_lease);
342
343 assert_eq!(durable.count, 2);
345 assert_eq!(durable.items.len(), 2);
346 }
347
348 #[test]
349 fn checkpoint_large_data() {
350 let block_lease = setup_block(256);
351 let items: Vec<u32> = (0..1000).collect();
352 let snap = Snapshot { items, count: 1000 };
353
354 let durable = Durable::new(snap, block_lease);
355 durable.checkpoint().expect("checkpoint");
356
357 let block_lease = durable.into_block_lease();
358 let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
359 assert_eq!(restored.inner().count, 1000);
360 assert_eq!(restored.inner().items.len(), 1000);
361 assert_eq!(restored.inner().items[999], 999);
362 }
363
364 #[test]
365 fn restore_bad_magic_fails() {
366 let block_lease = setup_block(64);
367 let mut garbage = [0u8; BLOCK_SIZE];
369 garbage[0..4].copy_from_slice(b"BAAD");
370 block_lease.block().write_block(0, &garbage).expect("write");
371
372 let result: Result<Durable<Snapshot>> = Durable::restore(block_lease);
373 assert!(result.is_err());
374 }
375
376 #[test]
377 fn inner_mut_allows_modification() {
378 let block_lease = setup_block(64);
379 let snap = Snapshot {
380 items: vec![1],
381 count: 1,
382 };
383 let mut durable = Durable::new(snap, block_lease);
384
385 durable.inner_mut().items.push(2);
386 durable.inner_mut().count = 2;
387 durable.checkpoint().expect("checkpoint");
388
389 let block_lease = durable.into_block_lease();
390 let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
391 assert_eq!(restored.inner().items, vec![1, 2]);
392 assert_eq!(restored.inner().count, 2);
393 }
394
395 #[test]
396 fn auto_checkpoint_after_n_mutations() {
397 let block_lease = setup_block(64);
398 let snap = Snapshot {
399 items: vec![],
400 count: 0,
401 };
402 let mut durable = Durable::new(snap, block_lease);
403 durable.set_auto_checkpoint(Some(3));
404
405 durable
407 .mutate(|s| {
408 s.items.push(1);
409 s.count = 1;
410 })
411 .expect("mutate 1");
412 assert_eq!(durable.mutation_count(), 1);
413
414 durable
415 .mutate(|s| {
416 s.items.push(2);
417 s.count = 2;
418 })
419 .expect("mutate 2");
420 assert_eq!(durable.mutation_count(), 2);
421
422 durable
424 .mutate(|s| {
425 s.items.push(3);
426 s.count = 3;
427 })
428 .expect("mutate 3");
429 assert_eq!(durable.mutation_count(), 0);
430
431 let block_lease = durable.into_block_lease();
433 let restored: Durable<Snapshot> = Durable::restore(block_lease).expect("restore");
434 assert_eq!(restored.inner().items, vec![1, 2, 3]);
435 assert_eq!(restored.inner().count, 3);
436 }
437
438 #[test]
439 fn auto_checkpoint_disabled_by_default() {
440 let block_lease = setup_block(64);
441 let snap = Snapshot {
442 items: vec![],
443 count: 0,
444 };
445 let mut durable = Durable::new(snap, block_lease);
446
447 for i in 0..10u32 {
449 durable
450 .mutate(|s| {
451 s.items.push(i);
452 s.count += 1;
453 })
454 .expect("mutate");
455 }
456 assert_eq!(durable.mutation_count(), 10);
457
458 let block_lease = durable.into_block_lease();
460 let result: Result<Durable<Snapshot>> = Durable::restore(block_lease);
461 assert!(result.is_err());
462 }
463
464 #[test]
465 fn set_auto_checkpoint_resets_counter() {
466 let block_lease = setup_block(64);
467 let snap = Snapshot {
468 items: vec![],
469 count: 0,
470 };
471 let mut durable = Durable::new(snap, block_lease);
472
473 durable.set_auto_checkpoint(Some(5));
474 durable.mutate(|s| s.count += 1).expect("mutate");
475 durable.mutate(|s| s.count += 1).expect("mutate");
476 assert_eq!(durable.mutation_count(), 2);
477
478 durable.set_auto_checkpoint(Some(2));
480 assert_eq!(durable.mutation_count(), 0);
481 }
482}