Skip to content

Commit

Permalink
fix(sync): sleep when sync receives None and remove timeout hack
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Mar 11, 2024
1 parent 01b38b7 commit 436be8f
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 55 deletions.
4 changes: 2 additions & 2 deletions config/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@
"privacy": "Public",
"value": 10000
},
"p2p_sync.query_timeout": {
"description": "Time in seconds to wait for query responses until we mark it as failed",
"p2p_sync.wait_period_for_new_data": {
"description": "Time in seconds to wait when a query returned with partial data before sending a new query",
"privacy": "Public",
"value": 5
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ expression: dumped_default_config
},
"privacy": "Public"
},
"p2p_sync.query_timeout": {
"description": "Time in seconds to wait for query responses until we mark it as failed",
"p2p_sync.wait_period_for_new_data": {
"description": "Time in seconds to wait when a query returned with partial data before sending a new query",
"value": {
"$serde_json::private::Number": "5"
},
Expand Down
59 changes: 29 additions & 30 deletions crates/papyrus_p2p_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockNumber, BlockSignature};
use tokio::time::timeout;
use tracing::{debug, info, instrument};

const STEP: usize = 1;
Expand All @@ -23,9 +22,8 @@ const ALLOWED_SIGNATURES_LENGTH: usize = 1;
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub struct P2PSyncConfig {
pub num_headers_per_query: usize,
// TODO(shahak): Remove timeout and check if query finished when the network reports it.
#[serde(deserialize_with = "deserialize_seconds_to_duration")]
pub query_timeout: Duration,
pub wait_period_for_new_data: Duration,
}

impl SerializeConfig for P2PSyncConfig {
Expand All @@ -38,9 +36,10 @@ impl SerializeConfig for P2PSyncConfig {
ParamPrivacyInput::Public,
),
ser_param(
"query_timeout",
&self.query_timeout.as_secs(),
"Time in seconds to wait for query responses until we mark it as failed",
"wait_period_for_new_data",
&self.wait_period_for_new_data.as_secs(),
"Time in seconds to wait when a query returned with partial data before sending a \
new query",
ParamPrivacyInput::Public,
),
])
Expand All @@ -49,7 +48,10 @@ impl SerializeConfig for P2PSyncConfig {

impl Default for P2PSyncConfig {
fn default() -> Self {
P2PSyncConfig { num_headers_per_query: 10000, query_timeout: Duration::from_secs(5) }
P2PSyncConfig {
num_headers_per_query: 10000,
wait_period_for_new_data: Duration::from_secs(5),
}
}
}

Expand All @@ -64,6 +66,8 @@ pub enum P2PSyncError {
#[error("Expected to receive one signature from the network. got {signatures:?} instead.")]
// TODO(shahak): Move this error to network.
WrongSignaturesLength { signatures: Vec<BlockSignature> },
#[error("Network returned more responses than expected for a query.")]
TooManyResponses,
// TODO(shahak): Replicate this error for each data type.
#[error("The sender end of the response receivers was closed.")]
ReceiverChannelTerminated,
Expand All @@ -75,7 +79,7 @@ pub enum P2PSyncError {

enum P2PSyncControl {
ContinueDownloading,
PeerFinished,
QueryFinishedPartially,
}

pub struct P2PSync {
Expand All @@ -101,10 +105,15 @@ impl P2PSync {
pub async fn run(mut self) -> Result<(), P2PSyncError> {
let mut current_block_number = self.storage_reader.begin_ro_txn()?.get_header_marker()?;
// TODO: make control more substantial once we have more peers and peer management.
#[allow(unused_variables)]
let mut control = P2PSyncControl::ContinueDownloading;
#[allow(unused_assignments)]
loop {
if matches!(control, P2PSyncControl::QueryFinishedPartially) {
debug!(
"Query returned with partial data. Waiting {:?} before sending another query.",
self.config.wait_period_for_new_data
);
tokio::time::sleep(self.config.wait_period_for_new_data).await;
}
let end_block_number = current_block_number.0
+ u64::try_from(self.config.num_headers_per_query)
.expect("Failed converting usize to u64");
Expand All @@ -129,29 +138,14 @@ impl P2PSync {
end_block_number: u64,
) -> Result<P2PSyncControl, P2PSyncError> {
while current_block_number.0 < end_block_number {
// Adding timeout because the network currently doesn't report when a query
// finished because the peers don't know about these blocks. If not all expected
// responses returned we will retry the query from the last received block.
// TODO(shahak): Once network reports finished queries, remove this timeout and add
// a sleep when a query finished with partial responses.
let Ok(maybe_signed_header_stream_result) = timeout(
self.config.query_timeout,
self.response_receivers.signed_headers_receiver.next(),
)
.await
else {
debug!(
"Other peer returned headers until {:?} when we requested until {:?}",
current_block_number, end_block_number
);
return Ok(P2PSyncControl::ContinueDownloading);
};
let maybe_signed_header_stream_result =
self.response_receivers.signed_headers_receiver.next().await;
let Some(maybe_signed_header) = maybe_signed_header_stream_result else {
return Err(P2PSyncError::ReceiverChannelTerminated);
};
let Some(SignedBlockHeader { block_header, signatures }) = maybe_signed_header else {
debug!("Handle empty signed headers -> mark that peer sent Fin");
return Ok(P2PSyncControl::PeerFinished);
debug!("Header query sent to network finished");
return Ok(P2PSyncControl::QueryFinishedPartially);
};
// TODO(shahak): Check that parent_hash is the same as the previous block's hash
// and handle reverts.
Expand All @@ -178,6 +172,11 @@ impl P2PSync {
info!("Added block {}.", current_block_number);
*current_block_number = current_block_number.next();
}
Ok(P2PSyncControl::ContinueDownloading)
// Consume the None message signaling the end of the query.
match self.response_receivers.signed_headers_receiver.next().await {
Some(None) => Ok(P2PSyncControl::ContinueDownloading),
Some(Some(_)) => Err(P2PSyncError::TooManyResponses),
None => Err(P2PSyncError::ReceiverChannelTerminated),
}
}
}
38 changes: 17 additions & 21 deletions crates/papyrus_p2p_sync/src/p2p_sync_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ use super::{P2PSync, P2PSyncConfig};
const BUFFER_SIZE: usize = 1000;
const QUERY_LENGTH: usize = 5;
const DURATION_BEFORE_CHECKING_STORAGE: Duration = Duration::from_millis(10);
const QUERY_TIMEOUT: Duration = Duration::from_millis(50);
const TIMEOUT_AFTER_QUERY_TIMEOUTED_IN_SYNC: Duration = QUERY_TIMEOUT.saturating_mul(5);
const WAIT_PERIOD_FOR_NEW_DATA: Duration = Duration::from_millis(50);
const TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE: Duration =
WAIT_PERIOD_FOR_NEW_DATA.saturating_mul(5);

lazy_static! {
static ref TEST_CONFIG: P2PSyncConfig =
P2PSyncConfig { num_headers_per_query: QUERY_LENGTH, query_timeout: QUERY_TIMEOUT };
static ref TEST_CONFIG: P2PSyncConfig = P2PSyncConfig {
num_headers_per_query: QUERY_LENGTH,
wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA
};
}

fn setup() -> (P2PSync, StorageReader, Receiver<Query>, Sender<Option<SignedBlockHeader>>) {
Expand Down Expand Up @@ -83,13 +86,13 @@ async fn signed_headers_basic_flow() {
}
);

// Send responses
for (i, (block_hash, block_signature)) in block_hashes_and_signatures
.iter()
.enumerate()
.take(end_block_number)
.skip(start_block_number)
{
// Send responses
signed_headers_sender
.send(Some(SignedBlockHeader {
block_header: BlockHeader {
Expand All @@ -101,31 +104,23 @@ async fn signed_headers_basic_flow() {
}))
.await
.unwrap();
}

tokio::time::sleep(DURATION_BEFORE_CHECKING_STORAGE).await;

// Check responses were written to the storage.
let txn = storage_reader.begin_ro_txn().unwrap();
assert_eq!(
u64::try_from(end_block_number).unwrap(),
txn.get_header_marker().unwrap().0
);
tokio::time::sleep(DURATION_BEFORE_CHECKING_STORAGE).await;

for (i, (block_hash, block_signature)) in block_hashes_and_signatures
.iter()
.enumerate()
.take(end_block_number)
.skip(start_block_number)
{
// Check responses were written to the storage. This way we make sure that the sync
// writes to the storage each response it receives before all query responses were
// sent.
let block_number = BlockNumber(i.try_into().unwrap());
let txn = storage_reader.begin_ro_txn().unwrap();
assert_eq!(block_number.next(), txn.get_header_marker().unwrap());
let block_header = txn.get_block_header(block_number).unwrap().unwrap();
assert_eq!(block_number, block_header.block_number);
assert_eq!(*block_hash, block_header.block_hash);
let actual_block_signature =
txn.get_block_signature(block_number).unwrap().unwrap();
assert_eq!(*block_signature, actual_block_signature);
}
signed_headers_sender.send(None).await.unwrap();
}
};

Expand Down Expand Up @@ -163,9 +158,10 @@ async fn sync_sends_new_query_if_it_got_partial_responses() {
.await
.unwrap();
}
signed_headers_sender.send(None).await.unwrap();

// First unwrap is for the timeout. Second unwrap is for the Option returned from Stream.
let query = timeout(TIMEOUT_AFTER_QUERY_TIMEOUTED_IN_SYNC, query_receiver.next())
let query = timeout(TIMEOUT_FOR_NEW_QUERY_AFTER_PARTIAL_RESPONSE, query_receiver.next())
.await
.unwrap()
.unwrap();
Expand Down

0 comments on commit 436be8f

Please sign in to comment.