Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic peer manager based on libp2p peer_store and connection_limits #126

Merged
merged 24 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,028 changes: 585 additions & 443 deletions Cargo.lock

Large diffs are not rendered by default.

40 changes: 22 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,26 @@ ssv_types = { path = "anchor/common/ssv_types" }
subnet_tracker = { path = "anchor/subnet_tracker" }
version = { path = "anchor/common/version" }

beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
bls = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
eth2 = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
eth2_config = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
eth2_network_config = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
health_metrics = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
lighthouse_network = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
metrics = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
safe_arith = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
sensitive_url = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
slashing_protection = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
slot_clock = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
task_executor = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0", default-features = false, features = [
beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
bls = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
eth2 = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
eth2_config = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
eth2_network_config = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
health_metrics = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
lighthouse_network = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
metrics = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
safe_arith = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
sensitive_url = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
slashing_protection = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
slot_clock = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
task_executor = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2", default-features = false, features = [
"tracing",
] }
types = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
unused_port = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
validator_metrics = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
validator_services = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
validator_store = { git = "https://github.com/sigp/lighthouse", rev = "1a77f7a0" }
types = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
unused_port = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
validator_metrics = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
validator_services = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }
validator_store = { git = "https://github.com/sigp/lighthouse", rev = "b71b5f2" }

alloy = { version = "0.11.0", features = [
"sol-types",
Expand Down Expand Up @@ -121,6 +121,10 @@ zeroize = "1.8.1"
[patch.crates-io]
# todo: remove when https://github.com/supranational/blst/pull/248 is merged
blst = { git = "https://github.com/dknopik/blst", branch = "sk-conversion" }
# todo: remove when libp2p versions are aligned again. this is only needed because cargo audit crashes otherwise
libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16" }
libp2p-mplex = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16" }
quick-protobuf-codec = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16" }

[profile.maxperf]
inherits = "release"
Expand Down
7 changes: 4 additions & 3 deletions anchor/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ethereum_ssz = { workspace = true }
ethereum_ssz_derive = { workspace = true }
futures = { workspace = true }
hex = "0.4.3"
libp2p = { version = "0.54", default-features = false, features = [
libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16", default-features = false, features = [
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting! Didn't know one could this, I suggest we use our own fork or yours Daniel, as Dr Huang may rebase and I fear that commit may disapear, wdyt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that can happen. The commit can disappear from the branch, but it stays in the repo if it was pushed. For example this commit is still accessible via the link: sigp/lighthouse@7f33d35, even though it no longer is in the current commit chain of the PR branch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks for confirming Daniel :)

"identify",
"yamux",
"noise",
Expand All @@ -23,6 +23,7 @@ libp2p = { version = "0.54", default-features = false, features = [
"gossipsub",
"quic",
"ping",
"peer-store",
"request-response",
] }
lighthouse_network = { workspace = true }
Expand All @@ -40,7 +41,7 @@ version = { workspace = true }

[dev-dependencies]
async-channel = { workspace = true }
libp2p-swarm = { version = "0.45.1", features = ["macros"] }
libp2p-swarm-test = { version = "0.4.0" }
libp2p-swarm = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16", features = ["macros"] }
libp2p-swarm-test = { git = "https://github.com/libp2p/rust-libp2p.git", rev = "082eb16" }
tokio = { workspace = true, features = ["rt", "macros", "time"] }
tracing-subscriber = { workspace = true }
3 changes: 3 additions & 0 deletions anchor/network/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::discovery::Discovery;
use crate::handshake;
use crate::peer_manager::PeerManager;
use libp2p::swarm::NetworkBehaviour;
use libp2p::{gossipsub, identify, ping};

Expand All @@ -13,6 +14,8 @@ pub struct AnchorBehaviour {
pub gossipsub: gossipsub::Behaviour,
/// Discv5 Discovery protocol.
pub discovery: Discovery,
/// Anchor peer manager, wrapping libp2p behaviours with minimal added logic for peer selection.
pub peer_manager: PeerManager,

pub handshake: handshake::Behaviour,
}
11 changes: 5 additions & 6 deletions anchor/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use futures::StreamExt;
use libp2p::bytes::Bytes;
use libp2p::core::transport::PortUse;
use libp2p::core::Endpoint;
use libp2p::swarm::dummy::ConnectionHandler;
use libp2p::swarm::{
ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
dummy, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use lighthouse_network::discovery::enr_ext::{QUIC6_ENR_KEY, QUIC_ENR_KEY};
Expand Down Expand Up @@ -432,7 +431,7 @@ impl Discovery {

impl NetworkBehaviour for Discovery {
// Discovery is not a real NetworkBehaviour...
type ConnectionHandler = ConnectionHandler;
type ConnectionHandler = dummy::ConnectionHandler;
type ToSwarm = DiscoveredPeers;

fn handle_established_inbound_connection(
Expand All @@ -442,7 +441,7 @@ impl NetworkBehaviour for Discovery {
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
Ok(dummy::ConnectionHandler)
}

fn handle_established_outbound_connection(
Expand All @@ -453,7 +452,7 @@ impl NetworkBehaviour for Discovery {
_role_override: Endpoint,
_port_use: PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
Ok(dummy::ConnectionHandler)
}

fn on_swarm_event(&mut self, event: FromSwarm) {
Expand Down Expand Up @@ -592,7 +591,7 @@ pub fn subnet_predicate(subnets: Vec<SubnetId>) -> impl Fn(&Enr) -> bool + Send
.any(|&s| committee_bitfield.get(*s as usize).unwrap_or(false));

if !predicate {
debug!(
trace!(
peer_id = %enr.peer_id(),
"Peer found but not on any of the desired subnets",
);
Expand Down
29 changes: 8 additions & 21 deletions anchor/network/src/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ pub fn handle_event(
match event {
Event::Message {
peer,
message:
Message::Request {
request_id: _,
request,
channel,
},
message: Message::Request {
request, channel, ..
},
..
} => Some(handle_request(
our_node_info,
behaviour,
Expand All @@ -89,25 +87,14 @@ pub fn handle_event(
)),
Event::Message {
peer,
message:
Message::Response {
request_id: _,
response,
},
message: Message::Response { response, .. },
..
} => Some(handle_response(our_node_info, peer, response)),
Event::OutboundFailure {
peer,
request_id: _,
error,
} => Some(Err(Failed {
Event::OutboundFailure { peer, error, .. } => Some(Err(Failed {
peer_id: peer,
error: Box::new(Error::Outbound(error)),
})),
Event::InboundFailure {
peer,
request_id: _,
error,
} => Some(Err(Failed {
Event::InboundFailure { peer, error, .. } => Some(Err(Failed {
peer_id: peer,
error: Box::new(Error::Inbound(error)),
})),
Expand Down
2 changes: 2 additions & 0 deletions anchor/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod discovery;
mod handshake;
mod keypair_utils;
mod network;
mod peer_manager;
mod transport;
pub use config::Config;
pub use lighthouse_network::{ListenAddr, ListenAddress};
Expand All @@ -14,3 +15,4 @@ pub use network::Network;
pub type Enr = discv5::enr::Enr<discv5::enr::CombinedKey>;

pub const SUBNET_COUNT: usize = 128;
type SubnetBits = [u8; SUBNET_COUNT / 8];
104 changes: 41 additions & 63 deletions anchor/network/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::num::{NonZeroU8, NonZeroUsize};
use std::pin::Pin;
use std::time::Duration;
use std::time::{Duration, Instant};

use futures::StreamExt;
use libp2p::core::muxing::StreamMuxerBox;
Expand All @@ -15,21 +16,21 @@ use libp2p::{
};
use lighthouse_network::discovery::DiscoveredPeers;
use lighthouse_network::discv5::enr::k256::sha2::{Digest, Sha256};
use lighthouse_network::EnrExt;
use ssv_types::message::SignedSSVMessage;
use ssz::Decode;
use subnet_tracker::{SubnetEvent, SubnetId};
use task_executor::TaskExecutor;
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info};

use crate::behaviour::AnchorBehaviour;
use crate::behaviour::AnchorBehaviourEvent;
use crate::discovery::{Discovery, DiscoveryError, FIND_NODE_QUERY_CLOSEST_PEERS};
use crate::handshake::node_info::{NodeInfo, NodeMetadata};
use crate::keypair_utils::load_private_key;
use crate::peer_manager::{PeerManager, SubnetConnectActions};
use crate::transport::build_transport;
use crate::{handshake, Config};
use crate::{handshake, Config, Enr};

use crate::network::NetworkError::{Gossipsub, SwarmConfig};
use thiserror::Error;
Expand Down Expand Up @@ -135,17 +136,6 @@ impl Network {

/// Main loop for polling and handling swarm and channels.
pub async fn run(mut self) {
let topic = IdentTopic::new("ssv.v2.9");

match self.swarm.behaviour_mut().gossipsub.subscribe(&topic) {
Err(e) => {
warn!(topic = %topic, "error" = ?e, "Failed to subscribe to topic");
}
Ok(_) => {
debug!(topic = %topic, "Subscribed to topic");
}
}

loop {
tokio::select! {
swarm_message = self.swarm.select_next_some() => {
Expand Down Expand Up @@ -179,21 +169,8 @@ impl Network {
}
// TODO handle gossipsub events
},
// Inform the peer manager about discovered peers.
//
// The peer manager will subsequently decide which peers need to be dialed and then dial
// them.
AnchorBehaviourEvent::Discovery(DiscoveredPeers { peers }) => {
//self.peer_manager_mut().peers_discovered(peers);
debug!(peers = ?peers, "Peers discovered");
for (enr, _) in peers {
for tcp in enr.multiaddr_tcp() {
trace!(address = ?tcp, "Dialing peer");
if let Err(e) = self.swarm.dial(tcp.clone()) {
error!(address = ?tcp, error = ?e, "Error dialing peer");
}
}
}
self.on_discovered_peers(peers);
}
AnchorBehaviourEvent::Handshake(event) => {
if let Some(result) = handshake::handle_event(
Expand Down Expand Up @@ -240,6 +217,19 @@ impl Network {
}
}

fn on_discovered_peers(&mut self, peers: HashMap<Enr, Option<Instant>>) {
debug!(peers = ?peers, "Peers discovered");
let manager = self.peer_manager();
// need to collect to avoid double borrow
let to_dial = peers
.into_iter()
.filter_map(|(enr, _)| manager.discovered_peer(enr))
.collect::<Vec<_>>();
for dial in to_dial {
let _ = self.swarm.dial(dial);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event only allows us to dial a single peer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it creates an event for each peer. But honestly, I don't know the reason. Maybe decoupling?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jxs: Is there a relevant difference between these approaches to dial?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also encapsulate this code inside the Peer Manager. Similar to https://github.com/sigp/lighthouse/blob/stable/beacon_node/lighthouse_network/src/service/mod.rs#L1845

Copy link
Member

@jxs jxs Feb 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, missed this!

@jxs: Is there a relevant difference between these approaches to dial?

no, one calls the other underneath :)

}
}

fn on_subnet_tracker_event(&mut self, event: SubnetEvent) {
match event {
SubnetEvent::Join(subnet) => {
Expand All @@ -251,24 +241,31 @@ impl Network {
{
error!(?err, subnet = *subnet, "can't subscribe");
}
self.swarm
.behaviour_mut()
.discovery
.start_subnet_query(vec![subnet]);
let SubnetConnectActions { dial, discover } =
self.peer_manager().join_subnet(subnet);
for peer in dial {
let _ = self.swarm.dial(peer);
}
if discover {
self.swarm
.behaviour_mut()
.discovery
.start_subnet_query(vec![subnet]);
}
}
SubnetEvent::Leave(subnet) => {
if let Err(err) = self
.swarm
self.swarm
.behaviour_mut()
.gossipsub
.unsubscribe(&subnet_to_topic(subnet))
{
error!(?err, subnet = *subnet, "can't unsubscribe");
}
.unsubscribe(&subnet_to_topic(subnet));
}
}
}

fn peer_manager(&mut self) -> &mut PeerManager {
&mut self.swarm.behaviour_mut().peer_manager
}

fn handle_handshake_result(&mut self, result: Result<handshake::Completed, handshake::Failed>) {
match result {
Ok(handshake::Completed {
Expand All @@ -286,7 +283,7 @@ impl Network {
}

fn subnet_to_topic(subnet: SubnetId) -> IdentTopic {
IdentTopic::new(format!("ssv.{}", *subnet))
IdentTopic::new(format!("ssv.v2.{}", *subnet))
}

async fn build_anchor_behaviour(
Expand Down Expand Up @@ -339,13 +336,16 @@ async fn build_anchor_behaviour(
discovery
};

let handshake = handshake::create_behaviour(local_keypair.clone());
let peer_manager = PeerManager::new(network_config);

let handshake = handshake::create_behaviour(local_keypair);

Ok(AnchorBehaviour {
identify,
ping: ping::Behaviour::default(),
gossipsub,
discovery,
peer_manager,
handshake,
})
}
Expand All @@ -370,28 +370,6 @@ fn build_swarm(
let dial_concurrency_factor = NonZeroU8::new(1)
.ok_or_else(|| SwarmConfig("dial_concurrency_factor cannot be 0".to_string()))?;

// TODO: revisit once peer manager is integrated
// let connection_limits = {
// let limits = libp2p::connection_limits::ConnectionLimits::default()
// .with_max_pending_incoming(Some(5))
// .with_max_pending_outgoing(Some(16))
// .with_max_established_incoming(Some(
// (config.target_peers as f32
// * (1.0 + PEER_EXCESS_FACTOR - MIN_OUTBOUND_ONLY_FACTOR))
// .ceil() as u32,
// ))
// .with_max_established_outgoing(Some(
// (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR)).ceil() as u32,
// ))
// .with_max_established(Some(
// (config.target_peers as f32 * (1.0 + PEER_EXCESS_FACTOR + PRIORITY_PEER_EXCESS))
// .ceil() as u32,
// ))
// .with_max_established_per_peer(Some(1));
//
// libp2p::connection_limits::Behaviour::new(limits)
// };

let swarm_config = libp2p::swarm::Config::with_executor(Executor(executor))
.with_notify_handler_buffer_size(notify_handler_buffer_size)
.with_per_connection_event_buffer_size(4)
Expand Down
Loading