113 lines
3.6 KiB
Rust

use crate::*;
use futures::SinkExt;
const CLIENT_BUFFER_SIZE: usize = 32;
pub struct Osc {
receiver: kanal::AsyncReceiver<osc::Message>,
listener: osc_server::platform::PlatformListener,
broadcaster: tokio::sync::broadcast::Sender<osc::Message>,
shadow_state: osc::State,
}
impl Osc {
/// Create new OSC server and controller with configurable matrix size and tempo
pub async fn new(
socket_path: &str,
columns: usize,
rows: usize,
tempo: f32,
) -> Result<(Self, OscController)> {
// Create platform listener (server)
let listener = osc_server::platform::create_listener(socket_path).await?;
// Create communication channels
let (sender, receiver) = kanal::bounded(64);
let receiver = receiver.to_async();
let (broadcaster, _) = tokio::sync::broadcast::channel(CLIENT_BUFFER_SIZE);
let controller = OscController::new(sender);
let server = Self {
receiver,
listener,
broadcaster,
shadow_state: osc::State::new(columns, rows, tempo),
};
Ok((server, controller))
}
/// Main server loop - same interface as before
pub async fn run(&mut self) -> Result<()> {
loop {
tokio::select! {
// Handle messages from OscController
msg = self.receiver.recv() => {
if let Ok(msg) = msg {
self.shadow_state.update(&msg);
// Broadcast to all connected clients (ignore if no clients)
let _ = self.broadcaster.send(msg);
} else {
// Channel closed, exit
break;
}
}
// Accept new client connections
connection = self.listener.accept() => {
if let Ok(stream) = connection {
self.spawn_client_task(stream);
}
}
}
}
Ok(())
}
fn spawn_client_task(&self, stream: osc_server::platform::PlatformStream) {
let mut receiver = self.broadcaster.subscribe();
let state_dump = self.shadow_state.create_state_dump();
tokio::spawn(async move {
let mut framed = tokio_util::codec::Framed::new(
stream,
tokio_util::codec::LengthDelimitedCodec::new(),
);
// Send current state dump immediately
for msg in state_dump {
if let Err(_) = Self::send_osc_message(&mut framed, msg).await {
return; // Client disconnected during state dump
}
}
// Forward broadcast messages until connection dies
while let Ok(msg) = receiver.recv().await {
if let Err(_) = Self::send_osc_message(&mut framed, msg).await {
break; // Client disconnected
}
}
});
}
async fn send_osc_message(
framed: &mut tokio_util::codec::Framed<
osc_server::platform::PlatformStream,
tokio_util::codec::LengthDelimitedCodec,
>,
message: osc::Message,
) -> Result<()> {
let osc_packet = message.to_osc_packet();
let osc_bytes = rosc::encoder::encode(&osc_packet)
.map_err(|_| crate::LooperError::StateSave(std::panic::Location::caller()))?;
framed
.send(bytes::Bytes::from(osc_bytes))
.await
.map_err(|_| crate::LooperError::StateSave(std::panic::Location::caller()))?;
Ok(())
}
}