grafos_mq/
dlq.rs

1//! Dead-letter queue: nack tracking and routing to a DLQ topic after max retries.
2//!
3//! DLQ is implemented as a separate topic — no external dependencies needed.
4//! A [`DlqRouter`] tracks nack counts per message offset and routes messages
5//! to the DLQ topic when `max_retries` is exceeded.
6
7extern crate alloc;
8use alloc::collections::BTreeMap;
9use alloc::string::String;
10
11use crate::producer::Producer;
12use crate::topic::TopicManager;
13use grafos_std::error::Result;
14
15/// Tracks nack counts and routes dead-lettered messages.
16pub struct DlqRouter {
17    /// Topic name of the DLQ sink.
18    dlq_topic: String,
19    /// Source topic name for tracking.
20    source_topic: String,
21    /// Maximum delivery attempts before routing to DLQ.
22    max_retries: u32,
23    /// Nack counts keyed by (partition, offset).
24    nack_counts: BTreeMap<(u32, u64), u32>,
25    /// Producer for writing to the DLQ topic.
26    producer: Producer,
27}
28
29impl DlqRouter {
30    /// Create a new DLQ router.
31    ///
32    /// `dlq_topic` is the name of the topic where dead-lettered messages are
33    /// sent. It must already exist in the [`TopicManager`].
34    pub fn new(source_topic: &str, dlq_topic: &str, max_retries: u32) -> Self {
35        DlqRouter {
36            dlq_topic: String::from(dlq_topic),
37            source_topic: String::from(source_topic),
38            max_retries,
39            nack_counts: BTreeMap::new(),
40            producer: Producer::new(dlq_topic),
41        }
42    }
43
44    /// Record a negative acknowledgement for a message.
45    ///
46    /// Returns `true` if the message should be routed to the DLQ (max retries
47    /// exceeded). The caller is responsible for calling [`DlqRouter::route_to_dlq`] with
48    /// the actual message bytes.
49    pub fn nack(&mut self, partition: u32, offset: u64) -> bool {
50        let count = self.nack_counts.entry((partition, offset)).or_insert(0);
51        *count += 1;
52        *count > self.max_retries
53    }
54
55    /// Route a message to the DLQ topic.
56    ///
57    /// Clears the nack counter for the given offset after routing.
58    pub fn route_to_dlq(
59        &mut self,
60        mgr: &mut TopicManager,
61        partition: u32,
62        offset: u64,
63        value: &[u8],
64    ) -> Result<u64> {
65        let result = self.producer.send(mgr, value)?;
66        self.nack_counts.remove(&(partition, offset));
67        Ok(result)
68    }
69
70    /// Returns the current nack count for a message.
71    pub fn nack_count(&self, partition: u32, offset: u64) -> u32 {
72        self.nack_counts
73            .get(&(partition, offset))
74            .copied()
75            .unwrap_or(0)
76    }
77
78    /// Returns the DLQ topic name.
79    pub fn dlq_topic(&self) -> &str {
80        &self.dlq_topic
81    }
82
83    /// Returns the source topic name.
84    pub fn source_topic(&self) -> &str {
85        &self.source_topic
86    }
87
88    /// Returns the maximum retries setting.
89    pub fn max_retries(&self) -> u32 {
90        self.max_retries
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use crate::producer::Producer;
98    use crate::topic::{TopicConfig, TopicManager};
99    use grafos_std::host;
100
101    fn setup() -> TopicManager {
102        host::reset_mock();
103        host::mock_set_fbmu_arena_size(1 << 20);
104        let mut mgr = TopicManager::new();
105        let config = TopicConfig {
106            num_partitions: 2,
107            partition_capacity: 32,
108            partition_stride: 256,
109        };
110        mgr.create("orders", config.clone()).expect("create");
111        mgr.create("orders-dlq", config).expect("create dlq");
112        mgr
113    }
114
115    #[test]
116    fn nack_tracking() {
117        let mut router = DlqRouter::new("orders", "orders-dlq", 3);
118
119        // First 3 nacks should not trigger DLQ
120        assert!(!router.nack(0, 5));
121        assert!(!router.nack(0, 5));
122        assert!(!router.nack(0, 5));
123        assert_eq!(router.nack_count(0, 5), 3);
124
125        // 4th nack exceeds max_retries
126        assert!(router.nack(0, 5));
127        assert_eq!(router.nack_count(0, 5), 4);
128    }
129
130    #[test]
131    fn route_to_dlq_clears_count() {
132        let mut mgr = setup();
133        let mut router = DlqRouter::new("orders", "orders-dlq", 1);
134
135        // Produce a message to source
136        let mut prod = Producer::new("orders");
137        prod.send_to(&mut mgr, 0, None, b"bad-msg").expect("send");
138
139        // Nack twice (exceeds max_retries=1)
140        router.nack(0, 0);
141        assert!(router.nack(0, 0));
142
143        // Route to DLQ
144        router
145            .route_to_dlq(&mut mgr, 0, 0, b"bad-msg")
146            .expect("route");
147        assert_eq!(router.nack_count(0, 0), 0);
148
149        // Verify message is in DLQ topic
150        let dlq_topic = mgr.open("orders-dlq").expect("open dlq");
151        let msg = dlq_topic
152            .partition(0)
153            .unwrap()
154            .read_at(0)
155            .expect("read")
156            .expect("some");
157        assert_eq!(msg.value, b"bad-msg");
158    }
159}