Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
fix(network): simplify handle_recieved_data
Browse files Browse the repository at this point in the history
  • Loading branch information
nagmo-starkware committed Jan 7, 2024
1 parent 42fccc0 commit 5128786
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 157 deletions.
136 changes: 56 additions & 80 deletions crates/papyrus_network/src/block_headers/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ use std::task::{Context, Poll};

use libp2p::core::Endpoint;
use libp2p::swarm::{
ConnectionDenied, ConnectionHandler, ConnectionId, FromSwarm, NetworkBehaviour, ToSwarm,
ConnectionDenied,
ConnectionHandler,
ConnectionId,
FromSwarm,
NetworkBehaviour,
ToSwarm,
};
use libp2p::{Multiaddr, PeerId};

Expand All @@ -25,8 +30,7 @@ where
protobuf::BlockHeadersRequest,
protobuf::BlockHeadersResponse,
>,
data_pending_pairing:
HashMap<OutboundSessionId, protobuf::block_headers_response_part::HeaderMessage>,
header_pending_pairing: HashMap<OutboundSessionId, protobuf::BlockHeader>,
outbound_sessions_pending_termination: HashSet<OutboundSessionId>,
inbound_sessions_pending_termination: HashSet<InboundSessionId>,
db_executor: Arc<DBExecutor>,
Expand All @@ -38,8 +42,6 @@ where
pub(crate) enum BehaviourInternalError {
#[error(transparent)]
ProtobufConversionError(#[from] ProtobufConversionError),
#[error("Pairing block header and signature error")]
PairingError,
}

#[derive(thiserror::Error, Debug)]
Expand All @@ -64,7 +66,7 @@ where
pub fn new(config: Config, db_executor: Arc<DBExecutor>) -> Self {
Self {
streamed_data_behaviour: streamed_data::behaviour::Behaviour::new(config),
data_pending_pairing: HashMap::new(),
header_pending_pairing: HashMap::new(),
outbound_sessions_pending_termination: HashSet::new(),
inbound_sessions_pending_termination: HashSet::new(),
db_executor,
Expand All @@ -75,7 +77,8 @@ where

#[allow(dead_code)]
pub fn send_query(&mut self, query: protobuf::BlockHeadersRequest, peer_id: PeerId) {
// TODO: keep track of the query id and the session id so that we can map between them for reputation.
// TODO: keep track of the query id and the session id so that we can map between them for
// reputation.
let _outbound_session_id = self.streamed_data_behaviour.send_query(query, peer_id);
}

Expand Down Expand Up @@ -109,7 +112,7 @@ where
},
*inbound_session_id,
)
.map_err(|e| SessionIdNotFoundError(e))?;
.map_err(SessionIdNotFoundError)?;
if let protobuf::block_headers_response_part::HeaderMessage::Fin { .. } = header_message {
// remove the session id from the mapping here since we need the query id to find it
// in the hash map.
Expand Down Expand Up @@ -186,79 +189,56 @@ where
}
}

// this function assumes that the data and header_message are each one of block header or
// signatures but not the same one. the function will return error if both parameter will
// evaluate to none or the same type.
fn get_block_header_and_signatures_from_event_data_and_stored_data(
&self,
data: &protobuf::block_headers_response_part::HeaderMessage,
header_message: &protobuf::block_headers_response_part::HeaderMessage,
) -> Result<(BlockHeader, Vec<Signature>), BehaviourInternalError> {
let (block_header_x, signatures_x) = self.header_message_to_header_or_signatures(data)?;
let (block_header_y, signatures_y) =
self.header_message_to_header_or_signatures(header_message)?;
let Some(block_header) = block_header_x.or_else(|| block_header_y.or(None)) else {
return Err(BehaviourInternalError::PairingError {});
};
let Some(signatures) = signatures_x.or_else(|| signatures_y.or(None)) else {
return Err(BehaviourInternalError::PairingError {});
};
Ok((block_header, signatures))
}

pub(crate) fn handle_pairing_of_header_and_signature(
&mut self,
outbound_session_id: OutboundSessionId,
header_message: &protobuf::block_headers_response_part::HeaderMessage,
ignore_event_and_return_pending: &mut bool,
) -> Event {
if let Some(data) = self.data_pending_pairing.get(&outbound_session_id) {
*ignore_event_and_return_pending = false;
match self.get_block_header_and_signatures_from_event_data_and_stored_data(
data,
header_message,
) {
Ok((block_header, signatures)) => Event::RecievedData {
data: BlockHeaderData { block_header, signatures },
outbound_session_id,
},
Err(e) => match e {
BehaviourInternalError::ProtobufConversionError(e) => {
Event::ProtobufConversionError(e)
}
BehaviourInternalError::PairingError => Event::SessionFailed {
session_id: SessionId::OutboundSessionId(outbound_session_id),
session_error: SessionError::PairingError,
},
},
}
} else {
*ignore_event_and_return_pending = true;
self.data_pending_pairing.insert(outbound_session_id, header_message.clone());
Event::SessionFailed {
session_id: SessionId::OutboundSessionId(outbound_session_id),
session_error: SessionError::WaitingToCompletePairing,
}
}
}

fn handle_received_data(
&mut self,
data: protobuf::BlockHeadersResponse,
outbound_session_id: OutboundSessionId,
ignore_event_and_return_pending: &mut bool,
) -> Event {
// TODO: handle getting more then one message part in the response.
if let Some(header_message) = data.part.first().and_then(|part| part.header_message.clone())
{
match header_message {
protobuf::block_headers_response_part::HeaderMessage::Header(_)
| protobuf::block_headers_response_part::HeaderMessage::Signatures(_) => self
.handle_pairing_of_header_and_signature(
// TODO: handle getting more than one message part in the response.
if let Some(message) = data.part.first().and_then(|part| part.header_message.clone()) {
match message {
protobuf::block_headers_response_part::HeaderMessage::Header(header) => {
*ignore_event_and_return_pending = true;
// TODO: check that there is no header for this session id already.
self.header_pending_pairing.insert(outbound_session_id, header.clone());
Event::SessionFailed {
session_id: outbound_session_id.into(),
session_error: SessionError::WaitingToCompletePairing,
}
}
protobuf::block_headers_response_part::HeaderMessage::Signatures(sigs) => {
let Some(block_header) = self
.header_pending_pairing
.remove(&outbound_session_id)
.and_then(|header| header.try_into().ok())
else {
return Event::SessionFailed {
session_id: outbound_session_id.into(),
session_error: SessionError::PairingError,
};
};
let Some(signatures) = sigs
.signatures
.iter()
.try_fold(vec![], |mut acc, sig| {
sig.clone().try_into().map(|sig| {
acc.push(sig);
acc
})
})
.ok()
else {
return Event::SessionFailed {
session_id: outbound_session_id.into(),
session_error: SessionError::PairingError,
};
};
Event::RecievedData {
data: BlockHeaderData { block_header, signatures },
outbound_session_id,
&header_message,
ignore_event_and_return_pending,
),
}
}
protobuf::block_headers_response_part::HeaderMessage::Fin(_) => {
*ignore_event_and_return_pending = true;
self.close_outbound_session(outbound_session_id);
Expand All @@ -271,7 +251,7 @@ where
} else {
Event::SessionFailed {
session_id: SessionId::OutboundSessionId(OutboundSessionId { value: usize::MAX }),
session_error: SessionError::StreamedDataEventConversionError,
session_error: SessionError::IncompatibleDataError,
}
}
}
Expand Down Expand Up @@ -443,11 +423,7 @@ where
&mut ignore_event_and_return_pending,
)
});
if ignore_event_and_return_pending {
Poll::Pending
} else {
Poll::Ready(event)
}
if ignore_event_and_return_pending { Poll::Pending } else { Poll::Ready(event) }
}
Poll::Pending => Poll::Pending,
}
Expand Down
79 changes: 4 additions & 75 deletions crates/papyrus_network/src/block_headers/block_headers_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,77 +67,6 @@ fn header_message_to_header_or_signatures() {
assert!(res.is_err());
}

#[test]
fn positive_flow_pairing_header_and_signatures() {
let test_db_executor = Arc::new(TestDBExecutor {});
let mut behaviour =
block_headers::behaviour::Behaviour::new(Config::get_test_config(), test_db_executor);
let outbound_session_id_a = OutboundSessionId { value: 1 };
let outbound_session_id_b = OutboundSessionId { value: 2 };
let mut wait_to_complete_pairing = false;

let header_message =
protobuf::block_headers_response_part::HeaderMessage::Header(protobuf::BlockHeader {
parent_header: Some(protobuf::Hash { elements: [0x01].repeat(32) }),
sequencer_address: Some(protobuf::Address { elements: [0x01].repeat(32) }),
..Default::default()
});
let signatures_message =
protobuf::block_headers_response_part::HeaderMessage::Signatures(protobuf::Signatures {
block: Some(protobuf::BlockId { number: 1, ..Default::default() }),
signatures: vec![protobuf::ConsensusSignature {
r: Some(protobuf::Felt252 { elements: [0x01].repeat(32) }),
s: Some(protobuf::Felt252 { elements: [0x01].repeat(32) }),
}],
});

// sending header to session a results in instruction to wait for pairing
assert_matches!(
behaviour.handle_pairing_of_header_and_signature(
outbound_session_id_a,
&header_message,
&mut wait_to_complete_pairing
),
block_headers::Event::SessionFailed { .. }
);
assert!(wait_to_complete_pairing);

// sending signatures to session b results in instruction to wait for pairing
assert_matches!(
behaviour.handle_pairing_of_header_and_signature(
outbound_session_id_b,
&signatures_message,
&mut wait_to_complete_pairing
),
block_headers::Event::SessionFailed { .. }
);
assert!(wait_to_complete_pairing);

// sending signatures to session a results in recieved data event and no instruction to wait for
// pairing
assert_matches!(
behaviour.handle_pairing_of_header_and_signature(
outbound_session_id_a,
&signatures_message,
&mut wait_to_complete_pairing,
),
block_headers::Event::RecievedData { .. }
);
assert!(!wait_to_complete_pairing);

// sending header to session b results in recieved data event and no instruction to wait for
// pairing
assert_matches!(
behaviour.handle_pairing_of_header_and_signature(
outbound_session_id_b,
&header_message,
&mut wait_to_complete_pairing,
),
block_headers::Event::RecievedData { .. }
);
assert!(!wait_to_complete_pairing);
}

#[test]
#[ignore = "functionality not implemented completely yet"]
fn test_fin_handling() {
Expand All @@ -152,15 +81,15 @@ async fn poll_is_pending_if_streamed_data_behaviour_poll_is_pending() {

#[tokio::test]
#[ignore = "functionality not implemented completely yet"]
async fn poll_is_pending_if_streamed_data_behaviour_poll_is_ready_but_event_mapping_returns_wait_to_complete_pairing(
) {
async fn poll_is_pending_if_streamed_data_behaviour_poll_is_ready_but_event_mapping_returns_wait_to_complete_pairing()
{
unimplemented!()
}

#[tokio::test]
#[ignore = "functionality not implemented completely yet"]
async fn poll_is_ready_if_streamed_data_behaviour_poll_is_ready_and_event_mapping_returns_not_to_wait_to_complete_pairing(
) {
async fn poll_is_ready_if_streamed_data_behaviour_poll_is_ready_and_event_mapping_returns_not_to_wait_to_complete_pairing()
{
unimplemented!()
}

Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_network/src/block_headers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::{BlockQuery, Direction};
pub(crate) enum SessionError {
#[error(transparent)]
StreamedData(#[from] streamed_data::behaviour::SessionError),
#[error("Streamed data event conversion error")]
StreamedDataEventConversionError,
#[error("Incompatible data error")]
IncompatibleDataError,
#[error("Pairing of header and signature error")]
PairingError,
#[error("Session closed unexpectedly")]
Expand Down

0 comments on commit 5128786

Please sign in to comment.