From ee5807e9a9c94ec4e7dfa1c89bfc9843b09a9e98 Mon Sep 17 00:00:00 2001 From: Geens Date: Wed, 4 Jun 2025 21:06:27 +0200 Subject: [PATCH] Repeater --- src/allocator.rs | 53 ++++++--- src/audio_chunk.rs | 257 ++++++++++++++++++++++------------------- src/chunk_factory.rs | 40 +++++++ src/looper_error.rs | 13 ++- src/main.rs | 67 +++++++---- src/process_handler.rs | 80 +++++++++++++ src/track.rs | 39 +++++++ 7 files changed, 385 insertions(+), 164 deletions(-) create mode 100644 src/chunk_factory.rs create mode 100644 src/process_handler.rs create mode 100644 src/track.rs diff --git a/src/allocator.rs b/src/allocator.rs index bd10b3e..76e1f57 100644 --- a/src/allocator.rs +++ b/src/allocator.rs @@ -1,26 +1,43 @@ -use std::sync::Arc; -use crate::AudioChunk; +use crate::*; pub struct Allocator { - buffer_size: usize, - allocated_buffer_sender: tokio::sync::mpsc::Sender>, + channel: tokio::sync::mpsc::Receiver>, } impl Allocator { - pub fn new(buffer_size: usize, allocated_buffer_sender: tokio::sync::mpsc::Sender>) -> Self { - while allocated_buffer_sender.try_send(AudioChunk::allocate(buffer_size)).is_ok() {} - Self { - buffer_size, - allocated_buffer_sender, - } - } - - pub async fn run(self) { - loop { - let chunk = AudioChunk::allocate(self.buffer_size); - if self.allocated_buffer_sender.send(chunk).await.is_err() { - break; + pub fn spawn(buffer_size: usize, pool_size: usize) -> (Self, tokio::task::JoinHandle<()>) { + let (allocated_buffer_sender, allocated_buffer_receiver) = + tokio::sync::mpsc::channel(pool_size); + + // Pre-fill the channel with initial buffers + while allocated_buffer_sender + .try_send(AudioChunk::allocate(buffer_size)) + .is_ok() + {} + + let allocator = Allocator { + channel: allocated_buffer_receiver, + }; + + // Spawn the background task that continuously allocates buffers + let join_handle = tokio::runtime::Handle::current().spawn(async move { + loop { + let chunk = AudioChunk::allocate(buffer_size); + if allocated_buffer_sender.send(chunk).await.is_err() { + break; + } } + }); + + (allocator, join_handle) + } +} + +impl ChunkFactory for Allocator { + fn create_chunk(&mut self) -> Result> { + match self.channel.try_recv() { + Ok(chunk) => Ok(chunk), + Err(_) => Err(LooperError::ChunkAllocation(std::panic::Location::caller())), } } -} \ No newline at end of file +} diff --git a/src/audio_chunk.rs b/src/audio_chunk.rs index 66cec1e..901288e 100644 --- a/src/audio_chunk.rs +++ b/src/audio_chunk.rs @@ -8,10 +8,10 @@ use crate::*; pub struct AudioChunk { /// Fixed-size audio buffer pub samples: Box<[f32]>, - + /// Number of valid samples (≤ samples.len()) pub sample_count: usize, - + /// Next chunk in linked list pub next: Option>, } @@ -25,7 +25,7 @@ impl AudioChunk { next: None, }) } - + /// Consolidate a linked list of chunks into a single optimized chunk pub fn consolidate(this: &Arc) -> Arc { // Calculate total sample count @@ -35,7 +35,7 @@ impl AudioChunk { total_samples += chunk.sample_count; current = chunk.next.clone(); } - + // Create consolidated buffer let mut consolidated_samples = Vec::with_capacity(total_samples); let mut current = Some(this.clone()); @@ -43,50 +43,69 @@ impl AudioChunk { consolidated_samples.extend_from_slice(&chunk.samples[..chunk.sample_count]); current = chunk.next.clone(); } - + Arc::new(Self { samples: consolidated_samples.into_boxed_slice(), sample_count: total_samples, next: None, }) } - - /// Write samples into this chunk and additional chunks as needed. + + /// Add samples to this chunk and allocate additional chunks as needed. /// Uses the factory to get pre-allocated chunks when current chunk fills up. - pub fn write_samples(this: &mut Arc, samples: &[f32], mut chunk_factory: F) -> Result<(), LooperError> - where - F: FnMut() -> Result, LooperError> - { - let sample_count = samples.len(); - - if sample_count > 0 { - let available_space = this.samples.len(); - let samples_to_write = sample_count.min(available_space); - - // Get mutable access to write samples - let chunk_mut = Arc::get_mut(this).ok_or(LooperError::ChunkOwnership)?; - - // Write what fits - if let Some(dest) = chunk_mut.samples.get_mut(..samples_to_write) { - dest.copy_from_slice(&samples[..samples_to_write]); - chunk_mut.sample_count = samples_to_write; - } - - // Handle remaining samples if any - if samples_to_write < sample_count { - // Need a new chunk for remaining samples - let mut new_chunk = chunk_factory()?; - let remaining_samples = &samples[samples_to_write..]; - - // Recurse on the new chunk - Self::write_samples(&mut new_chunk, remaining_samples, chunk_factory)?; - + pub fn append_samples( + self: &mut Arc, + samples: &[f32], + chunk_factory: &mut F, + ) -> Result<()> { + let mut_self = Arc::get_mut(self) + .ok_or(LooperError::ChunkOwnership(std::panic::Location::caller()))?; + if let Some(next) = &mut mut_self.next { + // Not the last chunk, recurse + next.append_samples(samples, chunk_factory) + } else { + // Add to this chunk first + let available_space = mut_self.samples.len() - mut_self.sample_count; + let samples_to_append_to_this_chunk = samples.len().min(available_space); + let dest = &mut mut_self.samples + [mut_self.sample_count..mut_self.sample_count + samples_to_append_to_this_chunk]; + dest.copy_from_slice(&samples[..samples_to_append_to_this_chunk]); + mut_self.sample_count += samples_to_append_to_this_chunk; + // Handle remaining samples by recursion + if samples_to_append_to_this_chunk < samples.len() { + let mut new_chunk = chunk_factory.create_chunk()?; + let remaining_samples = &samples[samples_to_append_to_this_chunk..]; + new_chunk.append_samples(remaining_samples, chunk_factory)?; // Link the new chunk - chunk_mut.next = Some(new_chunk); + mut_self.next = Some(new_chunk); + Ok(()) + } else { + Ok(()) } } - - Ok(()) + } + + pub fn copy_samples(self: &Arc, dest: &mut [f32], start: usize) -> Result<()> { + if start < self.sample_count { + // Copy from this chunk + let end = start + dest.len(); + if end <= self.sample_count { + dest.copy_from_slice(&self.samples[start..end]); + } else if let Some(next) = self.next.as_ref() { + let sample_count_from_this_chunk = self.sample_count - start; + dest[..sample_count_from_this_chunk] + .copy_from_slice(&self.samples[start..self.sample_count]); + next.copy_samples(&mut dest[sample_count_from_this_chunk..], 0)?; + } else { + return Err(LooperError::OutOfBounds(std::panic::Location::caller())); + } + Ok(()) + } else if let Some(next) = &self.next { + // Copy from next chunk + next.copy_samples(dest, start - self.sample_count) + } else { + Err(LooperError::OutOfBounds(std::panic::Location::caller())) + } } } @@ -97,34 +116,34 @@ mod tests { #[test] fn test_allocate_creates_empty_chunk() { let chunk = AudioChunk::allocate(1024); - + assert_eq!(chunk.samples.len(), 1024); assert_eq!(chunk.sample_count, 0); assert!(chunk.next.is_none()); - + // All samples should be initialized to 0.0 assert!(chunk.samples.iter().all(|&x| x == 0.0)); } - + #[test] fn test_consolidate_single_chunk() { let mut chunk = AudioChunk::allocate(10); - + // Write some sample data let samples = vec![1.0, 2.0, 3.0, 4.0, 5.0]; - let factory = || panic!("Factory should not be called"); - let result = AudioChunk::write_samples(&mut chunk, &samples, factory); + let mut factory = || panic!("Factory should not be called"); + let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory); assert!(result.is_ok()); - + let consolidated = AudioChunk::consolidate(&chunk); - + assert_eq!(consolidated.samples.len(), 5); assert_eq!(consolidated.sample_count, 5); assert!(consolidated.next.is_none()); assert_eq!(consolidated.samples[0], 1.0); assert_eq!(consolidated.samples[4], 5.0); } - + #[test] fn test_consolidate_linked_chunks() { // Create first chunk @@ -133,21 +152,21 @@ mod tests { sample_count: 3, next: None, }); - + // Create second chunk let chunk2 = Arc::new(AudioChunk { samples: vec![4.0, 5.0].into_boxed_slice(), sample_count: 2, next: None, }); - + // Create third chunk let chunk3 = Arc::new(AudioChunk { samples: vec![6.0, 7.0, 8.0, 9.0].into_boxed_slice(), sample_count: 4, next: None, }); - + // Link them together let chunk1 = Arc::new(AudioChunk { samples: chunk1.samples.clone(), @@ -158,17 +177,17 @@ mod tests { next: Some(chunk3), })), }); - + let consolidated = AudioChunk::consolidate(&chunk1); - + assert_eq!(consolidated.samples.len(), 9); assert_eq!(consolidated.sample_count, 9); assert!(consolidated.next.is_none()); - + let expected = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]; assert_eq!(consolidated.samples.as_ref(), expected.as_slice()); } - + #[test] fn test_consolidate_partial_chunks() { // Test chunks where sample_count < samples.len() @@ -177,48 +196,48 @@ mod tests { sample_count: 3, // Only first 3 samples are valid next: None, }); - + let chunk2 = Arc::new(AudioChunk { samples: vec![4.0, 5.0, 0.0].into_boxed_slice(), sample_count: 2, // Only first 2 samples are valid next: None, }); - + let chunk1 = Arc::new(AudioChunk { samples: chunk1.samples.clone(), sample_count: chunk1.sample_count, next: Some(chunk2), }); - + let consolidated = AudioChunk::consolidate(&chunk1); - + assert_eq!(consolidated.samples.len(), 5); assert_eq!(consolidated.sample_count, 5); - + let expected = vec![1.0, 2.0, 3.0, 4.0, 5.0]; assert_eq!(consolidated.samples.as_ref(), expected.as_slice()); } - + #[test] - fn test_write_samples_empty() { + fn test_append_samples_empty() { let mut chunk = AudioChunk::allocate(5); - - let factory = || panic!("Factory should not be called for empty samples"); - let result = AudioChunk::write_samples(&mut chunk, &[], factory); - + + let mut factory = || panic!("Factory should not be called for empty samples"); + let result = AudioChunk::append_samples(&mut chunk, &[], &mut factory); + assert!(result.is_ok()); assert_eq!(chunk.sample_count, 0); assert!(chunk.next.is_none()); } - + #[test] - fn test_write_samples_fits_in_one_chunk() { + fn test_append_samples_fits_in_one_chunk() { let mut chunk = AudioChunk::allocate(5); - - let factory = || panic!("Factory should not be called when samples fit"); + + let mut factory = || panic!("Factory should not be called when samples fit"); let samples = vec![1.0, 2.0, 3.0]; - let result = AudioChunk::write_samples(&mut chunk, &samples, factory); - + let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory); + assert!(result.is_ok()); assert_eq!(chunk.sample_count, 3); assert_eq!(chunk.samples[0], 1.0); @@ -226,15 +245,15 @@ mod tests { assert_eq!(chunk.samples[2], 3.0); assert!(chunk.next.is_none()); } - + #[test] - fn test_write_samples_exactly_fills_chunk() { + fn test_append_samples_exactly_fills_chunk() { let mut chunk = AudioChunk::allocate(3); - - let factory = || panic!("Factory should not be called when samples exactly fit"); + + let mut factory = || panic!("Factory should not be called when samples exactly fit"); let samples = vec![1.0, 2.0, 3.0]; - let result = AudioChunk::write_samples(&mut chunk, &samples, factory); - + let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory); + assert!(result.is_ok()); assert_eq!(chunk.sample_count, 3); assert_eq!(chunk.samples[0], 1.0); @@ -242,25 +261,23 @@ mod tests { assert_eq!(chunk.samples[2], 3.0); assert!(chunk.next.is_none()); } - + #[test] - fn test_write_samples_requires_new_chunk() { + fn test_append_samples_requires_new_chunk() { let mut chunk = AudioChunk::allocate(2); - - use std::cell::RefCell; - let available_chunks = RefCell::new(vec![AudioChunk::allocate(3)]); - let factory = || available_chunks.borrow_mut().pop().ok_or(LooperError::ChunkAllocation); - + + let mut factory = chunk_factory::mock::MockFactory::new(vec![AudioChunk::allocate(3)]); + let samples = vec![1.0, 2.0, 3.0, 4.0]; // 4 samples, first chunk holds 2 - let result = AudioChunk::write_samples(&mut chunk, &samples, factory); - + let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory); + assert!(result.is_ok()); - + // First chunk: 2 samples assert_eq!(chunk.sample_count, 2); assert_eq!(chunk.samples[0], 1.0); assert_eq!(chunk.samples[1], 2.0); - + // Second chunk: remaining 2 samples let chunk2 = chunk.next.as_ref().unwrap(); assert_eq!(chunk2.sample_count, 2); @@ -268,34 +285,32 @@ mod tests { assert_eq!(chunk2.samples[1], 4.0); assert!(chunk2.next.is_none()); } - + #[test] - fn test_write_samples_multiple_chunks_cascade() { + fn test_append_samples_multiple_chunks_cascade() { let mut chunk = AudioChunk::allocate(2); - - use std::cell::RefCell; - let available_chunks = RefCell::new(vec![ + + let mut factory = chunk_factory::mock::MockFactory::new(vec![ AudioChunk::allocate(2), AudioChunk::allocate(2), ]); - let factory = || available_chunks.borrow_mut().pop().ok_or(LooperError::ChunkAllocation); - + let samples = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; // 6 samples, each chunk holds 2 - let result = AudioChunk::write_samples(&mut chunk, &samples, factory); - + let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory); + assert!(result.is_ok()); - + // First chunk assert_eq!(chunk.sample_count, 2); assert_eq!(chunk.samples[0], 1.0); assert_eq!(chunk.samples[1], 2.0); - + // Second chunk let chunk2 = chunk.next.as_ref().unwrap(); assert_eq!(chunk2.sample_count, 2); assert_eq!(chunk2.samples[0], 3.0); assert_eq!(chunk2.samples[1], 4.0); - + // Third chunk let chunk3 = chunk2.next.as_ref().unwrap(); assert_eq!(chunk3.sample_count, 2); @@ -303,42 +318,48 @@ mod tests { assert_eq!(chunk3.samples[1], 6.0); assert!(chunk3.next.is_none()); } - + #[test] - fn test_write_samples_factory_failure() { + fn test_append_samples_factory_failure() { let mut chunk = AudioChunk::allocate(2); - - let factory = || Err(LooperError::ChunkAllocation); + + let mut factory = || Err(LooperError::ChunkAllocation(std::panic::Location::caller())); let samples = vec![1.0, 2.0, 3.0]; // 3 samples, chunk only holds 2 - let result = AudioChunk::write_samples(&mut chunk, &samples, factory); - + let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory); + assert!(result.is_err()); - assert!(matches!(result.unwrap_err(), LooperError::ChunkAllocation)); - + assert!(matches!( + result.unwrap_err(), + LooperError::ChunkAllocation(_) + )); + // First chunk should have been written before factory failure assert_eq!(chunk.sample_count, 2); assert_eq!(chunk.samples[0], 1.0); assert_eq!(chunk.samples[1], 2.0); assert!(chunk.next.is_none()); } - + #[test] - fn test_write_samples_ownership_failure() { + fn test_append_samples_ownership_failure() { let chunk = AudioChunk::allocate(3); let chunk_clone = chunk.clone(); // Create another reference let mut chunk_original = chunk; - - let factory = || panic!("Factory should not be called"); + + let mut factory = || panic!("Factory should not be called"); let samples = vec![1.0, 2.0]; - let result = AudioChunk::write_samples(&mut chunk_original, &samples, factory); - + let result = AudioChunk::append_samples(&mut chunk_original, &samples, &mut factory); + assert!(result.is_err()); - assert!(matches!(result.unwrap_err(), LooperError::ChunkOwnership)); - + assert!(matches!( + result.unwrap_err(), + LooperError::ChunkOwnership(_) + )); + // Chunk should be unchanged assert_eq!(chunk_original.sample_count, 0); - + // Clean up the clone reference to avoid unused variable warning drop(chunk_clone); } -} \ No newline at end of file +} diff --git a/src/chunk_factory.rs b/src/chunk_factory.rs new file mode 100644 index 0000000..f4fd4f4 --- /dev/null +++ b/src/chunk_factory.rs @@ -0,0 +1,40 @@ +use crate::*; + +/// Creates a new chunk of audio data. +pub trait ChunkFactory: Send { + fn create_chunk(&mut self) -> Result>; +} + +impl ChunkFactory for F +where + F: FnMut() -> Result> + Send, +{ + fn create_chunk(&mut self) -> Result> { + self() + } +} + +#[cfg(test)] +pub mod mock { + use super::*; + + pub struct MockFactory { + chunks: Vec>, + } + + impl MockFactory { + pub fn new(chunks: Vec>) -> Self { + Self { chunks } + } + } + + impl ChunkFactory for MockFactory { + fn create_chunk(&mut self) -> Result> { + if let Some(chunk) = self.chunks.pop() { + Ok(chunk) + } else { + Err(LooperError::ChunkAllocation(std::panic::Location::caller())) + } + } + } +} diff --git a/src/looper_error.rs b/src/looper_error.rs index 161932b..b86291a 100644 --- a/src/looper_error.rs +++ b/src/looper_error.rs @@ -1,8 +1,13 @@ #[derive(Debug, thiserror::Error)] pub enum LooperError { #[error("Failed to allocate new audio chunk")] - ChunkAllocation, - + ChunkAllocation(&'static std::panic::Location<'static>), + #[error("Cannot modify chunk with multiple references")] - ChunkOwnership, -} \ No newline at end of file + ChunkOwnership(&'static std::panic::Location<'static>), + + #[error("Index out of bounds")] + OutOfBounds(&'static std::panic::Location<'static>), +} + +pub type Result = std::result::Result; diff --git a/src/main.rs b/src/main.rs index 66816ef..7220e29 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,30 +1,49 @@ -mod audio_chunk; -mod looper_error; mod allocator; +mod audio_chunk; +mod chunk_factory; +mod looper_error; +mod process_handler; +mod track; + +use std::sync::Arc; -use audio_chunk::AudioChunk; -use looper_error::LooperError; use allocator::Allocator; +use audio_chunk::AudioChunk; +use chunk_factory::ChunkFactory; +use looper_error::LooperError; +use looper_error::Result; +use process_handler::ProcessHandler; +use track::Track; -fn main() { - let (allocated_buffer_sender, mut allocated_buffer_receiver) = tokio::sync::mpsc::channel(1); - let allocator = Allocator::new(3, allocated_buffer_sender); - let mut factory = move || { - match allocated_buffer_receiver.try_recv() { - Ok(chunk) => Ok(chunk), - Err(_) => Err(LooperError::ChunkAllocation), - } - }; +#[tokio::main] +async fn main() { + let (allocator, factory_handle) = Allocator::spawn(5000, 100); - let handle = std::thread::spawn(move || { - let mut chunk = factory().unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - let samples = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; // 6 samples, each chunk holds 2 - AudioChunk::write_samples(&mut chunk, &samples, factory).unwrap(); - AudioChunk::consolidate(&chunk); - }); + let (jack_client, jack_status) = + jack::Client::new("looper", jack::ClientOptions::NO_START_SERVER) + .expect("Could not create Jack client"); + if !jack_status.is_empty() { + println!("Could not start jack client: {jack_status:?}"); + return; + } - let tokio_runtime = tokio::runtime::Builder::new_current_thread().build().unwrap(); - tokio_runtime.block_on(allocator.run()); - handle.join().unwrap(); -} \ No newline at end of file + let audio_in = jack_client + .register_port("audio_in", jack::AudioIn::default()) + .expect("Could not create audio_in port"); + let audio_out = jack_client + .register_port("audio_out", jack::AudioOut::default()) + .expect("Could not create audio_out port"); + + let process_handler = ProcessHandler::new(audio_in, audio_out, allocator) + .expect("Could not create process handler"); + + let async_client = jack_client + .activate_async((), process_handler) + .expect("Could not activate Jack"); + + factory_handle.await.unwrap(); + + async_client + .deactivate() + .expect("Could not deactivate Jack"); +} diff --git a/src/process_handler.rs b/src/process_handler.rs new file mode 100644 index 0000000..abda7c0 --- /dev/null +++ b/src/process_handler.rs @@ -0,0 +1,80 @@ +use crate::*; + +pub struct ProcessHandler { + track: Track, + playback_position: usize, + input_port: jack::Port, + output_port: jack::Port, + chunk_factory: F, +} + +impl ProcessHandler { + pub fn new( + input_port: jack::Port, + output_port: jack::Port, + mut chunk_factory: F, + ) -> Result { + Ok(Self { + track: Track::new(&mut chunk_factory)?, + playback_position: 0, + input_port, + output_port, + chunk_factory, + }) + } +} + +impl jack::ProcessHandler for ProcessHandler { + fn process(&mut self, client: &jack::Client, ps: &jack::ProcessScope) -> jack::Control { + let input_buffer = self.input_port.as_slice(ps); + let output_buffer = self.output_port.as_mut_slice(ps); + + let jack_buffer_size = client.buffer_size() as usize; + let recording_samples = client.sample_rate() * 5; // 5 seconds + + let mut index = 0; + + while index < jack_buffer_size { + if self.track.len() < recording_samples { + // recording + let sample_count_to_append = jack_buffer_size - index; + let sample_count_to_append = + sample_count_to_append.min(recording_samples - self.track.len()); + let samples_to_append = &input_buffer[index..index + sample_count_to_append]; + if self + .track + .append_samples(samples_to_append, &mut self.chunk_factory) + .is_err() + { + return jack::Control::Quit; + }; + output_buffer[index..(index + sample_count_to_append)].fill(0.0); + index += sample_count_to_append; + } else { + // playback + let sample_count_to_play = jack_buffer_size - index; + let sample_count_to_play = + sample_count_to_play.min(self.track.len() - self.playback_position); + if self + .track + .copy_samples( + &mut output_buffer[index..(index + sample_count_to_play)], + self.playback_position, + ) + .is_err() + { + return jack::Control::Quit; + } + index += sample_count_to_play; + self.playback_position += sample_count_to_play; + if self.playback_position >= self.track.len() { + self.playback_position = 0; + if self.track.clear(&mut self.chunk_factory).is_err() { + return jack::Control::Quit; + } + } + } + } + jack::Control::Continue + } +} diff --git a/src/track.rs b/src/track.rs new file mode 100644 index 0000000..52d37e9 --- /dev/null +++ b/src/track.rs @@ -0,0 +1,39 @@ +use crate::*; + +pub struct Track { + audio_chunks: Arc, + length: usize, +} + +impl Track { + pub fn new(chunk_factory: &mut F) -> Result { + Ok(Self { + audio_chunks: chunk_factory.create_chunk()?, + length: 0, + }) + } + + pub fn append_samples( + &mut self, + samples: &[f32], + chunk_factory: &mut F, + ) -> Result<()> { + self.audio_chunks.append_samples(samples, chunk_factory)?; + self.length += samples.len(); + Ok(()) + } + + pub fn len(&self) -> usize { + self.length + } + + pub fn clear(&mut self, chunk_factory: &mut F) -> Result<()> { + self.audio_chunks = chunk_factory.create_chunk()?; + self.length = 0; + Ok(()) + } + + pub fn copy_samples(&self, dest: &mut [f32], start: usize) -> Result<()> { + self.audio_chunks.copy_samples(dest, start) + } +}