grafos_registry/
region.rs

1//! Registry storage region backed by a FabricHashMap.
2
3extern crate alloc;
4use alloc::string::String;
5use alloc::vec::Vec;
6
7use grafos_collections::map::FabricHashMap;
8use grafos_std::error::Result;
9use grafos_std::host;
10use serde::{Deserialize, Serialize};
11
12use crate::filter::RegistryFilter;
13use crate::registration::{HealthStatus, ServiceRegistration};
14
15/// A slot in the registry map value — wraps a single registration.
16#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
17pub struct RegistrationSlot {
18    pub registration: ServiceRegistration,
19}
20
21/// Fabric-backed registry storage.
22///
23/// Internally stores a `FabricHashMap<String, Vec<RegistrationSlot>>` where
24/// the key is the service name and the value is a list of instance
25/// registrations. A monotonic `version` counter is bumped on every mutation
26/// to support [`RegistryWatcher`](crate::RegistryWatcher) change detection.
27pub struct RegistryRegion {
28    map: FabricHashMap<String, Vec<RegistrationSlot>>,
29    version: u64,
30    default_ttl_secs: u32,
31}
32
33impl RegistryRegion {
34    /// Create a new registry region with the given capacity and strides.
35    pub fn new(
36        capacity: usize,
37        default_ttl_secs: u32,
38        name_stride: usize,
39        slot_stride: usize,
40    ) -> Result<Self> {
41        let map = FabricHashMap::with_capacity(capacity, name_stride, slot_stride)?;
42        Ok(RegistryRegion {
43            map,
44            version: 0,
45            default_ttl_secs,
46        })
47    }
48
49    /// Return the current version counter.
50    pub fn version(&self) -> u64 {
51        self.version
52    }
53
54    /// Register a service instance. If a registration with the same
55    /// `instance_id` already exists under the same name, it is replaced.
56    pub fn register(&mut self, mut reg: ServiceRegistration) -> Result<()> {
57        let now = host::unix_time_secs();
58        reg.lease_expires_at = now + self.default_ttl_secs as u64;
59
60        let name = reg.name.clone();
61        let mut slots = self.map.get(&name)?.unwrap_or_default();
62
63        // Replace existing registration with same instance_id
64        slots.retain(|s| s.registration.instance_id != reg.instance_id);
65        slots.push(RegistrationSlot { registration: reg });
66
67        self.map.insert(&name, &slots)?;
68        self.version += 1;
69
70        #[cfg(feature = "observe")]
71        {
72            let ver = &slots.last().unwrap().registration.version;
73            crate::observe_hooks::on_service_registered(&name, ver);
74        }
75
76        Ok(())
77    }
78
79    /// Remove a specific instance from the registry.
80    pub fn deregister(&mut self, name: &str, instance_id: u128) -> Result<bool> {
81        let key = String::from(name);
82        let mut slots = self.map.get(&key)?.unwrap_or_default();
83        let before = slots.len();
84        slots.retain(|s| s.registration.instance_id != instance_id);
85        let removed = slots.len() < before;
86
87        if slots.is_empty() {
88            self.map.remove(&key)?;
89        } else {
90            self.map.insert(&key, &slots)?;
91        }
92
93        if removed {
94            self.version += 1;
95            #[cfg(feature = "observe")]
96            crate::observe_hooks::on_service_deregistered(name);
97        }
98        Ok(removed)
99    }
100
101    /// Update the health status of a specific instance.
102    pub fn set_health(
103        &mut self,
104        name: &str,
105        instance_id: u128,
106        health: HealthStatus,
107    ) -> Result<bool> {
108        let key = String::from(name);
109        let mut slots = match self.map.get(&key)? {
110            Some(s) => s,
111            None => return Ok(false),
112        };
113
114        let mut found = false;
115        for slot in &mut slots {
116            if slot.registration.instance_id == instance_id {
117                slot.registration.health = health.clone();
118                found = true;
119                break;
120            }
121        }
122
123        if found {
124            self.map.insert(&key, &slots)?;
125            self.version += 1;
126            #[cfg(feature = "observe")]
127            {
128                let old_str = "unknown";
129                let new_str = match &health {
130                    HealthStatus::Healthy => "healthy",
131                    HealthStatus::Degraded { .. } => "degraded",
132                    HealthStatus::Draining => "draining",
133                };
134                crate::observe_hooks::on_health_change(name, old_str, new_str);
135            }
136        }
137        Ok(found)
138    }
139
140    /// Set a specific instance to [`HealthStatus::Draining`].
141    pub fn set_draining(&mut self, name: &str, instance_id: u128) -> Result<bool> {
142        self.set_health(name, instance_id, HealthStatus::Draining)
143    }
144
145    /// Look up all registrations for a service name.
146    pub fn lookup(&self, name: &str) -> Result<Vec<ServiceRegistration>> {
147        let key = String::from(name);
148        match self.map.get(&key)? {
149            Some(slots) => Ok(slots.into_iter().map(|s| s.registration).collect()),
150            None => Ok(Vec::new()),
151        }
152    }
153
154    /// Look up a single registration for a service name (returns the first match).
155    pub fn lookup_one(&self, name: &str) -> Result<Option<ServiceRegistration>> {
156        let results = self.lookup(name)?;
157        Ok(results.into_iter().next())
158    }
159
160    /// Look up registrations matching a filter.
161    pub fn lookup_filtered(
162        &self,
163        name: &str,
164        filter: &RegistryFilter,
165    ) -> Result<Vec<ServiceRegistration>> {
166        let all = self.lookup(name)?;
167        Ok(all.into_iter().filter(|r| filter.matches(r)).collect())
168    }
169
170    /// List all distinct service names in the registry.
171    pub fn list_services(&self) -> Result<Vec<String>> {
172        let mut names = Vec::new();
173        for item in self.map.iter() {
174            let (name, slots) = item?;
175            if !slots.is_empty() {
176                names.push(name);
177            }
178        }
179        Ok(names)
180    }
181
182    /// Prune expired registrations. Returns the number of instances removed.
183    pub fn tick(&mut self) -> Result<usize> {
184        let now = host::unix_time_secs();
185        let mut total_pruned = 0;
186
187        // Collect all keys first to avoid borrow conflicts
188        let mut keys = Vec::new();
189        for item in self.map.iter() {
190            let (name, _) = item?;
191            keys.push(name);
192        }
193
194        for key in keys {
195            let mut slots = match self.map.get(&key)? {
196                Some(s) => s,
197                None => continue,
198            };
199            let before = slots.len();
200            slots.retain(|s| s.registration.lease_expires_at > now);
201            let pruned = before - slots.len();
202
203            if pruned > 0 {
204                if slots.is_empty() {
205                    self.map.remove(&key)?;
206                } else {
207                    self.map.insert(&key, &slots)?;
208                }
209                total_pruned += pruned;
210            }
211        }
212
213        if total_pruned > 0 {
214            self.version += 1;
215        }
216        Ok(total_pruned)
217    }
218}