consolidate and save to disk

This commit is contained in:
Geens 2025-06-08 20:02:11 +02:00
parent 675216eb71
commit 6ab1884553
10 changed files with 448 additions and 87 deletions

7
Cargo.lock generated
View File

@ -125,6 +125,12 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "hound"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62adaabb884c94955b19907d60019f4e145d091c75345379e70d1ee696f7854f"
[[package]]
name = "itoa"
version = "1.0.15"
@ -221,6 +227,7 @@ name = "looper"
version = "0.1.0"
dependencies = [
"dirs",
"hound",
"jack",
"kanal",
"log",

View File

@ -15,6 +15,7 @@ path = "src/sendmidi.rs"
[dependencies]
dirs = "5"
hound = "3.5"
jack = "0.13"
kanal = "0.1"
log = "0.4"

View File

@ -85,6 +85,19 @@ impl AudioChunk {
}
}
/// Get total sample count across entire chunk chain
pub fn len(&self) -> usize {
let mut total = self.sample_count;
let mut current = &self.next;
while let Some(chunk) = current {
total += chunk.sample_count;
current = &chunk.next;
}
total
}
pub fn copy_samples(self: &Arc<Self>, dest: &mut [f32], start: usize) -> Result<()> {
if start < self.sample_count {
// Copy from this chunk
@ -113,6 +126,123 @@ impl AudioChunk {
mod tests {
use super::*;
#[test]
fn test_len_single_chunk_with_samples() {
let chunk = Arc::new(AudioChunk {
samples: vec![1.0, 2.0, 3.0, 4.0, 5.0].into_boxed_slice(),
sample_count: 5,
next: None,
});
assert_eq!(chunk.len(), 5);
}
#[test]
fn test_len_single_chunk_empty() {
let chunk = Arc::new(AudioChunk {
samples: vec![0.0, 0.0, 0.0].into_boxed_slice(),
sample_count: 0,
next: None,
});
assert_eq!(chunk.len(), 0);
}
#[test]
fn test_len_single_chunk_partial() {
let chunk = Arc::new(AudioChunk {
samples: vec![1.0, 2.0, 3.0, 0.0, 0.0].into_boxed_slice(),
sample_count: 3, // Only first 3 samples are valid
next: None,
});
assert_eq!(chunk.len(), 3);
}
#[test]
fn test_len_linked_chunks() {
let chunk3 = Arc::new(AudioChunk {
samples: vec![7.0, 8.0, 9.0].into_boxed_slice(),
sample_count: 3,
next: None,
});
let chunk2 = Arc::new(AudioChunk {
samples: vec![4.0, 5.0].into_boxed_slice(),
sample_count: 2,
next: Some(chunk3),
});
let chunk1 = Arc::new(AudioChunk {
samples: vec![1.0, 2.0, 3.0].into_boxed_slice(),
sample_count: 3,
next: Some(chunk2),
});
assert_eq!(chunk1.len(), 8); // 3 + 2 + 3
}
#[test]
fn test_len_chain_with_empty_chunks() {
let chunk3 = Arc::new(AudioChunk {
samples: vec![4.0, 5.0].into_boxed_slice(),
sample_count: 2,
next: None,
});
let chunk2 = Arc::new(AudioChunk {
samples: vec![0.0, 0.0].into_boxed_slice(),
sample_count: 0, // Empty chunk in middle
next: Some(chunk3),
});
let chunk1 = Arc::new(AudioChunk {
samples: vec![1.0, 2.0, 3.0].into_boxed_slice(),
sample_count: 3,
next: Some(chunk2),
});
assert_eq!(chunk1.len(), 5); // 3 + 0 + 2
}
#[test]
fn test_len_all_empty_chunks() {
let chunk2 = Arc::new(AudioChunk {
samples: vec![0.0, 0.0].into_boxed_slice(),
sample_count: 0,
next: None,
});
let chunk1 = Arc::new(AudioChunk {
samples: vec![0.0, 0.0, 0.0].into_boxed_slice(),
sample_count: 0,
next: Some(chunk2),
});
assert_eq!(chunk1.len(), 0);
}
#[test]
fn test_len_long_chain() {
// Create a longer chain to test iteration
let mut current_chunk = Arc::new(AudioChunk {
samples: vec![10.0].into_boxed_slice(),
sample_count: 1,
next: None,
});
// Build chain backwards: 9 -> 8 -> ... -> 1 -> 0
for i in (0..10).rev() {
current_chunk = Arc::new(AudioChunk {
samples: vec![i as f32].into_boxed_slice(),
sample_count: 1,
next: Some(current_chunk),
});
}
assert_eq!(current_chunk.len(), 11); // 11 chunks, each with 1 sample
}
#[test]
fn test_allocate_creates_empty_chunk() {
let chunk = AudioChunk::allocate(1024);

View File

@ -6,9 +6,10 @@ mod looper_error;
mod metronome;
mod midi;
mod notification_handler;
mod post_record_handler;
mod process_handler;
mod state;
mod state_manager;
mod persistence_manager;
mod track;
use std::sync::Arc;
@ -19,14 +20,18 @@ use chunk_factory::ChunkFactory;
use connection_manager::ConnectionManager;
use looper_error::LooperError;
use looper_error::Result;
use metronome::Metronome;
use metronome::generate_beep;
use notification_handler::JackNotification;
use notification_handler::NotificationHandler;
use post_record_handler::PostRecordHandler;
use post_record_handler::PostRecordController;
use process_handler::ProcessHandler;
use state::State;
use state_manager::StateManager;
use persistence_manager::PersistenceManager;
use track::Track;
use track::TrackState;
use track::TrackTiming;
pub struct JackPorts {
pub audio_in: jack::Port<jack::AudioIn>,
@ -51,20 +56,25 @@ async fn main() {
let notification_handler = NotificationHandler::new();
let mut notification_channel = notification_handler.subscribe();
let (mut state_manager, state_rx) = StateManager::new(notification_handler.subscribe());
let (mut persistence_manager, state_watch) = PersistenceManager::new(notification_handler.subscribe());
// Load state values for metronome configuration
let initial_state = state_rx.borrow().clone();
let initial_state = state_watch.borrow().clone();
// Create post-record handler and get controller for ProcessHandler
let (mut post_record_handler, post_record_controller) = PostRecordHandler::new()
.expect("Could not create post-record handler");
let process_handler = ProcessHandler::new(
ports,
allocator,
beep_samples,
&initial_state,
post_record_controller,
).expect("Could not create process handler");
let mut connection_manager = ConnectionManager::new(
state_rx,
state_watch,
notification_handler.subscribe(),
jack_client.name().to_string(),
)
@ -82,7 +92,7 @@ async fn main() {
break;
}
}
result = state_manager.run() => {
result = persistence_manager.run() => {
if let Err(e) = result {
log::error!("StateManager task failed: {}", e);
}
@ -94,6 +104,12 @@ async fn main() {
}
break;
}
result = post_record_handler.run() => {
if let Err(e) = result {
log::error!("PostRecordHandler task failed: {}", e);
}
break;
}
_ = tokio::signal::ctrl_c() => {
log::info!("Stopping");
break;

View File

@ -2,7 +2,7 @@ use crate::*;
use tokio::sync::{broadcast, watch};
pub struct ConnectionManager {
state_rx: watch::Receiver<State>,
state_watch: watch::Receiver<State>,
notification_rx: broadcast::Receiver<JackNotification>,
jack_client: jack::Client,
jack_client_name: String,
@ -10,7 +10,7 @@ pub struct ConnectionManager {
impl ConnectionManager {
pub fn new(
state_rx: watch::Receiver<State>,
state_watch: watch::Receiver<State>,
notification_rx: broadcast::Receiver<JackNotification>,
jack_client_name: String,
) -> Result<Self> {
@ -21,7 +21,7 @@ impl ConnectionManager {
.map_err(|_| LooperError::JackConnection(std::panic::Location::caller()))?;
Ok(Self {
state_rx,
state_watch,
notification_rx,
jack_client,
jack_client_name,
@ -39,7 +39,7 @@ impl ConnectionManager {
}
async fn restore_connections(&self) {
let state = self.state_rx.borrow();
let state = self.state_watch.borrow();
for external_port in &state.connections.midi_in {
let our_port = format!("{}:midi_in", self.jack_client_name);

View File

@ -63,14 +63,7 @@ impl jack::NotificationHandler for NotificationHandler {
.expect("Could not send port connection notification");
}
fn port_registration(&mut self, client: &jack::Client, port_id: jack::PortId, register: bool) {
let Some(port_name) = client.port_by_id(port_id) else {
return;
};
let Ok(port_name) = port_name.name() else {
return;
};
fn port_registration(&mut self, _client: &jack::Client, _port_id: jack::PortId, register: bool) {
if register {
let notification = JackNotification::PortRegistered {};
self.channel

View File

@ -2,14 +2,14 @@ use crate::*;
use std::path::PathBuf;
use tokio::sync::{broadcast, watch};
pub struct StateManager {
pub struct PersistenceManager {
state: State,
state_tx: watch::Sender<State>,
notification_rx: broadcast::Receiver<JackNotification>,
state_file_path: PathBuf,
}
impl StateManager {
impl PersistenceManager {
pub fn new(
notification_rx: broadcast::Receiver<JackNotification>,
) -> (Self, watch::Receiver<State>) {

173
src/post_record_handler.rs Normal file
View File

@ -0,0 +1,173 @@
use crate::*;
use std::path::PathBuf;
/// Request to process a recorded chunk chain
#[derive(Debug)]
pub struct PostRecordRequest {
pub chunk_chain: Arc<AudioChunk>,
pub sample_rate: u32,
}
/// Response containing the consolidated chunk
#[derive(Debug)]
pub struct PostRecordResponse {
pub consolidated_chunk: Arc<AudioChunk>,
}
/// RT-side interface for post-record operations
#[derive(Debug)]
pub struct PostRecordController {
request_sender: kanal::Sender<PostRecordRequest>,
response_receiver: kanal::Receiver<PostRecordResponse>,
}
impl PostRecordController {
/// Send a post-record processing request (RT-safe)
pub fn send_request(&self, chunk_chain: Arc<AudioChunk>, sample_rate: u32) -> Result<()> {
let request = PostRecordRequest { chunk_chain, sample_rate };
match self.request_sender.try_send(request) {
Ok(true) => Ok(()), // Successfully sent
Ok(false) => Err(LooperError::ChunkAllocation(std::panic::Location::caller())), // Channel full
Err(_) => Err(LooperError::ChunkAllocation(std::panic::Location::caller())), // Channel closed
}
}
/// Try to receive a processing response (RT-safe)
pub fn try_recv_response(&self) -> Option<PostRecordResponse> {
match self.response_receiver.try_recv() {
Ok(Some(response)) => Some(response),
Ok(None) | Err(_) => None, // No message or channel closed = None
}
}
}
/// Handles post-record processing: consolidation and saving
pub struct PostRecordHandler {
request_receiver: kanal::AsyncReceiver<PostRecordRequest>,
response_sender: kanal::AsyncSender<PostRecordResponse>,
directory: PathBuf,
}
impl PostRecordHandler {
/// Create new handler and return RT controller
pub fn new() -> Result<(Self, PostRecordController)> {
// Create channels for bidirectional communication
let (request_sender, request_receiver) = kanal::bounded(16);
let (response_sender, response_receiver) = kanal::bounded(16);
let request_receiver = request_receiver.to_async();
let response_sender = response_sender.to_async();
let controller = PostRecordController {
request_sender,
response_receiver,
};
let handler = Self {
request_receiver,
response_sender,
directory: Self::create_directory()?,
};
Ok((handler, controller))
}
/// Run the async processing task
pub async fn run(&mut self) -> Result<()> {
while let Ok(request) = self.request_receiver.recv().await {
if let Err(e) = self.process_request(request).await {
log::error!("Failed to process post-record request: {}", e);
}
}
Ok(())
}
/// Process a single post-record request
async fn process_request(
&self,
request: PostRecordRequest,
) -> Result<()> {
log::debug!("Processing post-record request for {} samples",
request.chunk_chain.len());
// Step 1: Consolidate chunk chain (CPU intensive)
let consolidated_chunk = AudioChunk::consolidate(&request.chunk_chain);
log::debug!("Consolidated {} samples", consolidated_chunk.sample_count);
// Step 2: Send consolidated chunk back to RT thread immediately
let response = PostRecordResponse {
consolidated_chunk: consolidated_chunk.clone(),
};
if let Err(_) = self.response_sender.send(response).await {
log::warn!("Failed to send consolidated chunk to RT thread");
}
// Step 3: Save WAV file in background (I/O intensive)
let file_path = self.get_file_path();
match self.save_wav_file(&consolidated_chunk, request.sample_rate, &file_path).await {
Ok(_) => log::info!("Saved recording to {:?}", file_path),
Err(e) => log::error!("Failed to save recording to {:?}: {}", file_path, e),
}
Ok(())
}
/// Save consolidated chunk as WAV file
async fn save_wav_file(
&self,
chunk: &AudioChunk,
sample_rate: u32,
file_path: &PathBuf,
) -> Result<()> {
// Run WAV writing in blocking task to avoid blocking async runtime
let chunk_samples: Vec<f32> = chunk.samples[..chunk.sample_count].to_vec();
let file_path_clone = file_path.clone();
tokio::task::spawn_blocking(move || {
let spec = hound::WavSpec {
channels: 1,
sample_rate,
bits_per_sample: 32,
sample_format: hound::SampleFormat::Float,
};
let mut writer = hound::WavWriter::create(&file_path_clone, spec)
.map_err(|_| LooperError::StateSave(std::panic::Location::caller()))?;
// Write all samples from the chunk
for sample in chunk_samples {
writer.write_sample(sample)
.map_err(|_| LooperError::StateSave(std::panic::Location::caller()))?;
}
writer.finalize()
.map_err(|_| LooperError::StateSave(std::panic::Location::caller()))?;
Ok::<(), LooperError>(())
})
.await
.map_err(|_| LooperError::StateSave(std::panic::Location::caller()))?
}
/// Create save directory and return path
fn create_directory() -> Result<PathBuf> {
let mut path = dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."));
path.push(".fcb_looper");
std::fs::create_dir_all(&path)
.map_err(|_| LooperError::StateSave(std::panic::Location::caller()))?;
Ok(path)
}
/// Get file path for track recording
fn get_file_path(&self) -> PathBuf {
self.directory.join("track.wav")
}
}

View File

@ -1,6 +1,4 @@
use crate::*;
use crate::metronome::Metronome;
use crate::track::{TrackTiming, TrackState};
pub struct ProcessHandler<F: ChunkFactory> {
track: Track,
@ -8,6 +6,7 @@ pub struct ProcessHandler<F: ChunkFactory> {
pub ports: JackPorts,
chunk_factory: F,
metronome: Metronome,
post_record_controller: PostRecordController,
}
impl<F: ChunkFactory> ProcessHandler<F> {
@ -16,6 +15,7 @@ impl<F: ChunkFactory> ProcessHandler<F> {
mut chunk_factory: F,
beep_samples: Arc<AudioChunk>,
state: &State,
post_record_controller: PostRecordController,
) -> Result<Self> {
Ok(Self {
track: Track::new(&mut chunk_factory)?,
@ -23,6 +23,7 @@ impl<F: ChunkFactory> ProcessHandler<F> {
ports,
chunk_factory,
metronome: Metronome::new(beep_samples, state),
post_record_controller,
})
}
@ -66,18 +67,27 @@ impl<F: ChunkFactory> jack::ProcessHandler for ProcessHandler<F> {
let state_before = self.track.current_state().clone();
// Calculate timing information for track processing
let timing = self.calculate_track_timing(beat_sample_index, buffer_size, &state_before);
let timing = self.calculate_track_timing(beat_sample_index, &state_before);
// Process track audio with calculated timing
if let Err(e) = self.track.process(
let should_consolidate = match self.track.process(
ps,
&mut self.ports,
timing,
&mut self.chunk_factory
) {
Ok(consolidate) => consolidate,
Err(e) => {
log::error!("Error processing track: {}", e);
return jack::Control::Quit;
}
};
// Handle post-record processing
if let Err(e) = self.handle_post_record_processing(should_consolidate, client.sample_rate() as u32) {
log::error!("Error handling post-record processing: {}", e);
return jack::Control::Quit;
}
// Update playback position based on what happened
self.update_playback_position(beat_sample_index, buffer_size, &state_before);
@ -87,11 +97,35 @@ impl<F: ChunkFactory> jack::ProcessHandler for ProcessHandler<F> {
}
impl<F: ChunkFactory> ProcessHandler<F> {
/// Handle post-record processing: send requests and swap chunks
fn handle_post_record_processing(&mut self, should_consolidate: bool, sample_rate: u32) -> Result<()> {
// Send chunk chain for processing if track indicates consolidation needed
if should_consolidate {
let chunk_chain = self.track.get_audio_chunk();
log::debug!("Sending chunk chain with {} samples for post-record processing",
chunk_chain.len());
if let Err(e) = self.post_record_controller.send_request(chunk_chain, sample_rate) {
log::warn!("Failed to send post-record request: {}", e);
}
}
// Check for consolidation response
if let Some(response) = self.post_record_controller.try_recv_response() {
log::debug!("Received consolidated chunk with {} samples, swapping in track",
response.consolidated_chunk.len());
// Swap the consolidated chunk into the track
self.track.set_audio_chunk(response.consolidated_chunk)?;
}
Ok(())
}
/// Calculate timing information for track processing
fn calculate_track_timing(
&self,
beat_sample_index: Option<u32>,
buffer_size: usize,
state_before: &TrackState,
) -> TrackTiming {
match beat_sample_index {

View File

@ -21,7 +21,7 @@ pub enum TrackTiming {
}
pub struct Track {
audio_chunks: Option<Arc<AudioChunk>>,
audio_chunk: Arc<AudioChunk>,
length: usize,
current_state: TrackState,
next_state: TrackState,
@ -31,7 +31,7 @@ pub struct Track {
impl Track {
pub fn new<F: ChunkFactory>(chunk_factory: &mut F) -> Result<Self> {
Ok(Self {
audio_chunks: None,
audio_chunk: chunk_factory.create_chunk()?,
length: 0,
current_state: TrackState::Empty,
next_state: TrackState::Empty,
@ -40,18 +40,19 @@ impl Track {
}
/// Main audio processing method called from ProcessHandler
/// Returns true if track should be consolidated and saved
pub fn process<F: ChunkFactory>(
&mut self,
ps: &jack::ProcessScope,
ports: &mut JackPorts,
timing: TrackTiming,
chunk_factory: &mut F,
) -> Result<()> {
) -> Result<bool> {
let input_buffer = ports.audio_in.as_slice(ps);
let output_buffer = ports.audio_out.as_mut_slice(ps);
let buffer_size = output_buffer.len();
match timing {
let should_consolidate = match timing {
TrackTiming::NoBeat { position } => {
// No beat in this buffer - process entire buffer with current state
self.process_audio_range(
@ -62,6 +63,7 @@ impl Track {
position,
chunk_factory,
)?;
false // No state transition possible without beat
}
TrackTiming::Beat {
pre_beat_position,
@ -80,8 +82,8 @@ impl Track {
)?;
}
// Apply state transition at beat boundary
self.apply_state_transition(chunk_factory)?;
// Apply state transition at beat boundary and check if consolidation needed
let should_consolidate = self.apply_state_transition(chunk_factory)?;
if beat_sample_index < buffer_size {
// Process samples after beat with new current state
@ -94,10 +96,12 @@ impl Track {
chunk_factory,
)?;
}
}
}
Ok(())
should_consolidate
}
};
Ok(should_consolidate)
}
/// Process audio for a specific range within the buffer
@ -142,19 +146,24 @@ impl Track {
}
/// Apply state transition from next_state to current_state
/// Returns true if track should be consolidated and saved
fn apply_state_transition<F: ChunkFactory>(
&mut self,
chunk_factory: &mut F
) -> Result<()> {
) -> Result<bool> {
// Check if this is a Recording → Playing transition (consolidation trigger)
let should_consolidate = self.current_state == TrackState::Recording
&& self.next_state == TrackState::Playing;
// Handle transitions that require setup
match (&self.current_state, &self.next_state) {
(_, TrackState::Recording) => {
(current_state, TrackState::Recording) if *current_state != TrackState::Recording => {
// Starting to record - clear previous data and create new chunk
self.clear_audio_data(chunk_factory)?;
}
(_, TrackState::Playing) => {
// Starting playback - check if we have audio data
if self.audio_chunks.is_none() || self.length == 0 {
if self.length == 0 {
// No audio data - transition to Idle instead
self.next_state = TrackState::Idle;
}
@ -162,7 +171,7 @@ impl Track {
(_, TrackState::Empty) => {
// Clear operation - remove audio data
// Note: Actual deallocation will happen later via IO thread
self.audio_chunks = None;
self.audio_chunk = chunk_factory.create_chunk()?;
self.length = 0;
}
_ => {
@ -173,7 +182,7 @@ impl Track {
// Apply the state transition
self.current_state = self.next_state.clone();
Ok(())
Ok(should_consolidate)
}
/// Copy samples from track audio to output buffer with looping
@ -183,7 +192,6 @@ impl Track {
sample_count: usize,
mut playback_position: usize,
) -> Result<()> {
if let Some(ref audio_chunks) = self.audio_chunks {
if self.length == 0 {
output_slice.fill(0.0);
return Ok(());
@ -196,8 +204,8 @@ impl Track {
let samples_until_loop = self.length - playback_position;
let samples_to_copy = samples_remaining.min(samples_until_loop);
// Copy directly from audio chunks to output slice
audio_chunks.copy_samples(
// Copy directly from audio chunk to output slice
self.audio_chunk.copy_samples(
&mut output_slice[samples_written..samples_written + samples_to_copy],
playback_position,
)?;
@ -215,10 +223,6 @@ impl Track {
playback_position = 0;
}
}
} else {
output_slice.fill(0.0);
}
Ok(())
}
@ -228,15 +232,8 @@ impl Track {
samples: &[f32],
chunk_factory: &mut F,
) -> Result<()> {
if self.audio_chunks.is_none() {
self.audio_chunks = Some(chunk_factory.create_chunk()?);
}
if let Some(ref mut chunks) = self.audio_chunks {
chunks.append_samples(samples, chunk_factory)?;
self.audio_chunk.append_samples(samples, chunk_factory)?;
self.length += samples.len();
}
Ok(())
}
@ -245,11 +242,25 @@ impl Track {
&mut self,
chunk_factory: &mut F,
) -> Result<()> {
self.audio_chunks = Some(chunk_factory.create_chunk()?);
self.audio_chunk = chunk_factory.create_chunk()?;
self.length = 0;
Ok(())
}
/// Get audio chunk for post-record processing (read-only access)
pub fn get_audio_chunk(&self) -> Arc<AudioChunk> {
self.audio_chunk.clone()
}
/// Set audio chunk (for swapping in consolidated chunk)
pub fn set_audio_chunk(&mut self, chunk: Arc<AudioChunk>) -> Result<()> {
if chunk.len() != self.length {
return Err(LooperError::OutOfBounds(std::panic::Location::caller()));
}
self.audio_chunk = chunk;
Ok(())
}
// Public accessors and commands for MIDI handling
pub fn current_state(&self) -> &TrackState {
&self.current_state
@ -259,10 +270,6 @@ impl Track {
&self.next_state
}
pub fn set_next_state(&mut self, state: TrackState) {
self.next_state = state;
}
pub fn len(&self) -> usize {
self.length
}