Skip to content

Commit

Permalink
feat(sync): add state diff sync to p2p sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Mar 14, 2024
1 parent a74c8d2 commit 2bf0963
Show file tree
Hide file tree
Showing 10 changed files with 498 additions and 43 deletions.
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ simple_logger = "4.0.0"
starknet_api = "0.10.0"
starknet-core = "0.6.0"
starknet-crypto = "0.5.1"
static_assertions = "1.1.0"
strum = "0.25.0"
strum_macros = "0.25.2"
tempfile = "3.3.0"
Expand All @@ -125,4 +126,4 @@ url = "2.2.2"
validator = "0.12"

[patch.crates-io]
starknet_api = { git = "https://github.com/starkware-libs/starknet-api", rev = "fa2cf6d6da46c43efe141e9ffc044ae9ebbf4541" }
starknet_api = { git = "https://github.com/starkware-libs/starknet-api", rev = "4409d371a1f85b6e0d745cf23250ecfaae1b6f42" }
5 changes: 5 additions & 0 deletions config/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@
"privacy": "TemporaryValue",
"value": true
},
"p2p_sync.num_block_state_diffs_per_query": {
"description": "The maximum amount of block's state diffs to ask from peers in each iteration.",
"privacy": "Public",
"value": 1000
},
"p2p_sync.num_headers_per_query": {
"description": "The maximum amount of headers to ask from peers in each iteration.",
"privacy": "Public",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ expression: dumped_default_config
"value": true,
"privacy": "TemporaryValue"
},
"p2p_sync.num_block_state_diffs_per_query": {
"description": "The maximum amount of block's state diffs to ask from peers in each iteration.",
"value": {
"$serde_json::private::Number": "1000"
},
"privacy": "Public"
},
"p2p_sync.num_headers_per_query": {
"description": "The maximum amount of headers to ask from peers in each iteration.",
"value": {
Expand Down
5 changes: 5 additions & 0 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,20 @@ license-file.workspace = true
[dependencies]
async-stream.workspace = true
futures.workspace = true
indexmap.workspace = true
papyrus_config = { path = "../papyrus_config", version = "0.3.0" }
papyrus_network = { path = "../papyrus_network", version = "0.3.0" }
papyrus_storage = { path = "../papyrus_storage", version = "0.3.0" }
serde.workspace = true
starknet_api.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tracing.workspace = true

[dev-dependencies]
lazy_static.workspace = true
papyrus_storage = { path = "../papyrus_storage", features = ["testing"] }
static_assertions.workspace = true
rand.workspace = true
test_utils = { path = "../test_utils" }
6 changes: 2 additions & 4 deletions crates/papyrus_p2p_sync/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use papyrus_network::{DataType, SignedBlockHeader};
use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use tracing::debug;

use crate::stream_factory::{BlockData, BlockNumberLimit, DataStreamFactory};
use crate::{P2PSyncError, ALLOWED_SIGNATURES_LENGTH, NETWORK_DATA_TIMEOUT};
Expand Down Expand Up @@ -41,7 +40,7 @@ impl DataStreamFactory for HeaderStreamFactory {

const DATA_TYPE: DataType = DataType::SignedBlockHeader;
const BLOCK_NUMBER_LIMIT: BlockNumberLimit = BlockNumberLimit::Unlimited;
const SHOULD_LOG_ADDED_BLOCK: bool = true;
const SHOULD_LOG_ADDED_BLOCK: bool = false;

fn parse_data_for_block<'a>(
signed_headers_receiver: &'a mut Pin<
Expand All @@ -54,10 +53,9 @@ impl DataStreamFactory for HeaderStreamFactory {
let maybe_signed_header_stream_result =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, signed_headers_receiver.next()).await?;
let Some(maybe_signed_header) = maybe_signed_header_stream_result else {
return Err(P2PSyncError::ReceiverChannelTerminated);
return Err(P2PSyncError::ReceiverChannelTerminated { data_type: Self::DATA_TYPE });
};
let Some(signed_block_header) = maybe_signed_header else {
debug!("Header query sent to network finished");
return Ok(None);
};
// TODO(shahak): Check that parent_hash is the same as the previous block's hash
Expand Down
54 changes: 47 additions & 7 deletions crates/papyrus_p2p_sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
mod header;
#[cfg(test)]
mod p2p_sync_test;
mod state_diff;
mod stream_factory;

use std::collections::BTreeMap;
use std::time::Duration;

use futures::channel::mpsc::{SendError, Sender};
use futures::StreamExt;
use papyrus_config::converters::deserialize_seconds_to_duration;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_network::{Query, ResponseReceivers};
use papyrus_network::{DataType, Query, ResponseReceivers};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockNumber, BlockSignature};
use tokio_stream::StreamExt;
use tracing::instrument;

use crate::header::HeaderStreamFactory;
use crate::state_diff::StateDiffStreamFactory;
use crate::stream_factory::DataStreamFactory;

const STEP: usize = 1;
Expand All @@ -28,6 +30,7 @@ const NETWORK_DATA_TIMEOUT: Duration = Duration::from_secs(300);
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub struct P2PSyncConfig {
pub num_headers_per_query: usize,
pub num_block_state_diffs_per_query: usize,
#[serde(deserialize_with = "deserialize_seconds_to_duration")]
pub wait_period_for_new_data: Duration,
}
Expand All @@ -41,6 +44,12 @@ impl SerializeConfig for P2PSyncConfig {
"The maximum amount of headers to ask from peers in each iteration.",
ParamPrivacyInput::Public,
),
ser_param(
"num_block_state_diffs_per_query",
&self.num_block_state_diffs_per_query,
"The maximum amount of block's state diffs to ask from peers in each iteration.",
ParamPrivacyInput::Public,
),
ser_param(
"wait_period_for_new_data",
&self.wait_period_for_new_data.as_secs(),
Expand All @@ -56,6 +65,7 @@ impl Default for P2PSyncConfig {
fn default() -> Self {
P2PSyncConfig {
num_headers_per_query: 10000,
num_block_state_diffs_per_query: 1000,
wait_period_for_new_data: Duration::from_secs(5),
}
}
Expand All @@ -75,11 +85,29 @@ pub enum P2PSyncError {
// Right now we support only one signature. In the future we will support many signatures.
WrongSignaturesLength { signatures: Vec<BlockSignature> },
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error(
"Expected state diff of length {expected_length}. The possible options for state diff \
length got are {possible_lengths:?}."
)]
WrongStateDiffLength { expected_length: usize, possible_lengths: Vec<usize> },
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error("Two state diff parts for the same state diff are conflicting.")]
ConflictingStateDiffParts,
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error(
"Received an empty state diff part from the network (this is a potential DDoS vector)."
)]
EmptyStateDiffPart,
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error("Network returned more responses than expected for a query.")]
TooManyResponses,
// TODO(shahak): Replicate this error for each data type.
#[error("The sender end of the response receivers was closed.")]
ReceiverChannelTerminated,
#[error(
"Encountered an old header in the storage at {block_number:?} that's missing the field \
{missing_field}"
)]
OldHeaderInStorage { block_number: BlockNumber, missing_field: &'static str },
#[error("The sender end of the response receivers for {data_type:?} was closed.")]
ReceiverChannelTerminated { data_type: DataType },
#[error(transparent)]
NetworkTimeout(#[from] tokio::time::error::Elapsed),
#[error(transparent)]
Expand Down Expand Up @@ -109,16 +137,28 @@ impl P2PSync {

#[instrument(skip(self), level = "debug", err)]
pub async fn run(mut self) -> Result<(), P2PSyncError> {
let mut data_stream = HeaderStreamFactory::create_stream(
let header_stream = HeaderStreamFactory::create_stream(
self.response_receivers
.signed_headers_receiver
.expect("p2p sync needs a signed headers receiver"),
self.query_sender.clone(),
self.storage_reader.clone(),
self.config.wait_period_for_new_data,
self.config.num_headers_per_query,
);

let state_diff_stream = StateDiffStreamFactory::create_stream(
self.response_receivers
.state_diffs_receiver
.expect("p2p sync needs a state diffs receiver"),
self.query_sender,
self.storage_reader,
self.config.wait_period_for_new_data,
self.config.num_headers_per_query,
self.config.num_block_state_diffs_per_query,
);

let mut data_stream = header_stream.merge(state_diff_stream);

loop {
let data = data_stream.next().await.expect("Sync data stream should never end")?;
data.write_to_storage(&mut self.storage_writer)?;
Expand Down
Loading

0 comments on commit 2bf0963

Please sign in to comment.