1extern crate alloc;
4use alloc::collections::VecDeque;
5use alloc::string::String;
6use alloc::vec::Vec;
7
8use grafos_std::error::Result;
9
10use crate::block_store::BlockObjectStore;
11use crate::mem_store::MemObjectStore;
12#[cfg(feature = "versioning")]
13use crate::meta::VersionInfo;
14use crate::meta::{ObjectInfo, PutOptions};
15use crate::store::{ObjectData, ObjectStore};
16use crate::uri::FabricUri;
17
18pub struct TieredObjectStore {
41 hot: MemObjectStore,
42 cold: BlockObjectStore,
43 lru: VecDeque<String>,
45 max_hot: usize,
47}
48
49impl TieredObjectStore {
50 pub fn new(hot_buckets: usize, cold_blocks: u64, max_hot: usize) -> Result<Self> {
56 let hot = MemObjectStore::new(hot_buckets)?;
57 let cold = BlockObjectStore::new(cold_blocks)?;
58 Ok(TieredObjectStore {
59 hot,
60 cold,
61 lru: VecDeque::new(),
62 max_hot,
63 })
64 }
65
66 fn touch_lru(&mut self, uri_str: &str) {
68 if let Some(pos) = self.lru.iter().position(|s| s == uri_str) {
69 self.lru.remove(pos);
70 }
71 self.lru.push_back(String::from(uri_str));
72 }
73
74 fn remove_lru(&mut self, uri_str: &str) {
76 if let Some(pos) = self.lru.iter().position(|s| s == uri_str) {
77 self.lru.remove(pos);
78 }
79 }
80
81 fn evict_lru(&mut self) -> Result<()> {
83 if let Some(victim_str) = self.lru.pop_front() {
84 if let Ok(victim_uri) = victim_str.parse::<FabricUri>() {
85 if let Some(obj) = self.hot.get(&victim_uri)? {
87 let opts = PutOptions {
89 content_type: Some(obj.info.content_type.clone()),
90 ..Default::default()
91 };
92 self.cold.put(&victim_uri, &obj.data, Some(opts))?;
93 }
94 let _ = self.hot.delete(&victim_uri);
96 }
97 }
98 Ok(())
99 }
100
101 pub fn checkpoint(&self) -> Result<()> {
103 self.cold.checkpoint_index()
104 }
105}
106
107impl ObjectStore for TieredObjectStore {
108 fn put(&mut self, uri: &FabricUri, data: &[u8], opts: Option<PutOptions>) -> Result<()> {
109 let uri_str = uri.to_string();
110
111 if self.hot.head(uri)?.is_none() && self.lru.len() >= self.max_hot {
113 self.evict_lru()?;
114 }
115
116 self.hot.put(uri, data, opts)?;
118 self.touch_lru(&uri_str);
119
120 let _ = self.cold.delete(uri);
122
123 Ok(())
124 }
125
126 fn get(&self, uri: &FabricUri) -> Result<Option<ObjectData>> {
127 if let Some(obj) = self.hot.get(uri)? {
129 return Ok(Some(obj));
131 }
132
133 self.cold.get(uri)
135 }
136
137 fn head(&self, uri: &FabricUri) -> Result<Option<ObjectInfo>> {
138 if let Some(info) = self.hot.head(uri)? {
139 return Ok(Some(info));
140 }
141 self.cold.head(uri)
142 }
143
144 fn delete(&mut self, uri: &FabricUri) -> Result<bool> {
145 let uri_str = uri.to_string();
146 self.remove_lru(&uri_str);
147 let hot_deleted = self.hot.delete(uri)?;
148 let cold_deleted = self.cold.delete(uri)?;
149 Ok(hot_deleted || cold_deleted)
150 }
151
152 fn list(&self, pool: &str, bucket: &str, prefix: &str) -> Result<Vec<String>> {
153 let mut hot_keys = self.hot.list(pool, bucket, prefix)?;
154 let cold_keys = self.cold.list(pool, bucket, prefix)?;
155
156 for k in cold_keys {
158 if !hot_keys.contains(&k) {
159 hot_keys.push(k);
160 }
161 }
162 hot_keys.sort();
163 Ok(hot_keys)
164 }
165
166 #[cfg(feature = "versioning")]
167 fn get_version(&self, uri: &FabricUri, version: u64) -> Result<Option<ObjectData>> {
168 if let Some(obj) = self.hot.get_version(uri, version)? {
170 return Ok(Some(obj));
171 }
172 self.cold.get_version(uri, version)
173 }
174
175 #[cfg(feature = "versioning")]
176 fn list_versions(&self, pool: &str, bucket: &str, key: &str) -> Result<Vec<VersionInfo>> {
177 let mut hot = self.hot.list_versions(pool, bucket, key)?;
178 let cold = self.cold.list_versions(pool, bucket, key)?;
179 for v in cold {
181 if !hot.iter().any(|h| h.version == v.version) {
182 hot.push(v);
183 }
184 }
185 hot.sort_by_key(|v| v.version);
186 Ok(hot)
187 }
188
189 #[cfg(feature = "versioning")]
190 fn delete_version(&mut self, uri: &FabricUri, version: u64) -> Result<bool> {
191 let hot_deleted = self.hot.delete_version(uri, version)?;
192 let cold_deleted = self.cold.delete_version(uri, version)?;
193 Ok(hot_deleted || cold_deleted)
194 }
195}
196
197impl TieredObjectStore {
199 pub fn get_mut(&mut self, uri: &FabricUri) -> Result<Option<ObjectData>> {
204 let uri_str = uri.to_string();
205
206 if let Some(obj) = self.hot.get(uri)? {
208 self.touch_lru(&uri_str);
209 return Ok(Some(obj));
210 }
211
212 if let Some(obj) = self.cold.get(uri)? {
214 if self.lru.len() >= self.max_hot {
216 self.evict_lru()?;
217 }
218 let opts = PutOptions {
219 content_type: Some(obj.info.content_type.clone()),
220 ..Default::default()
221 };
222 self.hot.put(uri, &obj.data, Some(opts))?;
223 self.touch_lru(&uri_str);
224 return Ok(Some(obj));
225 }
226
227 Ok(None)
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use grafos_std::host;
235
236 fn setup() {
237 host::reset_mock();
238 host::mock_set_fbmu_arena_size(1_048_576);
239 host::mock_set_fbbu_num_blocks(1024);
240 }
241
242 #[test]
243 fn hot_tier_put_get() {
244 setup();
245 let mut store = TieredObjectStore::new(64, 256, 16).unwrap();
246 let uri: FabricUri = "fabric://p/b/hot".parse().unwrap();
247
248 store.put(&uri, b"fast", None).unwrap();
249 let obj = store.get_mut(&uri).unwrap().unwrap();
250 assert_eq!(obj.data, b"fast");
251 }
252
253 #[test]
254 fn lru_eviction_to_cold() {
255 setup();
256 let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
258
259 let uri1: FabricUri = "fabric://p/b/obj1".parse().unwrap();
260 let uri2: FabricUri = "fabric://p/b/obj2".parse().unwrap();
261 let uri3: FabricUri = "fabric://p/b/obj3".parse().unwrap();
262
263 store.put(&uri1, b"one", None).unwrap();
264 store.put(&uri2, b"two", None).unwrap();
265 store.put(&uri3, b"three", None).unwrap();
267
268 let obj = store.get_mut(&uri1).unwrap().unwrap();
270 assert_eq!(obj.data, b"one");
271
272 let obj3 = store.get_mut(&uri3).unwrap().unwrap();
274 assert_eq!(obj3.data, b"three");
275 }
276
277 #[test]
278 fn cold_promotion_on_read() {
279 setup();
280 let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
281
282 let uri1: FabricUri = "fabric://p/b/a".parse().unwrap();
283 let uri2: FabricUri = "fabric://p/b/b".parse().unwrap();
284 let uri3: FabricUri = "fabric://p/b/c".parse().unwrap();
285
286 store.put(&uri1, b"1", None).unwrap();
287 store.put(&uri2, b"2", None).unwrap();
288 store.put(&uri3, b"3", None).unwrap(); let obj = store.get_mut(&uri1).unwrap().unwrap();
292 assert_eq!(obj.data, b"1");
293 }
294
295 #[test]
296 fn delete_from_both_tiers() {
297 setup();
298 let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
299
300 let uri1: FabricUri = "fabric://p/b/d1".parse().unwrap();
301 let uri2: FabricUri = "fabric://p/b/d2".parse().unwrap();
302 let uri3: FabricUri = "fabric://p/b/d3".parse().unwrap();
303
304 store.put(&uri1, b"1", None).unwrap();
305 store.put(&uri2, b"2", None).unwrap();
306 store.put(&uri3, b"3", None).unwrap(); assert!(store.delete(&uri1).unwrap());
310 assert!(store.delete(&uri3).unwrap());
311
312 assert!(store.get_mut(&uri1).unwrap().is_none());
313 assert!(store.get_mut(&uri3).unwrap().is_none());
314 }
315
316 #[test]
317 fn list_merges_tiers() {
318 setup();
319 let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
320
321 let uri1: FabricUri = "fabric://p/b/x1".parse().unwrap();
322 let uri2: FabricUri = "fabric://p/b/x2".parse().unwrap();
323 let uri3: FabricUri = "fabric://p/b/x3".parse().unwrap();
324
325 store.put(&uri1, b"1", None).unwrap();
326 store.put(&uri2, b"2", None).unwrap();
327 store.put(&uri3, b"3", None).unwrap(); let keys = store.list("p", "b", "x").unwrap();
330 assert_eq!(keys.len(), 3);
331 }
332
333 #[test]
334 fn head_checks_both_tiers() {
335 setup();
336 let mut store = TieredObjectStore::new(64, 256, 2).unwrap();
337
338 let uri1: FabricUri = "fabric://p/b/h1".parse().unwrap();
339 let uri2: FabricUri = "fabric://p/b/h2".parse().unwrap();
340 let uri3: FabricUri = "fabric://p/b/h3".parse().unwrap();
341
342 store.put(&uri1, b"1", None).unwrap();
343 store.put(&uri2, b"2", None).unwrap();
344 store.put(&uri3, b"3", None).unwrap();
345
346 let info = store.head(&uri1).unwrap().unwrap();
348 assert_eq!(info.size, 1);
349 }
350}