Skip to content

Commit

Permalink
fix(relay): relayed external address not expiring when stopping to li…
Browse files Browse the repository at this point in the history
…sten through relay
  • Loading branch information
stormshield-frb committed Aug 29, 2024
1 parent cefd22b commit eeefabb
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 120 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ libp2p-ping = { version = "0.45.0", path = "protocols/ping" }
libp2p-plaintext = { version = "0.42.0", path = "transports/plaintext" }
libp2p-pnet = { version = "0.25.0", path = "transports/pnet" }
libp2p-quic = { version = "0.11.1", path = "transports/quic" }
libp2p-relay = { version = "0.18.0", path = "protocols/relay" }
libp2p-relay = { version = "0.18.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.15.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.27.0", path = "protocols/request-response" }
libp2p-server = { version = "0.12.7", path = "misc/server" }
Expand Down
6 changes: 3 additions & 3 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ async fn connect() {

let dst_relayed_addr = relay_tcp_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(dst_peer_id));
.with(Protocol::P2pCircuit);
dst.listen_on(dst_relayed_addr.clone()).unwrap();

wait_for_reservation(
Expand All @@ -70,7 +69,8 @@ async fn connect() {
.await;
async_std::task::spawn(dst.loop_on_next());

src.dial_and_wait(dst_relayed_addr.clone()).await;
src.dial_and_wait(dst_relayed_addr.with(Protocol::P2p(dst_peer_id)))
.await;

let dst_addr = dst_tcp_addr.with(Protocol::P2p(dst_peer_id));

Expand Down
6 changes: 6 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.18.1

- Fix relayed `ExternalAddr` not expiring when stopping to listen through relay.
Removing `/p2p/<local_peer_id>` part from the listen and external addresses reported by the relay `Behaviour`.
See [PR 5577](https://github.com/libp2p/rust-libp2p/pull/5577).

## 0.18.0

<!-- Update to libp2p-swarm v0.45.0 -->
Expand Down
2 changes: 1 addition & 1 deletion protocols/relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-relay"
edition = "2021"
rust-version = { workspace = true }
description = "Communications relaying for libp2p"
version = "0.18.0"
version = "0.18.1"
authors = ["Parity Technologies <[email protected]>", "Max Inden <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
84 changes: 21 additions & 63 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ use futures::future::{BoxFuture, FutureExt};
use futures::io::{AsyncRead, AsyncWrite};
use futures::ready;
use futures::stream::StreamExt;
use libp2p_core::multiaddr::Protocol;
use libp2p_core::transport::PortUse;
use libp2p_core::{Endpoint, Multiaddr};
use libp2p_identity::PeerId;
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm};
use libp2p_swarm::dial_opts::DialOpts;
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, ExpiredListenAddr,
NetworkBehaviour, NewListenAddr, NotifyHandler, Stream, THandler, THandlerInEvent,
THandlerOutEvent, ToSwarm,
};
use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, ErrorKind, IoSlice};
Expand Down Expand Up @@ -71,12 +71,6 @@ pub enum Event {
},
}

#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum ReservationStatus {
Pending,
Confirmed,
}

/// [`NetworkBehaviour`] implementation of the relay client
/// functionality of the circuit relay v2 protocol.
pub struct Behaviour {
Expand All @@ -87,11 +81,6 @@ pub struct Behaviour {
/// connection.
directly_connected_peers: HashMap<PeerId, Vec<ConnectionId>>,

/// Stores the address of a pending or confirmed reservation.
///
/// This is indexed by the [`ConnectionId`] to a relay server and the address is the `/p2p-circuit` address we reserved on it.
reservation_addresses: HashMap<ConnectionId, (Multiaddr, ReservationStatus)>,

/// Queue of actions to return when polled.
queued_actions: VecDeque<ToSwarm<Event, Either<handler::In, Void>>>,

Expand All @@ -105,7 +94,6 @@ pub fn new(local_peer_id: PeerId) -> (Transport, Behaviour) {
local_peer_id,
from_transport,
directly_connected_peers: Default::default(),
reservation_addresses: Default::default(),
queued_actions: Default::default(),
pending_handler_commands: Default::default(),
};
Expand Down Expand Up @@ -140,12 +128,6 @@ impl Behaviour {
unreachable!("`on_connection_closed` for unconnected peer.")
}
};
if let Some((addr, ReservationStatus::Confirmed)) =
self.reservation_addresses.remove(&connection_id)
{
self.queued_actions
.push_back(ToSwarm::ExternalAddrExpired(addr));
}
}
}
}
Expand Down Expand Up @@ -220,8 +202,19 @@ impl NetworkBehaviour for Behaviour {
FromSwarm::ConnectionClosed(connection_closed) => {
self.on_connection_closed(connection_closed)
}
FromSwarm::NewListenAddr(NewListenAddr { addr, .. }) => {
if addr.is_relayed() {
self.queued_actions
.push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
}
}
FromSwarm::ExpiredListenAddr(ExpiredListenAddr { addr, .. }) => {
if addr.is_relayed() {
self.queued_actions
.push_back(ToSwarm::ExternalAddrExpired(addr.clone()));
}
}
FromSwarm::DialFailure(DialFailure { connection_id, .. }) => {
self.reservation_addresses.remove(&connection_id);
self.pending_handler_commands.remove(&connection_id);
}
_ => {}
Expand All @@ -231,7 +224,7 @@ impl NetworkBehaviour for Behaviour {
fn on_connection_handler_event(
&mut self,
event_source: PeerId,
connection: ConnectionId,
_: ConnectionId,
handler_event: THandlerOutEvent<Self>,
) {
let handler_event = match handler_event {
Expand All @@ -241,17 +234,6 @@ impl NetworkBehaviour for Behaviour {

let event = match handler_event {
handler::Event::ReservationReqAccepted { renewal, limit } => {
let (addr, status) = self
.reservation_addresses
.get_mut(&connection)
.expect("Relay connection exist");

if !renewal && *status == ReservationStatus::Pending {
*status = ReservationStatus::Confirmed;
self.queued_actions
.push_back(ToSwarm::ExternalAddrConfirmed(addr.clone()));
}

Event::ReservationReqAccepted {
relay_peer_id: event_source,
renewal,
Expand Down Expand Up @@ -292,42 +274,18 @@ impl NetworkBehaviour for Behaviour {
.get(&relay_peer_id)
.and_then(|cs| cs.first())
{
Some(connection_id) => {
self.reservation_addresses.insert(
*connection_id,
(
relay_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(self.local_peer_id)),
ReservationStatus::Pending,
),
);

ToSwarm::NotifyHandler {
peer_id: relay_peer_id,
handler: NotifyHandler::One(*connection_id),
event: Either::Left(handler::In::Reserve { to_listener }),
}
}
Some(connection_id) => ToSwarm::NotifyHandler {
peer_id: relay_peer_id,
handler: NotifyHandler::One(*connection_id),
event: Either::Left(handler::In::Reserve { to_listener }),
},
None => {
let opts = DialOpts::peer_id(relay_peer_id)
.addresses(vec![relay_addr.clone()])
.extend_addresses_through_behaviour()
.build();
let relayed_connection_id = opts.connection_id();

self.reservation_addresses.insert(
relayed_connection_id,
(
relay_addr
.with(Protocol::P2p(relay_peer_id))
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(self.local_peer_id)),
ReservationStatus::Pending,
),
);

self.pending_handler_commands
.insert(relayed_connection_id, handler::In::Reserve { to_listener });
ToSwarm::Dial { opts }
Expand Down
48 changes: 41 additions & 7 deletions protocols/relay/src/priv_client/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::stream::{Stream, StreamExt};
use libp2p_core::multiaddr::{Multiaddr, Protocol};
use libp2p_core::transport::{DialOpts, ListenerId, TransportError, TransportEvent};
use libp2p_identity::PeerId;
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use thiserror::Error;
Expand Down Expand Up @@ -154,6 +154,7 @@ impl libp2p_core::Transport for Transport {
from_behaviour,
is_closed: false,
waker: None,
listened_addrs: Default::default(),
};
self.listeners.push(listener);
Ok(())
Expand Down Expand Up @@ -311,6 +312,8 @@ pub(crate) struct Listener {
/// the sender side of the `from_behaviour` channel is dropped.
is_closed: bool,
waker: Option<Waker>,
/// Addresses listened through relay
listened_addrs: HashSet<Multiaddr>,
}

impl Listener {
Expand Down Expand Up @@ -368,14 +371,45 @@ impl Stream for Listener {
self.queued_events.is_empty(),
"Assert empty due to previous `pop_front` attempt."
);
// Returned as [`ListenerEvent::NewAddress`] in next iteration of loop.
self.queued_events = addrs

let reserved_addresses = addrs
.into_iter()
.map(|listen_addr| TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr,
.map(|mut addr| {
// Every transport (tcp / quic / etc) gives addresses without the last
// p2p part, so pop it if present.
if matches!(addr.iter().last(), Some(Protocol::P2p(_))) {
addr.pop();
}
addr
})
.collect();
.collect::<HashSet<_>>();

let expired_addresses = self
.listened_addrs
.difference(&reserved_addresses)
.map(Clone::clone)
.collect::<Vec<_>>();
let new_addresses = reserved_addresses
.difference(&self.listened_addrs)
.map(Clone::clone)
.collect::<Vec<_>>();
let listener_id = self.listener_id;

for listen_addr in expired_addresses {
self.listened_addrs.remove(&listen_addr);
self.queued_events
.push_back(TransportEvent::AddressExpired {
listener_id,
listen_addr,
});
}
for listen_addr in new_addresses {
self.listened_addrs.insert(listen_addr.clone());
self.queued_events.push_back(TransportEvent::NewAddress {
listener_id,
listen_addr,
});
}
}
ToListenerMsg::IncomingRelayedConnection {
stream,
Expand Down
Loading

0 comments on commit eeefabb

Please sign in to comment.