Skip to content
This repository has been archived by the owner on Dec 26, 2024. It is now read-only.

Commit

Permalink
refactor(network): remove InternalQuery (#2048)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored May 26, 2024
1 parent dbbe9e5 commit 8c4fac4
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use starknet_api::data_availability::{DataAvailabilityMode, L1DataAvailabilityMo

use super::ProtobufConversionError;
use crate::protobuf_messages::protobuf::{self};
use crate::{BlockHashOrNumber, Direction, InternalQuery, Query};
use crate::{BlockHashOrNumber, Direction, Query};

#[cfg(test)]
#[allow(dead_code)]
Expand Down Expand Up @@ -199,7 +199,7 @@ pub(super) fn volition_domain_to_enum_int(value: DataAvailabilityMode) -> i32 {
}
}

impl TryFrom<protobuf::Iteration> for InternalQuery {
impl TryFrom<protobuf::Iteration> for Query {
type Error = ProtobufConversionError;

fn try_from(value: protobuf::Iteration) -> Result<Self, Self::Error> {
Expand All @@ -226,21 +226,28 @@ impl TryFrom<protobuf::Iteration> for InternalQuery {
};
let limit = value.limit;
let step = value.step;
Ok(InternalQuery { start_block, direction, limit, step })
Ok(Query { start_block, direction, limit, step })
}
}

impl From<Query> for protobuf::Iteration {
fn from(value: Query) -> Self {
let start = protobuf::iteration::Start::BlockNumber(value.start_block.0);
let start = match value.start_block {
BlockHashOrNumber::Number(BlockNumber(number)) => {
protobuf::iteration::Start::BlockNumber(number)
}
BlockHashOrNumber::Hash(block_hash) => {
protobuf::iteration::Start::Header(block_hash.into())
}
};
Self {
start: Some(start),
direction: match value.direction {
Direction::Forward => 0,
Direction::Backward => 1,
},
limit: value.limit as u64,
step: value.step as u64,
limit: value.limit,
step: value.step,
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use starknet_api::crypto::Signature;
use super::common::{enum_int_to_l1_data_availability_mode, l1_data_availability_mode_to_enum_int};
use super::ProtobufConversionError;
use crate::protobuf_messages::protobuf::{self};
use crate::{InternalQuery, Query, SignedBlockHeader};
use crate::{Query, SignedBlockHeader};

impl TryFrom<protobuf::BlockHeadersResponse> for Option<SignedBlockHeader> {
type Error = ProtobufConversionError;
Expand Down Expand Up @@ -286,7 +286,7 @@ impl From<Option<SignedBlockHeader>> for protobuf::BlockHeadersResponse {
}
}

impl TryFrom<protobuf::BlockHeadersRequest> for InternalQuery {
impl TryFrom<protobuf::BlockHeadersRequest> for Query {
type Error = ProtobufConversionError;
fn try_from(value: protobuf::BlockHeadersRequest) -> Result<Self, Self::Error> {
let value = value.iteration.ok_or(ProtobufConversionError::MissingField {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use starknet_api::state::{StorageKey, ThinStateDiff};
use super::common::volition_domain_to_enum_int;
use super::ProtobufConversionError;
use crate::protobuf_messages::protobuf;
use crate::{InternalQuery, Query};
use crate::Query;

impl TryFrom<protobuf::StateDiffsResponse> for Option<ThinStateDiff> {
type Error = ProtobufConversionError;
Expand Down Expand Up @@ -223,7 +223,7 @@ impl From<ThinStateDiff> for StateDiffsResponseVec {
}
}

impl TryFrom<protobuf::StateDiffsRequest> for InternalQuery {
impl TryFrom<protobuf::StateDiffsRequest> for Query {
type Error = ProtobufConversionError;
fn try_from(value: protobuf::StateDiffsRequest) -> Result<Self, Self::Error> {
let value = value.iteration.ok_or(ProtobufConversionError::MissingField {
Expand Down
10 changes: 5 additions & 5 deletions crates/papyrus_network/src/db_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::task::JoinHandle;

use crate::converters::protobuf_conversion::state_diff::StateDiffsResponseVec;
use crate::protobuf_messages::protobuf;
use crate::{BlockHashOrNumber, DataType, InternalQuery, SignedBlockHeader};
use crate::{BlockHashOrNumber, DataType, Query, SignedBlockHeader};

#[cfg(test)]
mod test;
Expand Down Expand Up @@ -138,7 +138,7 @@ pub enum DBExecutorError {
#[error(
"Block number is out of range. Query: {query:?}, counter: {counter}, query_id: {query_id}"
)]
BlockNumberOutOfRange { query: InternalQuery, counter: u64, query_id: QueryId },
BlockNumberOutOfRange { query: Query, counter: u64, query_id: QueryId },
// TODO: add data type to the error message.
#[error("Block not found. Block: {block_hash_or_number:?}, query_id: {query_id}")]
BlockNotFound { block_hash_or_number: BlockHashOrNumber, query_id: QueryId },
Expand Down Expand Up @@ -188,7 +188,7 @@ pub trait DBExecutorTrait: Stream<Item = Result<QueryId, DBExecutorError>> + Unp
// TODO: add writer functionality
fn register_query(
&mut self,
query: InternalQuery,
query: Query,
data_type: impl FetchBlockDataFromDb + Send + 'static,
sender: Sender<Data>,
) -> QueryId;
Expand All @@ -210,7 +210,7 @@ impl DBExecutor {
impl DBExecutorTrait for DBExecutor {
fn register_query(
&mut self,
query: InternalQuery,
query: Query,
data_type: impl FetchBlockDataFromDb + Send + 'static,
mut sender: Sender<Data>,
) -> QueryId {
Expand Down Expand Up @@ -239,7 +239,7 @@ impl DBExecutorTrait for DBExecutor {
};
for block_counter in 0..query.limit {
let block_number = BlockNumber(utils::calculate_block_number(
query,
&query,
start_block_number,
block_counter,
query_id,
Expand Down
14 changes: 7 additions & 7 deletions crates/papyrus_network/src/db_executor/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::db_executor::{
MockFetchBlockDataFromDb,
QueryId,
};
use crate::{BlockHashOrNumber, DataType, Direction, InternalQuery, SignedBlockHeader};
use crate::{BlockHashOrNumber, DataType, Direction, Query, SignedBlockHeader};
const BUFFER_SIZE: usize = 10;

#[tokio::test]
Expand All @@ -34,7 +34,7 @@ async fn header_db_executor_can_register_and_run_a_query() {
insert_to_storage_test_blocks_up_to(NUM_OF_BLOCKS, &mut storage_writer);

// register a query.
let query = InternalQuery {
let query = Query {
start_block: BlockHashOrNumber::Number(BlockNumber(0)),
direction: Direction::Forward,
limit: NUM_OF_BLOCKS,
Expand All @@ -44,7 +44,7 @@ async fn header_db_executor_can_register_and_run_a_query() {
enum_iterator::all::<DataType>()
.map(|data_type| {
let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE);
let query_id = db_executor.register_query(query, data_type, sender);
let query_id = db_executor.register_query(query.clone(), data_type, sender);
(query_id, (receiver, data_type))
})
.unzip();
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn header_db_executor_start_block_given_by_hash() {

// register a query.
let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE);
let query = InternalQuery {
let query = Query {
start_block: BlockHashOrNumber::Hash(block_hash),
direction: Direction::Forward,
limit: NUM_OF_BLOCKS,
Expand Down Expand Up @@ -151,7 +151,7 @@ async fn header_db_executor_query_of_missing_block() {
const BLOCKS_DELTA: u64 = 5;
// register a query.
let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE);
let query = InternalQuery {
let query = Query {
start_block: BlockHashOrNumber::Number(BlockNumber(NUM_OF_BLOCKS - BLOCKS_DELTA)),
direction: Direction::Forward,
limit: NUM_OF_BLOCKS,
Expand Down Expand Up @@ -203,7 +203,7 @@ async fn header_db_executor_can_receive_queries_after_stream_is_exhausted() {
for _ in 0..2 {
// register a query.
let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE);
let query = InternalQuery {
let query = Query {
start_block: BlockHashOrNumber::Number(BlockNumber(0)),
direction: Direction::Forward,
limit: NUM_OF_BLOCKS,
Expand Down Expand Up @@ -240,7 +240,7 @@ async fn header_db_executor_drop_receiver_before_query_is_done() {
insert_to_storage_test_blocks_up_to(NUM_OF_BLOCKS, &mut storage_writer);

let (sender, receiver) = futures::channel::mpsc::channel(BUFFER_SIZE);
let query = InternalQuery {
let query = Query {
start_block: BlockHashOrNumber::Number(BlockNumber(1)),
direction: Direction::Forward,
limit: NUM_OF_BLOCKS,
Expand Down
7 changes: 4 additions & 3 deletions crates/papyrus_network/src/db_executor/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{DBExecutorError, QueryId};
use crate::{Direction, InternalQuery};
use crate::{Direction, Query};

pub(crate) fn calculate_block_number(
query: InternalQuery,
query: &Query,
start_block: u64,
read_blocks_counter: u64,
query_id: QueryId,
Expand All @@ -11,11 +11,12 @@ pub(crate) fn calculate_block_number(
Direction::Forward => 1,
Direction::Backward => -1,
};
// TODO(shahak): Fix this code.
let blocks_delta: i128 = direction_factor * (query.step * read_blocks_counter) as i128;
let block_number: i128 = start_block as i128 + blocks_delta;
if block_number < 0 || block_number > u64::MAX as i128 {
return Err(DBExecutorError::BlockNumberOutOfRange {
query,
query: query.clone(),
counter: read_blocks_counter,
query_id,
});
Expand Down
38 changes: 12 additions & 26 deletions crates/papyrus_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ impl From<DataType> for Protocol {
}

/// This struct represents a query that can be sent to a peer.
#[derive(Default, Debug, PartialEq, Eq)]
#[derive(Default, Clone, Debug, PartialEq, Eq)]
#[cfg_attr(test, derive(Hash))]
pub struct Query {
pub start_block: BlockNumber,
pub start_block: BlockHashOrNumber,
pub direction: Direction,
pub limit: usize,
pub step: usize,
pub limit: u64,
pub step: u64,
}

#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
Expand All @@ -101,23 +102,19 @@ pub struct SignedBlockHeader {
pub signatures: Vec<BlockSignature>,
}

// TODO(shahak): Internalize this when we have a mixed behaviour.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(test, derive(Hash))]
pub struct InternalQuery {
pub start_block: BlockHashOrNumber,
pub direction: Direction,
pub limit: u64,
pub step: u64,
}

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(test, derive(Hash))]
pub enum BlockHashOrNumber {
Hash(BlockHash),
Number(BlockNumber),
}

impl Default for BlockHashOrNumber {
fn default() -> Self {
Self::Number(BlockNumber::default())
}
}

pub type SignedBlockHeaderStream = Pin<Box<dyn Stream<Item = Option<SignedBlockHeader>> + Send>>;
pub type StateDiffStream = Pin<Box<dyn Stream<Item = Option<ThinStateDiff>> + Send>>;

Expand Down Expand Up @@ -145,7 +142,7 @@ impl Protocol {
}
}

pub fn bytes_query_to_protobuf_request(&self, query: Vec<u8>) -> InternalQuery {
pub fn bytes_query_to_protobuf_request(&self, query: Vec<u8>) -> Query {
// TODO: make this function return errors instead of panicking.
match self {
Protocol::SignedBlockHeader => protobuf::BlockHeadersRequest::decode(&query[..])
Expand Down Expand Up @@ -244,14 +241,3 @@ impl Default for NetworkConfig {
}
}
}

impl From<Query> for InternalQuery {
fn from(query: Query) -> InternalQuery {
InternalQuery {
start_block: BlockHashOrNumber::Number(query.start_block),
direction: query.direction,
limit: query.limit as u64,
step: query.step as u64,
}
}
}
25 changes: 12 additions & 13 deletions crates/papyrus_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use crate::{
BlockHashOrNumber,
DataType,
Direction,
InternalQuery,
Query,
SignedBlockHeader,
};
Expand All @@ -51,7 +50,7 @@ const TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Default)]
struct MockSwarm {
pub pending_events: Queue<Event>,
pub sent_queries: Vec<(InternalQuery, PeerId)>,
pub sent_queries: Vec<(Query, PeerId)>,
broadcasted_messages_senders: Vec<UnboundedSender<(Bytes, Topic)>>,
inbound_session_id_to_data_sender: HashMap<InboundSessionId, UnboundedSender<Data>>,
next_outbound_session_id: usize,
Expand Down Expand Up @@ -99,7 +98,7 @@ impl MockSwarm {

fn create_received_data_events_for_query(
&self,
query: InternalQuery,
query: Query,
outbound_session_id: OutboundSessionId,
) {
let BlockHashOrNumber::Number(BlockNumber(start_block_number)) = query.start_block else {
Expand Down Expand Up @@ -162,11 +161,11 @@ impl SwarmTrait for MockSwarm {
peer_id: PeerId,
_protocol: crate::Protocol,
) -> Result<OutboundSessionId, PeerNotConnected> {
let query = protobuf::BlockHeadersRequest::decode(&query[..])
let query: Query = protobuf::BlockHeadersRequest::decode(&query[..])
.expect("failed to decode protobuf BlockHeadersRequest")
.try_into()
.expect("failed to convert BlockHeadersRequest");
self.sent_queries.push((query, peer_id));
self.sent_queries.push((query.clone(), peer_id));
let outbound_session_id = OutboundSessionId { value: self.next_outbound_session_id };
self.create_received_data_events_for_query(query, outbound_session_id);
self.next_outbound_session_id += 1;
Expand Down Expand Up @@ -205,7 +204,7 @@ impl SwarmTrait for MockSwarm {
#[derive(Default)]
struct MockDBExecutor {
next_query_id: usize,
pub query_to_headers: HashMap<InternalQuery, Vec<BlockHeader>>,
pub query_to_headers: HashMap<Query, Vec<BlockHeader>>,
query_execution_set: FuturesUnordered<JoinHandle<Result<QueryId, DBExecutorError>>>,
}

Expand All @@ -221,7 +220,7 @@ impl DBExecutorTrait for MockDBExecutor {
// TODO(shahak): Consider fixing code duplication with DBExecutor.
fn register_query(
&mut self,
query: InternalQuery,
query: Query,
_data_type: impl FetchBlockDataFromDb + Send,
mut sender: Sender<Data>,
) -> QueryId {
Expand Down Expand Up @@ -265,12 +264,12 @@ async fn register_subscriber_and_use_channels() {
let mut network_manager =
GenericNetworkManager::generic_new(mock_swarm, MockDBExecutor::default(), BUFFER_SIZE);
// define query
let query_limit = 5;
let query_limit: usize = 5;
let start_block_number = 0;
let query = Query {
start_block: BlockNumber(start_block_number),
start_block: BlockHashOrNumber::Number(BlockNumber(start_block_number)),
direction: Direction::Forward,
limit: query_limit,
limit: query_limit.try_into().unwrap(),
step: 1,
};

Expand Down Expand Up @@ -308,7 +307,7 @@ async fn register_subscriber_and_use_channels() {
async fn process_incoming_query() {
// Create data for test.
const BLOCK_NUM: u64 = 0;
let query = InternalQuery {
let query = Query {
start_block: BlockHashOrNumber::Number(BlockNumber(BLOCK_NUM)),
direction: Direction::Forward,
limit: 5,
Expand All @@ -322,7 +321,7 @@ async fn process_incoming_query() {

// Setup mock DB executor and tell it to reply to the query with the given headers.
let mut mock_db_executor = MockDBExecutor::default();
mock_db_executor.query_to_headers.insert(query, headers.clone());
mock_db_executor.query_to_headers.insert(query.clone(), headers.clone());

// Setup mock swarm and tell it to return an event of new inbound query.
let mut mock_swarm = MockSwarm::default();
Expand Down Expand Up @@ -380,7 +379,7 @@ async fn close_inbound_session() {
// define query
let query_limit = 5;
let start_block_number = 0;
let query = InternalQuery {
let query = Query {
start_block: BlockHashOrNumber::Number(BlockNumber(start_block_number)),
direction: Direction::Forward,
limit: query_limit,
Expand Down
Loading

0 comments on commit 8c4fac4

Please sign in to comment.