Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sync): sleep when sync receives None and remove timeout hack #1800

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@
"privacy": "Public",
"value": 10000
},
"p2p_sync.query_timeout": {
"description": "Time in seconds to wait for query responses until we mark it as failed",
"p2p_sync.wait_period_for_new_data": {
"description": "Time in seconds to wait when a query returned with partial data before sending a new query",
"privacy": "Public",
"value": 5
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ expression: dumped_default_config
},
"privacy": "Public"
},
"p2p_sync.query_timeout": {
"description": "Time in seconds to wait for query responses until we mark it as failed",
"p2p_sync.wait_period_for_new_data": {
"description": "Time in seconds to wait when a query returned with partial data before sending a new query",
"value": {
"$serde_json::private::Number": "5"
},
Expand Down
65 changes: 37 additions & 28 deletions crates/papyrus_p2p_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockNumber, BlockSignature};
use tokio::time::timeout;
use tracing::{debug, info, instrument};

const STEP: usize = 1;
const ALLOWED_SIGNATURES_LENGTH: usize = 1;

const NETWORK_DATA_TIMEOUT: Duration = Duration::from_secs(300);

#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub struct P2PSyncConfig {
pub num_headers_per_query: usize,
// TODO(shahak): Remove timeout and check if query finished when the network reports it.
#[serde(deserialize_with = "deserialize_seconds_to_duration")]
pub query_timeout: Duration,
pub wait_period_for_new_data: Duration,
}

impl SerializeConfig for P2PSyncConfig {
Expand All @@ -38,9 +38,10 @@ impl SerializeConfig for P2PSyncConfig {
ParamPrivacyInput::Public,
),
ser_param(
"query_timeout",
&self.query_timeout.as_secs(),
"Time in seconds to wait for query responses until we mark it as failed",
"wait_period_for_new_data",
&self.wait_period_for_new_data.as_secs(),
"Time in seconds to wait when a query returned with partial data before sending a \
new query",
ParamPrivacyInput::Public,
),
])
Expand All @@ -49,12 +50,16 @@ impl SerializeConfig for P2PSyncConfig {

impl Default for P2PSyncConfig {
fn default() -> Self {
P2PSyncConfig { num_headers_per_query: 10000, query_timeout: Duration::from_secs(5) }
P2PSyncConfig {
num_headers_per_query: 10000,
wait_period_for_new_data: Duration::from_secs(5),
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum P2PSyncError {
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
// TODO(shahak): Consider removing this error and handling unordered headers without failing.
#[error(
"Blocks returned unordered from the network. Expected header with \
Expand All @@ -63,19 +68,25 @@ pub enum P2PSyncError {
HeadersUnordered { expected_block_number: BlockNumber, actual_block_number: BlockNumber },
#[error("Expected to receive one signature from the network. got {signatures:?} instead.")]
// TODO(shahak): Move this error to network.
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
WrongSignaturesLength { signatures: Vec<BlockSignature> },
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error("Network returned more responses than expected for a query.")]
TooManyResponses,
// TODO(shahak): Replicate this error for each data type.
#[error("The sender end of the response receivers was closed.")]
ReceiverChannelTerminated,
#[error(transparent)]
NetworkTimeout(#[from] tokio::time::error::Elapsed),
#[error(transparent)]
StorageError(#[from] StorageError),
#[error(transparent)]
SendError(#[from] SendError),
}

enum P2PSyncControl {
ContinueDownloading,
PeerFinished,
QueryFinishedPartially,
}

pub struct P2PSync {
Expand All @@ -101,10 +112,15 @@ impl P2PSync {
pub async fn run(mut self) -> Result<(), P2PSyncError> {
let mut current_block_number = self.storage_reader.begin_ro_txn()?.get_header_marker()?;
// TODO: make control more substantial once we have more peers and peer management.
#[allow(unused_variables)]
let mut control = P2PSyncControl::ContinueDownloading;
#[allow(unused_assignments)]
loop {
if matches!(control, P2PSyncControl::QueryFinishedPartially) {
debug!(
"Query returned with partial data. Waiting {:?} before sending another query.",
self.config.wait_period_for_new_data
);
tokio::time::sleep(self.config.wait_period_for_new_data).await;
}
let end_block_number = current_block_number.0
+ u64::try_from(self.config.num_headers_per_query)
.expect("Failed converting usize to u64");
Expand All @@ -129,29 +145,17 @@ impl P2PSync {
end_block_number: u64,
) -> Result<P2PSyncControl, P2PSyncError> {
while current_block_number.0 < end_block_number {
// Adding timeout because the network currently doesn't report when a query
// finished because the peers don't know about these blocks. If not all expected
// responses returned we will retry the query from the last received block.
// TODO(shahak): Once network reports finished queries, remove this timeout and add
// a sleep when a query finished with partial responses.
let Ok(maybe_signed_header_stream_result) = timeout(
self.config.query_timeout,
let maybe_signed_header_stream_result = tokio::time::timeout(
NETWORK_DATA_TIMEOUT,
self.response_receivers.signed_headers_receiver.next(),
)
.await
else {
debug!(
"Other peer returned headers until {:?} when we requested until {:?}",
current_block_number, end_block_number
);
return Ok(P2PSyncControl::ContinueDownloading);
};
.await?;
let Some(maybe_signed_header) = maybe_signed_header_stream_result else {
return Err(P2PSyncError::ReceiverChannelTerminated);
};
let Some(SignedBlockHeader { block_header, signatures }) = maybe_signed_header else {
debug!("Handle empty signed headers -> mark that peer sent Fin");
return Ok(P2PSyncControl::PeerFinished);
debug!("Header query sent to network finished");
return Ok(P2PSyncControl::QueryFinishedPartially);
};
// TODO(shahak): Check that parent_hash is the same as the previous block's hash
// and handle reverts.
Expand All @@ -178,6 +182,11 @@ impl P2PSync {
info!("Added block {}.", current_block_number);
*current_block_number = current_block_number.next();
}
Ok(P2PSyncControl::ContinueDownloading)
// Consume the None message signaling the end of the query.
match self.response_receivers.signed_headers_receiver.next().await {
Some(None) => Ok(P2PSyncControl::ContinueDownloading),
Some(Some(_)) => Err(P2PSyncError::TooManyResponses),
None => Err(P2PSyncError::ReceiverChannelTerminated),
}
}
}
38 changes: 17 additions & 21 deletions crates/papyrus_p2p_sync/src/p2p_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ use super::{P2PSync, P2PSyncConfig};
const BUFFER_SIZE: usize = 1000;
const QUERY_LENGTH: usize = 5;
const DURATION_BEFORE_CHECKING_STORAGE: Duration = Duration::from_millis(10);
const QUERY_TIMEOUT: Duration = Duration::from_millis(50);
const TIMEOUT_AFTER_QUERY_TIMEOUTED_IN_SYNC: Duration = QUERY_TIMEOUT.saturating_mul(5);
const WAIT_PERIOD_FOR_NEW_DATA: Duration = Duration::from_millis(50);
const TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE: Duration =
WAIT_PERIOD_FOR_NEW_DATA.saturating_mul(5);

lazy_static! {
static ref TEST_CONFIG: P2PSyncConfig =
P2PSyncConfig { num_headers_per_query: QUERY_LENGTH, query_timeout: QUERY_TIMEOUT };
static ref TEST_CONFIG: P2PSyncConfig = P2PSyncConfig {
num_headers_per_query: QUERY_LENGTH,
wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA
};
}

fn setup() -> (P2PSync, StorageReader, Receiver<Query>, Sender<Option<SignedBlockHeader>>) {
Expand Down Expand Up @@ -83,13 +86,13 @@ async fn signed_headers_basic_flow() {
}
);

// Send responses
for (i, (block_hash, block_signature)) in block_hashes_and_signatures
.iter()
.enumerate()
.take(end_block_number)
.skip(start_block_number)
{
// Send responses
signed_headers_sender
.send(Some(SignedBlockHeader {
block_header: BlockHeader {
Expand All @@ -101,31 +104,23 @@ async fn signed_headers_basic_flow() {
}))
.await
.unwrap();
}

tokio::time::sleep(DURATION_BEFORE_CHECKING_STORAGE).await;

// Check responses were written to the storage.
let txn = storage_reader.begin_ro_txn().unwrap();
assert_eq!(
u64::try_from(end_block_number).unwrap(),
txn.get_header_marker().unwrap().0
);
tokio::time::sleep(DURATION_BEFORE_CHECKING_STORAGE).await;

for (i, (block_hash, block_signature)) in block_hashes_and_signatures
.iter()
.enumerate()
.take(end_block_number)
.skip(start_block_number)
{
// Check responses were written to the storage. This way we make sure that the sync
// writes to the storage each response it receives before all query responses were
// sent.
let block_number = BlockNumber(i.try_into().unwrap());
let txn = storage_reader.begin_ro_txn().unwrap();
assert_eq!(block_number.next(), txn.get_header_marker().unwrap());
let block_header = txn.get_block_header(block_number).unwrap().unwrap();
assert_eq!(block_number, block_header.block_number);
assert_eq!(*block_hash, block_header.block_hash);
let actual_block_signature =
txn.get_block_signature(block_number).unwrap().unwrap();
assert_eq!(*block_signature, actual_block_signature);
}
signed_headers_sender.send(None).await.unwrap();
}
};

Expand Down Expand Up @@ -163,9 +158,10 @@ async fn sync_sends_new_query_if_it_got_partial_responses() {
.await
.unwrap();
}
signed_headers_sender.send(None).await.unwrap();

// First unwrap is for the timeout. Second unwrap is for the Option returned from Stream.
let query = timeout(TIMEOUT_AFTER_QUERY_TIMEOUTED_IN_SYNC, query_receiver.next())
let query = timeout(TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE, query_receiver.next())
.await
.unwrap()
.unwrap();
Expand Down
Loading