grafos_leasekit/
manager.rs

1//! The [`RenewalManager`] — poll-driven lease renewal engine.
2
3extern crate alloc;
4use alloc::boxed::Box;
5use alloc::vec::Vec;
6
7use crate::backoff::Backoff;
8use crate::policy::RenewalPolicy;
9
10/// Tracks the renewal state for a single registered lease.
11struct Entry {
12    lease_id: u128,
13    created_at: u64,
14    expires_at: u64,
15    policy: RenewalPolicy,
16    backoff: Backoff,
17    /// Absolute time before which we should not retry after a failure.
18    next_retry_at: u64,
19    /// Whether the lease has been successfully renewed at least once by
20    /// this manager (used to compute renewal deadline from latest expiry).
21    last_renewed_at: Option<u64>,
22}
23
24impl Entry {
25    fn renewal_deadline(&self) -> u64 {
26        let created = self.last_renewed_at.unwrap_or(self.created_at);
27        let jitter_seed = self.lease_id as u64;
28        self.policy
29            .renewal_deadline(created, self.expires_at, jitter_seed)
30    }
31
32    fn is_near_expiry(&self, now: u64) -> bool {
33        if now >= self.expires_at {
34            return true;
35        }
36        let remaining = self.expires_at - now;
37        let total_ttl = self
38            .expires_at
39            .saturating_sub(self.last_renewed_at.unwrap_or(self.created_at));
40        if total_ttl == 0 {
41            return true;
42        }
43        // Near expiry when less than 10% TTL remains
44        remaining < total_ttl / 10
45    }
46}
47
48/// Summary returned by [`RenewalManager::tick`].
49#[derive(Debug, Clone, Default)]
50pub struct RenewalSummary {
51    /// Number of leases successfully renewed this tick.
52    pub renewed: u32,
53    /// Number of leases checked but not yet due for renewal.
54    pub skipped: u32,
55    /// Number of leases where renewal was attempted but failed.
56    pub failed: u32,
57    /// Lease IDs that are near expiry (< 10% TTL remaining).
58    pub near_expiry: Vec<u128>,
59}
60
61/// Callback invoked by the manager to actually perform a renewal.
62///
63/// The manager itself does not hold lease objects; callers provide a
64/// `renew_fn` closure to `tick_with` that performs the renewal and
65/// returns the new `expires_at` timestamp on success.
66///
67/// Alternatively, use [`tick`](RenewalManager::tick) for a simpler API
68/// that auto-renews by extending `expires_at` by `policy.min_renew_secs`.
69pub struct RenewalManager {
70    entries: Vec<Entry>,
71    revoke_callbacks: Vec<Box<dyn Fn(u128, u8)>>,
72}
73
74impl RenewalManager {
75    /// Create an empty manager with no registered leases.
76    pub fn new() -> Self {
77        RenewalManager {
78            entries: Vec::new(),
79            revoke_callbacks: Vec::new(),
80        }
81    }
82
83    /// Register a lease for managed renewal.
84    ///
85    /// `created_at` is inferred as `expires_at - policy.min_renew_secs`
86    /// (clamped to 0). For precise control, use `register_with_created_at`.
87    pub fn register(&mut self, lease_id: u128, expires_at: u64, policy: RenewalPolicy) {
88        let created_at = expires_at.saturating_sub(policy.min_renew_secs);
89        self.register_with_created_at(lease_id, created_at, expires_at, policy);
90    }
91
92    /// Register a lease with an explicit creation timestamp.
93    pub fn register_with_created_at(
94        &mut self,
95        lease_id: u128,
96        created_at: u64,
97        expires_at: u64,
98        policy: RenewalPolicy,
99    ) {
100        // Remove any existing entry with the same lease_id
101        self.entries.retain(|e| e.lease_id != lease_id);
102        let backoff = Backoff::new(1, policy.max_backoff_secs);
103        self.entries.push(Entry {
104            lease_id,
105            created_at,
106            expires_at,
107            policy,
108            backoff,
109            next_retry_at: 0,
110            last_renewed_at: None,
111        });
112    }
113
114    /// Unregister a lease.
115    pub fn unregister(&mut self, lease_id: u128) {
116        self.entries.retain(|e| e.lease_id != lease_id);
117    }
118
119    /// Number of leases currently managed.
120    pub fn len(&self) -> usize {
121        self.entries.len()
122    }
123
124    /// Returns `true` if no leases are managed.
125    pub fn is_empty(&self) -> bool {
126        self.entries.is_empty()
127    }
128
129    /// Drive renewals using a simple model: if a lease is due, extend
130    /// `expires_at` by `policy.min_renew_secs`. This is suitable when the
131    /// manager owns the lease state (e.g. testing or self-contained use).
132    pub fn tick(&mut self, now_unix_secs: u64) -> RenewalSummary {
133        self.tick_with(now_unix_secs, |_lease_id, duration| {
134            // Simple model: always succeed, return new expiry
135            Ok(duration)
136        })
137    }
138
139    /// Drive renewals with a caller-provided renewal function.
140    ///
141    /// `renew_fn(lease_id, duration_secs)` should attempt the actual
142    /// renewal and return the new `expires_at` on success, or an error.
143    pub fn tick_with<F>(&mut self, now_unix_secs: u64, mut renew_fn: F) -> RenewalSummary
144    where
145        F: FnMut(u128, u64) -> Result<u64, ()>,
146    {
147        let mut summary = RenewalSummary::default();
148
149        for entry in self.entries.iter_mut() {
150            // Check near-expiry
151            if entry.is_near_expiry(now_unix_secs) {
152                summary.near_expiry.push(entry.lease_id);
153            }
154
155            // Already expired — skip
156            if now_unix_secs >= entry.expires_at {
157                summary.skipped += 1;
158                continue;
159            }
160
161            // Not yet at renewal deadline — skip
162            let deadline = entry.renewal_deadline();
163            if now_unix_secs < deadline {
164                summary.skipped += 1;
165                continue;
166            }
167
168            // In backoff — skip
169            if now_unix_secs < entry.next_retry_at {
170                summary.skipped += 1;
171                continue;
172            }
173
174            // Attempt renewal
175            let duration = entry.policy.min_renew_secs;
176            match renew_fn(entry.lease_id, duration) {
177                Ok(new_expires_at) => {
178                    let new_exp = now_unix_secs.saturating_add(new_expires_at);
179                    entry.last_renewed_at = Some(now_unix_secs);
180                    entry.expires_at = new_exp;
181                    entry.backoff.reset();
182                    entry.next_retry_at = 0;
183                    summary.renewed += 1;
184
185                    #[cfg(feature = "observe")]
186                    emit_renew_success(entry.lease_id);
187                }
188                Err(()) => {
189                    let delay = entry.backoff.next_delay();
190                    entry.next_retry_at = now_unix_secs.saturating_add(delay);
191                    summary.failed += 1;
192
193                    #[cfg(feature = "observe")]
194                    emit_renew_failure(entry.lease_id);
195                }
196            }
197        }
198
199        #[cfg(feature = "observe")]
200        emit_tick_metrics(&summary, self.entries.len());
201
202        summary
203    }
204
205    /// Returns `true` if the given lease is near expiry (< 10% TTL remaining).
206    pub fn is_near_expiry(&self, lease_id: u128, now: u64) -> bool {
207        self.entries
208            .iter()
209            .find(|e| e.lease_id == lease_id)
210            .map(|e| e.is_near_expiry(now))
211            .unwrap_or(false)
212    }
213
214    /// Register a callback invoked when a lease is revoked (e.g. via
215    /// WITHDRAW cleanup or REVOKE_BROADCAST).
216    ///
217    /// The callback receives the lease ID and a reason code (0 = unknown/timeout).
218    pub fn on_revoked<F: Fn(u128, u8) + 'static>(&mut self, callback: F) {
219        self.revoke_callbacks.push(Box::new(callback));
220    }
221
222    /// Notify all registered revocation callbacks for the given lease.
223    ///
224    /// `reason` is the WITHDRAW reason code (0 = timeout/unknown).
225    pub fn notify_revoked(&self, lease_id: u128, reason: u8) {
226        for cb in &self.revoke_callbacks {
227            cb(lease_id, reason);
228        }
229    }
230
231    /// Returns the current `expires_at` for a managed lease, if registered.
232    pub fn expires_at(&self, lease_id: u128) -> Option<u64> {
233        self.entries
234            .iter()
235            .find(|e| e.lease_id == lease_id)
236            .map(|e| e.expires_at)
237    }
238}
239
240impl Default for RenewalManager {
241    fn default() -> Self {
242        Self::new()
243    }
244}
245
246#[cfg(feature = "observe")]
247fn emit_renew_success(lease_id: u128) {
248    let _ = lease_id;
249    grafos_observe::FabricMetrics::global().ops_total.inc();
250}
251
252#[cfg(feature = "observe")]
253fn emit_renew_failure(lease_id: u128) {
254    let _ = lease_id;
255    grafos_observe::FabricMetrics::global().ops_total.inc();
256}
257
258#[cfg(feature = "observe")]
259fn emit_tick_metrics(summary: &RenewalSummary, managed_count: usize) {
260    let _ = (summary, managed_count);
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn register_and_tick_before_deadline() {
269        let mut mgr = RenewalManager::new();
270        let policy = RenewalPolicy {
271            renew_at_fraction: 0.75,
272            jitter_fraction: 0.0,
273            min_renew_secs: 100,
274            max_backoff_secs: 5,
275        };
276        // created_at=1000, expires_at=1100, TTL=100
277        mgr.register_with_created_at(1, 1000, 1100, policy);
278
279        // At t=1050, only 50% elapsed — should not renew
280        let s = mgr.tick(1050);
281        assert_eq!(s.renewed, 0);
282        assert_eq!(s.skipped, 1);
283        assert_eq!(s.failed, 0);
284    }
285
286    #[test]
287    fn tick_at_deadline_triggers_renewal() {
288        let mut mgr = RenewalManager::new();
289        let policy = RenewalPolicy {
290            renew_at_fraction: 0.75,
291            jitter_fraction: 0.0,
292            min_renew_secs: 100,
293            max_backoff_secs: 5,
294        };
295        mgr.register_with_created_at(1, 1000, 1100, policy);
296
297        // At t=1075 (exactly 75% elapsed), should renew
298        let s = mgr.tick(1075);
299        assert_eq!(s.renewed, 1);
300        assert_eq!(s.skipped, 0);
301    }
302
303    #[test]
304    fn tick_after_expiry_skips() {
305        let mut mgr = RenewalManager::new();
306        let policy = RenewalPolicy {
307            renew_at_fraction: 0.75,
308            jitter_fraction: 0.0,
309            min_renew_secs: 100,
310            max_backoff_secs: 5,
311        };
312        mgr.register_with_created_at(1, 1000, 1100, policy);
313
314        let s = mgr.tick(1200);
315        assert_eq!(s.renewed, 0);
316        assert_eq!(s.skipped, 1);
317    }
318
319    #[test]
320    fn tick_with_failure_triggers_backoff() {
321        let mut mgr = RenewalManager::new();
322        let policy = RenewalPolicy {
323            renew_at_fraction: 0.75,
324            jitter_fraction: 0.0,
325            min_renew_secs: 100,
326            max_backoff_secs: 5,
327        };
328        mgr.register_with_created_at(1, 1000, 1100, policy);
329
330        // First tick at deadline fails
331        let s = mgr.tick_with(1075, |_, _| Err(()));
332        assert_eq!(s.failed, 1);
333        assert_eq!(s.renewed, 0);
334
335        // Immediately retrying should be in backoff (next_retry_at = 1075 + 1 = 1076)
336        let s = mgr.tick_with(1075, |_, _| Ok(100));
337        assert_eq!(s.skipped, 1);
338        assert_eq!(s.renewed, 0);
339
340        // After backoff delay (t=1076), should retry
341        let s = mgr.tick_with(1076, |_, _| Ok(100));
342        assert_eq!(s.renewed, 1);
343    }
344
345    #[test]
346    fn multiple_leases() {
347        let mut mgr = RenewalManager::new();
348        let policy = RenewalPolicy {
349            renew_at_fraction: 0.50,
350            jitter_fraction: 0.0,
351            min_renew_secs: 100,
352            max_backoff_secs: 5,
353        };
354
355        mgr.register_with_created_at(1, 1000, 1100, policy);
356        mgr.register_with_created_at(2, 1000, 1200, policy);
357        mgr.register_with_created_at(3, 1000, 1300, policy);
358
359        // At t=1050: lease 1 (50% at TTL 100) is due, lease 2 (25% at TTL 200) not due
360        let s = mgr.tick(1050);
361        assert_eq!(s.renewed, 1);
362        assert_eq!(s.skipped, 2);
363
364        // At t=1100: lease 2 is now due (50% of 200), lease 3 not yet
365        // Lease 1 was renewed, so it has new expiry
366        let s = mgr.tick(1100);
367        assert!(s.renewed >= 1);
368    }
369
370    #[test]
371    fn unregister_removes_lease() {
372        let mut mgr = RenewalManager::new();
373        let policy = RenewalPolicy::default();
374        mgr.register(1, 1100, policy);
375        mgr.register(2, 1200, policy);
376        assert_eq!(mgr.len(), 2);
377
378        mgr.unregister(1);
379        assert_eq!(mgr.len(), 1);
380        assert!(mgr.expires_at(1).is_none());
381        assert!(mgr.expires_at(2).is_some());
382    }
383
384    #[test]
385    fn is_near_expiry_predicate() {
386        let mut mgr = RenewalManager::new();
387        let policy = RenewalPolicy {
388            renew_at_fraction: 0.75,
389            jitter_fraction: 0.0,
390            min_renew_secs: 100,
391            max_backoff_secs: 5,
392        };
393        mgr.register_with_created_at(1, 1000, 1100, policy);
394
395        // At t=1050: 50% remaining, not near expiry
396        assert!(!mgr.is_near_expiry(1, 1050));
397
398        // At t=1091: 9% remaining, near expiry (< 10%)
399        assert!(mgr.is_near_expiry(1, 1091));
400
401        // At t=1100: expired, near expiry
402        assert!(mgr.is_near_expiry(1, 1100));
403    }
404
405    #[test]
406    fn near_expiry_in_summary() {
407        let mut mgr = RenewalManager::new();
408        let policy = RenewalPolicy {
409            renew_at_fraction: 0.75,
410            jitter_fraction: 0.0,
411            min_renew_secs: 100,
412            max_backoff_secs: 5,
413        };
414        mgr.register_with_created_at(1, 1000, 1100, policy);
415
416        let s = mgr.tick(1091);
417        assert!(s.near_expiry.contains(&1));
418    }
419
420    #[test]
421    fn re_register_replaces_entry() {
422        let mut mgr = RenewalManager::new();
423        let policy = RenewalPolicy::default();
424        mgr.register(1, 1100, policy);
425        assert_eq!(mgr.expires_at(1), Some(1100));
426
427        mgr.register(1, 2200, policy);
428        assert_eq!(mgr.len(), 1);
429        assert_eq!(mgr.expires_at(1), Some(2200));
430    }
431
432    #[test]
433    fn unknown_lease_is_not_near_expiry() {
434        let mgr = RenewalManager::new();
435        assert!(!mgr.is_near_expiry(999, 5000));
436    }
437
438    #[test]
439    fn renewal_extends_expiry() {
440        let mut mgr = RenewalManager::new();
441        let policy = RenewalPolicy {
442            renew_at_fraction: 0.75,
443            jitter_fraction: 0.0,
444            min_renew_secs: 100,
445            max_backoff_secs: 5,
446        };
447        mgr.register_with_created_at(1, 1000, 1100, policy);
448
449        let old_exp = mgr.expires_at(1).unwrap();
450        assert_eq!(old_exp, 1100);
451
452        // Renew at t=1080 with tick (auto-extends by min_renew_secs=100)
453        mgr.tick(1080);
454        let new_exp = mgr.expires_at(1).unwrap();
455        // new_expires_at = now + duration = 1080 + 100 = 1180
456        assert_eq!(new_exp, 1180);
457    }
458
459    #[test]
460    fn on_revoked_callback_fires() {
461        use alloc::sync::Arc;
462        use core::sync::atomic::{AtomicBool, Ordering};
463
464        let mut mgr = RenewalManager::new();
465        let fired = Arc::new(AtomicBool::new(false));
466        let fired_clone = Arc::clone(&fired);
467        mgr.on_revoked(move |_lease_id, _reason| {
468            fired_clone.store(true, Ordering::SeqCst);
469        });
470
471        mgr.notify_revoked(42, 1);
472        assert!(fired.load(Ordering::SeqCst));
473    }
474
475    #[test]
476    fn multiple_revocation_callbacks_fire() {
477        use alloc::sync::Arc;
478        use core::sync::atomic::{AtomicBool, Ordering};
479
480        let mut mgr = RenewalManager::new();
481
482        let fired_a = Arc::new(AtomicBool::new(false));
483        let fired_b = Arc::new(AtomicBool::new(false));
484
485        let a = Arc::clone(&fired_a);
486        mgr.on_revoked(move |_, _| {
487            a.store(true, Ordering::SeqCst);
488        });
489
490        let b = Arc::clone(&fired_b);
491        mgr.on_revoked(move |_, _| {
492            b.store(true, Ordering::SeqCst);
493        });
494
495        mgr.notify_revoked(99, 2);
496        assert!(fired_a.load(Ordering::SeqCst));
497        assert!(fired_b.load(Ordering::SeqCst));
498    }
499
500    #[test]
501    fn notify_revoked_with_no_callbacks_is_noop() {
502        let mgr = RenewalManager::new();
503        // Should not panic
504        mgr.notify_revoked(7, 0);
505    }
506
507    #[test]
508    fn revocation_callback_receives_correct_args() {
509        use alloc::sync::Arc;
510        use std::sync::Mutex;
511
512        let mut mgr = RenewalManager::new();
513        let log: Arc<Mutex<Vec<(u128, u8)>>> = Arc::new(Mutex::new(Vec::new()));
514        let log_clone = Arc::clone(&log);
515        mgr.on_revoked(move |lease_id, reason| {
516            log_clone.lock().unwrap().push((lease_id, reason));
517        });
518
519        mgr.notify_revoked(42, 1);
520        mgr.notify_revoked(1000, 255);
521
522        let entries = log.lock().unwrap();
523        assert_eq!(entries.len(), 2);
524        assert_eq!(entries[0], (42, 1));
525        assert_eq!(entries[1], (1000, 255));
526    }
527
528    #[test]
529    fn backoff_escalation_on_repeated_failure() {
530        let mut mgr = RenewalManager::new();
531        let policy = RenewalPolicy {
532            renew_at_fraction: 0.75,
533            jitter_fraction: 0.0,
534            min_renew_secs: 100,
535            max_backoff_secs: 5,
536        };
537        mgr.register_with_created_at(1, 1000, 1100, policy);
538
539        // Fail at t=1075 => backoff 1s, next_retry_at=1076
540        let s = mgr.tick_with(1075, |_, _| Err(()));
541        assert_eq!(s.failed, 1);
542
543        // Fail at t=1076 => backoff 2s, next_retry_at=1078
544        let s = mgr.tick_with(1076, |_, _| Err(()));
545        assert_eq!(s.failed, 1);
546
547        // Skip at t=1077 (in backoff)
548        let s = mgr.tick_with(1077, |_, _| Ok(100));
549        assert_eq!(s.skipped, 1);
550
551        // Succeed at t=1078 (past backoff)
552        let s = mgr.tick_with(1078, |_, _| Ok(100));
553        assert_eq!(s.renewed, 1);
554    }
555}