grafos_mq/
topic.rs

1//! Topic management: create, open, delete topics with configurable partitions.
2
3extern crate alloc;
4use alloc::collections::BTreeMap;
5use alloc::string::String;
6use alloc::vec::Vec;
7
8use crate::partition::Partition;
9use grafos_std::error::{FabricError, Result};
10
11/// Configuration for a topic.
12#[derive(Clone, Debug)]
13pub struct TopicConfig {
14    /// Number of partitions.
15    pub num_partitions: u32,
16    /// Slot capacity per partition (ring buffer size).
17    pub partition_capacity: usize,
18    /// Slot stride in bytes per partition.
19    pub partition_stride: usize,
20}
21
22impl Default for TopicConfig {
23    fn default() -> Self {
24        TopicConfig {
25            num_partitions: 4,
26            partition_capacity: 64,
27            partition_stride: 512,
28        }
29    }
30}
31
32/// A topic consisting of one or more partitions.
33pub struct Topic {
34    name: String,
35    partitions: Vec<Partition>,
36}
37
38impl Topic {
39    /// Create a new topic with the given configuration.
40    pub(crate) fn new(name: &str, config: &TopicConfig) -> Result<Self> {
41        let mut partitions = Vec::with_capacity(config.num_partitions as usize);
42        for _ in 0..config.num_partitions {
43            partitions.push(Partition::new(
44                config.partition_capacity,
45                config.partition_stride,
46            )?);
47        }
48        Ok(Topic {
49            name: String::from(name),
50            partitions,
51        })
52    }
53
54    /// Returns the topic name.
55    pub fn name(&self) -> &str {
56        &self.name
57    }
58
59    /// Returns the number of partitions.
60    pub fn num_partitions(&self) -> u32 {
61        self.partitions.len() as u32
62    }
63
64    /// Returns a reference to the partition at the given index.
65    pub fn partition(&self, idx: u32) -> Option<&Partition> {
66        self.partitions.get(idx as usize)
67    }
68
69    /// Returns a mutable reference to the partition at the given index.
70    pub fn partition_mut(&mut self, idx: u32) -> Option<&mut Partition> {
71        self.partitions.get_mut(idx as usize)
72    }
73}
74
75/// Manages topic lifecycle: create, open, delete.
76pub struct TopicManager {
77    topics: BTreeMap<String, Topic>,
78}
79
80impl TopicManager {
81    /// Create a new empty topic manager.
82    pub fn new() -> Self {
83        TopicManager {
84            topics: BTreeMap::new(),
85        }
86    }
87
88    /// Create a new topic with the given name and configuration.
89    ///
90    /// Returns an error if the topic already exists.
91    pub fn create(&mut self, name: &str, config: TopicConfig) -> Result<()> {
92        if self.topics.contains_key(name) {
93            return Err(FabricError::IoError(-10));
94        }
95        let topic = Topic::new(name, &config)?;
96        self.topics.insert(String::from(name), topic);
97        #[cfg(feature = "observe")]
98        crate::observe_hooks::on_topic_created(name);
99        Ok(())
100    }
101
102    /// Open an existing topic by name.
103    pub fn open(&self, name: &str) -> Option<&Topic> {
104        self.topics.get(name)
105    }
106
107    /// Open an existing topic mutably by name.
108    pub fn open_mut(&mut self, name: &str) -> Option<&mut Topic> {
109        self.topics.get_mut(name)
110    }
111
112    /// Delete a topic by name. Returns `true` if the topic existed.
113    pub fn delete(&mut self, name: &str) -> bool {
114        self.topics.remove(name).is_some()
115    }
116
117    /// Returns all topic names.
118    pub fn list(&self) -> Vec<String> {
119        self.topics.keys().cloned().collect()
120    }
121
122    /// Returns the number of topics.
123    pub fn len(&self) -> usize {
124        self.topics.len()
125    }
126
127    /// Returns `true` if there are no topics.
128    pub fn is_empty(&self) -> bool {
129        self.topics.is_empty()
130    }
131}
132
133impl Default for TopicManager {
134    fn default() -> Self {
135        Self::new()
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use grafos_std::host;
143
144    fn setup() {
145        host::reset_mock();
146        host::mock_set_fbmu_arena_size(1 << 20); // 1 MiB
147    }
148
149    #[test]
150    fn create_open_delete() {
151        setup();
152        let mut mgr = TopicManager::new();
153        assert!(mgr.is_empty());
154
155        mgr.create("orders", TopicConfig::default())
156            .expect("create");
157        assert_eq!(mgr.len(), 1);
158        assert_eq!(mgr.list(), vec!["orders"]);
159
160        let topic = mgr.open("orders").expect("open");
161        assert_eq!(topic.name(), "orders");
162        assert_eq!(topic.num_partitions(), 4);
163
164        assert!(mgr.delete("orders"));
165        assert!(mgr.open("orders").is_none());
166        assert!(!mgr.delete("orders"));
167    }
168
169    #[test]
170    fn duplicate_create_fails() {
171        setup();
172        let mut mgr = TopicManager::new();
173        mgr.create("t1", TopicConfig::default()).expect("create");
174        let result = mgr.create("t1", TopicConfig::default());
175        assert!(result.is_err());
176    }
177
178    #[test]
179    fn custom_partition_count() {
180        setup();
181        let mut mgr = TopicManager::new();
182        let config = TopicConfig {
183            num_partitions: 8,
184            partition_capacity: 32,
185            partition_stride: 256,
186        };
187        mgr.create("events", config).expect("create");
188        let topic = mgr.open("events").expect("open");
189        assert_eq!(topic.num_partitions(), 8);
190    }
191}