1extern crate alloc;
6
7use crate::topic::TopicManager;
8use grafos_std::error::{FabricError, Result};
9
10#[derive(Clone, Debug, PartialEq, Eq)]
12pub enum Partitioner {
13 RoundRobin,
15 KeyHash,
17}
18
19pub struct Producer {
21 topic_name: alloc::string::String,
22 partitioner: Partitioner,
23 round_robin_counter: u32,
24}
25
26impl Producer {
27 pub fn new(topic_name: &str) -> Self {
29 Producer {
30 topic_name: alloc::string::String::from(topic_name),
31 partitioner: Partitioner::RoundRobin,
32 round_robin_counter: 0,
33 }
34 }
35
36 pub fn with_partitioner(mut self, p: Partitioner) -> Self {
38 self.partitioner = p;
39 self
40 }
41
42 pub fn send(&mut self, mgr: &mut TopicManager, value: &[u8]) -> Result<u64> {
44 let topic = mgr
45 .open_mut(&self.topic_name)
46 .ok_or(FabricError::IoError(-11))?;
47 let np = topic.num_partitions();
48 let idx = self.round_robin_counter % np;
49 self.round_robin_counter = self.round_robin_counter.wrapping_add(1);
50 let part = topic.partition_mut(idx).ok_or(FabricError::IoError(-12))?;
51 part.append(None, value)
52 }
53
54 pub fn send_keyed(&mut self, mgr: &mut TopicManager, key: &[u8], value: &[u8]) -> Result<u64> {
60 let topic = mgr
61 .open_mut(&self.topic_name)
62 .ok_or(FabricError::IoError(-11))?;
63 let np = topic.num_partitions();
64 let idx = match self.partitioner {
65 Partitioner::RoundRobin => {
66 let i = self.round_robin_counter % np;
67 self.round_robin_counter = self.round_robin_counter.wrapping_add(1);
68 i
69 }
70 Partitioner::KeyHash => fnv1a_hash(key) % np,
71 };
72 let part = topic.partition_mut(idx).ok_or(FabricError::IoError(-12))?;
73 part.append(Some(key), value)
74 }
75
76 pub fn send_to(
78 &mut self,
79 mgr: &mut TopicManager,
80 partition: u32,
81 key: Option<&[u8]>,
82 value: &[u8],
83 ) -> Result<u64> {
84 let topic = mgr
85 .open_mut(&self.topic_name)
86 .ok_or(FabricError::IoError(-11))?;
87 let part = topic
88 .partition_mut(partition)
89 .ok_or(FabricError::IoError(-12))?;
90 let offset = part.append(key, value)?;
91 #[cfg(feature = "observe")]
92 crate::observe_hooks::on_message_published(&self.topic_name, value.len() as u64);
93 Ok(offset)
94 }
95}
96
97fn fnv1a_hash(data: &[u8]) -> u32 {
99 const FNV_OFFSET: u32 = 2166136261;
100 const FNV_PRIME: u32 = 16777619;
101 let mut hash = FNV_OFFSET;
102 for &byte in data {
103 hash ^= byte as u32;
104 hash = hash.wrapping_mul(FNV_PRIME);
105 }
106 hash
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use crate::topic::{TopicConfig, TopicManager};
113 use grafos_std::host;
114
115 fn setup() -> TopicManager {
116 host::reset_mock();
117 host::mock_set_fbmu_arena_size(1 << 20);
118 let mut mgr = TopicManager::new();
119 mgr.create(
120 "test",
121 TopicConfig {
122 num_partitions: 4,
123 partition_capacity: 32,
124 partition_stride: 256,
125 },
126 )
127 .expect("create");
128 mgr
129 }
130
131 #[test]
132 fn round_robin_distributes() {
133 let mut mgr = setup();
134 let mut prod = Producer::new("test");
135
136 for _ in 0..8 {
137 prod.send(&mut mgr, b"msg").expect("send");
138 }
139
140 let topic = mgr.open("test").expect("open");
141 for i in 0..4 {
143 assert_eq!(topic.partition(i).unwrap().next_offset(), 2);
144 }
145 }
146
147 #[test]
148 fn key_hash_same_key_same_partition() {
149 let mut mgr = setup();
150 let mut prod = Producer::new("test").with_partitioner(Partitioner::KeyHash);
151
152 for _ in 0..5 {
153 prod.send_keyed(&mut mgr, b"same-key", b"val")
154 .expect("send");
155 }
156
157 let topic = mgr.open("test").expect("open");
159 let mut counts = [0u64; 4];
160 for i in 0..4 {
161 counts[i as usize] = topic.partition(i).unwrap().next_offset();
162 }
163 let non_zero: Vec<_> = counts.iter().filter(|&&c| c > 0).collect();
164 assert_eq!(non_zero.len(), 1);
165 assert_eq!(*non_zero[0], 5);
166 }
167
168 #[test]
169 fn send_to_explicit_partition() {
170 let mut mgr = setup();
171 let mut prod = Producer::new("test");
172
173 prod.send_to(&mut mgr, 2, Some(b"k"), b"v")
174 .expect("send_to");
175 let topic = mgr.open("test").expect("open");
176 assert_eq!(topic.partition(2).unwrap().next_offset(), 1);
177 assert_eq!(topic.partition(0).unwrap().next_offset(), 0);
178 }
179
180 #[test]
181 fn send_to_nonexistent_topic() {
182 let mut mgr = setup();
183 let mut prod = Producer::new("no-such-topic");
184 let result = prod.send(&mut mgr, b"msg");
185 assert!(result.is_err());
186 }
187
188 #[test]
189 fn fnv1a_deterministic() {
190 assert_eq!(fnv1a_hash(b"hello"), fnv1a_hash(b"hello"));
191 assert_ne!(fnv1a_hash(b"hello"), fnv1a_hash(b"world"));
192 }
193}