diff --git a/audio_engine/src/args.rs b/audio_engine/src/args.rs index a52ab3f..dd9fab9 100644 --- a/audio_engine/src/args.rs +++ b/audio_engine/src/args.rs @@ -4,10 +4,10 @@ use crate::*; #[derive(Parser)] #[command(name = "audio_engine")] -#[command(about = "FCB1010 Looper Pedal Audio Engine")] +#[command(version, about = "FCB1010 Looper Pedal Audio Engine")] pub struct Args { /// Path to Unix socket or named pipe name - #[arg(short = 's', long = "socket", env = "SOCKET", default_value = "/tmp/fcb_looper.sock")] + #[arg(short = 's', long = "socket", env = "SOCKET", default_value = "fcb_looper.sock")] pub socket: String, /// Path to configuration file diff --git a/audio_engine/src/osc.rs b/audio_engine/src/osc.rs deleted file mode 100644 index 884f57c..0000000 --- a/audio_engine/src/osc.rs +++ /dev/null @@ -1,175 +0,0 @@ -use crate::*; -use tokio_util::codec::{Framed, LengthDelimitedCodec}; -use futures::SinkExt; - -#[cfg(unix)] -use tokio::net::UnixStream; -#[cfg(windows)] -use tokio::net::windows::named_pipe::ClientOptions; - -#[cfg(unix)] -type PlatformStream = UnixStream; -#[cfg(windows)] -type PlatformStream = tokio::net::windows::named_pipe::NamedPipeClient; - -#[derive(Debug, Clone)] -enum OscMessage { - TrackStateChanged { - column: usize, - row: usize, - state: TrackState, - }, - SelectedColumnChanged { - column: usize, - }, - SelectedRowChanged { - row: usize, - }, -} - -#[derive(Debug)] -pub struct OscController { - sender: kanal::Sender, -} - -impl OscController { - pub fn track_state_changed( - &self, - column: usize, - row: usize, - state: TrackState, - ) -> Result<()> { - let message = OscMessage::TrackStateChanged { column, row, state }; - match self.sender.try_send(message) { - Ok(true) => Ok(()), - Ok(false) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel full - Err(_) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel closed - } - } - - pub fn selected_column_changed(&self, column: usize) -> Result<()> { - let message = OscMessage::SelectedColumnChanged { column }; - match self.sender.try_send(message) { - Ok(true) => Ok(()), - Ok(false) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel full - Err(_) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel closed - } - } - - pub fn selected_row_changed(&self, row: usize) -> Result<()> { - let message = OscMessage::SelectedRowChanged { row }; - match self.sender.try_send(message) { - Ok(true) => Ok(()), - Ok(false) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel full - Err(_) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel closed - } - } -} - -pub struct Osc { - receiver: kanal::AsyncReceiver, - framed_socket: Framed, -} - -impl Osc { - /// # Platform Notes - /// - Unix: socket_path should be a file path (e.g., "/tmp/fcb_looper.sock") - /// - Windows: socket_path will be converted to named pipe format (e.g., "fcb_looper" -> "\\.\pipe\fcb_looper") - pub async fn new(socket_path: &str) -> Result<(Self, OscController)> { - // Connect to platform-specific IPC - let stream = Self::connect_platform_stream(socket_path).await?; - - // Wrap with length-delimited codec - let framed_socket = Framed::new(stream, LengthDelimitedCodec::new()); - - // Create communication channel - let (sender, receiver) = kanal::bounded(64); - let receiver = receiver.to_async(); - - let controller = OscController { sender }; - - let server = Self { - receiver, - framed_socket, - }; - - Ok((server, controller)) - } - - #[cfg(unix)] - async fn connect_platform_stream(socket_path: &str) -> Result { - UnixStream::connect(socket_path) - .await - .map_err(|_| LooperError::JackConnection(std::panic::Location::caller())) - } - - #[cfg(windows)] - async fn connect_platform_stream(pipe_name: &str) -> Result { - ClientOptions::new() - .open(pipe_name) - .map_err(|_| LooperError::JackConnection(std::panic::Location::caller())) - } - - pub async fn run(&mut self) -> Result<()> { - while let Ok(message) = self.receiver.recv().await { - if let Err(e) = self.send_osc_message(message).await { - log::error!("Failed to send OSC message: {}", e); - // Continue processing other messages - } - } - Ok(()) - } - - async fn send_osc_message(&mut self, message: OscMessage) -> Result<()> { - let osc_packet = self.message_to_osc_packet(message)?; - - let osc_bytes = rosc::encoder::encode(&osc_packet) - .map_err(|_| LooperError::StateSave(std::panic::Location::caller()))?; - - self.framed_socket - .send(bytes::Bytes::from(osc_bytes)) - .await - .map_err(|_| LooperError::StateSave(std::panic::Location::caller()))?; - - Ok(()) - } - - fn message_to_osc_packet(&self, message: OscMessage) -> Result { - match message { - OscMessage::TrackStateChanged { column, row, state } => { - let osc_state = self.track_state_to_osc_string(state); - let address = format!("/looper/cell/{}/{}/state", column + 1, row + 1); // 1-based indexing - - Ok(rosc::OscPacket::Message(rosc::OscMessage { - addr: address, - args: vec![rosc::OscType::String(osc_state)], - })) - } - OscMessage::SelectedColumnChanged { column } => { - let address = "/looper/selected/column".to_string(); - - Ok(rosc::OscPacket::Message(rosc::OscMessage { - addr: address, - args: vec![rosc::OscType::Int((column + 1) as i32)], // 1-based indexing - })) - } - OscMessage::SelectedRowChanged { row } => { - let address = "/looper/selected/row".to_string(); - - Ok(rosc::OscPacket::Message(rosc::OscMessage { - addr: address, - args: vec![rosc::OscType::Int((row + 1) as i32)], // 1-based indexing - })) - } - } - } - - fn track_state_to_osc_string(&self, state: TrackState) -> String { - match state { - TrackState::Empty => "empty".to_string(), - TrackState::Idle => "ready".to_string(), // Assuming track has audio data - TrackState::Recording | TrackState::RecordingAutoStop { .. } => "recording".to_string(), - TrackState::Playing => "playing".to_string(), - } - } -} \ No newline at end of file diff --git a/audio_engine/src/osc/controller.rs b/audio_engine/src/osc/controller.rs new file mode 100644 index 0000000..1532b76 --- /dev/null +++ b/audio_engine/src/osc/controller.rs @@ -0,0 +1,45 @@ +use crate::{LooperError, Result, TrackState}; +use super::message::OscMessage; + +#[derive(Debug)] +pub struct OscController { + sender: kanal::Sender, +} + +impl OscController { + pub(crate) fn new(sender: kanal::Sender) -> Self { + Self { sender } + } + + pub fn track_state_changed( + &self, + column: usize, + row: usize, + state: TrackState, + ) -> Result<()> { + let message = OscMessage::TrackStateChanged { column, row, state }; + match self.sender.try_send(message) { + Ok(true) => Ok(()), + Ok(false) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel full + Err(_) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel closed + } + } + + pub fn selected_column_changed(&self, column: usize) -> Result<()> { + let message = OscMessage::SelectedColumnChanged { column }; + match self.sender.try_send(message) { + Ok(true) => Ok(()), + Ok(false) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel full + Err(_) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel closed + } + } + + pub fn selected_row_changed(&self, row: usize) -> Result<()> { + let message = OscMessage::SelectedRowChanged { row }; + match self.sender.try_send(message) { + Ok(true) => Ok(()), + Ok(false) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel full + Err(_) => Err(LooperError::Osc(std::panic::Location::caller())), // Channel closed + } + } +} \ No newline at end of file diff --git a/audio_engine/src/osc/message.rs b/audio_engine/src/osc/message.rs new file mode 100644 index 0000000..8ec8f99 --- /dev/null +++ b/audio_engine/src/osc/message.rs @@ -0,0 +1,86 @@ +use crate::TrackState; + +#[derive(Debug, Clone)] +pub(crate) enum OscMessage { + TrackStateChanged { + column: usize, + row: usize, + state: TrackState, + }, + SelectedColumnChanged { + column: usize, + }, + SelectedRowChanged { + row: usize, + }, +} + +#[derive(Debug, Clone)] +pub(crate) struct ShadowState { + pub selected_column: usize, // 0-based (converted to 1-based for OSC) + pub selected_row: usize, // 0-based (converted to 1-based for OSC) + pub cells: Vec>, + pub columns: usize, + pub rows: usize, +} + +impl ShadowState { + pub fn new(columns: usize, rows: usize) -> Self { + let cells = (0..columns) + .map(|_| vec![TrackState::Empty; rows]) + .collect(); + + Self { + selected_column: 0, + selected_row: 0, + cells, + columns, + rows, + } + } + + pub fn update(&mut self, message: &OscMessage) { + match message { + OscMessage::TrackStateChanged { column, row, state } => { + if *column < self.columns && *row < self.rows { + self.cells[*column][*row] = state.clone(); + } + } + OscMessage::SelectedColumnChanged { column } => { + if *column < self.columns { + self.selected_column = *column; + } + } + OscMessage::SelectedRowChanged { row } => { + if *row < self.rows { + self.selected_row = *row; + } + } + } + } + + pub fn create_state_dump(&self) -> Vec { + let mut messages = Vec::new(); + + // Send current selections + messages.push(OscMessage::SelectedColumnChanged { + column: self.selected_column, + }); + messages.push(OscMessage::SelectedRowChanged { + row: self.selected_row, + }); + + // Send all cell states + for (column, column_cells) in self.cells.iter().enumerate() { + for (row, cell_state) in column_cells.iter().enumerate() { + messages.push(OscMessage::TrackStateChanged { + column, + row, + state: cell_state.clone(), + }); + } + } + + messages + } +} \ No newline at end of file diff --git a/audio_engine/src/osc/mod.rs b/audio_engine/src/osc/mod.rs new file mode 100644 index 0000000..65280e9 --- /dev/null +++ b/audio_engine/src/osc/mod.rs @@ -0,0 +1,8 @@ +mod controller; +mod message; +mod platform; +mod server; + +// Re-export public interface - same as before +pub use controller::OscController; +pub use server::Osc; \ No newline at end of file diff --git a/audio_engine/src/osc/platform/mod.rs b/audio_engine/src/osc/platform/mod.rs new file mode 100644 index 0000000..00f1745 --- /dev/null +++ b/audio_engine/src/osc/platform/mod.rs @@ -0,0 +1,23 @@ +#[cfg(unix)] +mod unix; +#[cfg(windows)] +mod windows; + +#[cfg(unix)] +pub(crate) use unix::{PlatformListener, PlatformStream}; + +#[cfg(windows)] +pub(crate) use windows::{PlatformListener, PlatformStream}; + +use crate::Result; + +pub(crate) async fn create_listener(socket_path: &str) -> Result { + #[cfg(unix)] + { + unix::create_listener(socket_path).await + } + #[cfg(windows)] + { + windows::create_listener(socket_path).await + } +} \ No newline at end of file diff --git a/audio_engine/src/osc/platform/unix.rs b/audio_engine/src/osc/platform/unix.rs new file mode 100644 index 0000000..a0113b2 --- /dev/null +++ b/audio_engine/src/osc/platform/unix.rs @@ -0,0 +1,26 @@ +use crate::{LooperError, Result}; +use tokio::net::{UnixListener, UnixStream}; + +pub(crate) type PlatformStream = UnixStream; + +pub(crate) struct PlatformListener { + listener: UnixListener, +} + +impl PlatformListener { + pub async fn accept(&mut self) -> Result { + let (stream, _) = self.listener.accept().await + .map_err(|_| LooperError::JackConnection(std::panic::Location::caller()))?; + Ok(stream) + } +} + +pub(crate) async fn create_listener(socket_path: &str) -> Result { + // Remove existing socket file if it exists + let _ = std::fs::remove_file(socket_path); + + let listener = UnixListener::bind(socket_path) + .map_err(|_| LooperError::JackConnection(std::panic::Location::caller()))?; + + Ok(PlatformListener { listener }) +} \ No newline at end of file diff --git a/audio_engine/src/osc/platform/windows.rs b/audio_engine/src/osc/platform/windows.rs new file mode 100644 index 0000000..b45588a --- /dev/null +++ b/audio_engine/src/osc/platform/windows.rs @@ -0,0 +1,45 @@ +use crate::{LooperError, Result}; +use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions}; + +pub(crate) type PlatformStream = NamedPipeServer; + +pub(crate) struct PlatformListener { + pipe_name: String, + current_server: Option, +} + +impl PlatformListener { + pub async fn accept(&mut self) -> Result { + // Create new pipe instance if needed + if self.current_server.is_none() { + self.current_server = Some(create_pipe_server(&self.pipe_name)?); + } + + let server = self.current_server.take().unwrap(); + server.connect().await + .map_err(|_| LooperError::JackConnection(std::panic::Location::caller()))?; + + Ok(server) + } +} + +pub(crate) async fn create_listener(pipe_name: &str) -> Result { + // Convert pipe name to Windows format if needed + let pipe_path = if pipe_name.starts_with(r"\\.\pipe\") { + pipe_name.to_string() + } else { + format!(r"\\.\pipe\{}", pipe_name) + }; + + Ok(PlatformListener { + pipe_name: pipe_path, + current_server: None, + }) +} + +fn create_pipe_server(pipe_name: &str) -> Result { + ServerOptions::new() + .first_pipe_instance(false) + .create(pipe_name) + .map_err(|_| LooperError::JackConnection(std::panic::Location::caller())) +} \ No newline at end of file diff --git a/audio_engine/src/osc/server.rs b/audio_engine/src/osc/server.rs new file mode 100644 index 0000000..86b8716 --- /dev/null +++ b/audio_engine/src/osc/server.rs @@ -0,0 +1,149 @@ +use crate::Result; +use super::{ + controller::OscController, + message::{OscMessage, ShadowState}, + platform::{create_listener, PlatformListener, PlatformStream}, +}; +use bytes::Bytes; +use futures::SinkExt; +use tokio::sync::broadcast; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; + +const CLIENT_BUFFER_SIZE: usize = 32; + +pub struct Osc { + receiver: kanal::AsyncReceiver, + listener: PlatformListener, + broadcaster: broadcast::Sender, + shadow_state: ShadowState, +} + +impl Osc { + /// Create new OSC server and controller with same interface as before + pub async fn new(socket_path: &str) -> Result<(Self, OscController)> { + // Create platform listener (server) + let listener = create_listener(socket_path).await?; + + // Create communication channels + let (sender, receiver) = kanal::bounded(64); + let receiver = receiver.to_async(); + let (broadcaster, _) = broadcast::channel(CLIENT_BUFFER_SIZE); + + let controller = OscController::new(sender); + + let server = Self { + receiver, + listener, + broadcaster, + shadow_state: ShadowState::new(5, 5), // Default 5x5 matrix + }; + + Ok((server, controller)) + } + + /// Main server loop - same interface as before + pub async fn run(&mut self) -> Result<()> { + loop { + tokio::select! { + // Handle messages from OscController + msg = self.receiver.recv() => { + if let Ok(msg) = msg { + self.shadow_state.update(&msg); + // Broadcast to all connected clients (ignore if no clients) + let _ = self.broadcaster.send(msg); + } else { + // Channel closed, exit + break; + } + } + + // Accept new client connections + connection = self.listener.accept() => { + if let Ok(stream) = connection { + self.spawn_client_task(stream); + } + } + } + } + Ok(()) + } + + fn spawn_client_task(&self, stream: PlatformStream) { + let mut receiver = self.broadcaster.subscribe(); + let state_dump = self.shadow_state.create_state_dump(); + + tokio::spawn(async move { + let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); + + // Send current state dump immediately + for msg in state_dump { + if let Err(_) = Self::send_osc_message(&mut framed, msg).await { + return; // Client disconnected during state dump + } + } + + // Forward broadcast messages until connection dies + while let Ok(msg) = receiver.recv().await { + if let Err(_) = Self::send_osc_message(&mut framed, msg).await { + break; // Client disconnected + } + } + }); + } + + async fn send_osc_message( + framed: &mut Framed, + message: OscMessage, + ) -> Result<()> { + let osc_packet = Self::message_to_osc_packet(message)?; + + let osc_bytes = rosc::encoder::encode(&osc_packet) + .map_err(|_| crate::LooperError::StateSave(std::panic::Location::caller()))?; + + framed + .send(Bytes::from(osc_bytes)) + .await + .map_err(|_| crate::LooperError::StateSave(std::panic::Location::caller()))?; + + Ok(()) + } + + fn message_to_osc_packet(message: OscMessage) -> Result { + match message { + OscMessage::TrackStateChanged { column, row, state } => { + let osc_state = Self::track_state_to_osc_string(state); + let address = format!("/looper/cell/{}/{}/state", column + 1, row + 1); // 1-based indexing + + Ok(rosc::OscPacket::Message(rosc::OscMessage { + addr: address, + args: vec![rosc::OscType::String(osc_state)], + })) + } + OscMessage::SelectedColumnChanged { column } => { + let address = "/looper/selected/column".to_string(); + + Ok(rosc::OscPacket::Message(rosc::OscMessage { + addr: address, + args: vec![rosc::OscType::Int((column + 1) as i32)], // 1-based indexing + })) + } + OscMessage::SelectedRowChanged { row } => { + let address = "/looper/selected/row".to_string(); + + Ok(rosc::OscPacket::Message(rosc::OscMessage { + addr: address, + args: vec![rosc::OscType::Int((row + 1) as i32)], // 1-based indexing + })) + } + } + } + + fn track_state_to_osc_string(state: crate::TrackState) -> String { + match state { + crate::TrackState::Empty => "empty".to_string(), + crate::TrackState::Idle => "ready".to_string(), + crate::TrackState::Recording | crate::TrackState::RecordingAutoStop { .. } => "recording".to_string(), + crate::TrackState::Playing => "playing".to_string(), + } + } +} \ No newline at end of file