Skip to content

Commit

Permalink
refactor(sync): make changes to allow easily adding more types to sync
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Mar 12, 2024
1 parent bade592 commit 8507d96
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 86 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/papyrus_p2p_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ repository.workspace = true
license-file.workspace = true

[dependencies]
async-stream.workspace = true
futures.workspace = true
papyrus_config = { path = "../papyrus_config", version = "0.3.0" }
papyrus_network = { path = "../papyrus_network", version = "0.3.0" }
Expand Down
81 changes: 81 additions & 0 deletions crates/papyrus_p2p_sync/src/header.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::pin::Pin;

use futures::future::BoxFuture;
use futures::{FutureExt, Stream, StreamExt};
use papyrus_network::{DataType, SignedBlockHeader};
use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use tracing::debug;

use crate::stream_factory::{BlockData, BlockNumberLimit, DataStreamFactory};
use crate::{P2PSyncError, ALLOWED_SIGNATURES_LENGTH, NETWORK_DATA_TIMEOUT};

impl BlockData for SignedBlockHeader {
fn write_to_storage(&self, storage_writer: &mut StorageWriter) -> Result<(), StorageError> {
storage_writer
.begin_rw_txn()?
.append_header(self.block_header.block_number, &self.block_header)?
.append_block_signature(
self.block_header.block_number,
self
.signatures
// In the future we will support multiple signatures.
.first()
// The verification that the size of the vector is 1 is done in the data
// verification.
.expect("Vec::first should return a value on a vector of size 1"),
)?
.commit()
}
}

pub(crate) struct HeaderStreamFactory;

impl DataStreamFactory for HeaderStreamFactory {
type InputFromNetwork = SignedBlockHeader;
type Output = SignedBlockHeader;

const DATA_TYPE: DataType = DataType::SignedBlockHeader;
const BLOCK_NUMBER_LIMIT: BlockNumberLimit = BlockNumberLimit::Unlimited;
const SHOULD_LOG_ADDED_BLOCK: bool = true;

fn parse_data_for_block<'a>(
signed_headers_receiver: &'a mut Pin<
Box<dyn Stream<Item = Option<Self::InputFromNetwork>> + Send>,
>,
block_number: BlockNumber,
_storage_reader: &'a StorageReader,
) -> BoxFuture<'a, Result<Option<Self::Output>, P2PSyncError>> {
async move {
let maybe_signed_header_stream_result =
tokio::time::timeout(NETWORK_DATA_TIMEOUT, signed_headers_receiver.next()).await?;
let Some(maybe_signed_header) = maybe_signed_header_stream_result else {
return Err(P2PSyncError::ReceiverChannelTerminated);
};
let Some(signed_block_header) = maybe_signed_header else {
debug!("Header query sent to network finished");
return Ok(None);
};
// TODO(shahak): Check that parent_hash is the same as the previous block's hash
// and handle reverts.
if block_number != signed_block_header.block_header.block_number {
return Err(P2PSyncError::HeadersUnordered {
expected_block_number: block_number,
actual_block_number: signed_block_header.block_header.block_number,
});
}
if signed_block_header.signatures.len() != ALLOWED_SIGNATURES_LENGTH {
return Err(P2PSyncError::WrongSignaturesLength {
signatures: signed_block_header.signatures,
});
}
Ok(Some(signed_block_header))
}
.boxed()
}

fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError> {
storage_reader.begin_ro_txn()?.get_header_marker()
}
}
105 changes: 19 additions & 86 deletions crates/papyrus_p2p_sync/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
mod header;
#[cfg(test)]
mod p2p_sync_test;
mod stream_factory;

use std::collections::BTreeMap;
use std::time::Duration;

use futures::channel::mpsc::{SendError, Sender};
use futures::{SinkExt, StreamExt};
use futures::StreamExt;
use papyrus_config::converters::deserialize_seconds_to_duration;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_network::{DataType, Direction, Query, ResponseReceivers, SignedBlockHeader};
use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter};
use papyrus_network::{Query, ResponseReceivers};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockNumber, BlockSignature};
use tracing::{debug, info, instrument};
use tracing::instrument;

use crate::header::HeaderStreamFactory;
use crate::stream_factory::DataStreamFactory;

const STEP: usize = 1;
const ALLOWED_SIGNATURES_LENGTH: usize = 1;
Expand Down Expand Up @@ -67,8 +71,8 @@ pub enum P2PSyncError {
)]
HeadersUnordered { expected_block_number: BlockNumber, actual_block_number: BlockNumber },
#[error("Expected to receive one signature from the network. got {signatures:?} instead.")]
// TODO(shahak): Move this error to network.
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
// Right now we support only one signature. In the future we will support many signatures.
WrongSignaturesLength { signatures: Vec<BlockSignature> },
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error("Network returned more responses than expected for a query.")]
Expand All @@ -84,11 +88,6 @@ pub enum P2PSyncError {
SendError(#[from] SendError),
}

enum P2PSyncControl {
ContinueDownloading,
QueryFinishedPartially,
}

pub struct P2PSync {
config: P2PSyncConfig,
storage_reader: StorageReader,
Expand All @@ -110,83 +109,17 @@ impl P2PSync {

#[instrument(skip(self), level = "debug", err)]
pub async fn run(mut self) -> Result<(), P2PSyncError> {
let mut current_block_number = self.storage_reader.begin_ro_txn()?.get_header_marker()?;
// TODO: make control more substantial once we have more peers and peer management.
let mut control = P2PSyncControl::ContinueDownloading;
loop {
if matches!(control, P2PSyncControl::QueryFinishedPartially) {
debug!(
"Query returned with partial data. Waiting {:?} before sending another query.",
self.config.wait_period_for_new_data
);
tokio::time::sleep(self.config.wait_period_for_new_data).await;
}
let end_block_number = current_block_number.0
+ u64::try_from(self.config.num_headers_per_query)
.expect("Failed converting usize to u64");
debug!("Downloading blocks [{}, {})", current_block_number.0, end_block_number);
self.query_sender
.send(Query {
start_block: current_block_number,
direction: Direction::Forward,
limit: self.config.num_headers_per_query,
step: STEP,
data_type: DataType::SignedBlockHeader,
})
.await?;
control = self.parse_headers(&mut current_block_number, end_block_number).await?;
}
}
let mut data_stream = HeaderStreamFactory::create_stream(
self.response_receivers.signed_headers_receiver,
self.query_sender,
self.storage_reader,
self.config.wait_period_for_new_data,
self.config.num_headers_per_query,
);

#[instrument(skip(self), level = "debug", err)]
async fn parse_headers(
&mut self,
current_block_number: &mut BlockNumber,
end_block_number: u64,
) -> Result<P2PSyncControl, P2PSyncError> {
while current_block_number.0 < end_block_number {
let maybe_signed_header_stream_result = tokio::time::timeout(
NETWORK_DATA_TIMEOUT,
self.response_receivers.signed_headers_receiver.next(),
)
.await?;
let Some(maybe_signed_header) = maybe_signed_header_stream_result else {
return Err(P2PSyncError::ReceiverChannelTerminated);
};
let Some(SignedBlockHeader { block_header, signatures }) = maybe_signed_header else {
debug!("Header query sent to network finished");
return Ok(P2PSyncControl::QueryFinishedPartially);
};
// TODO(shahak): Check that parent_hash is the same as the previous block's hash
// and handle reverts.
if *current_block_number != block_header.block_number {
return Err(P2PSyncError::HeadersUnordered {
expected_block_number: *current_block_number,
actual_block_number: block_header.block_number,
});
}
if signatures.len() != ALLOWED_SIGNATURES_LENGTH {
return Err(P2PSyncError::WrongSignaturesLength { signatures });
}
self.storage_writer
.begin_rw_txn()?
.append_header(*current_block_number, &block_header)?
.append_block_signature(
*current_block_number,
signatures.first().expect(
"Calling first on a vector of size {ALLOWED_SIGNATURES_LENGTH} returned \
None",
),
)?
.commit()?;
info!("Added block {}.", current_block_number);
*current_block_number = current_block_number.next();
}
// Consume the None message signaling the end of the query.
match self.response_receivers.signed_headers_receiver.next().await {
Some(None) => Ok(P2PSyncControl::ContinueDownloading),
Some(Some(_)) => Err(P2PSyncError::TooManyResponses),
None => Err(P2PSyncError::ReceiverChannelTerminated),
loop {
let data = data_stream.next().await.expect("Sync data stream should never end")?;
data.write_to_storage(&mut self.storage_writer)?;
}
}
}
99 changes: 99 additions & 0 deletions crates/papyrus_p2p_sync/src/stream_factory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::pin::Pin;
use std::time::Duration;

use async_stream::stream;
use futures::channel::mpsc::Sender;
use futures::future::BoxFuture;
use futures::stream::BoxStream;
use futures::{SinkExt, Stream, StreamExt};
use papyrus_network::{DataType, Direction, Query};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use tracing::{debug, info};

use crate::{P2PSyncError, STEP};

pub(crate) trait BlockData: Send {
fn write_to_storage(&self, storage_writer: &mut StorageWriter) -> Result<(), StorageError>;
}

pub(crate) enum BlockNumberLimit {
Unlimited,
// TODO(shahak): Add variant for header marker once we support state diff sync.
// TODO(shahak): Add variant for state diff marker once we support classes sync.
}

pub(crate) trait DataStreamFactory {
type InputFromNetwork: Send + 'static;
type Output: BlockData + 'static;

const DATA_TYPE: DataType;
const BLOCK_NUMBER_LIMIT: BlockNumberLimit;
const SHOULD_LOG_ADDED_BLOCK: bool;

// Async functions in trait don't work well with argument references
fn parse_data_for_block<'a>(
data_receiver: &'a mut Pin<Box<dyn Stream<Item = Option<Self::InputFromNetwork>> + Send>>,
block_number: BlockNumber,
storage_reader: &'a StorageReader,
) -> BoxFuture<'a, Result<Option<Self::Output>, P2PSyncError>>;

fn get_start_block_number(storage_reader: &StorageReader) -> Result<BlockNumber, StorageError>;

fn create_stream(
mut data_receiver: Pin<Box<dyn Stream<Item = Option<Self::InputFromNetwork>> + Send>>,
mut query_sender: Sender<Query>,
storage_reader: StorageReader,
wait_period_for_new_data: Duration,
num_blocks_per_query: usize,
) -> BoxStream<'static, Result<Box<dyn BlockData>, P2PSyncError>> {
stream! {
let mut current_block_number = Self::get_start_block_number(&storage_reader)?;
'send_query_and_parse_responses: loop {
let end_block_number = current_block_number.0
+ u64::try_from(num_blocks_per_query)
.expect("Failed converting usize to u64");
debug!("Downloading {:?} for blocks [{}, {})", Self::DATA_TYPE, current_block_number.0, end_block_number);
query_sender
.send(Query {
start_block: current_block_number,
direction: Direction::Forward,
limit: num_blocks_per_query,
step: STEP,
data_type: Self::DATA_TYPE,
})
.await?;

while current_block_number.0 < end_block_number {
match Self::parse_data_for_block(
&mut data_receiver, current_block_number, &storage_reader
).await? {
Some(output) => yield Ok(Box::<dyn BlockData>::from(Box::new(output))),
None => {
debug!(
"Query for {:?} returned with partial data. Waiting {:?} before \
sending another query.",
Self::DATA_TYPE,
wait_period_for_new_data
);
tokio::time::sleep(wait_period_for_new_data).await;
continue 'send_query_and_parse_responses;
}
}
if Self::SHOULD_LOG_ADDED_BLOCK {
info!("Added block {}.", current_block_number);
}
current_block_number = current_block_number.next();
}

// Consume the None message signaling the end of the query.
match data_receiver.next().await {
Some(None) => {},
Some(Some(_)) => Err(P2PSyncError::TooManyResponses)?,
None => Err(P2PSyncError::ReceiverChannelTerminated)?,
}
}
}
.boxed()
}
}

0 comments on commit 8507d96

Please sign in to comment.