Skip to content

Commit

Permalink
Sync clocks between graphs to fix scheduling in late started graphs
Browse files Browse the repository at this point in the history
  • Loading branch information
ErikNatanael committed Jun 7, 2023
1 parent 7a594a0 commit ba0b93c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 6 deletions.
67 changes: 63 additions & 4 deletions src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1348,9 +1348,22 @@ impl Graph {
..
} = &mut ggc.scheduler
{
let clock_update = ClockUpdate {
timestamp: ggc.timestamp.clone(),
clock_sample_rate: self.sample_rate,
};
let current_time = Superseconds::from_samples(
ggc.timestamp.load(Ordering::SeqCst),
self.sample_rate as u64,
);
let latency =
Duration::from_secs_f64(*latency_in_samples / self.sample_rate as f64);
graph.start_scheduler(latency, start_ts.clone(), musical_time_map.clone());
graph.start_scheduler(
latency,
start_ts.clone(),
Some(clock_update),
musical_time_map.clone(),
);
}
}
self.graphs_per_node
Expand Down Expand Up @@ -1720,9 +1733,13 @@ impl Graph {
&mut self,
latency: Duration,
start_ts: Instant,
clock_update: Option<ClockUpdate>,
musical_time_map: Arc<RwLock<MusicalTimeMap>>,
) {
if let Some(ggc) = &mut self.graph_gen_communicator {
if let Some(clock_update) = &clock_update {
ggc.send_clock_update(clock_update.clone()); // Make sure all the clocks in the GraphGens are in sync.
}
ggc.scheduler.start(
self.sample_rate * self.oversampling.as_usize() as f32,
self.block_size * self.oversampling.as_usize(),
Expand All @@ -1732,7 +1749,12 @@ impl Graph {
);
}
for (_key, graph) in &mut self.graphs_per_node {
graph.start_scheduler(latency, start_ts, musical_time_map.clone());
graph.start_scheduler(
latency,
start_ts,
clock_update.clone(),
musical_time_map.clone(),
);
}
}
/// Returns the current audio thread time in Superbeats based on the
Expand Down Expand Up @@ -3141,12 +3163,15 @@ impl Graph {

let scheduler_buffer_size = self.ring_buffer_size;
let (scheduled_change_producer, rb_consumer) = RingBuffer::new(scheduler_buffer_size);
let schedule_receiver = ScheduleReceiver::new(rb_consumer, scheduler_buffer_size);
let (clock_update_producer, clock_update_consumer) = RingBuffer::new(10);
let schedule_receiver =
ScheduleReceiver::new(rb_consumer, clock_update_consumer, scheduler_buffer_size);

let graph_gen_communicator = GraphGenCommunicator {
free_node_queue_consumer,
scheduler,
scheduled_change_producer,
clock_update_producer,
task_data_to_be_dropped_consumer,
new_task_data_producer,
next_change_flag: task_data.applied.clone(),
Expand Down Expand Up @@ -3591,18 +3616,46 @@ impl Scheduler {
}
}
}
#[derive(Clone, Debug)]
struct ClockUpdate {
/// A sample counter in a different graph. This Arc must never deallocate when dropped.
timestamp: Arc<AtomicU64>,
/// The sample rate of the graph that the timestamp above comes from. Used
/// to convert between timestamps in different sample rates.
clock_sample_rate: f32,
}

struct ScheduleReceiver {
rb_consumer: rtrb::Consumer<ScheduledChange>,
schedule_queue: Vec<ScheduledChange>,
clock_update_consumer: rtrb::Consumer<ClockUpdate>,
}
impl ScheduleReceiver {
fn new(rb_consumer: rtrb::Consumer<ScheduledChange>, capacity: usize) -> Self {
fn new(
rb_consumer: rtrb::Consumer<ScheduledChange>,
clock_update_consumer: rtrb::Consumer<ClockUpdate>,
capacity: usize,
) -> Self {
Self {
rb_consumer,
schedule_queue: Vec::with_capacity(capacity),
clock_update_consumer,
}
}
fn clock_update(&mut self, sample_rate: f32) -> Option<u64> {
let mut new_timestamp = None;
while let Ok(clock) = self.clock_update_consumer.pop() {
let samples = clock.timestamp.load(Ordering::SeqCst);
if sample_rate == clock.clock_sample_rate {
new_timestamp = Some(samples);
} else {
new_timestamp = Some(
((samples as f64 / clock.clock_sample_rate as f64) * sample_rate as f64) as u64,
);
}
}
new_timestamp
}
/// TODO: Return only a slice of changes that should be applied this block and then remove them all at once.
fn changes(&mut self) -> &mut Vec<ScheduledChange> {
let num_new_changes = self.rb_consumer.slots();
Expand Down Expand Up @@ -3649,6 +3702,8 @@ struct GraphGenCommunicator {
// `updates_available` every time it finishes a block. It is a u16 so that
// when it overflows generation - some_generation_number fits in an i32.
scheduler: Scheduler,
/// For sending clock updates to the audio thread
clock_update_producer: rtrb::Producer<ClockUpdate>,
/// The ring buffer for sending scheduled changes to the audio thread
scheduled_change_producer: rtrb::Producer<ScheduledChange>,
timestamp: Arc<AtomicU64>,
Expand Down Expand Up @@ -3682,6 +3737,10 @@ impl GraphGenCommunicator {
}
}

fn send_clock_update(&mut self, clock_update: ClockUpdate) {
self.clock_update_producer.push(clock_update).unwrap();
}

/// Sends the updated tasks to the GraphGen. NB: Always check if any
/// resoruces in the Graph can be freed before running this.
/// GraphGenCommunicator will free its own resources.
Expand Down
6 changes: 6 additions & 0 deletions src/graph/graph_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,12 @@ impl Gen for GraphGen {
// TODO: Support output with a different block size, i.e. local buffering and running this graph more or less often than the parent graph
//
//
// Check if there is a new clock to update to
if let Some(new_sample_counter) =
self.schedule_receiver.clock_update(self.sample_rate)
{
self.sample_counter = new_sample_counter;
}
let mut do_empty_buffer = None;
let mut do_mend_connections = None;
let num_new_task_data = self.new_task_data_consumer.slots();
Expand Down
5 changes: 3 additions & 2 deletions src/graph/run_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use std::{

use rtrb::RingBuffer;

use crate::{scheduling::MusicalTimeMap, Resources};
use crate::{prelude::Superseconds, scheduling::MusicalTimeMap, Resources};

use super::{node::Node, Graph, NodeBufferRef, Sample};
use super::{node::Node, ClockUpdate, Graph, NodeBufferRef, Sample};

/// Wrapper around a [`Graph`] `Node` with convenience methods to run the
/// Graph, either from an audio thread or for non-real time purposes.
Expand Down Expand Up @@ -78,6 +78,7 @@ impl RunGraph {
graph.start_scheduler(
settings.scheduling_latency,
scheduler_start_time_stamp,
None,
musical_time_map.clone(),
);
// Run a first update to make sure any queued changes get sent to the GraphGen
Expand Down

0 comments on commit ba0b93c

Please sign in to comment.