1#![cfg_attr(not(feature = "std"), no_std)]
40
41extern crate alloc;
42
43mod endpoint;
44mod filter;
45mod reader;
46mod region;
47mod registration;
48mod watcher;
49mod writer;
50
51#[cfg(feature = "observe")]
52pub mod observe_hooks;
53
54#[cfg(feature = "rpc")]
55pub mod rpc_resolver;
56
57pub use endpoint::ServiceEndpoint;
58pub use filter::RegistryFilter;
59pub use reader::RegistryReader;
60pub use region::RegistryRegion;
61pub use registration::{HealthStatus, ServiceRegistration};
62pub use watcher::RegistryWatcher;
63pub use writer::RegistryWriter;
64
65use grafos_std::error::Result;
66
67const NAME_STRIDE: usize = 128;
69const SLOT_STRIDE: usize = 4096;
71
72pub struct RegistryBuilder {
88 capacity: usize,
89 default_ttl_secs: u32,
90 name_stride: usize,
91 slot_stride: usize,
92}
93
94impl RegistryBuilder {
95 pub fn new() -> Self {
97 RegistryBuilder {
98 capacity: 64,
99 default_ttl_secs: 300,
100 name_stride: NAME_STRIDE,
101 slot_stride: SLOT_STRIDE,
102 }
103 }
104
105 pub fn capacity(mut self, n: usize) -> Self {
107 self.capacity = n;
108 self
109 }
110
111 pub fn default_ttl_secs(mut self, secs: u32) -> Self {
113 self.default_ttl_secs = secs;
114 self
115 }
116
117 pub fn name_stride(mut self, stride: usize) -> Self {
119 self.name_stride = stride;
120 self
121 }
122
123 pub fn slot_stride(mut self, stride: usize) -> Self {
125 self.slot_stride = stride;
126 self
127 }
128
129 pub fn build(self) -> Result<(RegistryWriter, RegistryReader)> {
132 let region = RegistryRegion::new(
133 self.capacity,
134 self.default_ttl_secs,
135 self.name_stride,
136 self.slot_stride,
137 )?;
138 let writer = RegistryWriter::new(region);
139 let reader_region = RegistryRegion::new(
144 self.capacity,
145 self.default_ttl_secs,
146 self.name_stride,
147 self.slot_stride,
148 )?;
149 let reader = RegistryReader::new(reader_region);
150 Ok((writer, reader))
151 }
152}
153
154impl Default for RegistryBuilder {
155 fn default() -> Self {
156 Self::new()
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163 use alloc::string::String;
164 use alloc::vec;
165 use grafos_std::host;
166
167 fn setup() {
168 host::reset_mock();
169 host::mock_set_fbmu_arena_size(1048576);
170 }
171
172 #[test]
173 fn register_and_discover_roundtrip() {
174 setup();
175 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
176
177 let reg = ServiceRegistration::new("api-gateway", "2.1.0", 1001)
178 .with_endpoint(ServiceEndpoint::net([10, 0, 0, 1], 8080))
179 .with_tag("env", "prod");
180
181 region.register(reg.clone()).unwrap();
182
183 let results = region.lookup("api-gateway").unwrap();
184 assert_eq!(results.len(), 1);
185 assert_eq!(results[0].name, "api-gateway");
186 assert_eq!(results[0].version, "2.1.0");
187 assert_eq!(results[0].instance_id, 1001);
188 }
189
190 #[test]
191 fn multi_instance_registration() {
192 setup();
193 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
194
195 for i in 0..3 {
196 let reg = ServiceRegistration::new("worker", "1.0.0", 100 + i)
197 .with_endpoint(ServiceEndpoint::net([10, 0, 0, i as u8 + 1], 9090));
198 region.register(reg).unwrap();
199 }
200
201 let results = region.lookup("worker").unwrap();
202 assert_eq!(results.len(), 3);
203 }
204
205 #[test]
206 fn explicit_deregister() {
207 setup();
208 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
209
210 let reg = ServiceRegistration::new("temp-svc", "1.0.0", 42);
211 region.register(reg).unwrap();
212
213 assert_eq!(region.lookup("temp-svc").unwrap().len(), 1);
214
215 region.deregister("temp-svc", 42).unwrap();
216 assert_eq!(region.lookup("temp-svc").unwrap().len(), 0);
217 }
218
219 #[test]
220 fn auto_deregister_on_lease_expiry() {
221 setup();
222 let mut region = RegistryRegion::new(64, 10, NAME_STRIDE, SLOT_STRIDE).unwrap();
223
224 let reg = ServiceRegistration::new("ephemeral", "1.0.0", 1);
225 region.register(reg).unwrap();
226
227 assert_eq!(region.lookup("ephemeral").unwrap().len(), 1);
228
229 host::mock_advance_time_secs(11);
230
231 let pruned = region.tick().unwrap();
232 assert_eq!(pruned, 1);
233 assert_eq!(region.lookup("ephemeral").unwrap().len(), 0);
234 }
235
236 #[test]
237 fn health_status_changes() {
238 setup();
239 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
240
241 let reg = ServiceRegistration::new("db-proxy", "3.0.0", 7);
242 region.register(reg).unwrap();
243
244 region
245 .set_health(
246 "db-proxy",
247 7,
248 HealthStatus::Degraded {
249 reason: String::from("high latency"),
250 },
251 )
252 .unwrap();
253
254 let results = region.lookup("db-proxy").unwrap();
255 assert!(matches!(results[0].health, HealthStatus::Degraded { .. }));
256
257 region
258 .set_health("db-proxy", 7, HealthStatus::Draining)
259 .unwrap();
260
261 let results = region.lookup("db-proxy").unwrap();
262 assert!(matches!(results[0].health, HealthStatus::Draining));
263 }
264
265 #[test]
266 fn tag_and_version_filtering() {
267 setup();
268 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
269
270 let reg1 = ServiceRegistration::new("api", "1.0.0", 1)
271 .with_tag("env", "prod")
272 .with_tag("region", "us-east");
273 let reg2 = ServiceRegistration::new("api", "1.1.0", 2)
274 .with_tag("env", "prod")
275 .with_tag("region", "us-west");
276 let reg3 = ServiceRegistration::new("api", "2.0.0", 3).with_tag("env", "staging");
277
278 region.register(reg1).unwrap();
279 region.register(reg2).unwrap();
280 region.register(reg3).unwrap();
281
282 let filter = RegistryFilter::new().version_prefix("1.");
284 let results = region.lookup_filtered("api", &filter).unwrap();
285 assert_eq!(results.len(), 2);
286
287 let filter = RegistryFilter::new().tag("env", "prod");
289 let results = region.lookup_filtered("api", &filter).unwrap();
290 assert_eq!(results.len(), 2);
291
292 let filter = RegistryFilter::new()
294 .tag("env", "prod")
295 .tag("region", "us-east");
296 let results = region.lookup_filtered("api", &filter).unwrap();
297 assert_eq!(results.len(), 1);
298 assert_eq!(results[0].instance_id, 1);
299
300 let filter = RegistryFilter::new().health(HealthStatus::Healthy);
302 let results = region.lookup_filtered("api", &filter).unwrap();
303 assert_eq!(results.len(), 3);
304 }
305
306 #[test]
307 fn watch_notification() {
308 setup();
309 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
310
311 let initial_version = region.version();
312
313 let reg = ServiceRegistration::new("watched", "1.0.0", 1);
314 region.register(reg).unwrap();
315
316 assert!(region.version() > initial_version);
317
318 let mut watcher = RegistryWatcher::new(initial_version);
319 assert!(watcher.changed(region.version()));
320
321 watcher.acknowledge(region.version());
323 assert!(!watcher.changed(region.version()));
324
325 let reg2 = ServiceRegistration::new("watched-2", "1.0.0", 2);
327 region.register(reg2).unwrap();
328 assert!(watcher.changed(region.version()));
329 }
330
331 #[test]
332 fn list_services() {
333 setup();
334 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
335
336 region
337 .register(ServiceRegistration::new("svc-a", "1.0.0", 1))
338 .unwrap();
339 region
340 .register(ServiceRegistration::new("svc-b", "1.0.0", 2))
341 .unwrap();
342 region
343 .register(ServiceRegistration::new("svc-a", "1.0.0", 3))
344 .unwrap();
345
346 let mut names = region.list_services().unwrap();
347 names.sort();
348 assert_eq!(names, vec!["svc-a", "svc-b"]);
349 }
350
351 #[test]
352 fn lookup_one() {
353 setup();
354 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
355
356 assert!(region.lookup_one("missing").unwrap().is_none());
357
358 region
359 .register(ServiceRegistration::new("singleton", "1.0.0", 42))
360 .unwrap();
361
362 let result = region.lookup_one("singleton").unwrap();
363 assert!(result.is_some());
364 assert_eq!(result.unwrap().instance_id, 42);
365 }
366
367 #[test]
368 fn update_registration() {
369 setup();
370 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
371
372 let reg = ServiceRegistration::new("updatable", "1.0.0", 1)
373 .with_endpoint(ServiceEndpoint::net([10, 0, 0, 1], 8080));
374 region.register(reg).unwrap();
375
376 let updated = ServiceRegistration::new("updatable", "1.1.0", 1)
378 .with_endpoint(ServiceEndpoint::net([10, 0, 0, 2], 9090));
379 region.register(updated).unwrap();
380
381 let results = region.lookup("updatable").unwrap();
382 assert_eq!(results.len(), 1);
383 assert_eq!(results[0].version, "1.1.0");
384 }
385
386 #[test]
387 fn writer_reader_pair() {
388 setup();
389 let (mut writer, _reader) = RegistryBuilder::new().capacity(32).build().unwrap();
390
391 let reg = ServiceRegistration::new("pair-test", "1.0.0", 1);
392 writer.register(reg).unwrap();
393
394 let from_writer = writer.lookup("pair-test").unwrap();
396 assert_eq!(from_writer.len(), 1);
397 }
398
399 #[test]
400 fn set_draining() {
401 setup();
402 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
403
404 region
405 .register(ServiceRegistration::new("drain-me", "1.0.0", 5))
406 .unwrap();
407
408 region.set_draining("drain-me", 5).unwrap();
409
410 let results = region.lookup("drain-me").unwrap();
411 assert!(matches!(results[0].health, HealthStatus::Draining));
412 }
413
414 #[test]
415 fn endpoint_variants() {
416 setup();
417 let mut region = RegistryRegion::new(64, 300, NAME_STRIDE, SLOT_STRIDE).unwrap();
418
419 let reg = ServiceRegistration::new("multi-endpoint", "1.0.0", 1)
420 .with_endpoint(ServiceEndpoint::net([10, 0, 0, 1], 8080))
421 .with_endpoint(ServiceEndpoint::queue(b"topic://events"))
422 .with_endpoint(ServiceEndpoint::store(b"bucket://data"))
423 .with_endpoint(ServiceEndpoint::custom("grpc", b"grpc://10.0.0.1:50051"));
424
425 region.register(reg).unwrap();
426
427 let results = region.lookup("multi-endpoint").unwrap();
428 assert_eq!(results[0].endpoints.len(), 4);
429 }
430}