diff --git a/core/lib/config/src/configs/da_dispatcher.rs b/core/lib/config/src/configs/da_dispatcher.rs index e9ad6bd3c07..c8bf1b3b899 100644 --- a/core/lib/config/src/configs/da_dispatcher.rs +++ b/core/lib/config/src/configs/da_dispatcher.rs @@ -6,6 +6,7 @@ pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000; pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100; pub const DEFAULT_MAX_RETRIES: u16 = 5; pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false; +pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u32 = 100; #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct DADispatcherConfig { @@ -19,6 +20,8 @@ pub struct DADispatcherConfig { // TODO: run a verification task to check if the L1 contract expects the inclusion proofs to // avoid the scenario where contracts expect real proofs, and server is using dummy proofs. pub use_dummy_inclusion_data: Option, + /// The maximun number of concurrent request to send to the DA server. + pub max_concurrent_requests: Option, } impl DADispatcherConfig { @@ -28,6 +31,7 @@ impl DADispatcherConfig { max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH), max_retries: Some(DEFAULT_MAX_RETRIES), use_dummy_inclusion_data: Some(DEFAULT_USE_DUMMY_INCLUSION_DATA), + max_concurrent_requests: Some(DEFAULT_MAX_CONCURRENT_REQUESTS), } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 4a2858b9cbf..a106acd5a2f 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -948,6 +948,7 @@ impl Distribution for EncodeDist { max_rows_to_dispatch: self.sample(rng), max_retries: self.sample(rng), use_dummy_inclusion_data: self.sample(rng), + max_concurrent_requests: self.sample(rng), } } } diff --git a/core/lib/env_config/src/da_dispatcher.rs b/core/lib/env_config/src/da_dispatcher.rs index 246752db91a..805e6b2234b 100644 --- a/core/lib/env_config/src/da_dispatcher.rs +++ b/core/lib/env_config/src/da_dispatcher.rs @@ -21,12 +21,14 @@ mod tests { interval: u32, rows_limit: u32, max_retries: u16, + max_concurrent_requests: u32, ) -> DADispatcherConfig { DADispatcherConfig { polling_interval_ms: Some(interval), max_rows_to_dispatch: Some(rows_limit), max_retries: Some(max_retries), use_dummy_inclusion_data: Some(true), + max_concurrent_requests: Some(max_concurrent_requests), } } @@ -38,9 +40,10 @@ mod tests { DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60 DA_DISPATCHER_MAX_RETRIES=7 DA_DISPATCHER_USE_DUMMY_INCLUSION_DATA="true" + DA_DISPATCHER_MAX_CONCURRENT_REQUESTS=10 "#; lock.set_env(config); let actual = DADispatcherConfig::from_env().unwrap(); - assert_eq!(actual, expected_da_layer_config(5000, 60, 7)); + assert_eq!(actual, expected_da_layer_config(5000, 60, 7, 10)); } } diff --git a/core/lib/protobuf_config/src/da_dispatcher.rs b/core/lib/protobuf_config/src/da_dispatcher.rs index d77073bd32c..e85ff5ae76e 100644 --- a/core/lib/protobuf_config/src/da_dispatcher.rs +++ b/core/lib/protobuf_config/src/da_dispatcher.rs @@ -12,6 +12,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: self.max_rows_to_dispatch, max_retries: self.max_retries.map(|x| x as u16), use_dummy_inclusion_data: self.use_dummy_inclusion_data, + max_concurrent_requests: self.max_concurrent_requests, }) } @@ -21,6 +22,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: this.max_rows_to_dispatch, max_retries: this.max_retries.map(Into::into), use_dummy_inclusion_data: this.use_dummy_inclusion_data, + max_concurrent_requests: this.max_concurrent_requests, } } } diff --git a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto index dd366bd5b92..d6329d14b28 100644 --- a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto +++ b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto @@ -7,4 +7,5 @@ message DataAvailabilityDispatcher { optional uint32 max_rows_to_dispatch = 2; optional uint32 max_retries = 3; optional bool use_dummy_inclusion_data = 4; + optional uint32 max_concurrent_requests = 5; } diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index a476add4a70..4a9ad49e751 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -1,10 +1,11 @@ -use std::{future::Future, time::Duration}; +use std::{collections::HashSet, future::Future, sync::Arc, time::Duration}; use anyhow::Context; use chrono::Utc; +use futures::future::join_all; use rand::Rng; -use tokio::sync::watch::Receiver; -use zksync_config::DADispatcherConfig; +use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify}; +use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; use zksync_da_client::{ types::{DAError, InclusionData}, DataAvailabilityClient, @@ -19,6 +20,7 @@ pub struct DataAvailabilityDispatcher { client: Box, pool: ConnectionPool, config: DADispatcherConfig, + request_semaphore: Arc, } impl DataAvailabilityDispatcher { @@ -27,178 +29,312 @@ impl DataAvailabilityDispatcher { config: DADispatcherConfig, client: Box, ) -> Self { + let request_semaphore = Arc::new(tokio::sync::Semaphore::new( + config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + )); Self { pool, config, client, + request_semaphore, } } - pub async fn run(self, mut stop_receiver: Receiver) -> anyhow::Result<()> { - loop { - if *stop_receiver.borrow() { - break; - } + pub async fn run(self, stop_receiver: Receiver) -> anyhow::Result<()> { + let subtasks = futures::future::join( + async { + if let Err(err) = self.dispatch_batches(stop_receiver.clone()).await { + tracing::error!("dispatch error {err:?}"); + } + }, + async { + if let Err(err) = self.inclusion_poller(stop_receiver.clone()).await { + tracing::error!("poll_for_inclusion error {err:?}"); + } + }, + ); + + tokio::select! { + _ = subtasks => {}, + } + Ok(()) + } + + async fn dispatch_batches(&self, stop_receiver: Receiver) -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + ); + + let next_expected_batch = Arc::new(Mutex::new(None)); + + let stop_receiver_clone = stop_receiver.clone(); + let pool_clone = self.pool.clone(); + let config_clone = self.config.clone(); + let next_expected_batch_clone = next_expected_batch.clone(); + let pending_blobs_reader = tokio::spawn(async move { + // Used to avoid sending the same batch multiple times + let mut pending_batches = HashSet::new(); + // let pair = cvar_pair_clone.clone(); + loop { + if *stop_receiver_clone.borrow() { + tracing::info!("Stop signal received, da_dispatcher is shutting down"); + break; + } - let subtasks = futures::future::join( - async { - if let Err(err) = self.dispatch().await { - tracing::error!("dispatch error {err:?}"); + let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; + let batches = conn + .data_availability_dal() + .get_ready_for_da_dispatch_l1_batches( + config_clone.max_rows_to_dispatch() as usize + ) + .await?; + drop(conn); + for batch in batches { + if pending_batches.contains(&batch.l1_batch_number.0) { + continue; } - }, - async { - if let Err(err) = self.poll_for_inclusion().await { - tracing::error!("poll_for_inclusion error {err:?}"); + + // This should only happen once. + // We can't assume that the first batch is always 1 because the dispatcher can be restarted + // and resume from a different batch. + let mut next_expected_batch_lock = next_expected_batch_clone.lock().await; + if next_expected_batch_lock.is_none() { + next_expected_batch_lock.replace(batch.l1_batch_number); } - }, - ); - let subtasks = futures::future::join(subtasks, async { - if let Err(err) = self.update_metrics().await { - tracing::error!("update_metrics error {err:?}"); + pending_batches.insert(batch.l1_batch_number.0); + METRICS.blobs_pending_dispatch.inc_by(1); + tx.send(batch).await?; } - }); - tokio::select! { - _ = subtasks => {}, - _ = stop_receiver.changed() => { + tokio::time::sleep(Duration::from_secs(5)).await; + } + Ok::<(), anyhow::Error>(()) + }); + + let pool = self.pool.clone(); + let config = self.config.clone(); + let client = self.client.clone(); + let request_semaphore = self.request_semaphore.clone(); + let notifier = Arc::new(Notify::new()); + let pending_blobs_sender = tokio::spawn(async move { + let mut spawned_requests = vec![]; + let notifier = notifier.clone(); + loop { + if *stop_receiver.borrow() { break; } - } - if tokio::time::timeout(self.config.polling_interval(), stop_receiver.changed()) - .await - .is_ok() - { - break; - } - } + let batch = match rx.recv().await { + Some(batch) => batch, + None => continue, // Should never happen + }; - tracing::info!("Stop signal received, da_dispatcher is shutting down"); - Ok(()) - } + // Block until we can send the request + let permit = request_semaphore.clone().acquire_owned().await?; - /// Dispatches the blobs to the data availability layer, and saves the blob_id in the database. - async fn dispatch(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let batches = conn - .data_availability_dal() - .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) - .await?; - drop(conn); - - for batch in batches { - let dispatch_latency = METRICS.blob_dispatch_latency.start(); - let dispatch_response = retry(self.config.max_retries(), batch.l1_batch_number, || { - self.client - .dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) - }) - .await - .with_context(|| { - format!( - "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", - batch.l1_batch_number, - batch.pubdata.len() - ) - })?; - let dispatch_latency_duration = dispatch_latency.observe(); - - let sent_at = Utc::now().naive_utc(); - - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .insert_l1_batch_da( - batch.l1_batch_number, - dispatch_response.blob_id.as_str(), - sent_at, - ) - .await?; - drop(conn); - - METRICS - .last_dispatched_l1_batch - .set(batch.l1_batch_number.0 as usize); - METRICS.blob_size.observe(batch.pubdata.len()); - METRICS.blobs_dispatched.inc_by(1); - tracing::info!( - "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", - batch.l1_batch_number, - batch.pubdata.len(), - ); - } + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let next_expected_batch = next_expected_batch.clone(); + let notifier = notifier.clone(); + let request = tokio::spawn(async move { + let _permit = permit; // move permit into scope + let dispatch_latency = METRICS.blob_dispatch_latency.start(); + let dispatch_response = + retry(config.max_retries(), batch.l1_batch_number, || { + client.dispatch_blob(batch.l1_batch_number.0, batch.pubdata.clone()) + }) + .await + .with_context(|| { + format!( + "failed to dispatch a blob with batch_number: {}, pubdata_len: {}", + batch.l1_batch_number, + batch.pubdata.len() + ) + })?; + let dispatch_latency_duration = dispatch_latency.observe(); + + let sent_at = Utc::now().naive_utc(); + + // Before saving the blob in the database, we need to be sure that we are doing it + // in the correct order. + while next_expected_batch + .lock() + .await + .map_or(true, |next_expected_batch| { + batch.l1_batch_number > next_expected_batch + }) + { + notifier.clone().notified().await; + } + + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .insert_l1_batch_da( + batch.l1_batch_number, + dispatch_response.blob_id.as_str(), + sent_at, + ) + .await?; + drop(conn); + // Update the next expected batch number + next_expected_batch + .lock() + .await + .replace(batch.l1_batch_number + 1); + notifier.notify_waiters(); + + METRICS + .last_dispatched_l1_batch + .set(batch.l1_batch_number.0 as usize); + METRICS.blob_size.observe(batch.pubdata.len()); + METRICS.blobs_dispatched.inc_by(1); + METRICS.blobs_pending_dispatch.dec_by(1); + tracing::info!( + "Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}", + batch.l1_batch_number, + batch.pubdata.len(), + ); + + Ok::<(), anyhow::Error>(()) + }); + spawned_requests.push(request); + } + join_all(spawned_requests).await; + Ok::<(), anyhow::Error>(()) + }); + + let results = join_all(vec![pending_blobs_reader, pending_blobs_sender]).await; + for result in results { + result??; + } Ok(()) } - /// Polls the data availability layer for inclusion data, and saves it in the database. - async fn poll_for_inclusion(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let blob_info = conn - .data_availability_dal() - .get_first_da_blob_awaiting_inclusion() - .await?; - drop(conn); - - let Some(blob_info) = blob_info else { - return Ok(()); - }; - - let inclusion_data = if self.config.use_dummy_inclusion_data() { - self.client - .get_inclusion_data(blob_info.blob_id.as_str()) - .await - .with_context(|| { - format!( - "failed to get inclusion data for blob_id: {}, batch_number: {}", - blob_info.blob_id, blob_info.l1_batch_number - ) - })? - } else { - // if the inclusion verification is disabled, we don't need to wait for the inclusion - // data before committing the batch, so simply return an empty vector - Some(InclusionData { data: vec![] }) - }; - - let Some(inclusion_data) = inclusion_data else { - return Ok(()); - }; - - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - conn.data_availability_dal() - .save_l1_batch_inclusion_data( - L1BatchNumber(blob_info.l1_batch_number.0), - inclusion_data.data.as_slice(), - ) - .await?; - drop(conn); - - let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); - if let Ok(latency) = inclusion_latency.to_std() { - METRICS.inclusion_latency.observe(latency); - } - METRICS - .last_included_l1_batch - .set(blob_info.l1_batch_number.0 as usize); - METRICS.blobs_included.inc_by(1); - - tracing::info!( - "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", - blob_info.l1_batch_number, - inclusion_latency.num_seconds() + async fn inclusion_poller(&self, stop_receiver: Receiver) -> anyhow::Result<()> { + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, ); - Ok(()) - } + let stop_receiver_clone = stop_receiver.clone(); + let pool_clone = self.pool.clone(); + let pending_inclusion_reader = tokio::spawn(async move { + let mut pending_inclusions = HashSet::new(); + loop { + if *stop_receiver_clone.borrow() { + break; + } + + let mut conn = pool_clone.connection_tagged("da_dispatcher").await?; + // TODO: this query might always return the same blob if the blob is not included + // we should probably change the query to return all blobs that are not included + let blob_info = conn + .data_availability_dal() + .get_first_da_blob_awaiting_inclusion() + .await?; + drop(conn); + + let Some(blob_info) = blob_info else { + tokio::time::sleep(Duration::from_secs(5)).await; + continue; + }; + + if pending_inclusions.contains(&blob_info.blob_id) { + continue; + } + pending_inclusions.insert(blob_info.blob_id.clone()); + tx.send(blob_info).await?; + } + Ok::<(), anyhow::Error>(()) + }); + + let pool = self.pool.clone(); + let config = self.config.clone(); + let client = self.client.clone(); + let semaphore = self.request_semaphore.clone(); + let pending_inclusion_sender = tokio::spawn(async move { + let mut spawned_requests = vec![]; + loop { + if *stop_receiver.borrow() { + break; + } + let blob_info = match rx.recv().await { + Some(blob_info) => blob_info, + None => continue, // Should never happen + }; + + // Block until we can send the request + let permit = semaphore.clone().acquire_owned().await?; + + let client = client.clone(); + let pool = pool.clone(); + let config = config.clone(); + let request = tokio::spawn(async move { + let _permit = permit; // move permit into scope + let inclusion_data = if config.use_dummy_inclusion_data() { + client + .get_inclusion_data(blob_info.blob_id.as_str()) + .await + .with_context(|| { + format!( + "failed to get inclusion data for blob_id: {}, batch_number: {}", + blob_info.blob_id, blob_info.l1_batch_number + ) + })? + } else { + // if the inclusion verification is disabled, we don't need to wait for the inclusion + // data before committing the batch, so simply return an empty vector + Some(InclusionData { data: vec![] }) + }; - async fn update_metrics(&self) -> anyhow::Result<()> { - let mut conn = self.pool.connection_tagged("da_dispatcher").await?; - let batches = conn - .data_availability_dal() - .get_ready_for_da_dispatch_l1_batches(self.config.max_rows_to_dispatch() as usize) - .await?; - drop(conn); - METRICS.blobs_pending_dispatch.set(batches.len()); + let Some(inclusion_data) = inclusion_data else { + return Ok(()); + }; + let mut conn = pool.connection_tagged("da_dispatcher").await?; + conn.data_availability_dal() + .save_l1_batch_inclusion_data( + L1BatchNumber(blob_info.l1_batch_number.0), + inclusion_data.data.as_slice(), + ) + .await?; + drop(conn); + + let inclusion_latency = Utc::now().signed_duration_since(blob_info.sent_at); + if let Ok(latency) = inclusion_latency.to_std() { + METRICS.inclusion_latency.observe(latency); + } + METRICS + .last_included_l1_batch + .set(blob_info.l1_batch_number.0 as usize); + METRICS.blobs_included.inc_by(1); + + tracing::info!( + "Received an inclusion data for a batch_number: {}, inclusion_latency_seconds: {}", + blob_info.l1_batch_number, + inclusion_latency.num_seconds() + ); + + Ok::<(), anyhow::Error>(()) + }); + spawned_requests.push(request); + } + join_all(spawned_requests).await; + Ok::<(), anyhow::Error>(()) + }); + + let results = join_all(vec![pending_inclusion_reader, pending_inclusion_sender]).await; + for result in results { + result??; + } Ok(()) } }