grafos_store/
mem_store.rs

1//! In-memory object store backed by FabricHashMap.
2
3extern crate alloc;
4use alloc::string::String;
5use alloc::vec::Vec;
6
7use grafos_collections::map::FabricHashMap;
8use grafos_std::error::{FabricError, Result};
9use grafos_std::host;
10
11use crate::crc::crc32;
12#[cfg(feature = "versioning")]
13use crate::meta::VersionInfo;
14use crate::meta::{ObjectInfo, ObjectMeta, PutOptions};
15use crate::store::{ObjectData, ObjectStore};
16use crate::uri::FabricUri;
17
18/// Key stride: URI string serialized (up to ~256 bytes).
19const KEY_STRIDE: usize = 256;
20/// Value stride: ObjectMeta (~100 bytes) + inline data (up to ~1 KiB).
21/// Objects larger than ~1 KiB are split into chunks stored under derived keys.
22const VALUE_STRIDE: usize = 1536;
23
24/// Entry stored in the hash map: metadata + inline data.
25#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
26struct StoreEntry {
27    meta: ObjectMeta,
28    data: Vec<u8>,
29}
30
31/// In-memory object store backed by [`FabricHashMap`].
32///
33/// Objects are stored as serialized entries in a hash map over leased DRAM.
34/// Large objects (>64 KiB) are split into 64 KiB chunks, each stored under
35/// a derived key (`<uri>#chunk/<n>`), with the root entry holding metadata
36/// and the chunk count.
37///
38/// # Example
39///
40/// ```rust
41/// use grafos_store::{MemObjectStore, ObjectStore, FabricUri};
42///
43/// # grafos_std::host::reset_mock();
44/// # grafos_std::host::mock_set_fbmu_arena_size(131072);
45/// let mut store = MemObjectStore::new(64)?;
46/// let uri: FabricUri = "fabric://p/b/k".parse()?;
47/// store.put(&uri, b"data", None)?;
48/// let obj = store.get(&uri)?.unwrap();
49/// assert_eq!(obj.data, b"data");
50/// # Ok::<(), grafos_std::FabricError>(())
51/// ```
52pub struct MemObjectStore {
53    map: FabricHashMap<String, StoreEntry>,
54}
55
56/// Maximum inline object size before chunking.
57/// Must fit within VALUE_STRIDE after postcard serialization of StoreEntry.
58/// ObjectMeta overhead is ~100 bytes, so ~1400 bytes available for data.
59const CHUNK_SIZE: usize = 1024;
60
61impl MemObjectStore {
62    /// Create a new in-memory object store with the given number of hash
63    /// map buckets.
64    pub fn new(buckets: usize) -> Result<Self> {
65        let map = FabricHashMap::with_capacity(buckets, KEY_STRIDE, VALUE_STRIDE)?;
66        Ok(MemObjectStore { map })
67    }
68
69    fn uri_key(uri: &FabricUri) -> String {
70        uri.to_string()
71    }
72
73    fn chunk_key(uri: &FabricUri, chunk_idx: usize) -> String {
74        alloc::format!("{}#chunk/{}", uri, chunk_idx)
75    }
76
77    #[cfg(feature = "versioning")]
78    fn version_key(uri: &FabricUri, version: u64) -> String {
79        alloc::format!("{}#v/{}", uri, version)
80    }
81}
82
83impl ObjectStore for MemObjectStore {
84    fn put(&mut self, uri: &FabricUri, data: &[u8], opts: Option<PutOptions>) -> Result<()> {
85        let opts = opts.unwrap_or_default();
86        let checksum = crc32(data);
87        let now = host::unix_time_secs();
88
89        // Read existing version for versioning
90        #[cfg(feature = "versioning")]
91        let next_version = {
92            let key = Self::uri_key(uri);
93            match self.map.get(&key)? {
94                Some(existing) => existing.meta.version + 1,
95                None => 1,
96            }
97        };
98
99        let content_type = opts
100            .content_type
101            .unwrap_or_else(|| String::from("application/octet-stream"));
102
103        if data.len() <= CHUNK_SIZE {
104            let meta = ObjectMeta {
105                content_type,
106                crc32: checksum,
107                size: data.len() as u64,
108                created_at: now,
109                tags: opts.tags,
110                #[cfg(feature = "versioning")]
111                version: next_version,
112            };
113            let entry = StoreEntry {
114                meta,
115                data: data.to_vec(),
116            };
117            let key = Self::uri_key(uri);
118            self.map.insert(&key, &entry)?;
119
120            #[cfg(feature = "versioning")]
121            {
122                let vk = Self::version_key(uri, next_version);
123                let _ = self.map.insert(&vk, &entry);
124            }
125        } else {
126            // Chunked storage
127            let num_chunks = data.len().div_ceil(CHUNK_SIZE);
128            let meta = ObjectMeta {
129                content_type,
130                crc32: checksum,
131                size: data.len() as u64,
132                created_at: now,
133                tags: opts.tags,
134                #[cfg(feature = "versioning")]
135                version: next_version,
136            };
137            // Root entry stores metadata + chunk count (no inline data)
138            let root = StoreEntry {
139                meta,
140                data: Vec::new(),
141            };
142            let key = Self::uri_key(uri);
143            self.map.insert(&key, &root)?;
144
145            for i in 0..num_chunks {
146                let start = i * CHUNK_SIZE;
147                let end = core::cmp::min(start + CHUNK_SIZE, data.len());
148                let chunk_entry = StoreEntry {
149                    meta: ObjectMeta {
150                        content_type: String::new(),
151                        crc32: 0,
152                        size: (end - start) as u64,
153                        created_at: now,
154                        tags: Vec::new(),
155                        #[cfg(feature = "versioning")]
156                        version: 0,
157                    },
158                    data: data[start..end].to_vec(),
159                };
160                let ck = Self::chunk_key(uri, i);
161                self.map.insert(&ck, &chunk_entry)?;
162            }
163
164            #[cfg(feature = "versioning")]
165            {
166                let vk = Self::version_key(uri, next_version);
167                let ver_entry = StoreEntry {
168                    meta: root.meta.clone(),
169                    data: data.to_vec(),
170                };
171                let _ = self.map.insert(&vk, &ver_entry);
172            }
173        }
174
175        #[cfg(feature = "observe")]
176        crate::observe_hooks::on_object_put(uri.bucket(), uri.key(), data.len() as u64);
177
178        Ok(())
179    }
180
181    fn get(&self, uri: &FabricUri) -> Result<Option<ObjectData>> {
182        let key = Self::uri_key(uri);
183        let entry = match self.map.get(&key)? {
184            Some(e) => e,
185            None => return Ok(None),
186        };
187
188        let data = if entry.data.is_empty() && entry.meta.size > 0 {
189            // Chunked: reassemble
190            let num_chunks = (entry.meta.size as usize).div_ceil(CHUNK_SIZE);
191            let mut assembled = Vec::with_capacity(entry.meta.size as usize);
192            for i in 0..num_chunks {
193                let ck = Self::chunk_key(uri, i);
194                let chunk = self.map.get(&ck)?.ok_or(FabricError::IoError(-4))?;
195                assembled.extend_from_slice(&chunk.data);
196            }
197            assembled
198        } else {
199            entry.data.clone()
200        };
201
202        // Verify CRC32
203        let computed = crc32(&data);
204        if computed != entry.meta.crc32 {
205            return Err(FabricError::IoError(-5));
206        }
207
208        let info = ObjectInfo::from_meta(&entry.meta);
209
210        #[cfg(feature = "observe")]
211        crate::observe_hooks::on_object_get(uri.bucket(), uri.key(), data.len() as u64);
212
213        Ok(Some(ObjectData { data, info }))
214    }
215
216    fn head(&self, uri: &FabricUri) -> Result<Option<ObjectInfo>> {
217        let key = Self::uri_key(uri);
218        match self.map.get(&key)? {
219            Some(entry) => Ok(Some(ObjectInfo::from_meta(&entry.meta))),
220            None => Ok(None),
221        }
222    }
223
224    fn delete(&mut self, uri: &FabricUri) -> Result<bool> {
225        let key = Self::uri_key(uri);
226        let entry = self.map.remove(&key)?;
227        if let Some(e) = &entry {
228            // Clean up chunks if any
229            if e.data.is_empty() && e.meta.size > 0 {
230                let num_chunks = (e.meta.size as usize).div_ceil(CHUNK_SIZE);
231                for i in 0..num_chunks {
232                    let ck = Self::chunk_key(uri, i);
233                    let _ = self.map.remove(&ck);
234                }
235            }
236        }
237        Ok(entry.is_some())
238    }
239
240    fn list(&self, _pool: &str, _bucket: &str, prefix: &str) -> Result<Vec<String>> {
241        let match_prefix = if prefix.is_empty() {
242            String::new()
243        } else {
244            String::from(prefix)
245        };
246
247        let mut keys = Vec::new();
248        for item in self.map.iter() {
249            let (key_str, _): (String, StoreEntry) = item?;
250            // Skip chunk and version keys
251            if key_str.contains("#chunk/") || key_str.contains("#v/") {
252                continue;
253            }
254            // Parse the stored key as a FabricUri and match bucket/key
255            if let Ok(uri) = key_str.parse::<FabricUri>() {
256                if uri.pool() == _pool
257                    && uri.bucket() == _bucket
258                    && (match_prefix.is_empty() || uri.key().starts_with(&match_prefix))
259                {
260                    keys.push(String::from(uri.key()));
261                }
262            }
263        }
264        keys.sort();
265        Ok(keys)
266    }
267
268    #[cfg(feature = "versioning")]
269    fn get_version(&self, uri: &FabricUri, version: u64) -> Result<Option<ObjectData>> {
270        let vk = Self::version_key(uri, version);
271        let entry = match self.map.get(&vk)? {
272            Some(e) => e,
273            None => return Ok(None),
274        };
275
276        let info = ObjectInfo::from_meta(&entry.meta);
277        Ok(Some(ObjectData {
278            data: entry.data.clone(),
279            info,
280        }))
281    }
282
283    #[cfg(feature = "versioning")]
284    fn list_versions(&self, pool: &str, bucket: &str, key: &str) -> Result<Vec<VersionInfo>> {
285        let uri = FabricUri::new(pool, bucket, key)?;
286        let prefix = alloc::format!("{}#v/", uri);
287        let mut versions = Vec::new();
288        for item in self.map.iter() {
289            let (key_str, entry): (String, StoreEntry) = item?;
290            if key_str.starts_with(&prefix) {
291                versions.push(VersionInfo {
292                    version: entry.meta.version,
293                    size: entry.meta.size,
294                    created_at: entry.meta.created_at,
295                });
296            }
297        }
298        versions.sort_by_key(|v| v.version);
299        Ok(versions)
300    }
301
302    #[cfg(feature = "versioning")]
303    fn delete_version(&mut self, uri: &FabricUri, version: u64) -> Result<bool> {
304        let vk = Self::version_key(uri, version);
305        let removed = self.map.remove(&vk)?;
306        Ok(removed.is_some())
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use grafos_std::host;
314
315    fn setup() {
316        host::reset_mock();
317        host::mock_set_fbmu_arena_size(1_048_576); // 1 MiB
318    }
319
320    #[test]
321    fn put_get_roundtrip() {
322        setup();
323        let mut store = MemObjectStore::new(64).unwrap();
324        let uri: FabricUri = "fabric://p/b/hello".parse().unwrap();
325
326        store.put(&uri, b"world", None).unwrap();
327        let obj = store.get(&uri).unwrap().unwrap();
328        assert_eq!(obj.data, b"world");
329        assert_eq!(obj.info.size, 5);
330        assert_eq!(obj.info.content_type, "application/octet-stream");
331    }
332
333    #[test]
334    fn head_returns_meta() {
335        setup();
336        let mut store = MemObjectStore::new(64).unwrap();
337        let uri: FabricUri = "fabric://p/b/k".parse().unwrap();
338
339        assert!(store.head(&uri).unwrap().is_none());
340
341        store.put(&uri, b"data", None).unwrap();
342        let info = store.head(&uri).unwrap().unwrap();
343        assert_eq!(info.size, 4);
344        assert_eq!(info.crc32, crc32(b"data"));
345    }
346
347    #[test]
348    fn delete_removes_object() {
349        setup();
350        let mut store = MemObjectStore::new(64).unwrap();
351        let uri: FabricUri = "fabric://p/b/del".parse().unwrap();
352
353        store.put(&uri, b"gone", None).unwrap();
354        assert!(store.delete(&uri).unwrap());
355        assert!(store.get(&uri).unwrap().is_none());
356        assert!(!store.delete(&uri).unwrap());
357    }
358
359    #[test]
360    fn list_prefix() {
361        setup();
362        let mut store = MemObjectStore::new(64).unwrap();
363
364        store
365            .put(&"fabric://p/b/logs/a".parse().unwrap(), b"1", None)
366            .unwrap();
367        store
368            .put(&"fabric://p/b/logs/b".parse().unwrap(), b"2", None)
369            .unwrap();
370        store
371            .put(&"fabric://p/b/data/c".parse().unwrap(), b"3", None)
372            .unwrap();
373
374        let logs = store.list("p", "b", "logs/").unwrap();
375        assert_eq!(logs, vec!["logs/a", "logs/b"]);
376
377        let all = store.list("p", "b", "").unwrap();
378        assert_eq!(all.len(), 3);
379    }
380
381    #[test]
382    fn overwrite_updates_data() {
383        setup();
384        let mut store = MemObjectStore::new(64).unwrap();
385        let uri: FabricUri = "fabric://p/b/ow".parse().unwrap();
386
387        store.put(&uri, b"first", None).unwrap();
388        store.put(&uri, b"second", None).unwrap();
389
390        let obj = store.get(&uri).unwrap().unwrap();
391        assert_eq!(obj.data, b"second");
392    }
393
394    #[test]
395    fn custom_content_type() {
396        setup();
397        let mut store = MemObjectStore::new(64).unwrap();
398        let uri: FabricUri = "fabric://p/b/ct".parse().unwrap();
399
400        let opts = PutOptions {
401            content_type: Some(String::from("text/plain")),
402            ..Default::default()
403        };
404        store.put(&uri, b"hello", Some(opts)).unwrap();
405
406        let info = store.head(&uri).unwrap().unwrap();
407        assert_eq!(info.content_type, "text/plain");
408    }
409
410    #[test]
411    fn crc32_integrity_verified() {
412        setup();
413        let mut store = MemObjectStore::new(64).unwrap();
414        let uri: FabricUri = "fabric://p/b/crc".parse().unwrap();
415
416        store.put(&uri, b"check me", None).unwrap();
417        let obj = store.get(&uri).unwrap().unwrap();
418        assert_eq!(obj.info.crc32, crc32(b"check me"));
419    }
420}