Switched to Kanal buffers

This commit is contained in:
Geens 2025-06-05 13:47:52 +02:00
parent 457e634626
commit 292087be3e
6 changed files with 29 additions and 10 deletions

17
Cargo.lock generated
View File

@ -81,6 +81,12 @@ dependencies = [
"powerfmt", "powerfmt",
] ]
[[package]]
name = "futures-core"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.31.1" version = "0.31.1"
@ -120,6 +126,16 @@ dependencies = [
"pkg-config", "pkg-config",
] ]
[[package]]
name = "kanal"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574"
dependencies = [
"futures-core",
"lock_api",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.5.0" version = "1.5.0"
@ -163,6 +179,7 @@ name = "looper"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"jack", "jack",
"kanal",
"log", "log",
"simple_logger", "simple_logger",
"thiserror", "thiserror",

View File

@ -9,3 +9,4 @@ jack = "0.13"
tokio = { version = "1.0", features = ["full"] } tokio = { version = "1.0", features = ["full"] }
log = "0.4.27" log = "0.4.27"
simple_logger = "5.0.0" simple_logger = "5.0.0"
kanal = "0.1.1"

View File

@ -94,7 +94,7 @@ GlobalState
## Ring buffers ## Ring buffers
Communication between real-time and non-real-time threads requires lock-free data structures to avoid blocking the RT thread. Communication between real-time and non-real-time threads requires lock-free data structures to avoid blocking the RT thread.
Tokio bounded channels fulfill this requirement and provide both synchronous and asynchronous interfaces, The Rust Kanal crate fulfills this requirement and provides both synchronous and asynchronous interfaces,
making it ideal for bridging real-time and async-based systems. making it ideal for bridging real-time and async-based systems.
The RT thread uses the synchronous, non-blocking interface for immediate data transfer. The RT thread uses the synchronous, non-blocking interface for immediate data transfer.

View File

@ -1,24 +1,24 @@
use crate::*; use crate::*;
pub struct Allocator { pub struct Allocator {
channel: tokio::sync::mpsc::Receiver<Arc<AudioChunk>>, channel: kanal::Receiver<Arc<AudioChunk>>,
} }
impl Allocator { impl Allocator {
pub fn spawn(buffer_size: usize, pool_size: usize) -> (Self, tokio::task::JoinHandle<()>) { pub fn spawn(buffer_size: usize, pool_size: usize) -> (Self, tokio::task::JoinHandle<()>) {
let (allocated_buffer_sender, allocated_buffer_receiver) = let (allocated_buffer_sender, allocated_buffer_receiver) = kanal::bounded(pool_size);
tokio::sync::mpsc::channel(pool_size);
// Pre-fill the channel with initial buffers // Pre-fill the channel with initial buffers
while allocated_buffer_sender while let Ok(true) = allocated_buffer_sender
.try_send(AudioChunk::allocate(buffer_size)) .try_send(AudioChunk::allocate(buffer_size))
.is_ok()
{} {}
let allocator = Allocator { let allocator = Allocator {
channel: allocated_buffer_receiver, channel: allocated_buffer_receiver,
}; };
let allocated_buffer_sender = allocated_buffer_sender.to_async();
// Spawn the background task that continuously allocates buffers // Spawn the background task that continuously allocates buffers
let join_handle = tokio::runtime::Handle::current().spawn(async move { let join_handle = tokio::runtime::Handle::current().spawn(async move {
loop { loop {
@ -35,9 +35,10 @@ impl Allocator {
impl ChunkFactory for Allocator { impl ChunkFactory for Allocator {
fn create_chunk(&mut self) -> Result<std::sync::Arc<AudioChunk>> { fn create_chunk(&mut self) -> Result<std::sync::Arc<AudioChunk>> {
//match self.channel.try_recv_realtime() {
match self.channel.try_recv() { match self.channel.try_recv() {
Ok(chunk) => Ok(chunk), Ok(Some(chunk)) => Ok(chunk),
Err(_) => Err(LooperError::ChunkAllocation(std::panic::Location::caller())), _ => Err(LooperError::ChunkAllocation(std::panic::Location::caller())),
} }
} }
} }

View File

@ -24,7 +24,7 @@ async fn main() {
let (jack_client, audio_in, audio_out) = setup_jack(); let (jack_client, audio_in, audio_out) = setup_jack();
let (allocator, factory_handle) = Allocator::spawn(jack_client.buffer_size() as usize * 10, 100); let (allocator, factory_handle) = Allocator::spawn(jack_client.sample_rate(), 3);
let process_handler = ProcessHandler::new(audio_in, audio_out, allocator) let process_handler = ProcessHandler::new(audio_in, audio_out, allocator)
.expect("Could not create process handler"); .expect("Could not create process handler");

View File

@ -30,7 +30,7 @@ impl<F: ChunkFactory> jack::ProcessHandler for ProcessHandler<F> {
let output_buffer = self.output_port.as_mut_slice(ps); let output_buffer = self.output_port.as_mut_slice(ps);
let jack_buffer_size = client.buffer_size() as usize; let jack_buffer_size = client.buffer_size() as usize;
let recording_samples = client.sample_rate() * 5; // 5 seconds let recording_samples = (client.sample_rate() as f64 * 0.6) as usize; // 0.6 seconds
let mut index = 0; let mut index = 0;