From 436be8fe8af4d544ad13341b5e67030ee2deedd4 Mon Sep 17 00:00:00 2001 From: Shahak Shama Date: Mon, 11 Mar 2024 09:31:10 +0200 Subject: [PATCH] fix(sync): sleep when sync receives None and remove timeout hack --- config/default_config.json | 4 +- ...fig__config_test__dump_default_config.snap | 4 +- crates/papyrus_p2p_sync/src/lib.rs | 59 +++++++++---------- crates/papyrus_p2p_sync/src/p2p_sync_test.rs | 38 ++++++------ 4 files changed, 50 insertions(+), 55 deletions(-) diff --git a/config/default_config.json b/config/default_config.json index e973d863c7..7e478e1f59 100644 --- a/config/default_config.json +++ b/config/default_config.json @@ -154,8 +154,8 @@ "privacy": "Public", "value": 10000 }, - "p2p_sync.query_timeout": { - "description": "Time in seconds to wait for query responses until we mark it as failed", + "p2p_sync.wait_period_for_new_data": { + "description": "Time in seconds to wait when a query returned with partial data before sending a new query", "privacy": "Public", "value": 5 }, diff --git a/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap b/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap index cdcd99dea7..7c5258c816 100644 --- a/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap +++ b/crates/papyrus_node/src/config/snapshots/papyrus_node__config__config_test__dump_default_config.snap @@ -178,8 +178,8 @@ expression: dumped_default_config }, "privacy": "Public" }, - "p2p_sync.query_timeout": { - "description": "Time in seconds to wait for query responses until we mark it as failed", + "p2p_sync.wait_period_for_new_data": { + "description": "Time in seconds to wait when a query returned with partial data before sending a new query", "value": { "$serde_json::private::Number": "5" }, diff --git a/crates/papyrus_p2p_sync/src/lib.rs b/crates/papyrus_p2p_sync/src/lib.rs index 820c038765..b8515be8fd 100644 --- a/crates/papyrus_p2p_sync/src/lib.rs +++ b/crates/papyrus_p2p_sync/src/lib.rs @@ -14,7 +14,6 @@ use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter}; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use serde::{Deserialize, Serialize}; use starknet_api::block::{BlockNumber, BlockSignature}; -use tokio::time::timeout; use tracing::{debug, info, instrument}; const STEP: usize = 1; @@ -23,9 +22,8 @@ const ALLOWED_SIGNATURES_LENGTH: usize = 1; #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] pub struct P2PSyncConfig { pub num_headers_per_query: usize, - // TODO(shahak): Remove timeout and check if query finished when the network reports it. #[serde(deserialize_with = "deserialize_seconds_to_duration")] - pub query_timeout: Duration, + pub wait_period_for_new_data: Duration, } impl SerializeConfig for P2PSyncConfig { @@ -38,9 +36,10 @@ impl SerializeConfig for P2PSyncConfig { ParamPrivacyInput::Public, ), ser_param( - "query_timeout", - &self.query_timeout.as_secs(), - "Time in seconds to wait for query responses until we mark it as failed", + "wait_period_for_new_data", + &self.wait_period_for_new_data.as_secs(), + "Time in seconds to wait when a query returned with partial data before sending a \ + new query", ParamPrivacyInput::Public, ), ]) @@ -49,7 +48,10 @@ impl SerializeConfig for P2PSyncConfig { impl Default for P2PSyncConfig { fn default() -> Self { - P2PSyncConfig { num_headers_per_query: 10000, query_timeout: Duration::from_secs(5) } + P2PSyncConfig { + num_headers_per_query: 10000, + wait_period_for_new_data: Duration::from_secs(5), + } } } @@ -64,6 +66,8 @@ pub enum P2PSyncError { #[error("Expected to receive one signature from the network. got {signatures:?} instead.")] // TODO(shahak): Move this error to network. WrongSignaturesLength { signatures: Vec }, + #[error("Network returned more responses than expected for a query.")] + TooManyResponses, // TODO(shahak): Replicate this error for each data type. #[error("The sender end of the response receivers was closed.")] ReceiverChannelTerminated, @@ -75,7 +79,7 @@ pub enum P2PSyncError { enum P2PSyncControl { ContinueDownloading, - PeerFinished, + QueryFinishedPartially, } pub struct P2PSync { @@ -101,10 +105,15 @@ impl P2PSync { pub async fn run(mut self) -> Result<(), P2PSyncError> { let mut current_block_number = self.storage_reader.begin_ro_txn()?.get_header_marker()?; // TODO: make control more substantial once we have more peers and peer management. - #[allow(unused_variables)] let mut control = P2PSyncControl::ContinueDownloading; - #[allow(unused_assignments)] loop { + if matches!(control, P2PSyncControl::QueryFinishedPartially) { + debug!( + "Query returned with partial data. Waiting {:?} before sending another query.", + self.config.wait_period_for_new_data + ); + tokio::time::sleep(self.config.wait_period_for_new_data).await; + } let end_block_number = current_block_number.0 + u64::try_from(self.config.num_headers_per_query) .expect("Failed converting usize to u64"); @@ -129,29 +138,14 @@ impl P2PSync { end_block_number: u64, ) -> Result { while current_block_number.0 < end_block_number { - // Adding timeout because the network currently doesn't report when a query - // finished because the peers don't know about these blocks. If not all expected - // responses returned we will retry the query from the last received block. - // TODO(shahak): Once network reports finished queries, remove this timeout and add - // a sleep when a query finished with partial responses. - let Ok(maybe_signed_header_stream_result) = timeout( - self.config.query_timeout, - self.response_receivers.signed_headers_receiver.next(), - ) - .await - else { - debug!( - "Other peer returned headers until {:?} when we requested until {:?}", - current_block_number, end_block_number - ); - return Ok(P2PSyncControl::ContinueDownloading); - }; + let maybe_signed_header_stream_result = + self.response_receivers.signed_headers_receiver.next().await; let Some(maybe_signed_header) = maybe_signed_header_stream_result else { return Err(P2PSyncError::ReceiverChannelTerminated); }; let Some(SignedBlockHeader { block_header, signatures }) = maybe_signed_header else { - debug!("Handle empty signed headers -> mark that peer sent Fin"); - return Ok(P2PSyncControl::PeerFinished); + debug!("Header query sent to network finished"); + return Ok(P2PSyncControl::QueryFinishedPartially); }; // TODO(shahak): Check that parent_hash is the same as the previous block's hash // and handle reverts. @@ -178,6 +172,11 @@ impl P2PSync { info!("Added block {}.", current_block_number); *current_block_number = current_block_number.next(); } - Ok(P2PSyncControl::ContinueDownloading) + // Consume the None message signaling the end of the query. + match self.response_receivers.signed_headers_receiver.next().await { + Some(None) => Ok(P2PSyncControl::ContinueDownloading), + Some(Some(_)) => Err(P2PSyncError::TooManyResponses), + None => Err(P2PSyncError::ReceiverChannelTerminated), + } } } diff --git a/crates/papyrus_p2p_sync/src/p2p_sync_test.rs b/crates/papyrus_p2p_sync/src/p2p_sync_test.rs index 0d7169d995..7ec444a8df 100644 --- a/crates/papyrus_p2p_sync/src/p2p_sync_test.rs +++ b/crates/papyrus_p2p_sync/src/p2p_sync_test.rs @@ -17,12 +17,15 @@ use super::{P2PSync, P2PSyncConfig}; const BUFFER_SIZE: usize = 1000; const QUERY_LENGTH: usize = 5; const DURATION_BEFORE_CHECKING_STORAGE: Duration = Duration::from_millis(10); -const QUERY_TIMEOUT: Duration = Duration::from_millis(50); -const TIMEOUT_AFTER_QUERY_TIMEOUTED_IN_SYNC: Duration = QUERY_TIMEOUT.saturating_mul(5); +const WAIT_PERIOD_FOR_NEW_DATA: Duration = Duration::from_millis(50); +const TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE: Duration = + WAIT_PERIOD_FOR_NEW_DATA.saturating_mul(5); lazy_static! { - static ref TEST_CONFIG: P2PSyncConfig = - P2PSyncConfig { num_headers_per_query: QUERY_LENGTH, query_timeout: QUERY_TIMEOUT }; + static ref TEST_CONFIG: P2PSyncConfig = P2PSyncConfig { + num_headers_per_query: QUERY_LENGTH, + wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA + }; } fn setup() -> (P2PSync, StorageReader, Receiver, Sender>) { @@ -83,13 +86,13 @@ async fn signed_headers_basic_flow() { } ); - // Send responses for (i, (block_hash, block_signature)) in block_hashes_and_signatures .iter() .enumerate() .take(end_block_number) .skip(start_block_number) { + // Send responses signed_headers_sender .send(Some(SignedBlockHeader { block_header: BlockHeader { @@ -101,24 +104,15 @@ async fn signed_headers_basic_flow() { })) .await .unwrap(); - } - - tokio::time::sleep(DURATION_BEFORE_CHECKING_STORAGE).await; - // Check responses were written to the storage. - let txn = storage_reader.begin_ro_txn().unwrap(); - assert_eq!( - u64::try_from(end_block_number).unwrap(), - txn.get_header_marker().unwrap().0 - ); + tokio::time::sleep(DURATION_BEFORE_CHECKING_STORAGE).await; - for (i, (block_hash, block_signature)) in block_hashes_and_signatures - .iter() - .enumerate() - .take(end_block_number) - .skip(start_block_number) - { + // Check responses were written to the storage. This way we make sure that the sync + // writes to the storage each response it receives before all query responses were + // sent. let block_number = BlockNumber(i.try_into().unwrap()); + let txn = storage_reader.begin_ro_txn().unwrap(); + assert_eq!(block_number.next(), txn.get_header_marker().unwrap()); let block_header = txn.get_block_header(block_number).unwrap().unwrap(); assert_eq!(block_number, block_header.block_number); assert_eq!(*block_hash, block_header.block_hash); @@ -126,6 +120,7 @@ async fn signed_headers_basic_flow() { txn.get_block_signature(block_number).unwrap().unwrap(); assert_eq!(*block_signature, actual_block_signature); } + signed_headers_sender.send(None).await.unwrap(); } }; @@ -163,9 +158,10 @@ async fn sync_sends_new_query_if_it_got_partial_responses() { .await .unwrap(); } + signed_headers_sender.send(None).await.unwrap(); // First unwrap is for the timeout. Second unwrap is for the Option returned from Stream. - let query = timeout(TIMEOUT_AFTER_QUERY_TIMEOUTED_IN_SYNC, query_receiver.next()) + let query = timeout(TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE, query_receiver.next()) .await .unwrap() .unwrap();