use crate::Result; use super::{ controller::OscController, message::{OscMessage, ShadowState}, platform::{create_listener, PlatformListener, PlatformStream}, }; use bytes::Bytes; use futures::SinkExt; use tokio::sync::broadcast; use tokio_util::codec::{Framed, LengthDelimitedCodec}; const CLIENT_BUFFER_SIZE: usize = 32; pub struct Osc { receiver: kanal::AsyncReceiver, listener: PlatformListener, broadcaster: broadcast::Sender, shadow_state: ShadowState, } impl Osc { /// Create new OSC server and controller with same interface as before pub async fn new(socket_path: &str) -> Result<(Self, OscController)> { // Create platform listener (server) let listener = create_listener(socket_path).await?; // Create communication channels let (sender, receiver) = kanal::bounded(64); let receiver = receiver.to_async(); let (broadcaster, _) = broadcast::channel(CLIENT_BUFFER_SIZE); let controller = OscController::new(sender); let server = Self { receiver, listener, broadcaster, shadow_state: ShadowState::new(5, 5), // Default 5x5 matrix }; Ok((server, controller)) } /// Main server loop - same interface as before pub async fn run(&mut self) -> Result<()> { loop { tokio::select! { // Handle messages from OscController msg = self.receiver.recv() => { if let Ok(msg) = msg { self.shadow_state.update(&msg); // Broadcast to all connected clients (ignore if no clients) let _ = self.broadcaster.send(msg); } else { // Channel closed, exit break; } } // Accept new client connections connection = self.listener.accept() => { if let Ok(stream) = connection { self.spawn_client_task(stream); } } } } Ok(()) } fn spawn_client_task(&self, stream: PlatformStream) { let mut receiver = self.broadcaster.subscribe(); let state_dump = self.shadow_state.create_state_dump(); tokio::spawn(async move { let mut framed = Framed::new(stream, LengthDelimitedCodec::new()); // Send current state dump immediately for msg in state_dump { if let Err(_) = Self::send_osc_message(&mut framed, msg).await { return; // Client disconnected during state dump } } // Forward broadcast messages until connection dies while let Ok(msg) = receiver.recv().await { if let Err(_) = Self::send_osc_message(&mut framed, msg).await { break; // Client disconnected } } }); } async fn send_osc_message( framed: &mut Framed, message: OscMessage, ) -> Result<()> { let osc_packet = Self::message_to_osc_packet(message)?; let osc_bytes = rosc::encoder::encode(&osc_packet) .map_err(|_| crate::LooperError::StateSave(std::panic::Location::caller()))?; framed .send(Bytes::from(osc_bytes)) .await .map_err(|_| crate::LooperError::StateSave(std::panic::Location::caller()))?; Ok(()) } fn message_to_osc_packet(message: OscMessage) -> Result { match message { OscMessage::TrackStateChanged { column, row, state } => { let osc_state = Self::track_state_to_osc_string(state); let address = format!("/looper/cell/{}/{}/state", column + 1, row + 1); // 1-based indexing Ok(rosc::OscPacket::Message(rosc::OscMessage { addr: address, args: vec![rosc::OscType::String(osc_state)], })) } OscMessage::TrackVolumeChanged { column, row, volume } => { let address = format!("/looper/cell/{}/{}/volume", column + 1, row + 1); // 1-based indexing Ok(rosc::OscPacket::Message(rosc::OscMessage { addr: address, args: vec![rosc::OscType::Float(volume)], })) } OscMessage::SelectedColumnChanged { column } => { let address = "/looper/selected/column".to_string(); Ok(rosc::OscPacket::Message(rosc::OscMessage { addr: address, args: vec![rosc::OscType::Int((column + 1) as i32)], // 1-based indexing })) } OscMessage::SelectedRowChanged { row } => { let address = "/looper/selected/row".to_string(); Ok(rosc::OscPacket::Message(rosc::OscMessage { addr: address, args: vec![rosc::OscType::Int((row + 1) as i32)], // 1-based indexing })) } } } fn track_state_to_osc_string(state: crate::TrackState) -> String { match state { crate::TrackState::Empty => "empty".to_string(), crate::TrackState::Idle => "ready".to_string(), crate::TrackState::Recording { .. } => "recording".to_string(), crate::TrackState::RecordingAutoStop { .. } => "recording".to_string(), crate::TrackState::Playing => "playing".to_string(), } } }