1extern crate alloc;
10use alloc::string::String;
11use alloc::vec;
12use alloc::vec::Vec;
13
14use grafos_std::block::{BlockBuilder, BlockLease, BLOCK_SIZE};
15use grafos_std::error::{FabricError, Result};
16use grafos_std::host;
17
18use serde::{Deserialize, Serialize};
19
20use crate::crc::crc32;
21#[cfg(feature = "versioning")]
22use crate::meta::VersionInfo;
23use crate::meta::{ObjectInfo, ObjectMeta, PutOptions};
24use crate::store::{ObjectData, ObjectStore};
25use crate::uri::FabricUri;
26
27const SUPER_MAGIC: [u8; 4] = *b"GOST"; const SUPER_VERSION: u32 = 1;
29
30#[derive(Clone, Debug, Serialize, Deserialize)]
36struct IndexEntry {
37 uri: String,
38 lba: u64,
39 total_bytes: u64,
40 meta: ObjectMeta,
41}
42
43pub struct BlockObjectStore {
52 lease: BlockLease,
53 write_cursor: u64,
55 index: Vec<IndexEntry>,
57 index_start_lba: u64,
59}
60
61impl BlockObjectStore {
62 pub fn new(num_blocks: u64) -> Result<Self> {
66 let lease = BlockBuilder::new().min_blocks(num_blocks).acquire()?;
67 let total_blocks = lease.block().num_blocks();
68
69 let index_blocks = core::cmp::max(4, total_blocks / 10);
71 let index_start_lba = total_blocks - index_blocks;
72
73 let mut sb = [0u8; BLOCK_SIZE];
75 sb[0..4].copy_from_slice(&SUPER_MAGIC);
76 sb[4..8].copy_from_slice(&SUPER_VERSION.to_le_bytes());
77 sb[8..16].copy_from_slice(&0u64.to_le_bytes()); let data_start = BLOCK_SIZE as u64;
80 sb[16..24].copy_from_slice(&data_start.to_le_bytes()); lease.block().write_block(0, &sb)?;
82
83 Ok(BlockObjectStore {
84 lease,
85 write_cursor: data_start,
86 index: Vec::new(),
87 index_start_lba,
88 })
89 }
90
91 fn max_data_offset(&self) -> u64 {
93 self.index_start_lba * BLOCK_SIZE as u64
94 }
95
96 fn write_raw(&mut self, data: &[u8]) -> Result<u64> {
98 let start = self.write_cursor;
99 if start + data.len() as u64 > self.max_data_offset() {
100 return Err(FabricError::CapacityExceeded);
101 }
102
103 let mut offset = start;
104 let mut remaining = data;
105 while !remaining.is_empty() {
106 let block_idx = offset / BLOCK_SIZE as u64;
107 let block_offset = (offset % BLOCK_SIZE as u64) as usize;
108 let space = BLOCK_SIZE - block_offset;
109 let chunk_len = remaining.len().min(space);
110
111 let mut block = if block_offset == 0 && chunk_len == BLOCK_SIZE {
112 [0u8; BLOCK_SIZE]
113 } else {
114 self.lease.block().read_block(block_idx)?
115 };
116 block[block_offset..block_offset + chunk_len].copy_from_slice(&remaining[..chunk_len]);
117 self.lease.block().write_block(block_idx, &block)?;
118
119 remaining = &remaining[chunk_len..];
120 offset += chunk_len as u64;
121 }
122
123 self.write_cursor = offset;
124 Ok(start)
125 }
126
127 fn read_raw(&self, offset: u64, len: usize) -> Result<Vec<u8>> {
129 let mut buf = vec![0u8; len];
130 let mut buf_offset = 0;
131 let mut disk_offset = offset;
132
133 while buf_offset < len {
134 let block_idx = disk_offset / BLOCK_SIZE as u64;
135 let block_off = (disk_offset % BLOCK_SIZE as u64) as usize;
136 let space = BLOCK_SIZE - block_off;
137 let chunk_len = (len - buf_offset).min(space);
138
139 let block = self.lease.block().read_block(block_idx)?;
140 buf[buf_offset..buf_offset + chunk_len]
141 .copy_from_slice(&block[block_off..block_off + chunk_len]);
142
143 buf_offset += chunk_len;
144 disk_offset += chunk_len as u64;
145 }
146
147 Ok(buf)
148 }
149
150 fn update_superblock(&self) -> Result<()> {
152 let mut sb = [0u8; BLOCK_SIZE];
153 sb[0..4].copy_from_slice(&SUPER_MAGIC);
154 sb[4..8].copy_from_slice(&SUPER_VERSION.to_le_bytes());
155 sb[8..16].copy_from_slice(&(self.index.len() as u64).to_le_bytes());
156 sb[16..24].copy_from_slice(&self.write_cursor.to_le_bytes());
157 self.lease.block().write_block(0, &sb)
158 }
159
160 pub fn checkpoint_index(&self) -> Result<()> {
162 let data = postcard::to_allocvec(&self.index).map_err(|_| FabricError::IoError(-1))?;
163 let data_len = data.len() as u64;
164
165 let mut header = [0u8; BLOCK_SIZE];
167 header[0..4].copy_from_slice(b"GIDX"); header[4..8].copy_from_slice(&1u32.to_le_bytes()); header[8..16].copy_from_slice(&data_len.to_le_bytes());
170 self.lease
171 .block()
172 .write_block(self.index_start_lba, &header)?;
173
174 let num_data_blocks = data.len().div_ceil(BLOCK_SIZE);
176 let available = self.lease.block().num_blocks() - self.index_start_lba - 1;
177 if num_data_blocks as u64 > available {
178 return Err(FabricError::CapacityExceeded);
179 }
180
181 for i in 0..num_data_blocks {
182 let start = i * BLOCK_SIZE;
183 let end = core::cmp::min(start + BLOCK_SIZE, data.len());
184 let mut block = [0u8; BLOCK_SIZE];
185 block[..end - start].copy_from_slice(&data[start..end]);
186 self.lease
187 .block()
188 .write_block(self.index_start_lba + 1 + i as u64, &block)?;
189 }
190
191 self.update_superblock()
192 }
193
194 fn find_index(&self, uri: &str) -> Option<usize> {
195 self.index.iter().position(|e| e.uri == uri)
196 }
197}
198
199impl ObjectStore for BlockObjectStore {
200 fn put(&mut self, uri: &FabricUri, data: &[u8], opts: Option<PutOptions>) -> Result<()> {
201 let opts = opts.unwrap_or_default();
202 let checksum = crc32(data);
203 let now = host::unix_time_secs();
204
205 #[cfg(feature = "versioning")]
206 let next_version = {
207 let uri_str = uri.to_string();
208 match self.find_index(&uri_str) {
209 Some(idx) => self.index[idx].meta.version + 1,
210 None => 1,
211 }
212 };
213
214 let content_type = opts
215 .content_type
216 .unwrap_or_else(|| String::from("application/octet-stream"));
217
218 let lba = self.write_raw(data)?;
220
221 let meta = ObjectMeta {
222 content_type,
223 crc32: checksum,
224 size: data.len() as u64,
225 created_at: now,
226 tags: opts.tags,
227 #[cfg(feature = "versioning")]
228 version: next_version,
229 };
230
231 let entry = IndexEntry {
232 uri: uri.to_string(),
233 lba,
234 total_bytes: data.len() as u64,
235 meta,
236 };
237
238 let uri_str = uri.to_string();
240 if let Some(idx) = self.find_index(&uri_str) {
241 self.index[idx] = entry.clone();
242 } else {
243 self.index.push(entry.clone());
244 }
245
246 #[cfg(feature = "versioning")]
248 {
249 let ver_uri = alloc::format!("{}#v/{}", uri, next_version);
250 let ver_entry = IndexEntry {
251 uri: ver_uri,
252 lba: entry.lba,
253 total_bytes: entry.total_bytes,
254 meta: entry.meta,
255 };
256 self.index.push(ver_entry);
257 }
258
259 Ok(())
260 }
261
262 fn get(&self, uri: &FabricUri) -> Result<Option<ObjectData>> {
263 let uri_str = uri.to_string();
264 let idx = match self.find_index(&uri_str) {
265 Some(i) => i,
266 None => return Ok(None),
267 };
268
269 let entry = &self.index[idx];
270 let data = self.read_raw(entry.lba, entry.total_bytes as usize)?;
271
272 let computed = crc32(&data);
274 if computed != entry.meta.crc32 {
275 return Err(FabricError::IoError(-5));
276 }
277
278 let info = ObjectInfo::from_meta(&entry.meta);
279 Ok(Some(ObjectData { data, info }))
280 }
281
282 fn head(&self, uri: &FabricUri) -> Result<Option<ObjectInfo>> {
283 let uri_str = uri.to_string();
284 match self.find_index(&uri_str) {
285 Some(idx) => Ok(Some(ObjectInfo::from_meta(&self.index[idx].meta))),
286 None => Ok(None),
287 }
288 }
289
290 fn delete(&mut self, uri: &FabricUri) -> Result<bool> {
291 let uri_str = uri.to_string();
292 match self.find_index(&uri_str) {
293 Some(idx) => {
294 self.index.remove(idx);
295 Ok(true)
296 }
297 None => Ok(false),
298 }
299 }
300
301 fn list(&self, pool: &str, bucket: &str, prefix: &str) -> Result<Vec<String>> {
302 let mut keys = Vec::new();
303 for entry in &self.index {
304 if entry.uri.contains("#v/") {
306 continue;
307 }
308 if let Ok(uri) = entry.uri.parse::<FabricUri>() {
309 if uri.pool() == pool
310 && uri.bucket() == bucket
311 && (prefix.is_empty() || uri.key().starts_with(prefix))
312 {
313 keys.push(String::from(uri.key()));
314 }
315 }
316 }
317 keys.sort();
318 Ok(keys)
319 }
320
321 #[cfg(feature = "versioning")]
322 fn get_version(&self, uri: &FabricUri, version: u64) -> Result<Option<ObjectData>> {
323 let ver_uri = alloc::format!("{}#v/{}", uri, version);
324 let idx = match self.find_index(&ver_uri) {
325 Some(i) => i,
326 None => return Ok(None),
327 };
328
329 let entry = &self.index[idx];
330 let data = self.read_raw(entry.lba, entry.total_bytes as usize)?;
331
332 let computed = crc32(&data);
333 if computed != entry.meta.crc32 {
334 return Err(FabricError::IoError(-5));
335 }
336
337 let info = ObjectInfo::from_meta(&entry.meta);
338 Ok(Some(ObjectData { data, info }))
339 }
340
341 #[cfg(feature = "versioning")]
342 fn list_versions(&self, pool: &str, bucket: &str, key: &str) -> Result<Vec<VersionInfo>> {
343 let uri = FabricUri::new(pool, bucket, key)?;
344 let prefix = alloc::format!("{}#v/", uri);
345 let mut versions = Vec::new();
346 for entry in &self.index {
347 if entry.uri.starts_with(&prefix) {
348 versions.push(VersionInfo {
349 version: entry.meta.version,
350 size: entry.meta.size,
351 created_at: entry.meta.created_at,
352 });
353 }
354 }
355 versions.sort_by_key(|v| v.version);
356 Ok(versions)
357 }
358
359 #[cfg(feature = "versioning")]
360 fn delete_version(&mut self, uri: &FabricUri, version: u64) -> Result<bool> {
361 let ver_uri = alloc::format!("{}#v/{}", uri, version);
362 match self.find_index(&ver_uri) {
363 Some(idx) => {
364 self.index.remove(idx);
365 Ok(true)
366 }
367 None => Ok(false),
368 }
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use grafos_std::host;
376
377 fn setup() {
378 host::reset_mock();
379 host::mock_set_fbbu_num_blocks(1024);
380 }
381
382 #[test]
383 fn put_get_roundtrip() {
384 setup();
385 let mut store = BlockObjectStore::new(256).unwrap();
386 let uri: FabricUri = "fabric://p/b/hello".parse().unwrap();
387
388 store.put(&uri, b"world", None).unwrap();
389 let obj = store.get(&uri).unwrap().unwrap();
390 assert_eq!(obj.data, b"world");
391 assert_eq!(obj.info.size, 5);
392 }
393
394 #[test]
395 fn delete_removes_from_index() {
396 setup();
397 let mut store = BlockObjectStore::new(256).unwrap();
398 let uri: FabricUri = "fabric://p/b/del".parse().unwrap();
399
400 store.put(&uri, b"gone", None).unwrap();
401 assert!(store.delete(&uri).unwrap());
402 assert!(store.get(&uri).unwrap().is_none());
403 }
404
405 #[test]
406 fn list_with_prefix() {
407 setup();
408 let mut store = BlockObjectStore::new(256).unwrap();
409
410 store
411 .put(&"fabric://p/b/logs/a".parse().unwrap(), b"1", None)
412 .unwrap();
413 store
414 .put(&"fabric://p/b/logs/b".parse().unwrap(), b"2", None)
415 .unwrap();
416 store
417 .put(&"fabric://p/b/data/c".parse().unwrap(), b"3", None)
418 .unwrap();
419
420 let logs = store.list("p", "b", "logs/").unwrap();
421 assert_eq!(logs, vec!["logs/a", "logs/b"]);
422 }
423
424 #[test]
425 fn checkpoint_preserves_state() {
426 setup();
427 let mut store = BlockObjectStore::new(256).unwrap();
428 let uri: FabricUri = "fabric://p/b/ck".parse().unwrap();
429
430 store.put(&uri, b"persistent", None).unwrap();
431 store.checkpoint_index().unwrap();
432
433 let obj = store.get(&uri).unwrap().unwrap();
435 assert_eq!(obj.data, b"persistent");
436 }
437
438 #[test]
439 fn overwrite_updates() {
440 setup();
441 let mut store = BlockObjectStore::new(256).unwrap();
442 let uri: FabricUri = "fabric://p/b/ow".parse().unwrap();
443
444 store.put(&uri, b"first", None).unwrap();
445 store.put(&uri, b"second", None).unwrap();
446
447 let obj = store.get(&uri).unwrap().unwrap();
448 assert_eq!(obj.data, b"second");
449 }
450
451 #[test]
452 fn large_object_across_blocks() {
453 setup();
454 let mut store = BlockObjectStore::new(256).unwrap();
455 let uri: FabricUri = "fabric://p/b/big".parse().unwrap();
456
457 let data = vec![0xABu8; 2048]; store.put(&uri, &data, None).unwrap();
459
460 let obj = store.get(&uri).unwrap().unwrap();
461 assert_eq!(obj.data.len(), 2048);
462 assert_eq!(obj.data, data);
463 }
464}