grafos_mq/
message.rs

1//! Message envelope for MQ records.
2
3extern crate alloc;
4use alloc::string::String;
5use alloc::vec::Vec;
6
7use serde::{Deserialize, Serialize};
8
9/// A message stored in a partition.
10#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
11pub struct Message {
12    /// Monotonic offset within the partition.
13    pub offset: u64,
14    /// Unix timestamp in seconds when the message was produced.
15    pub timestamp: u64,
16    /// Optional key for keyed partitioning.
17    pub key: Option<Vec<u8>>,
18    /// Message payload.
19    pub value: Vec<u8>,
20    /// User-defined headers.
21    pub headers: Vec<(String, Vec<u8>)>,
22}
23
24impl Message {
25    /// Create a new message with the given offset, timestamp, and value.
26    pub fn new(offset: u64, timestamp: u64, value: Vec<u8>) -> Self {
27        Message {
28            offset,
29            timestamp,
30            key: None,
31            value,
32            headers: Vec::new(),
33        }
34    }
35
36    /// Create a keyed message.
37    pub fn keyed(offset: u64, timestamp: u64, key: Vec<u8>, value: Vec<u8>) -> Self {
38        Message {
39            offset,
40            timestamp,
41            key: Some(key),
42            value,
43            headers: Vec::new(),
44        }
45    }
46
47    /// Add a header to the message.
48    pub fn with_header(mut self, name: &str, value: Vec<u8>) -> Self {
49        self.headers.push((String::from(name), value));
50        self
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use super::*;
57
58    #[test]
59    fn message_roundtrip() {
60        let msg = Message::keyed(42, 1000, b"key1".to_vec(), b"hello".to_vec())
61            .with_header("trace-id", b"abc123".to_vec());
62
63        let bytes = postcard::to_allocvec(&msg).expect("serialize");
64        let decoded: Message = postcard::from_bytes(&bytes).expect("deserialize");
65        assert_eq!(msg, decoded);
66        assert_eq!(decoded.offset, 42);
67        assert_eq!(decoded.key, Some(b"key1".to_vec()));
68        assert_eq!(decoded.headers.len(), 1);
69        assert_eq!(decoded.headers[0].0, "trace-id");
70    }
71
72    #[test]
73    fn message_no_key_no_headers() {
74        let msg = Message::new(0, 500, b"payload".to_vec());
75        assert_eq!(msg.key, None);
76        assert!(msg.headers.is_empty());
77
78        let bytes = postcard::to_allocvec(&msg).expect("serialize");
79        let decoded: Message = postcard::from_bytes(&bytes).expect("deserialize");
80        assert_eq!(msg, decoded);
81    }
82}