This commit is contained in:
Geens 2025-06-04 21:06:27 +02:00
parent a450e81338
commit ee5807e9a9
7 changed files with 385 additions and 164 deletions

View File

@ -1,26 +1,43 @@
use std::sync::Arc;
use crate::AudioChunk;
use crate::*;
pub struct Allocator {
buffer_size: usize,
allocated_buffer_sender: tokio::sync::mpsc::Sender<Arc<AudioChunk>>,
channel: tokio::sync::mpsc::Receiver<Arc<AudioChunk>>,
}
impl Allocator {
pub fn new(buffer_size: usize, allocated_buffer_sender: tokio::sync::mpsc::Sender<Arc<AudioChunk>>) -> Self {
while allocated_buffer_sender.try_send(AudioChunk::allocate(buffer_size)).is_ok() {}
Self {
buffer_size,
allocated_buffer_sender,
}
}
pub fn spawn(buffer_size: usize, pool_size: usize) -> (Self, tokio::task::JoinHandle<()>) {
let (allocated_buffer_sender, allocated_buffer_receiver) =
tokio::sync::mpsc::channel(pool_size);
pub async fn run(self) {
loop {
let chunk = AudioChunk::allocate(self.buffer_size);
if self.allocated_buffer_sender.send(chunk).await.is_err() {
break;
// Pre-fill the channel with initial buffers
while allocated_buffer_sender
.try_send(AudioChunk::allocate(buffer_size))
.is_ok()
{}
let allocator = Allocator {
channel: allocated_buffer_receiver,
};
// Spawn the background task that continuously allocates buffers
let join_handle = tokio::runtime::Handle::current().spawn(async move {
loop {
let chunk = AudioChunk::allocate(buffer_size);
if allocated_buffer_sender.send(chunk).await.is_err() {
break;
}
}
});
(allocator, join_handle)
}
}
impl ChunkFactory for Allocator {
fn create_chunk(&mut self) -> Result<std::sync::Arc<AudioChunk>> {
match self.channel.try_recv() {
Ok(chunk) => Ok(chunk),
Err(_) => Err(LooperError::ChunkAllocation(std::panic::Location::caller())),
}
}
}

View File

@ -51,42 +51,61 @@ impl AudioChunk {
})
}
/// Write samples into this chunk and additional chunks as needed.
/// Add samples to this chunk and allocate additional chunks as needed.
/// Uses the factory to get pre-allocated chunks when current chunk fills up.
pub fn write_samples<F>(this: &mut Arc<Self>, samples: &[f32], mut chunk_factory: F) -> Result<(), LooperError>
where
F: FnMut() -> Result<Arc<AudioChunk>, LooperError>
{
let sample_count = samples.len();
if sample_count > 0 {
let available_space = this.samples.len();
let samples_to_write = sample_count.min(available_space);
// Get mutable access to write samples
let chunk_mut = Arc::get_mut(this).ok_or(LooperError::ChunkOwnership)?;
// Write what fits
if let Some(dest) = chunk_mut.samples.get_mut(..samples_to_write) {
dest.copy_from_slice(&samples[..samples_to_write]);
chunk_mut.sample_count = samples_to_write;
}
// Handle remaining samples if any
if samples_to_write < sample_count {
// Need a new chunk for remaining samples
let mut new_chunk = chunk_factory()?;
let remaining_samples = &samples[samples_to_write..];
// Recurse on the new chunk
Self::write_samples(&mut new_chunk, remaining_samples, chunk_factory)?;
pub fn append_samples<F: ChunkFactory>(
self: &mut Arc<Self>,
samples: &[f32],
chunk_factory: &mut F,
) -> Result<()> {
let mut_self = Arc::get_mut(self)
.ok_or(LooperError::ChunkOwnership(std::panic::Location::caller()))?;
if let Some(next) = &mut mut_self.next {
// Not the last chunk, recurse
next.append_samples(samples, chunk_factory)
} else {
// Add to this chunk first
let available_space = mut_self.samples.len() - mut_self.sample_count;
let samples_to_append_to_this_chunk = samples.len().min(available_space);
let dest = &mut mut_self.samples
[mut_self.sample_count..mut_self.sample_count + samples_to_append_to_this_chunk];
dest.copy_from_slice(&samples[..samples_to_append_to_this_chunk]);
mut_self.sample_count += samples_to_append_to_this_chunk;
// Handle remaining samples by recursion
if samples_to_append_to_this_chunk < samples.len() {
let mut new_chunk = chunk_factory.create_chunk()?;
let remaining_samples = &samples[samples_to_append_to_this_chunk..];
new_chunk.append_samples(remaining_samples, chunk_factory)?;
// Link the new chunk
chunk_mut.next = Some(new_chunk);
mut_self.next = Some(new_chunk);
Ok(())
} else {
Ok(())
}
}
}
Ok(())
pub fn copy_samples(self: &Arc<Self>, dest: &mut [f32], start: usize) -> Result<()> {
if start < self.sample_count {
// Copy from this chunk
let end = start + dest.len();
if end <= self.sample_count {
dest.copy_from_slice(&self.samples[start..end]);
} else if let Some(next) = self.next.as_ref() {
let sample_count_from_this_chunk = self.sample_count - start;
dest[..sample_count_from_this_chunk]
.copy_from_slice(&self.samples[start..self.sample_count]);
next.copy_samples(&mut dest[sample_count_from_this_chunk..], 0)?;
} else {
return Err(LooperError::OutOfBounds(std::panic::Location::caller()));
}
Ok(())
} else if let Some(next) = &self.next {
// Copy from next chunk
next.copy_samples(dest, start - self.sample_count)
} else {
Err(LooperError::OutOfBounds(std::panic::Location::caller()))
}
}
}
@ -112,8 +131,8 @@ mod tests {
// Write some sample data
let samples = vec![1.0, 2.0, 3.0, 4.0, 5.0];
let factory = || panic!("Factory should not be called");
let result = AudioChunk::write_samples(&mut chunk, &samples, factory);
let mut factory = || panic!("Factory should not be called");
let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory);
assert!(result.is_ok());
let consolidated = AudioChunk::consolidate(&chunk);
@ -200,11 +219,11 @@ mod tests {
}
#[test]
fn test_write_samples_empty() {
fn test_append_samples_empty() {
let mut chunk = AudioChunk::allocate(5);
let factory = || panic!("Factory should not be called for empty samples");
let result = AudioChunk::write_samples(&mut chunk, &[], factory);
let mut factory = || panic!("Factory should not be called for empty samples");
let result = AudioChunk::append_samples(&mut chunk, &[], &mut factory);
assert!(result.is_ok());
assert_eq!(chunk.sample_count, 0);
@ -212,12 +231,12 @@ mod tests {
}
#[test]
fn test_write_samples_fits_in_one_chunk() {
fn test_append_samples_fits_in_one_chunk() {
let mut chunk = AudioChunk::allocate(5);
let factory = || panic!("Factory should not be called when samples fit");
let mut factory = || panic!("Factory should not be called when samples fit");
let samples = vec![1.0, 2.0, 3.0];
let result = AudioChunk::write_samples(&mut chunk, &samples, factory);
let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory);
assert!(result.is_ok());
assert_eq!(chunk.sample_count, 3);
@ -228,12 +247,12 @@ mod tests {
}
#[test]
fn test_write_samples_exactly_fills_chunk() {
fn test_append_samples_exactly_fills_chunk() {
let mut chunk = AudioChunk::allocate(3);
let factory = || panic!("Factory should not be called when samples exactly fit");
let mut factory = || panic!("Factory should not be called when samples exactly fit");
let samples = vec![1.0, 2.0, 3.0];
let result = AudioChunk::write_samples(&mut chunk, &samples, factory);
let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory);
assert!(result.is_ok());
assert_eq!(chunk.sample_count, 3);
@ -244,15 +263,13 @@ mod tests {
}
#[test]
fn test_write_samples_requires_new_chunk() {
fn test_append_samples_requires_new_chunk() {
let mut chunk = AudioChunk::allocate(2);
use std::cell::RefCell;
let available_chunks = RefCell::new(vec![AudioChunk::allocate(3)]);
let factory = || available_chunks.borrow_mut().pop().ok_or(LooperError::ChunkAllocation);
let mut factory = chunk_factory::mock::MockFactory::new(vec![AudioChunk::allocate(3)]);
let samples = vec![1.0, 2.0, 3.0, 4.0]; // 4 samples, first chunk holds 2
let result = AudioChunk::write_samples(&mut chunk, &samples, factory);
let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory);
assert!(result.is_ok());
@ -270,18 +287,16 @@ mod tests {
}
#[test]
fn test_write_samples_multiple_chunks_cascade() {
fn test_append_samples_multiple_chunks_cascade() {
let mut chunk = AudioChunk::allocate(2);
use std::cell::RefCell;
let available_chunks = RefCell::new(vec![
let mut factory = chunk_factory::mock::MockFactory::new(vec![
AudioChunk::allocate(2),
AudioChunk::allocate(2),
]);
let factory = || available_chunks.borrow_mut().pop().ok_or(LooperError::ChunkAllocation);
let samples = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; // 6 samples, each chunk holds 2
let result = AudioChunk::write_samples(&mut chunk, &samples, factory);
let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory);
assert!(result.is_ok());
@ -305,15 +320,18 @@ mod tests {
}
#[test]
fn test_write_samples_factory_failure() {
fn test_append_samples_factory_failure() {
let mut chunk = AudioChunk::allocate(2);
let factory = || Err(LooperError::ChunkAllocation);
let mut factory = || Err(LooperError::ChunkAllocation(std::panic::Location::caller()));
let samples = vec![1.0, 2.0, 3.0]; // 3 samples, chunk only holds 2
let result = AudioChunk::write_samples(&mut chunk, &samples, factory);
let result = AudioChunk::append_samples(&mut chunk, &samples, &mut factory);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), LooperError::ChunkAllocation));
assert!(matches!(
result.unwrap_err(),
LooperError::ChunkAllocation(_)
));
// First chunk should have been written before factory failure
assert_eq!(chunk.sample_count, 2);
@ -323,17 +341,20 @@ mod tests {
}
#[test]
fn test_write_samples_ownership_failure() {
fn test_append_samples_ownership_failure() {
let chunk = AudioChunk::allocate(3);
let chunk_clone = chunk.clone(); // Create another reference
let mut chunk_original = chunk;
let factory = || panic!("Factory should not be called");
let mut factory = || panic!("Factory should not be called");
let samples = vec![1.0, 2.0];
let result = AudioChunk::write_samples(&mut chunk_original, &samples, factory);
let result = AudioChunk::append_samples(&mut chunk_original, &samples, &mut factory);
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), LooperError::ChunkOwnership));
assert!(matches!(
result.unwrap_err(),
LooperError::ChunkOwnership(_)
));
// Chunk should be unchanged
assert_eq!(chunk_original.sample_count, 0);

40
src/chunk_factory.rs Normal file
View File

@ -0,0 +1,40 @@
use crate::*;
/// Creates a new chunk of audio data.
pub trait ChunkFactory: Send {
fn create_chunk(&mut self) -> Result<std::sync::Arc<AudioChunk>>;
}
impl<F> ChunkFactory for F
where
F: FnMut() -> Result<std::sync::Arc<AudioChunk>> + Send,
{
fn create_chunk(&mut self) -> Result<std::sync::Arc<AudioChunk>> {
self()
}
}
#[cfg(test)]
pub mod mock {
use super::*;
pub struct MockFactory {
chunks: Vec<std::sync::Arc<AudioChunk>>,
}
impl MockFactory {
pub fn new(chunks: Vec<std::sync::Arc<AudioChunk>>) -> Self {
Self { chunks }
}
}
impl ChunkFactory for MockFactory {
fn create_chunk(&mut self) -> Result<std::sync::Arc<AudioChunk>> {
if let Some(chunk) = self.chunks.pop() {
Ok(chunk)
} else {
Err(LooperError::ChunkAllocation(std::panic::Location::caller()))
}
}
}
}

View File

@ -1,8 +1,13 @@
#[derive(Debug, thiserror::Error)]
pub enum LooperError {
#[error("Failed to allocate new audio chunk")]
ChunkAllocation,
ChunkAllocation(&'static std::panic::Location<'static>),
#[error("Cannot modify chunk with multiple references")]
ChunkOwnership,
ChunkOwnership(&'static std::panic::Location<'static>),
#[error("Index out of bounds")]
OutOfBounds(&'static std::panic::Location<'static>),
}
pub type Result<T> = std::result::Result<T, LooperError>;

View File

@ -1,30 +1,49 @@
mod audio_chunk;
mod looper_error;
mod allocator;
mod audio_chunk;
mod chunk_factory;
mod looper_error;
mod process_handler;
mod track;
use std::sync::Arc;
use audio_chunk::AudioChunk;
use looper_error::LooperError;
use allocator::Allocator;
use audio_chunk::AudioChunk;
use chunk_factory::ChunkFactory;
use looper_error::LooperError;
use looper_error::Result;
use process_handler::ProcessHandler;
use track::Track;
fn main() {
let (allocated_buffer_sender, mut allocated_buffer_receiver) = tokio::sync::mpsc::channel(1);
let allocator = Allocator::new(3, allocated_buffer_sender);
let mut factory = move || {
match allocated_buffer_receiver.try_recv() {
Ok(chunk) => Ok(chunk),
Err(_) => Err(LooperError::ChunkAllocation),
}
};
#[tokio::main]
async fn main() {
let (allocator, factory_handle) = Allocator::spawn(5000, 100);
let handle = std::thread::spawn(move || {
let mut chunk = factory().unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
let samples = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0]; // 6 samples, each chunk holds 2
AudioChunk::write_samples(&mut chunk, &samples, factory).unwrap();
AudioChunk::consolidate(&chunk);
});
let (jack_client, jack_status) =
jack::Client::new("looper", jack::ClientOptions::NO_START_SERVER)
.expect("Could not create Jack client");
if !jack_status.is_empty() {
println!("Could not start jack client: {jack_status:?}");
return;
}
let tokio_runtime = tokio::runtime::Builder::new_current_thread().build().unwrap();
tokio_runtime.block_on(allocator.run());
handle.join().unwrap();
let audio_in = jack_client
.register_port("audio_in", jack::AudioIn::default())
.expect("Could not create audio_in port");
let audio_out = jack_client
.register_port("audio_out", jack::AudioOut::default())
.expect("Could not create audio_out port");
let process_handler = ProcessHandler::new(audio_in, audio_out, allocator)
.expect("Could not create process handler");
let async_client = jack_client
.activate_async((), process_handler)
.expect("Could not activate Jack");
factory_handle.await.unwrap();
async_client
.deactivate()
.expect("Could not deactivate Jack");
}

80
src/process_handler.rs Normal file
View File

@ -0,0 +1,80 @@
use crate::*;
pub struct ProcessHandler<F: ChunkFactory> {
track: Track,
playback_position: usize,
input_port: jack::Port<jack::AudioIn>,
output_port: jack::Port<jack::AudioOut>,
chunk_factory: F,
}
impl<F: ChunkFactory> ProcessHandler<F> {
pub fn new(
input_port: jack::Port<jack::AudioIn>,
output_port: jack::Port<jack::AudioOut>,
mut chunk_factory: F,
) -> Result<Self> {
Ok(Self {
track: Track::new(&mut chunk_factory)?,
playback_position: 0,
input_port,
output_port,
chunk_factory,
})
}
}
impl<F: ChunkFactory> jack::ProcessHandler for ProcessHandler<F> {
fn process(&mut self, client: &jack::Client, ps: &jack::ProcessScope) -> jack::Control {
let input_buffer = self.input_port.as_slice(ps);
let output_buffer = self.output_port.as_mut_slice(ps);
let jack_buffer_size = client.buffer_size() as usize;
let recording_samples = client.sample_rate() * 5; // 5 seconds
let mut index = 0;
while index < jack_buffer_size {
if self.track.len() < recording_samples {
// recording
let sample_count_to_append = jack_buffer_size - index;
let sample_count_to_append =
sample_count_to_append.min(recording_samples - self.track.len());
let samples_to_append = &input_buffer[index..index + sample_count_to_append];
if self
.track
.append_samples(samples_to_append, &mut self.chunk_factory)
.is_err()
{
return jack::Control::Quit;
};
output_buffer[index..(index + sample_count_to_append)].fill(0.0);
index += sample_count_to_append;
} else {
// playback
let sample_count_to_play = jack_buffer_size - index;
let sample_count_to_play =
sample_count_to_play.min(self.track.len() - self.playback_position);
if self
.track
.copy_samples(
&mut output_buffer[index..(index + sample_count_to_play)],
self.playback_position,
)
.is_err()
{
return jack::Control::Quit;
}
index += sample_count_to_play;
self.playback_position += sample_count_to_play;
if self.playback_position >= self.track.len() {
self.playback_position = 0;
if self.track.clear(&mut self.chunk_factory).is_err() {
return jack::Control::Quit;
}
}
}
}
jack::Control::Continue
}
}

39
src/track.rs Normal file
View File

@ -0,0 +1,39 @@
use crate::*;
pub struct Track {
audio_chunks: Arc<AudioChunk>,
length: usize,
}
impl Track {
pub fn new<F: ChunkFactory>(chunk_factory: &mut F) -> Result<Self> {
Ok(Self {
audio_chunks: chunk_factory.create_chunk()?,
length: 0,
})
}
pub fn append_samples<F: ChunkFactory>(
&mut self,
samples: &[f32],
chunk_factory: &mut F,
) -> Result<()> {
self.audio_chunks.append_samples(samples, chunk_factory)?;
self.length += samples.len();
Ok(())
}
pub fn len(&self) -> usize {
self.length
}
pub fn clear<F: ChunkFactory>(&mut self, chunk_factory: &mut F) -> Result<()> {
self.audio_chunks = chunk_factory.create_chunk()?;
self.length = 0;
Ok(())
}
pub fn copy_samples(&self, dest: &mut [f32], start: usize) -> Result<()> {
self.audio_chunks.copy_samples(dest, start)
}
}