grafos_mq/
group.rs

1//! Consumer group: decentralized partition assignment with lease-based liveness.
2//!
3//! Each consumer in a group claims unclaimed or stale partitions from a shared
4//! assignment map. Liveness is tracked via timestamps — if a consumer's
5//! heartbeat goes stale beyond `stale_threshold_secs`, its partitions are
6//! eligible for rebalancing.
7
8extern crate alloc;
9use alloc::collections::BTreeMap;
10use alloc::string::String;
11use alloc::vec::Vec;
12
13use grafos_std::host;
14
15/// Assignment entry for a single partition.
16#[derive(Clone, Debug)]
17struct Assignment {
18    consumer_id: String,
19    last_heartbeat: u64,
20}
21
22/// Shared state for a consumer group.
23pub struct ConsumerGroup {
24    group_name: String,
25    topic_name: String,
26    num_partitions: u32,
27    /// Partition index -> assignment.
28    assignments: BTreeMap<u32, Assignment>,
29    /// Seconds after which a consumer is considered stale.
30    stale_threshold_secs: u64,
31}
32
33impl ConsumerGroup {
34    /// Create a new consumer group.
35    pub fn new(
36        group_name: &str,
37        topic_name: &str,
38        num_partitions: u32,
39        stale_threshold_secs: u64,
40    ) -> Self {
41        ConsumerGroup {
42            group_name: String::from(group_name),
43            topic_name: String::from(topic_name),
44            num_partitions,
45            assignments: BTreeMap::new(),
46            stale_threshold_secs,
47        }
48    }
49
50    /// Claim unclaimed or stale partitions for the given consumer.
51    ///
52    /// Returns the list of partition indices now assigned to this consumer.
53    pub fn claim(&mut self, consumer_id: &str) -> Vec<u32> {
54        let now = host::unix_time_secs();
55        let mut claimed = Vec::new();
56
57        for part_idx in 0..self.num_partitions {
58            let should_claim = match self.assignments.get(&part_idx) {
59                None => true,
60                Some(a) => {
61                    if a.consumer_id == consumer_id {
62                        true
63                    } else {
64                        now.saturating_sub(a.last_heartbeat) > self.stale_threshold_secs
65                    }
66                }
67            };
68
69            if should_claim {
70                self.assignments.insert(
71                    part_idx,
72                    Assignment {
73                        consumer_id: String::from(consumer_id),
74                        last_heartbeat: now,
75                    },
76                );
77                claimed.push(part_idx);
78            }
79        }
80
81        claimed
82    }
83
84    /// Return the partitions currently assigned to the given consumer.
85    pub fn assigned_to(&self, consumer_id: &str) -> Vec<u32> {
86        self.assignments
87            .iter()
88            .filter(|(_, a)| a.consumer_id == consumer_id)
89            .map(|(&idx, _)| idx)
90            .collect()
91    }
92
93    /// Update the heartbeat for a consumer's assigned partitions.
94    pub fn heartbeat(&mut self, consumer_id: &str) {
95        let now = host::unix_time_secs();
96        for a in self.assignments.values_mut() {
97            if a.consumer_id == consumer_id {
98                a.last_heartbeat = now;
99            }
100        }
101    }
102
103    /// Release all partitions held by the given consumer.
104    pub fn release(&mut self, consumer_id: &str) {
105        self.assignments.retain(|_, a| a.consumer_id != consumer_id);
106    }
107
108    /// Returns the group name.
109    pub fn group_name(&self) -> &str {
110        &self.group_name
111    }
112
113    /// Returns the topic name.
114    pub fn topic_name(&self) -> &str {
115        &self.topic_name
116    }
117
118    /// Returns the number of unassigned partitions.
119    pub fn unassigned_count(&self) -> u32 {
120        let assigned = self.assignments.len() as u32;
121        self.num_partitions.saturating_sub(assigned)
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use grafos_std::host;
129
130    fn setup() {
131        host::reset_mock();
132        host::mock_set_unix_time_secs(1000);
133    }
134
135    #[test]
136    fn single_consumer_claims_all() {
137        setup();
138        let mut group = ConsumerGroup::new("g1", "orders", 4, 30);
139        let claimed = group.claim("c1");
140        assert_eq!(claimed, vec![0, 1, 2, 3]);
141        assert_eq!(group.assigned_to("c1"), vec![0, 1, 2, 3]);
142        assert_eq!(group.unassigned_count(), 0);
143    }
144
145    #[test]
146    fn second_consumer_waits_for_stale() {
147        setup();
148        let mut group = ConsumerGroup::new("g1", "orders", 4, 30);
149
150        // c1 claims all
151        group.claim("c1");
152
153        // c2 can't claim anything yet (c1 is still fresh)
154        let claimed = group.claim("c2");
155        assert!(claimed.is_empty());
156
157        // Advance time past stale threshold
158        host::mock_advance_time_secs(31);
159
160        // c2 can now claim stale partitions
161        let claimed = group.claim("c2");
162        assert_eq!(claimed, vec![0, 1, 2, 3]);
163        assert_eq!(group.assigned_to("c2"), vec![0, 1, 2, 3]);
164        assert!(group.assigned_to("c1").is_empty());
165    }
166
167    #[test]
168    fn heartbeat_prevents_stale_takeover() {
169        setup();
170        let mut group = ConsumerGroup::new("g1", "orders", 4, 30);
171        group.claim("c1");
172
173        // Advance 20s, heartbeat
174        host::mock_advance_time_secs(20);
175        group.heartbeat("c1");
176
177        // Advance another 20s (40s total, but only 20s since heartbeat)
178        host::mock_advance_time_secs(20);
179
180        // c2 can't claim (c1 heartbeated recently)
181        let claimed = group.claim("c2");
182        assert!(claimed.is_empty());
183    }
184
185    #[test]
186    fn release_frees_partitions() {
187        setup();
188        let mut group = ConsumerGroup::new("g1", "orders", 4, 30);
189        group.claim("c1");
190
191        group.release("c1");
192        assert_eq!(group.unassigned_count(), 4);
193
194        // c2 can immediately claim
195        let claimed = group.claim("c2");
196        assert_eq!(claimed, vec![0, 1, 2, 3]);
197    }
198}