grafos_stream/sink.rs
1//! Built-in sink implementations.
2//!
3//! | Type | Description |
4//! |------|-------------|
5//! | [`CollectSink<T>`] | Collects all items into a `Vec<T>` |
6//! | [`CountSink<T>`] | Counts items received |
7//! | [`FnSink<T, F>`] | Wraps a closure as a sink |
8
9extern crate alloc;
10use alloc::vec::Vec;
11
12use grafos_std::error::FabricError;
13
14use crate::stage::Sink;
15
16/// Sink that collects all items into a `Vec<T>`.
17///
18/// After the pipeline completes, call [`into_vec()`](CollectSink::into_vec)
19/// to retrieve the collected items.
20///
21/// # Example
22///
23/// ```rust
24/// use grafos_stream::sink::CollectSink;
25/// use grafos_stream::stage::Sink;
26///
27/// let mut sink = CollectSink::new();
28/// sink.accept(1).unwrap();
29/// sink.accept(2).unwrap();
30/// assert_eq!(sink.into_vec(), vec![1, 2]);
31/// ```
32pub struct CollectSink<T> {
33 items: Vec<T>,
34}
35
36impl<T> CollectSink<T> {
37 /// Create an empty collecting sink.
38 pub fn new() -> Self {
39 CollectSink { items: Vec::new() }
40 }
41
42 /// Consume the sink and return all accepted items in arrival order.
43 pub fn into_vec(self) -> Vec<T> {
44 self.items
45 }
46}
47
48impl<T> Default for CollectSink<T> {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl<T> Sink<T> for CollectSink<T> {
55 fn accept(&mut self, item: T) -> Result<(), FabricError> {
56 self.items.push(item);
57 Ok(())
58 }
59}
60
61/// Sink that counts items received.
62///
63/// # Example
64///
65/// ```rust
66/// use grafos_stream::sink::CountSink;
67/// use grafos_stream::stage::Sink;
68///
69/// let mut sink: CountSink<u32> = CountSink::new();
70/// sink.accept(10).unwrap();
71/// sink.accept(20).unwrap();
72/// assert_eq!(sink.count(), 2);
73/// ```
74pub struct CountSink<T> {
75 count: u64,
76 _marker: core::marker::PhantomData<T>,
77}
78
79impl<T> CountSink<T> {
80 /// Create a count sink initialized to zero.
81 pub fn new() -> Self {
82 CountSink {
83 count: 0,
84 _marker: core::marker::PhantomData,
85 }
86 }
87
88 /// Number of items accepted so far.
89 pub fn count(&self) -> u64 {
90 self.count
91 }
92}
93
94impl<T> Default for CountSink<T> {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100impl<T> Sink<T> for CountSink<T> {
101 fn accept(&mut self, _item: T) -> Result<(), FabricError> {
102 self.count += 1;
103 Ok(())
104 }
105}
106
107/// Sink that wraps a closure.
108///
109/// # Example
110///
111/// ```rust
112/// use grafos_stream::sink::FnSink;
113/// use grafos_stream::stage::Sink;
114///
115/// let mut total = 0u64;
116/// {
117/// let mut sink = FnSink::new(|x: u32| { total += x as u64; });
118/// sink.accept(10).unwrap();
119/// sink.accept(20).unwrap();
120/// }
121/// assert_eq!(total, 30);
122/// ```
123pub struct FnSink<T, F: FnMut(T)> {
124 f: F,
125 _marker: core::marker::PhantomData<T>,
126}
127
128impl<T, F: FnMut(T)> FnSink<T, F> {
129 /// Wrap a closure as a sink implementation.
130 pub fn new(f: F) -> Self {
131 FnSink {
132 f,
133 _marker: core::marker::PhantomData,
134 }
135 }
136}
137
138impl<T, F: FnMut(T)> Sink<T> for FnSink<T, F> {
139 fn accept(&mut self, item: T) -> Result<(), FabricError> {
140 (self.f)(item);
141 Ok(())
142 }
143}