Skip to content

Commit

Permalink
fix: pipeline polling order
Browse files Browse the repository at this point in the history
  • Loading branch information
cchudant committed Feb 2, 2025
1 parent b0e2b6e commit c0a7b29
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 66 deletions.
2 changes: 1 addition & 1 deletion crates/madara/client/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl MadaraP2p {
.flat_map(|peers_in_bucket| peers_in_bucket.into_iter())
.collect::<std::collections::HashSet<_>>();
tracing::info!("P2P {peers} peers IN: {connections_in} OUT: {connections_out} Pending: {pending_connections}");
tracing::info!("DHT {dht:?}");
tracing::trace!("DHT {dht:?}");
}

// Make progress on the swarm and handle the events it yields
Expand Down
69 changes: 69 additions & 0 deletions crates/madara/client/sync2/src/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use std::{
collections::VecDeque,
time::{Duration, Instant},
};

/// Rolling average implementation.
pub struct ThroughputCounter {
buckets: VecDeque<(Instant, u64)>,
bucket_size: Duration,
window_size: Duration,
current_count: u64,
current_bucket_start: Instant,
}
impl ThroughputCounter {
pub fn new(window_size: Duration) -> Self {
let now = Instant::now();
Self {
buckets: VecDeque::new(),
bucket_size: window_size / 60,
window_size,
current_count: 0,
current_bucket_start: now,
}
}

pub fn increment(&mut self) {
let now = Instant::now();
if now.duration_since(self.current_bucket_start) >= self.bucket_size {
// Clean-up expired buckets.
while let Some((time, _)) = self.buckets.front() {
if now.duration_since(*time) < self.window_size {
break;
}
self.buckets.pop_front();
}

// Make a new bucket.
if self.current_count > 0 {
self.buckets.push_back((self.current_bucket_start, self.current_count));
}
self.current_count = 0;
self.current_bucket_start = now;
}
self.current_count += 1;
}

/// Returns ops/s
pub fn get_throughput(&self) -> f64 {
let now = Instant::now();
let total_ops = self
.buckets
.iter()
.skip_while(|(time, _)| now.duration_since(*time) >= self.window_size)
.map(|(_, count)| count)
.sum::<u64>()
+ self.current_count;
let window_duration = if let Some((oldest_time, _)) = self.buckets.front() {
now.duration_since(*oldest_time).as_secs_f64()
} else {
now.duration_since(self.current_bucket_start).as_secs_f64()
};

if window_duration > 0.0 {
total_ops as f64 / window_duration
} else {
0.0
}
}
}
71 changes: 52 additions & 19 deletions crates/madara/client/sync2/src/gateway/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
apply_state::ApplyStateSync,
counter::ThroughputCounter,
import::BlockImporter,
pipeline::{ApplyOutcome, PipelineController, PipelineSteps},
sync::{ForwardPipeline, Probe, SyncController, SyncControllerConfig},
Expand All @@ -16,7 +17,7 @@ use mp_gateway::state_update::{ProviderStateUpdateWithBlock, ProviderStateUpdate
use mp_receipt::EventWithTransactionHash;
use mp_state_update::StateDiff;
use starknet_core::types::Felt;
use std::{iter, ops::Range, sync::Arc};
use std::{iter, ops::Range, sync::Arc, time::Duration};

mod classes;

Expand Down Expand Up @@ -239,7 +240,7 @@ impl Default for ForwardSyncConfig {
classes_parallelization: 200,
classes_batch_size: 1,
apply_state_parallelization: 3,
apply_state_batch_size: 20,
apply_state_batch_size: 5,
disable_tries: false,
}
}
Expand Down Expand Up @@ -271,6 +272,8 @@ pub struct GatewayForwardSync {
blocks_pipeline: GatewayBlockSync,
classes_pipeline: ClassesSync,
apply_state_pipeline: ApplyStateSync,
counter: ThroughputCounter,
backend: Arc<MadaraBackend>,
}

impl GatewayForwardSync {
Expand Down Expand Up @@ -301,7 +304,13 @@ impl GatewayForwardSync {
config.apply_state_batch_size,
config.disable_tries,
);
Self { blocks_pipeline, classes_pipeline, apply_state_pipeline }
Self {
blocks_pipeline,
classes_pipeline,
apply_state_pipeline,
counter: ThroughputCounter::new(Duration::from_secs(5 * 60)),
backend,
}
}
}

Expand All @@ -314,21 +323,30 @@ impl ForwardPipeline for GatewayForwardSync {
let next_input_block_n = self.blocks_pipeline.next_input_block_n();
self.blocks_pipeline.push(next_input_block_n..next_input_block_n + 1, iter::once(()));
}

let next_full_block = self.backend.head_status().next_full_block();

tokio::select! {
Some(res) = self.blocks_pipeline.next(), if self.classes_pipeline.can_schedule_more() && self.apply_state_pipeline.can_schedule_more() => {
let (range, state_diffs) = res?;
self.classes_pipeline.push(range.clone(), state_diffs.iter().map(|s| s.all_declared_classes()));
self.apply_state_pipeline.push(range, state_diffs);
Some(res) = self.apply_state_pipeline.next() => {
res?;
}
Some(res) = self.classes_pipeline.next() => {
res?;
}
Some(res) = self.apply_state_pipeline.next() => {
res?;
Some(res) = self.blocks_pipeline.next(), if self.classes_pipeline.can_schedule_more() && self.apply_state_pipeline.can_schedule_more() => {
let (range, state_diffs) = res?;
self.classes_pipeline.push(range.clone(), state_diffs.iter().map(|s| s.all_declared_classes()));
self.apply_state_pipeline.push(range, state_diffs);
}
// all pipelines are empty, we're done :)
else => break Ok(())
}

let new_next_full_block = self.backend.head_status().next_full_block();
for _block_n in next_full_block..new_next_full_block {
// Notify of a new full block here.
self.counter.increment();
}
}
}

Expand All @@ -355,7 +373,6 @@ impl ForwardPipeline for GatewayForwardSync {
f: &mut fmt::Formatter<'_>,
name: &str,
pipeline: &PipelineController<S>,
target_height: Option<u64>,
) -> fmt::Result {
let last_applied_block_n = pipeline.last_applied_block_n();
write!(f, "{name}: ")?;
Expand All @@ -366,12 +383,6 @@ impl ForwardPipeline for GatewayForwardSync {
write!(f, "N")?;
}

if let Some(target_height) = target_height {
write!(f, "/{target_height}")?;
} else {
write!(f, "/?")?;
}

write!(f, " [{}", pipeline.queue_len())?;
if pipeline.is_applying() {
write!(f, "+")?;
Expand All @@ -380,15 +391,37 @@ impl ForwardPipeline for GatewayForwardSync {

Ok(())
}
// blocks/s
let throughput_sec = self.counter.get_throughput();
let latest_block = self.backend.head_status().latest_full_block_n();

tracing::info!(
"{}",
DisplayFromFn(move |f| {
show_pipeline(f, "Blocks", &self.blocks_pipeline, target_height)?;
show_pipeline(f, "Blocks", &self.blocks_pipeline)?;
write!(f, " | ")?;
show_pipeline(f, "Classes", &self.classes_pipeline, target_height)?;
show_pipeline(f, "Classes", &self.classes_pipeline)?;
write!(f, " | ")?;
show_pipeline(f, "State", &self.apply_state_pipeline, target_height)?;
show_pipeline(f, "State", &self.apply_state_pipeline)?;
Ok(())
})
);
tracing::info!(
"{}",
DisplayFromFn(move |f| {
write!(f, "🔗 Sync is at ")?;
if let Some(latest_block) = latest_block {
write!(f, "{latest_block}")?;
} else {
write!(f, "-")?;
}
write!(f, "/")?;
if let Some(target_height) = target_height {
write!(f, "{target_height}")?;
} else {
write!(f, "?")?;
}
write!(f, " [{throughput_sec:.2} blocks/s]")?;
Ok(())
})
);
Expand Down
1 change: 1 addition & 0 deletions crates/madara/client/sync2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod apply_state;
mod pipeline;
mod sync;
mod util;
mod counter;

pub use sync::SyncControllerConfig;

Expand Down
Loading

0 comments on commit c0a7b29

Please sign in to comment.