Skip to content

Commit

Permalink
move const to config
Browse files Browse the repository at this point in the history
  • Loading branch information
juan518munoz committed Oct 2, 2024
1 parent b108adc commit 1286def
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 5 deletions.
4 changes: 4 additions & 0 deletions core/lib/config/src/configs/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub const DEFAULT_POLLING_INTERVAL_MS: u32 = 5000;
pub const DEFAULT_MAX_ROWS_TO_DISPATCH: u32 = 100;
pub const DEFAULT_MAX_RETRIES: u16 = 5;
pub const DEFAULT_USE_DUMMY_INCLUSION_DATA: bool = false;
pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u32 = 100;

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct DADispatcherConfig {
Expand All @@ -19,6 +20,8 @@ pub struct DADispatcherConfig {
// TODO: run a verification task to check if the L1 contract expects the inclusion proofs to
// avoid the scenario where contracts expect real proofs, and server is using dummy proofs.
pub use_dummy_inclusion_data: Option<bool>,
/// The maximun number of concurrent request to send to the DA server.
pub max_concurrent_requests: Option<u32>,
}

impl DADispatcherConfig {
Expand All @@ -28,6 +31,7 @@ impl DADispatcherConfig {
max_rows_to_dispatch: Some(DEFAULT_MAX_ROWS_TO_DISPATCH),
max_retries: Some(DEFAULT_MAX_RETRIES),
use_dummy_inclusion_data: Some(DEFAULT_USE_DUMMY_INCLUSION_DATA),
max_concurrent_requests: Some(DEFAULT_MAX_CONCURRENT_REQUESTS),
}
}

Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ impl Distribution<configs::da_dispatcher::DADispatcherConfig> for EncodeDist {
max_rows_to_dispatch: self.sample(rng),
max_retries: self.sample(rng),
use_dummy_inclusion_data: self.sample(rng),
max_concurrent_requests: self.sample(rng),
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion core/lib/env_config/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ mod tests {
interval: u32,
rows_limit: u32,
max_retries: u16,
max_concurrent_requests: u32,
) -> DADispatcherConfig {
DADispatcherConfig {
polling_interval_ms: Some(interval),
max_rows_to_dispatch: Some(rows_limit),
max_retries: Some(max_retries),
use_dummy_inclusion_data: Some(true),
max_concurrent_requests: Some(max_concurrent_requests),
}
}

Expand All @@ -38,9 +40,10 @@ mod tests {
DA_DISPATCHER_MAX_ROWS_TO_DISPATCH=60
DA_DISPATCHER_MAX_RETRIES=7
DA_DISPATCHER_USE_DUMMY_INCLUSION_DATA="true"
DA_DISPATCHER_MAX_CONCURRENT_REQUESTS=10
"#;
lock.set_env(config);
let actual = DADispatcherConfig::from_env().unwrap();
assert_eq!(actual, expected_da_layer_config(5000, 60, 7));
assert_eq!(actual, expected_da_layer_config(5000, 60, 7, 10));
}
}
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher {
max_rows_to_dispatch: self.max_rows_to_dispatch,
max_retries: self.max_retries.map(|x| x as u16),
use_dummy_inclusion_data: self.use_dummy_inclusion_data,
max_concurrent_requests: self.max_concurrent_requests,
})
}

Expand All @@ -21,6 +22,7 @@ impl ProtoRepr for proto::DataAvailabilityDispatcher {
max_rows_to_dispatch: this.max_rows_to_dispatch,
max_retries: this.max_retries.map(Into::into),
use_dummy_inclusion_data: this.use_dummy_inclusion_data,
max_concurrent_requests: this.max_concurrent_requests,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ message DataAvailabilityDispatcher {
optional uint32 max_rows_to_dispatch = 2;
optional uint32 max_retries = 3;
optional bool use_dummy_inclusion_data = 4;
optional uint32 max_concurrent_requests = 5;
}
20 changes: 16 additions & 4 deletions core/node/da_dispatcher/src/da_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use chrono::Utc;
use futures::future::join_all;
use rand::Rng;
use tokio::sync::{mpsc, watch::Receiver, Mutex, Notify};
use zksync_config::DADispatcherConfig;
use zksync_config::{configs::da_dispatcher::DEFAULT_MAX_CONCURRENT_REQUESTS, DADispatcherConfig};
use zksync_da_client::{
types::{DAError, InclusionData},
DataAvailabilityClient,
Expand All @@ -29,7 +29,11 @@ impl DataAvailabilityDispatcher {
config: DADispatcherConfig,
client: Box<dyn DataAvailabilityClient>,
) -> Self {
let request_semaphore = Arc::new(tokio::sync::Semaphore::new(100));
let request_semaphore = Arc::new(tokio::sync::Semaphore::new(
config
.max_concurrent_requests
.unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize,
));
Self {
pool,
config,
Expand Down Expand Up @@ -59,7 +63,11 @@ impl DataAvailabilityDispatcher {
}

async fn dispatch_batches(&self, stop_receiver: Receiver<bool>) -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::channel(100);
let (tx, mut rx) = mpsc::channel(
self.config
.max_concurrent_requests
.unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize,
);

let next_expected_batch = Arc::new(Mutex::new(None));

Expand Down Expand Up @@ -210,7 +218,11 @@ impl DataAvailabilityDispatcher {
}

async fn inclusion_poller(&self, stop_receiver: Receiver<bool>) -> anyhow::Result<()> {
let (tx, mut rx) = mpsc::channel(100);
let (tx, mut rx) = mpsc::channel(
self.config
.max_concurrent_requests
.unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS) as usize,
);

let stop_receiver_clone = stop_receiver.clone();
let pool_clone = self.pool.clone();
Expand Down

0 comments on commit 1286def

Please sign in to comment.