Skip to content

Commit

Permalink
feat(sync): stop sync at block number from config for profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
nagmo-starkware committed Mar 25, 2024
1 parent 57dffa7 commit 5193e8a
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 6 deletions.
12 changes: 11 additions & 1 deletion config/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@
"privacy": "Public",
"value": 10000
},
"p2p_sync.stop_sync_at_block_number": {
"description": "Stops the sync at given block number and closes the node cleanly. Used to run profiling on the node.",
"privacy": "Public",
"value": 1000
},
"p2p_sync.stop_sync_at_block_number.#is_none": {
"description": "Flag for an optional field",
"privacy": "TemporaryValue",
"value": true
},
"p2p_sync.wait_period_for_new_data": {
"description": "Time in seconds to wait when a query returned with partial data before sending a new query",
"privacy": "Public",
Expand Down Expand Up @@ -304,4 +314,4 @@
"privacy": "Public",
"value": true
}
}
}
8 changes: 7 additions & 1 deletion crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,11 @@ async fn main() -> anyhow::Result<()> {
}

info!("Booting up.");
run_threads(config).await
let res = run_threads(config.clone()).await;
if config.p2p_sync.is_some_and(|c| c.stop_sync_at_block_number.is_some()) {
if let Err(err) = res {
error!("Error: {err}");
};
}
Ok(())
}
19 changes: 16 additions & 3 deletions crates/papyrus_p2p_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;

use futures::channel::mpsc::{SendError, Sender};
use papyrus_config::converters::deserialize_seconds_to_duration;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::dumping::{ser_optional_param, ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_network::{DataType, Query, ResponseReceivers};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
Expand All @@ -33,11 +33,12 @@ pub struct P2PSyncConfig {
pub num_block_state_diffs_per_query: usize,
#[serde(deserialize_with = "deserialize_seconds_to_duration")]
pub wait_period_for_new_data: Duration,
pub stop_sync_at_block_number: Option<BlockNumber>,
}

impl SerializeConfig for P2PSyncConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([
let mut config = BTreeMap::from_iter([
ser_param(
"num_headers_per_query",
&self.num_headers_per_query,
Expand All @@ -57,7 +58,16 @@ impl SerializeConfig for P2PSyncConfig {
new query",
ParamPrivacyInput::Public,
),
])
]);
config.extend(ser_optional_param(
&self.stop_sync_at_block_number,
BlockNumber(1000),
"stop_sync_at_block_number",
"Stops the sync at given block number and closes the node cleanly. Used to run \
profiling on the node.",
ParamPrivacyInput::Public,
));
config
}
}

Expand All @@ -69,6 +79,7 @@ impl Default for P2PSyncConfig {
// messages in the network buffers.
num_block_state_diffs_per_query: 100,
wait_period_for_new_data: Duration::from_secs(5),
stop_sync_at_block_number: None,
}
}
}
Expand Down Expand Up @@ -149,6 +160,7 @@ impl P2PSync {
self.storage_reader.clone(),
self.config.wait_period_for_new_data,
self.config.num_headers_per_query,
self.config.stop_sync_at_block_number,
);

let state_diff_stream = StateDiffStreamFactory::create_stream(
Expand All @@ -159,6 +171,7 @@ impl P2PSync {
self.storage_reader,
self.config.wait_period_for_new_data,
self.config.num_block_state_diffs_per_query,
self.config.stop_sync_at_block_number,
);

let mut data_stream = header_stream.merge(state_diff_stream);
Expand Down
3 changes: 2 additions & 1 deletion crates/papyrus_p2p_sync/src/p2p_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ lazy_static! {
static ref TEST_CONFIG: P2PSyncConfig = P2PSyncConfig {
num_headers_per_query: HEADER_QUERY_LENGTH,
num_block_state_diffs_per_query: STATE_DIFF_QUERY_LENGTH,
wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA
wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA,
stop_sync_at_block_number: None,
};
}

Expand Down
6 changes: 6 additions & 0 deletions crates/papyrus_p2p_sync/src/stream_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub(crate) trait DataStreamFactory {
storage_reader: StorageReader,
wait_period_for_new_data: Duration,
num_blocks_per_query: usize,
stop_sync_at_block_number: Option<BlockNumber>,
) -> BoxStream<'static, Result<Box<dyn BlockData>, P2PSyncError>> {
stream! {
let mut current_block_number = Self::get_start_block_number(&storage_reader)?;
Expand Down Expand Up @@ -109,6 +110,11 @@ pub(crate) trait DataStreamFactory {
}
info!("Added {:?} for block {}.", Self::DATA_TYPE, current_block_number);
current_block_number = current_block_number.unchecked_next();
if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| {
current_block_number >= stop_sync_at_block_number
}) {
return;
}
}

// Consume the None message signaling the end of the query.
Expand Down

0 comments on commit 5193e8a

Please sign in to comment.