1extern crate alloc;
4use alloc::boxed::Box;
5use alloc::vec::Vec;
6
7use grafos_std::error::FabricError;
8
9use crate::placement::StagePlacement;
10use crate::stages::DspStage;
11use crate::types::{Block, DspConfig, DspStats};
12
13pub trait DspSource {
15 fn next_block(&mut self) -> Result<Option<Block>, FabricError>;
16}
17
18pub trait DspSink {
20 fn accept_block(&mut self, block: Block) -> Result<(), FabricError>;
21}
22
23pub struct CollectSink {
25 blocks: Vec<Block>,
26}
27
28impl CollectSink {
29 pub fn new() -> Self {
30 Self { blocks: Vec::new() }
31 }
32
33 pub fn into_blocks(self) -> Vec<Block> {
34 self.blocks
35 }
36}
37
38impl Default for CollectSink {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl DspSink for CollectSink {
45 fn accept_block(&mut self, block: Block) -> Result<(), FabricError> {
46 self.blocks.push(block);
47 Ok(())
48 }
49}
50
51pub struct VecSource {
53 blocks: Vec<Block>,
54 index: usize,
55}
56
57impl VecSource {
58 pub fn new(blocks: Vec<Block>) -> Self {
59 Self { blocks, index: 0 }
60 }
61}
62
63impl DspSource for VecSource {
64 fn next_block(&mut self) -> Result<Option<Block>, FabricError> {
65 if self.index < self.blocks.len() {
66 let block = self.blocks[self.index].clone();
67 self.index += 1;
68 Ok(Some(block))
69 } else {
70 Ok(None)
71 }
72 }
73}
74
75struct StageEntry {
76 stage: Box<dyn DspStage>,
77 _placement: StagePlacement,
78}
79
80pub struct DspPipeline {
82 config: DspConfig,
83 source: Option<Box<dyn DspSource>>,
84 stages: Vec<StageEntry>,
85 sink: Option<Box<dyn DspSink>>,
86}
87
88impl DspPipeline {
89 pub fn new(config: DspConfig) -> Self {
91 Self {
92 config,
93 source: None,
94 stages: Vec::new(),
95 sink: None,
96 }
97 }
98
99 pub fn source(mut self, source: impl DspSource + 'static) -> Self {
101 self.source = Some(Box::new(source));
102 self
103 }
104
105 pub fn stage(mut self, stage: impl DspStage + 'static, placement: StagePlacement) -> Self {
107 self.stages.push(StageEntry {
108 stage: Box::new(stage),
109 _placement: placement,
110 });
111 self
112 }
113
114 pub fn sink(mut self, sink: impl DspSink + 'static) -> Self {
116 self.sink = Some(Box::new(sink));
117 self
118 }
119
120 pub fn build(self) -> Result<DspPipelineHandle, FabricError> {
122 let source = self.source.ok_or(FabricError::CapacityExceeded)?;
123 let sink = self.sink.ok_or(FabricError::CapacityExceeded)?;
124 Ok(DspPipelineHandle {
125 config: self.config,
126 source,
127 stages: self.stages,
128 sink,
129 })
130 }
131}
132
133pub struct DspPipelineHandle {
135 config: DspConfig,
136 source: Box<dyn DspSource>,
137 stages: Vec<StageEntry>,
138 sink: Box<dyn DspSink>,
139}
140
141impl DspPipelineHandle {
142 pub fn run(&mut self) -> Result<DspStats, FabricError> {
144 let mut stats = DspStats::default();
145 let mut min_latency_us: u64 = u64::MAX;
146 let mut total_latency_us: u64 = 0;
147
148 let block_duration_us = if self.config.sample_rate > 0 {
150 (self.config.block_size as u64 * 1_000_000) / self.config.sample_rate as u64
151 } else {
152 0
153 };
154
155 while let Some(block) = self.source.next_block()? {
156 let start = Self::timestamp_us();
157
158 let mut current = block;
159 for entry in self.stages.iter_mut() {
160 current = entry.stage.process(¤t)?;
161 }
162
163 self.sink.accept_block(current)?;
164
165 let end = Self::timestamp_us();
166 let elapsed = end.saturating_sub(start);
167
168 stats.blocks_processed += 1;
169 total_latency_us += elapsed;
170
171 if elapsed > stats.max_latency_us {
172 stats.max_latency_us = elapsed;
173 }
174 if elapsed < min_latency_us {
175 min_latency_us = elapsed;
176 }
177
178 if block_duration_us > 0 && elapsed > block_duration_us {
180 stats.overruns += 1;
181 }
182 }
183
184 if stats.blocks_processed > 0 {
185 stats.avg_latency_us = total_latency_us / stats.blocks_processed;
186 stats.jitter_us = stats.max_latency_us.saturating_sub(min_latency_us);
187 }
188
189 Ok(stats)
190 }
191
192 #[cfg(feature = "std")]
193 fn timestamp_us() -> u64 {
194 use std::time::{SystemTime, UNIX_EPOCH};
196 SystemTime::now()
197 .duration_since(UNIX_EPOCH)
198 .map(|d| d.as_micros() as u64)
199 .unwrap_or(0)
200 }
201
202 #[cfg(not(feature = "std"))]
203 fn timestamp_us() -> u64 {
204 0
206 }
207}
208
209#[cfg(all(test, feature = "std"))]
210mod tests {
211 use super::*;
212 use crate::stages::DspStage;
213 use crate::types::{Block, Sample};
214 use alloc::vec;
215
216 struct SlowStage {
219 delay: std::time::Duration,
220 }
221
222 impl DspStage for SlowStage {
223 fn process(&mut self, block: &Block) -> Result<Block, FabricError> {
224 std::thread::sleep(self.delay);
225 Ok(block.clone())
226 }
227 }
228
229 fn make_block(block_size: usize, sample_rate: u32) -> Block {
230 Block {
231 data: vec![0.0 as Sample; block_size],
232 sample_rate,
233 channels: 1,
234 }
235 }
236
237 #[test]
238 fn overrun_detected_for_slow_stage() {
239 let config = DspConfig {
241 block_size: 256,
242 sample_rate: 48_000,
243 channels: 1,
244 };
245
246 let blocks = vec![
247 make_block(256, 48_000),
248 make_block(256, 48_000),
249 make_block(256, 48_000),
250 ];
251
252 let slow = SlowStage {
254 delay: std::time::Duration::from_millis(20),
255 };
256
257 let mut handle = DspPipeline::new(config)
258 .source(VecSource::new(blocks))
259 .stage(slow, StagePlacement::default())
260 .sink(CollectSink::new())
261 .build()
262 .unwrap();
263
264 let stats = handle.run().unwrap();
265
266 assert_eq!(stats.blocks_processed, 3);
267 assert_eq!(stats.overruns, 3, "all 3 blocks should be overruns");
268 }
269
270 #[test]
271 fn no_overrun_for_fast_stage() {
272 let config = DspConfig {
275 block_size: 256,
276 sample_rate: 48_000,
277 channels: 1,
278 };
279
280 let blocks = vec![make_block(256, 48_000), make_block(256, 48_000)];
281
282 let gain = crate::stages::GainStage::new(1.0);
283
284 let mut handle = DspPipeline::new(config)
285 .source(VecSource::new(blocks))
286 .stage(gain, StagePlacement::default())
287 .sink(CollectSink::new())
288 .build()
289 .unwrap();
290
291 let stats = handle.run().unwrap();
292
293 assert_eq!(stats.blocks_processed, 2);
294 assert_eq!(stats.overruns, 0, "fast stage should not trigger overruns");
295 }
296}