From 1286def371904ca642b1f78d0ad1556c593f2a5e Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 2 Oct 2024 19:14:32 -0300 Subject: [PATCH] move const to config --- core/lib/config/src/configs/da_dispatcher.rs | 4 ++++ core/lib/config/src/testonly.rs | 1 + core/lib/env_config/src/da_dispatcher.rs | 5 ++++- core/lib/protobuf_config/src/da_dispatcher.rs | 2 ++ .../src/proto/config/da_dispatcher.proto | 1 + core/node/da_dispatcher/src/da_dispatcher.rs | 20 +++++++++++++++---- 6 files changed, 28 insertions(+), 5 deletions(-) diff --git a/core/lib/config/src/configs/da_dispatcher.rs b/core/lib/config/src/configs/da_dispatcher.rs index e9ad6bd3c07..c8bf1b3b899 100644 --- a/core/lib/config/src/configs/da_dispatcher.rs +++ b/core/lib/config/src/configs/da_dispatcher.rs @@ -6,6 +6,7 @@ pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000; pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100; pub const DEFAULT_MAX_RETRIES: u16 = 5; pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false; +pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u32 = 100; #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct DADispatcherConfig { @@ -19,6 +20,8 @@ pub struct DADispatcherConfig { // TODO: run a verification task to check if the L1 contract expects the inclusion proofs to // avoid the scenario where contracts expect real proofs, and server is using dummy proofs. pub use_dummy_inclusion_data: Option, + /// The maximun number of concurrent request to send to the DA server. + pub max_concurrent_requests: Option, } impl DADispatcherConfig { @@ -28,6 +31,7 @@ impl DADispatcherConfig { max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH), max_retries: Some(DEFAULT_MAX_RETRIES), use_dummy_inclusion_data: Some(DEFAULT_USE_DUMMY_INCLUSION_DATA), + max_concurrent_requests: Some(DEFAULT_MAX_CONCURRENT_REQUESTS), } } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 4a2858b9cbf..a106acd5a2f 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -948,6 +948,7 @@ impl Distribution for EncodeDist { max_rows_to_dispatch: self.sample(rng), max_retries: self.sample(rng), use_dummy_inclusion_data: self.sample(rng), + max_concurrent_requests: self.sample(rng), } } } diff --git a/core/lib/env_config/src/da_dispatcher.rs b/core/lib/env_config/src/da_dispatcher.rs index 246752db91a..805e6b2234b 100644 --- a/core/lib/env_config/src/da_dispatcher.rs +++ b/core/lib/env_config/src/da_dispatcher.rs @@ -21,12 +21,14 @@ mod tests { interval: u32, rows_limit: u32, max_retries: u16, + max_concurrent_requests: u32, ) -> DADispatcherConfig { DADispatcherConfig { polling_interval_ms: Some(interval), max_rows_to_dispatch: Some(rows_limit), max_retries: Some(max_retries), use_dummy_inclusion_data: Some(true), + max_concurrent_requests: Some(max_concurrent_requests), } } @@ -38,9 +40,10 @@ mod tests { DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60 DA_DISPATCHER_MAX_RETRIES=7 DA_DISPATCHER_USE_DUMMY_INCLUSION_DATA="true" + DA_DISPATCHER_MAX_CONCURRENT_REQUESTS=10 "#; lock.set_env(config); let actual = DADispatcherConfig::from_env().unwrap(); - assert_eq!(actual, expected_da_layer_config(5000, 60, 7)); + assert_eq!(actual, expected_da_layer_config(5000, 60, 7, 10)); } } diff --git a/core/lib/protobuf_config/src/da_dispatcher.rs b/core/lib/protobuf_config/src/da_dispatcher.rs index d77073bd32c..e85ff5ae76e 100644 --- a/core/lib/protobuf_config/src/da_dispatcher.rs +++ b/core/lib/protobuf_config/src/da_dispatcher.rs @@ -12,6 +12,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: self.max_rows_to_dispatch, max_retries: self.max_retries.map(|x| x as u16), use_dummy_inclusion_data: self.use_dummy_inclusion_data, + max_concurrent_requests: self.max_concurrent_requests, }) } @@ -21,6 +22,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher { max_rows_to_dispatch: this.max_rows_to_dispatch, max_retries: this.max_retries.map(Into::into), use_dummy_inclusion_data: this.use_dummy_inclusion_data, + max_concurrent_requests: this.max_concurrent_requests, } } } diff --git a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto index dd366bd5b92..d6329d14b28 100644 --- a/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto +++ b/core/lib/protobuf_config/src/proto/config/da_dispatcher.proto @@ -7,4 +7,5 @@ message DataAvailabilityDispatcher { optional uint32 max_rows_to_dispatch = 2; optional uint32 max_retries = 3; optional bool use_dummy_inclusion_data = 4; + optional uint32 max_concurrent_requests = 5; } diff --git a/core/node/da_dispatcher/src/da_dispatcher.rs b/core/node/da_dispatcher/src/da_dispatcher.rs index ccc7a73610b..4a9ad49e751 100644 --- a/core/node/da_dispatcher/src/da_dispatcher.rs +++ b/core/node/da_dispatcher/src/da_dispatcher.rs @@ -5,7 +5,7 @@ use chrono::Utc; use futures::future::join_all; use rand::Rng; use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify}; -use zksync_config::DADispatcherConfig; +use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig}; use zksync_da_client::{ types::{DAError, InclusionData}, DataAvailabilityClient, @@ -29,7 +29,11 @@ impl DataAvailabilityDispatcher { config: DADispatcherConfig, client: Box, ) -> Self { - let request_semaphore = Arc::new(tokio::sync::Semaphore::new(100)); + let request_semaphore = Arc::new(tokio::sync::Semaphore::new( + config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + )); Self { pool, config, @@ -59,7 +63,11 @@ impl DataAvailabilityDispatcher { } async fn dispatch_batches(&self, stop_receiver: Receiver) -> anyhow::Result<()> { - let (tx, mut rx) = mpsc::channel(100); + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + ); let next_expected_batch = Arc::new(Mutex::new(None)); @@ -210,7 +218,11 @@ impl DataAvailabilityDispatcher { } async fn inclusion_poller(&self, stop_receiver: Receiver) -> anyhow::Result<()> { - let (tx, mut rx) = mpsc::channel(100); + let (tx, mut rx) = mpsc::channel( + self.config + .max_concurrent_requests + .unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize, + ); let stop_receiver_clone = stop_receiver.clone(); let pool_clone = self.pool.clone();