1extern crate alloc;
8use alloc::collections::BTreeMap;
9use alloc::string::String;
10
11use crate::producer::Producer;
12use crate::topic::TopicManager;
13use grafos_std::error::Result;
14
15pub struct DlqRouter {
17 dlq_topic: String,
19 source_topic: String,
21 max_retries: u32,
23 nack_counts: BTreeMap<(u32, u64), u32>,
25 producer: Producer,
27}
28
29impl DlqRouter {
30 pub fn new(source_topic: &str, dlq_topic: &str, max_retries: u32) -> Self {
35 DlqRouter {
36 dlq_topic: String::from(dlq_topic),
37 source_topic: String::from(source_topic),
38 max_retries,
39 nack_counts: BTreeMap::new(),
40 producer: Producer::new(dlq_topic),
41 }
42 }
43
44 pub fn nack(&mut self, partition: u32, offset: u64) -> bool {
50 let count = self.nack_counts.entry((partition, offset)).or_insert(0);
51 *count += 1;
52 *count > self.max_retries
53 }
54
55 pub fn route_to_dlq(
59 &mut self,
60 mgr: &mut TopicManager,
61 partition: u32,
62 offset: u64,
63 value: &[u8],
64 ) -> Result<u64> {
65 let result = self.producer.send(mgr, value)?;
66 self.nack_counts.remove(&(partition, offset));
67 Ok(result)
68 }
69
70 pub fn nack_count(&self, partition: u32, offset: u64) -> u32 {
72 self.nack_counts
73 .get(&(partition, offset))
74 .copied()
75 .unwrap_or(0)
76 }
77
78 pub fn dlq_topic(&self) -> &str {
80 &self.dlq_topic
81 }
82
83 pub fn source_topic(&self) -> &str {
85 &self.source_topic
86 }
87
88 pub fn max_retries(&self) -> u32 {
90 self.max_retries
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use crate::producer::Producer;
98 use crate::topic::{TopicConfig, TopicManager};
99 use grafos_std::host;
100
101 fn setup() -> TopicManager {
102 host::reset_mock();
103 host::mock_set_fbmu_arena_size(1 << 20);
104 let mut mgr = TopicManager::new();
105 let config = TopicConfig {
106 num_partitions: 2,
107 partition_capacity: 32,
108 partition_stride: 256,
109 };
110 mgr.create("orders", config.clone()).expect("create");
111 mgr.create("orders-dlq", config).expect("create dlq");
112 mgr
113 }
114
115 #[test]
116 fn nack_tracking() {
117 let mut router = DlqRouter::new("orders", "orders-dlq", 3);
118
119 assert!(!router.nack(0, 5));
121 assert!(!router.nack(0, 5));
122 assert!(!router.nack(0, 5));
123 assert_eq!(router.nack_count(0, 5), 3);
124
125 assert!(router.nack(0, 5));
127 assert_eq!(router.nack_count(0, 5), 4);
128 }
129
130 #[test]
131 fn route_to_dlq_clears_count() {
132 let mut mgr = setup();
133 let mut router = DlqRouter::new("orders", "orders-dlq", 1);
134
135 let mut prod = Producer::new("orders");
137 prod.send_to(&mut mgr, 0, None, b"bad-msg").expect("send");
138
139 router.nack(0, 0);
141 assert!(router.nack(0, 0));
142
143 router
145 .route_to_dlq(&mut mgr, 0, 0, b"bad-msg")
146 .expect("route");
147 assert_eq!(router.nack_count(0, 0), 0);
148
149 let dlq_topic = mgr.open("orders-dlq").expect("open dlq");
151 let msg = dlq_topic
152 .partition(0)
153 .unwrap()
154 .read_at(0)
155 .expect("read")
156 .expect("some");
157 assert_eq!(msg.value, b"bad-msg");
158 }
159}