Skip to content

Commit

Permalink
p2p: check for the endpoints before listen/connect to them
Browse files Browse the repository at this point in the history
  • Loading branch information
hozan23 committed Jul 16, 2024
1 parent 6795c2a commit e7b4296
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 11 deletions.
3 changes: 1 addition & 2 deletions p2p/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ impl Connection {
pub async fn recv<P: Protocol>(&self) -> Result<ProtocolEvent> {
match self.listeners.get(&P::id()) {
Some(l) => l.recv().await.map_err(Error::from),
// TODO
None => todo!(),
None => Err(Error::UnsupportedProtocol(P::id())),
}
}

Expand Down
8 changes: 8 additions & 0 deletions p2p/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ impl Connector {

async fn dial(&self, endpoint: &Endpoint, peer_id: &Option<PeerID>) -> Result<Conn<NetMsg>> {
if self.enable_tls {
if !endpoint.is_tcp() && !endpoint.is_tls() {
return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
}

let tls_config = tls::ClientTlsConfig {
tcp_config: Default::default(),
client_config: tls_client_config(&self.key_pair, peer_id.clone())?,
Expand All @@ -157,6 +161,10 @@ impl Connector {
.await
.map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
} else {
if !endpoint.is_tcp() {
return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
}

tcp::dial(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
.map(|l| Box::new(l) as karyon_net::Conn<NetMsg>)
Expand Down
27 changes: 19 additions & 8 deletions p2p/src/discovery/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct LookupService {
/// Resolved listen endpoint
listen_endpoint: RwLock<Option<Endpoint>>,

/// Resolved discovery endpoint
discovery_endpoint: RwLock<Option<Endpoint>>,

/// Holds the configuration for the P2P network.
config: Arc<Config>,

Expand All @@ -52,7 +55,6 @@ impl LookupService {
/// Creates a new lookup service
pub fn new(
key_pair: &KeyPair,
id: &PeerID,
table: Arc<RoutingTable>,
config: Arc<Config>,
monitor: Arc<Monitor>,
Expand All @@ -78,13 +80,18 @@ impl LookupService {
ex,
);

let id = key_pair
.public()
.try_into()
.expect("Get PeerID from KeyPair");
Self {
id: id.clone(),
id,
table,
listener,
connector,
outbound_slots,
listen_endpoint: RwLock::new(None),
discovery_endpoint: RwLock::new(None),
config,
monitor,
}
Expand All @@ -98,19 +105,23 @@ impl LookupService {

/// Set the resolved listen endpoint.
pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> {
let resolved_endpoint = Endpoint::Tcp(
let discovery_endpoint = Endpoint::Tcp(
resolved_endpoint.addr()?.clone(),
self.config.discovery_port,
);
*self.listen_endpoint.write() = Some(resolved_endpoint);
*self.listen_endpoint.write() = Some(resolved_endpoint.clone());
*self.discovery_endpoint.write() = Some(discovery_endpoint.clone());
Ok(())
}

/// Get the listening endpoint.
pub fn listen_endpoint(&self) -> Option<Endpoint> {
self.listen_endpoint.read().clone()
}

pub fn discovery_endpoint(&self) -> Option<Endpoint> {
self.discovery_endpoint.read().clone()
}

/// Shuts down the lookup service.
pub async fn shutdown(&self) {
self.connector.shutdown().await;
Expand Down Expand Up @@ -278,7 +289,7 @@ impl LookupService {

trace!("Send Peer msg");
if let Some(endpoint) = self.listen_endpoint() {
self.send_peer_msg(&conn, endpoint.clone()).await?;
self.send_peer_msg(&conn, endpoint).await?;
}

trace!("Send Shutdown msg");
Expand All @@ -289,8 +300,8 @@ impl LookupService {

/// Start a listener.
async fn start_listener(self: &Arc<Self>) -> Result<()> {
let endpoint: Endpoint = match self.listen_endpoint() {
Some(e) => e.clone(),
let endpoint: Endpoint = match self.discovery_endpoint() {
Some(e) => e,
None => return Ok(()),
};

Expand Down
1 change: 0 additions & 1 deletion p2p/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl Discovery {

let lookup_service = Arc::new(LookupService::new(
key_pair,
peer_id,
table.clone(),
config.clone(),
monitor.clone(),
Expand Down
3 changes: 3 additions & 0 deletions p2p/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ pub enum Error {
#[error("Unsupported protocol error: {0}")]
UnsupportedProtocol(String),

#[error("Unsupported Endpoint: {0}")]
UnsupportedEndpoint(String),

#[error("PeerID try from PublicKey Error")]
PeerIDTryFromPublicKey,

Expand Down
8 changes: 8 additions & 0 deletions p2p/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ impl Listener {

async fn listen(&self, endpoint: &Endpoint) -> Result<karyon_net::Listener<NetMsg>> {
if self.enable_tls {
if !endpoint.is_tcp() && !endpoint.is_tls() {
return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
}

let tls_config = tls::ServerTlsConfig {
tcp_config: Default::default(),
server_config: tls_server_config(&self.key_pair)?,
Expand All @@ -165,6 +169,10 @@ impl Listener {
.await
.map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)
} else {
if !endpoint.is_tcp() {
return Err(Error::UnsupportedEndpoint(endpoint.to_string()));
}

tcp::listen(endpoint, tcp::TcpConfig::default(), NetMsgCodec::new())
.await
.map(|l| Box::new(l) as karyon_net::Listener<NetMsg>)
Expand Down

0 comments on commit e7b4296

Please sign in to comment.