From ba0b93c8cf7082d6b87c6debe4272f45f551f98d Mon Sep 17 00:00:00 2001 From: Erik Natanael Gustafsson Date: Wed, 7 Jun 2023 21:40:31 +0200 Subject: [PATCH] Sync clocks between graphs to fix scheduling in late started graphs --- src/graph.rs | 67 +++++++++++++++++++++++++++++++++++++++--- src/graph/graph_gen.rs | 6 ++++ src/graph/run_graph.rs | 5 ++-- 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/src/graph.rs b/src/graph.rs index 0f031b1..249bf8e 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -1348,9 +1348,22 @@ impl Graph { .. } = &mut ggc.scheduler { + let clock_update = ClockUpdate { + timestamp: ggc.timestamp.clone(), + clock_sample_rate: self.sample_rate, + }; + let current_time = Superseconds::from_samples( + ggc.timestamp.load(Ordering::SeqCst), + self.sample_rate as u64, + ); let latency = Duration::from_secs_f64(*latency_in_samples / self.sample_rate as f64); - graph.start_scheduler(latency, start_ts.clone(), musical_time_map.clone()); + graph.start_scheduler( + latency, + start_ts.clone(), + Some(clock_update), + musical_time_map.clone(), + ); } } self.graphs_per_node @@ -1720,9 +1733,13 @@ impl Graph { &mut self, latency: Duration, start_ts: Instant, + clock_update: Option, musical_time_map: Arc>, ) { if let Some(ggc) = &mut self.graph_gen_communicator { + if let Some(clock_update) = &clock_update { + ggc.send_clock_update(clock_update.clone()); // Make sure all the clocks in the GraphGens are in sync. + } ggc.scheduler.start( self.sample_rate * self.oversampling.as_usize() as f32, self.block_size * self.oversampling.as_usize(), @@ -1732,7 +1749,12 @@ impl Graph { ); } for (_key, graph) in &mut self.graphs_per_node { - graph.start_scheduler(latency, start_ts, musical_time_map.clone()); + graph.start_scheduler( + latency, + start_ts, + clock_update.clone(), + musical_time_map.clone(), + ); } } /// Returns the current audio thread time in Superbeats based on the @@ -3141,12 +3163,15 @@ impl Graph { let scheduler_buffer_size = self.ring_buffer_size; let (scheduled_change_producer, rb_consumer) = RingBuffer::new(scheduler_buffer_size); - let schedule_receiver = ScheduleReceiver::new(rb_consumer, scheduler_buffer_size); + let (clock_update_producer, clock_update_consumer) = RingBuffer::new(10); + let schedule_receiver = + ScheduleReceiver::new(rb_consumer, clock_update_consumer, scheduler_buffer_size); let graph_gen_communicator = GraphGenCommunicator { free_node_queue_consumer, scheduler, scheduled_change_producer, + clock_update_producer, task_data_to_be_dropped_consumer, new_task_data_producer, next_change_flag: task_data.applied.clone(), @@ -3591,18 +3616,46 @@ impl Scheduler { } } } +#[derive(Clone, Debug)] +struct ClockUpdate { + /// A sample counter in a different graph. This Arc must never deallocate when dropped. + timestamp: Arc, + /// The sample rate of the graph that the timestamp above comes from. Used + /// to convert between timestamps in different sample rates. + clock_sample_rate: f32, +} struct ScheduleReceiver { rb_consumer: rtrb::Consumer, schedule_queue: Vec, + clock_update_consumer: rtrb::Consumer, } impl ScheduleReceiver { - fn new(rb_consumer: rtrb::Consumer, capacity: usize) -> Self { + fn new( + rb_consumer: rtrb::Consumer, + clock_update_consumer: rtrb::Consumer, + capacity: usize, + ) -> Self { Self { rb_consumer, schedule_queue: Vec::with_capacity(capacity), + clock_update_consumer, } } + fn clock_update(&mut self, sample_rate: f32) -> Option { + let mut new_timestamp = None; + while let Ok(clock) = self.clock_update_consumer.pop() { + let samples = clock.timestamp.load(Ordering::SeqCst); + if sample_rate == clock.clock_sample_rate { + new_timestamp = Some(samples); + } else { + new_timestamp = Some( + ((samples as f64 / clock.clock_sample_rate as f64) * sample_rate as f64) as u64, + ); + } + } + new_timestamp + } /// TODO: Return only a slice of changes that should be applied this block and then remove them all at once. fn changes(&mut self) -> &mut Vec { let num_new_changes = self.rb_consumer.slots(); @@ -3649,6 +3702,8 @@ struct GraphGenCommunicator { // `updates_available` every time it finishes a block. It is a u16 so that // when it overflows generation - some_generation_number fits in an i32. scheduler: Scheduler, + /// For sending clock updates to the audio thread + clock_update_producer: rtrb::Producer, /// The ring buffer for sending scheduled changes to the audio thread scheduled_change_producer: rtrb::Producer, timestamp: Arc, @@ -3682,6 +3737,10 @@ impl GraphGenCommunicator { } } + fn send_clock_update(&mut self, clock_update: ClockUpdate) { + self.clock_update_producer.push(clock_update).unwrap(); + } + /// Sends the updated tasks to the GraphGen. NB: Always check if any /// resoruces in the Graph can be freed before running this. /// GraphGenCommunicator will free its own resources. diff --git a/src/graph/graph_gen.rs b/src/graph/graph_gen.rs index 2bbeeb2..4228fa7 100644 --- a/src/graph/graph_gen.rs +++ b/src/graph/graph_gen.rs @@ -541,6 +541,12 @@ impl Gen for GraphGen { // TODO: Support output with a different block size, i.e. local buffering and running this graph more or less often than the parent graph // // + // Check if there is a new clock to update to + if let Some(new_sample_counter) = + self.schedule_receiver.clock_update(self.sample_rate) + { + self.sample_counter = new_sample_counter; + } let mut do_empty_buffer = None; let mut do_mend_connections = None; let num_new_task_data = self.new_task_data_consumer.slots(); diff --git a/src/graph/run_graph.rs b/src/graph/run_graph.rs index 7be3aa0..f68e68e 100644 --- a/src/graph/run_graph.rs +++ b/src/graph/run_graph.rs @@ -10,9 +10,9 @@ use std::{ use rtrb::RingBuffer; -use crate::{scheduling::MusicalTimeMap, Resources}; +use crate::{prelude::Superseconds, scheduling::MusicalTimeMap, Resources}; -use super::{node::Node, Graph, NodeBufferRef, Sample}; +use super::{node::Node, ClockUpdate, Graph, NodeBufferRef, Sample}; /// Wrapper around a [`Graph`] `Node` with convenience methods to run the /// Graph, either from an audio thread or for non-real time purposes. @@ -78,6 +78,7 @@ impl RunGraph { graph.start_scheduler( settings.scheduling_latency, scheduler_start_time_stamp, + None, musical_time_map.clone(), ); // Run a first update to make sure any queued changes get sent to the GraphGen