grafos_mq/
producer.rs

1//! Producer: send messages to topic partitions.
2//!
3//! Supports round-robin (default) and key-hash (FNV-1a) partitioning strategies.
4
5extern crate alloc;
6
7use crate::topic::TopicManager;
8use grafos_std::error::{FabricError, Result};
9
10/// Partitioning strategy for the producer.
11#[derive(Clone, Debug, PartialEq, Eq)]
12pub enum Partitioner {
13    /// Round-robin across partitions.
14    RoundRobin,
15    /// Hash the message key using FNV-1a and select partition by modulus.
16    KeyHash,
17}
18
19/// A producer that sends messages to a named topic.
20pub struct Producer {
21    topic_name: alloc::string::String,
22    partitioner: Partitioner,
23    round_robin_counter: u32,
24}
25
26impl Producer {
27    /// Create a new producer for the named topic.
28    pub fn new(topic_name: &str) -> Self {
29        Producer {
30            topic_name: alloc::string::String::from(topic_name),
31            partitioner: Partitioner::RoundRobin,
32            round_robin_counter: 0,
33        }
34    }
35
36    /// Set the partitioning strategy.
37    pub fn with_partitioner(mut self, p: Partitioner) -> Self {
38        self.partitioner = p;
39        self
40    }
41
42    /// Send a message without a key (uses round-robin partitioning).
43    pub fn send(&mut self, mgr: &mut TopicManager, value: &[u8]) -> Result<u64> {
44        let topic = mgr
45            .open_mut(&self.topic_name)
46            .ok_or(FabricError::IoError(-11))?;
47        let np = topic.num_partitions();
48        let idx = self.round_robin_counter % np;
49        self.round_robin_counter = self.round_robin_counter.wrapping_add(1);
50        let part = topic.partition_mut(idx).ok_or(FabricError::IoError(-12))?;
51        part.append(None, value)
52    }
53
54    /// Send a keyed message.
55    ///
56    /// With [`Partitioner::KeyHash`], the key determines the partition.
57    /// With [`Partitioner::RoundRobin`], the key is stored but partition
58    /// selection still uses round-robin.
59    pub fn send_keyed(&mut self, mgr: &mut TopicManager, key: &[u8], value: &[u8]) -> Result<u64> {
60        let topic = mgr
61            .open_mut(&self.topic_name)
62            .ok_or(FabricError::IoError(-11))?;
63        let np = topic.num_partitions();
64        let idx = match self.partitioner {
65            Partitioner::RoundRobin => {
66                let i = self.round_robin_counter % np;
67                self.round_robin_counter = self.round_robin_counter.wrapping_add(1);
68                i
69            }
70            Partitioner::KeyHash => fnv1a_hash(key) % np,
71        };
72        let part = topic.partition_mut(idx).ok_or(FabricError::IoError(-12))?;
73        part.append(Some(key), value)
74    }
75
76    /// Send a message to a specific partition.
77    pub fn send_to(
78        &mut self,
79        mgr: &mut TopicManager,
80        partition: u32,
81        key: Option<&[u8]>,
82        value: &[u8],
83    ) -> Result<u64> {
84        let topic = mgr
85            .open_mut(&self.topic_name)
86            .ok_or(FabricError::IoError(-11))?;
87        let part = topic
88            .partition_mut(partition)
89            .ok_or(FabricError::IoError(-12))?;
90        let offset = part.append(key, value)?;
91        #[cfg(feature = "observe")]
92        crate::observe_hooks::on_message_published(&self.topic_name, value.len() as u64);
93        Ok(offset)
94    }
95}
96
97/// FNV-1a hash over a byte slice, returning a u32-range partition index.
98fn fnv1a_hash(data: &[u8]) -> u32 {
99    const FNV_OFFSET: u32 = 2166136261;
100    const FNV_PRIME: u32 = 16777619;
101    let mut hash = FNV_OFFSET;
102    for &byte in data {
103        hash ^= byte as u32;
104        hash = hash.wrapping_mul(FNV_PRIME);
105    }
106    hash
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use crate::topic::{TopicConfig, TopicManager};
113    use grafos_std::host;
114
115    fn setup() -> TopicManager {
116        host::reset_mock();
117        host::mock_set_fbmu_arena_size(1 << 20);
118        let mut mgr = TopicManager::new();
119        mgr.create(
120            "test",
121            TopicConfig {
122                num_partitions: 4,
123                partition_capacity: 32,
124                partition_stride: 256,
125            },
126        )
127        .expect("create");
128        mgr
129    }
130
131    #[test]
132    fn round_robin_distributes() {
133        let mut mgr = setup();
134        let mut prod = Producer::new("test");
135
136        for _ in 0..8 {
137            prod.send(&mut mgr, b"msg").expect("send");
138        }
139
140        let topic = mgr.open("test").expect("open");
141        // Each partition should have 2 messages
142        for i in 0..4 {
143            assert_eq!(topic.partition(i).unwrap().next_offset(), 2);
144        }
145    }
146
147    #[test]
148    fn key_hash_same_key_same_partition() {
149        let mut mgr = setup();
150        let mut prod = Producer::new("test").with_partitioner(Partitioner::KeyHash);
151
152        for _ in 0..5 {
153            prod.send_keyed(&mut mgr, b"same-key", b"val")
154                .expect("send");
155        }
156
157        // All messages should land in the same partition
158        let topic = mgr.open("test").expect("open");
159        let mut counts = [0u64; 4];
160        for i in 0..4 {
161            counts[i as usize] = topic.partition(i).unwrap().next_offset();
162        }
163        let non_zero: Vec<_> = counts.iter().filter(|&&c| c > 0).collect();
164        assert_eq!(non_zero.len(), 1);
165        assert_eq!(*non_zero[0], 5);
166    }
167
168    #[test]
169    fn send_to_explicit_partition() {
170        let mut mgr = setup();
171        let mut prod = Producer::new("test");
172
173        prod.send_to(&mut mgr, 2, Some(b"k"), b"v")
174            .expect("send_to");
175        let topic = mgr.open("test").expect("open");
176        assert_eq!(topic.partition(2).unwrap().next_offset(), 1);
177        assert_eq!(topic.partition(0).unwrap().next_offset(), 0);
178    }
179
180    #[test]
181    fn send_to_nonexistent_topic() {
182        let mut mgr = setup();
183        let mut prod = Producer::new("no-such-topic");
184        let result = prod.send(&mut mgr, b"msg");
185        assert!(result.is_err());
186    }
187
188    #[test]
189    fn fnv1a_deterministic() {
190        assert_eq!(fnv1a_hash(b"hello"), fnv1a_hash(b"hello"));
191        assert_ne!(fnv1a_hash(b"hello"), fnv1a_hash(b"world"));
192    }
193}