Skip to content

Commit

Permalink
use Notify for a more deterministic approach
Browse files Browse the repository at this point in the history
  • Loading branch information
juan518munoz committed Sep 27, 2024
1 parent cc954c4 commit 3150e0d
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions core/node/da_dispatcher/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use anyhow::Context;
use chrono::Utc;
use futures::future::join_all;
use rand::Rng;
use tokio::sync::{mpsc, watch::Receiver};
use tokio::sync::{mpsc, watch::Receiver, Notify};
use zksync_config::DADispatcherConfig;
use zksync_da_client::{
types::{DAError, InclusionData},
Expand Down Expand Up @@ -122,8 +122,10 @@ impl DataAvailabilityDispatcher {
let config = self.config.clone();
let client = self.client.clone();
let request_semaphore = self.request_semaphore.clone();
let notifier = Arc::new(Notify::new());
let pending_blobs_sender = tokio::spawn(async move {
let mut spawned_requests = vec![];
let notifier = notifier.clone();
loop {
if *stop_receiver.borrow() {
break;
Expand All @@ -141,6 +143,7 @@ impl DataAvailabilityDispatcher {
let pool = pool.clone();
let config = config.clone();
let next_expected_batch = next_expected_batch.clone();
let notifier = notifier.clone();
let request = tokio::spawn(async move {
let _permit = permit; // move permit into scope
let dispatch_latency = METRICS.blob_dispatch_latency.start();
Expand All @@ -165,13 +168,7 @@ impl DataAvailabilityDispatcher {
while batch.l1_batch_number.0 as i64
> next_expected_batch.load(std::sync::atomic::Ordering::Relaxed)
{
tracing::info!(
"batch_number: {} finished DA dispatch, but the current expected batch is: {}, waiting for the correct order",
batch.l1_batch_number, next_expected_batch.load(std::sync::atomic::Ordering::Relaxed));
// Wait a base time of 5 seconds plus an additional 1 second per batch number difference
let waiting_time = 5 + (batch.l1_batch_number.0 as i64)
- next_expected_batch.load(std::sync::atomic::Ordering::Relaxed);
tokio::time::sleep(Duration::from_secs(waiting_time as u64)).await;
notifier.clone().notified().await;
}

let mut conn = pool.connection_tagged("da_dispatcher").await?;
Expand All @@ -186,6 +183,7 @@ impl DataAvailabilityDispatcher {

// Update the next expected batch number
next_expected_batch.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
notifier.notify_waiters();

METRICS
.last_dispatched_l1_batch
Expand Down

0 comments on commit 3150e0d

Please sign in to comment.