Skip to content

Commit

Permalink
feat(network): add bootstrap node to config
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama committed Apr 18, 2024
1 parent f09ee9c commit a766658
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 67 deletions.
28 changes: 14 additions & 14 deletions config/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,36 +104,36 @@
"privacy": "TemporaryValue",
"value": true
},
"network.header_buffer_size": {
"description": "Size of the buffer for headers read from the storage.",
"privacy": "Public",
"value": 100000
},
"network.idle_connection_timeout": {
"description": "Amount of time in seconds that a connection with no active sessions will stay alive.",
"privacy": "Public",
"value": 10
},
"network.peer.#is_none": {
"network.bootstrap_peer.#is_none": {
"description": "Flag for an optional field",
"privacy": "TemporaryValue",
"value": true
},
"network.peer.ip": {
"network.bootstrap_peer.ip": {
"description": "The ipv4 address of another peer that the node will dial to.",
"privacy": "Public",
"value": "127.0.0.1"
},
"network.peer.peer_id": {
"network.bootstrap_peer.peer_id": {
"description": "Peer ID to send requests to. If not set, the node will not send requests. for info: https://docs.libp2p.io/concepts/fundamentals/peers/",
"privacy": "Public",
"value": "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N"
},
"network.peer.tcp_port": {
"network.bootstrap_peer.tcp_port": {
"description": "The port on the other peer that the node will dial to to use for TCP transport.",
"privacy": "Public",
"value": 10002
},
"network.header_buffer_size": {
"description": "Size of the buffer for headers read from the storage.",
"privacy": "Public",
"value": 100000
},
"network.idle_connection_timeout": {
"description": "Amount of time in seconds that a connection with no active sessions will stay alive.",
"privacy": "Public",
"value": 10
},
"network.quic_port": {
"description": "The port that the node listens on for incoming quic connections.",
"privacy": "Public",
Expand Down
7 changes: 4 additions & 3 deletions crates/papyrus_network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ pub struct NetworkConfig {
#[serde(deserialize_with = "deserialize_seconds_to_duration")]
pub idle_connection_timeout: Duration,
pub header_buffer_size: usize,
pub peer: Option<PeerAddressConfig>,
/// If None, the node won't discover other peers but will still be discoverable by other peers.
pub bootstrap_peer: Option<PeerAddressConfig>,
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
Expand Down Expand Up @@ -245,7 +246,7 @@ impl SerializeConfig for NetworkConfig {
ParamPrivacyInput::Public,
),
]);
config.extend(ser_optional_sub_config(&self.peer, "peer"));
config.extend(ser_optional_sub_config(&self.bootstrap_peer, "bootstrap_peer"));
config
}
}
Expand All @@ -258,7 +259,7 @@ impl Default for NetworkConfig {
session_timeout: Duration::from_secs(10),
idle_connection_timeout: Duration::from_secs(10),
header_buffer_size: 100000,
peer: None,
bootstrap_peer: None,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/papyrus_network/src/main_behaviour/mixed_behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use libp2p::kad::store::MemoryStore;
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{identify, kad};

Expand All @@ -10,7 +11,7 @@ use crate::{discovery, peer_manager, streamed_bytes};
#[behaviour(out_event = "Event")]
pub struct MixedBehaviour {
pub peer_manager: peer_manager::PeerManager<peer_manager::peer::Peer>,
pub discovery: discovery::Behaviour,
pub discovery: Toggle<discovery::Behaviour>,
pub identify: identify::Behaviour,
// TODO(shahak): Consider using a different store.
pub kademlia: kad::Behaviour<MemoryStore>,
Expand Down
4 changes: 3 additions & 1 deletion crates/papyrus_network/src/main_behaviour/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ impl NetworkBehaviour for MainBehaviour {
self.mixed_behaviour.kademlia.on_other_behaviour_event(internal_event)
}
mixed_behaviour::InternalEvent::NotifyDiscovery(_) => {
self.mixed_behaviour.discovery.on_other_behaviour_event(internal_event)
if let Some(discovery) = self.mixed_behaviour.discovery.as_mut() {
discovery.on_other_behaviour_event(internal_event);
}
}
mixed_behaviour::InternalEvent::NotifyStreamedBytes(_) => {
self.mixed_behaviour.streamed_bytes.on_other_behaviour_event(internal_event)
Expand Down
46 changes: 21 additions & 25 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::stream::{self, BoxStream, SelectAll};
use futures::{FutureExt, StreamExt};
use libp2p::kad::store::MemoryStore;
use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::{identify, kad, Multiaddr, PeerId, Swarm};
use libp2p::{identify, kad, multiaddr, Multiaddr, PeerId, Swarm};
use metrics::gauge;
use papyrus_common::metrics as papyrus_metrics;
use papyrus_storage::StorageReader;
Expand All @@ -32,16 +32,7 @@ use crate::streamed_bytes::{
OutboundSessionId,
SessionId,
};
use crate::{
discovery,
peer_manager,
DataType,
NetworkConfig,
PeerAddressConfig,
Protocol,
Query,
ResponseReceivers,
};
use crate::{discovery, peer_manager, DataType, NetworkConfig, Protocol, Query, ResponseReceivers};

type StreamCollection = SelectAll<BoxStream<'static, (Data, InboundSessionId)>>;
type SubscriberChannels = (Receiver<Query>, Router);
Expand All @@ -59,7 +50,6 @@ pub struct GenericNetworkManager<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> {
query_results_router: StreamCollection,
sync_subscriber_channels: Option<SubscriberChannels>,
query_id_to_inbound_session_id: HashMap<QueryId, InboundSessionId>,
peer: Option<PeerAddressConfig>,
outbound_session_id_to_protocol: HashMap<OutboundSessionId, Protocol>,
peer_id: Option<PeerId>,
// Fields for metrics
Expand All @@ -69,12 +59,6 @@ pub struct GenericNetworkManager<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> {

impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecutorT, SwarmT> {
pub async fn run(mut self) -> Result<(), NetworkError> {
if let Some(peer) = self.peer.clone() {
debug!("Starting network manager connected to peer: {peer:?}");
self.swarm.dial(peer)?;
} else {
debug!("Starting network manager not connected to any peer.");
}
loop {
tokio::select! {
Some(event) = self.swarm.next() => self.handle_swarm_event(event),
Expand All @@ -91,7 +75,6 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
swarm: SwarmT,
db_executor: DBExecutorT,
header_buffer_size: usize,
peer: Option<PeerAddressConfig>,
) -> Self {
gauge!(papyrus_metrics::PAPYRUS_NUM_CONNECTED_PEERS, 0f64);
Self {
Expand All @@ -101,7 +84,6 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
query_results_router: StreamCollection::new(),
sync_subscriber_channels: None,
query_id_to_inbound_session_id: HashMap::new(),
peer,
outbound_session_id_to_protocol: HashMap::new(),
peer_id: None,
num_active_inbound_sessions: 0,
Expand Down Expand Up @@ -222,7 +204,9 @@ impl<DBExecutorT: DBExecutor, SwarmT: SwarmTrait> GenericNetworkManager<DBExecut
self.swarm.behaviour_mut().kademlia.on_other_behaviour_event(event)
}
mixed_behaviour::InternalEvent::NotifyDiscovery(_) => {
self.swarm.behaviour_mut().discovery.on_other_behaviour_event(event)
if let Some(discovery) = self.swarm.behaviour_mut().discovery.as_mut() {
discovery.on_other_behaviour_event(event);
}
}
mixed_behaviour::InternalEvent::NotifyStreamedBytes(_) => {
self.swarm.behaviour_mut().streamed_bytes.on_other_behaviour_event(event)
Expand Down Expand Up @@ -426,7 +410,7 @@ impl NetworkManager {
session_timeout,
idle_connection_timeout,
header_buffer_size,
peer,
bootstrap_peer,
} = config;

let listen_addresses = vec![
Expand All @@ -441,8 +425,20 @@ impl NetworkManager {
let local_peer_id = PeerId::from_public_key(&key);
mixed_behaviour::MixedBehaviour {
peer_manager: peer_manager::PeerManager::new(PeerManagerConfig::default()),
// TODO: add real bootstrap peer
discovery: discovery::Behaviour::new(PeerId::random(), Multiaddr::empty()),
discovery: bootstrap_peer
.as_ref()
.map(|bootstrap_peer| {
discovery::Behaviour::new(
bootstrap_peer.peer_id,
format!("/ip4/{}", bootstrap_peer.ip)
.parse::<Multiaddr>()
.unwrap_or_else(|_| {
panic!("Wrong ip4 address format {}", bootstrap_peer.ip)
})
.with(multiaddr::Protocol::Tcp(bootstrap_peer.tcp_port)),
)
})
.into(),
identify: identify::Behaviour::new(identify::Config::new(
"/staknet/identify/0.1.0-rc.0".to_string(),
key,
Expand All @@ -460,7 +456,7 @@ impl NetworkManager {
let swarm = build_swarm(listen_addresses, idle_connection_timeout, behaviour);

let db_executor = BlockHeaderDBExecutor::new(storage_reader);
Self::generic_new(swarm, db_executor, header_buffer_size, peer)
Self::generic_new(swarm, db_executor, header_buffer_size)
}

pub fn get_own_peer_id(&self) -> String {
Expand Down
7 changes: 2 additions & 5 deletions crates/papyrus_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ async fn register_subscriber_and_use_channels() {
mock_swarm,
MockDBExecutor::default(),
HEADER_BUFFER_SIZE,
Some(PeerAddressConfig { peer_id, ..Default::default() }),
);
// define query
let query_limit = 5;
Expand Down Expand Up @@ -328,7 +327,7 @@ async fn process_incoming_query() {
let get_data_fut = mock_swarm.get_data_sent_to_inbound_session(inbound_session_id);

let network_manager =
GenericNetworkManager::generic_new(mock_swarm, mock_db_executor, HEADER_BUFFER_SIZE, None);
GenericNetworkManager::generic_new(mock_swarm, mock_db_executor, HEADER_BUFFER_SIZE);

select! {
inbound_session_data = get_data_fut => {
Expand Down Expand Up @@ -357,7 +356,6 @@ async fn sync_subscriber_query_before_established_connection() {
MockSwarm::default(),
MockDBExecutor::default(),
HEADER_BUFFER_SIZE,
Some(PeerAddressConfig { peer_id: PeerId::random(), ..Default::default() }),
);
// define query
let query_limit = 5;
Expand Down Expand Up @@ -435,7 +433,7 @@ async fn close_inbound_session() {

// Create network manager and run it
let network_manager =
GenericNetworkManager::generic_new(mock_swarm, mock_db_executor, HEADER_BUFFER_SIZE, None);
GenericNetworkManager::generic_new(mock_swarm, mock_db_executor, HEADER_BUFFER_SIZE);
tokio::select! {
_ = network_manager.run() => panic!("network manager ended"),
_ = inbound_session_closed_receiver => {}
Expand All @@ -452,7 +450,6 @@ async fn return_fin_to_subscriber_unit_test() {
MockSwarm::default(),
MockDBExecutor::default(),
HEADER_BUFFER_SIZE,
None,
);
// register subscriber
let (_, response_receivers) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,42 +114,42 @@ expression: dumped_default_config
"value": true,
"privacy": "TemporaryValue"
},
"network.header_buffer_size": {
"description": "Size of the buffer for headers read from the storage.",
"value": {
"$serde_json::private::Number": "100000"
},
"privacy": "Public"
},
"network.idle_connection_timeout": {
"description": "Amount of time in seconds that a connection with no active sessions will stay alive.",
"value": {
"$serde_json::private::Number": "10"
},
"privacy": "Public"
},
"network.peer.#is_none": {
"network.bootstrap_peer.#is_none": {
"description": "Flag for an optional field",
"value": true,
"privacy": "TemporaryValue"
},
"network.peer.ip": {
"network.bootstrap_peer.ip": {
"description": "The ipv4 address of another peer that the node will dial to.",
"value": "127.0.0.1",
"privacy": "Public"
},
"network.peer.peer_id": {
"network.bootstrap_peer.peer_id": {
"description": "Peer ID to send requests to. If not set, the node will not send requests. for info: https://docs.libp2p.io/concepts/fundamentals/peers/",
"value": "QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N",
"privacy": "Public"
},
"network.peer.tcp_port": {
"network.bootstrap_peer.tcp_port": {
"description": "The port on the other peer that the node will dial to to use for TCP transport.",
"value": {
"$serde_json::private::Number": "10002"
},
"privacy": "Public"
},
"network.header_buffer_size": {
"description": "Size of the buffer for headers read from the storage.",
"value": {
"$serde_json::private::Number": "100000"
},
"privacy": "Public"
},
"network.idle_connection_timeout": {
"description": "Amount of time in seconds that a connection with no active sessions will stay alive.",
"value": {
"$serde_json::private::Number": "10"
},
"privacy": "Public"
},
"network.quic_port": {
"description": "The port that the node listens on for incoming quic connections.",
"value": {
Expand Down

0 comments on commit a766658

Please sign in to comment.