Skip to content

Commit

Permalink
refactor behaviour to reduce the amount of code
Browse files Browse the repository at this point in the history
namely in the poll function.
  • Loading branch information
jxs committed Jul 25, 2023
1 parent e24cc16 commit b0f8f7a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 107 deletions.
228 changes: 124 additions & 104 deletions protocols/upnp/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use std::{
error::Error,
hash::{Hash, Hasher},
marker::PhantomData,
net::SocketAddrV4,
net::{Ipv4Addr, SocketAddrV4},
ops::{Deref, DerefMut},
pin::Pin,
task::{Context, Poll},
time::Duration,
Expand All @@ -53,7 +54,10 @@ const MAPPING_TIMEOUT: u64 = MAPPING_DURATION as u64 / 2;
/// A [`Gateway`] Request.
#[derive(Debug)]
pub(crate) enum GatewayRequest {
AddMapping(Mapping, Option<u32>),
AddMapping {
mapping: Mapping,
duration: Option<u32>,
},
RemoveMapping(Mapping),
}

Expand All @@ -66,7 +70,7 @@ pub(crate) enum GatewayEvent {
MapFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
/// Port was successfully removed.
Removed(Mapping),
/// There was a failure removing the mapping port.
/// There was a failure removing the mapped port.
RemovalFailure(Mapping, Box<dyn Error + Send + Sync + 'static>),
}

Expand All @@ -79,6 +83,16 @@ pub(crate) struct Mapping {
pub(crate) internal_addr: SocketAddrV4,
}

impl Mapping {
/// Given the input gateway address, calculate the
/// open external `Multiaddr`.
fn external_addr(&self, gateway_addr: Ipv4Addr) -> Multiaddr {
self.multiaddr
.replace(0, |_| Some(multiaddr::Protocol::Ip4(gateway_addr)))
.expect("multiaddr should be valid")
}
}

impl Hash for Mapping {
fn hash<H: Hasher>(&self, state: &mut H) {
self.listener_id.hash(state);
Expand All @@ -100,6 +114,7 @@ impl Borrow<ListenerId> for Mapping {
}

/// Current state of a [`Mapping`].
#[derive(Debug)]
enum MappingState {
/// Port mapping is inactive, will be requested or re-requested on the next iteration.
Inactive,
Expand Down Expand Up @@ -131,8 +146,67 @@ pub enum Event {
GatewayNotFound,
}

/// A `NetworkBehaviour` for UPnP port mapping. Automatically tries to map the external port
/// to an internal address on the gateway on a `FromSwarm::NewListenAddr`.
/// A list of port mappings and its state.
#[derive(Debug, Default)]
struct MappingList(HashMap<Mapping, MappingState>);

impl Deref for MappingList {
type Target = HashMap<Mapping, MappingState>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl DerefMut for MappingList {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}

impl MappingList {
/// Queue for renewal the current mapped ports on the `Gateway` that are expiring,
/// and try to activate the inactive.
fn renew(&mut self, config: &Config, gateway: &mut Gateway, cx: &mut Context<'_>) {
for (mapping, state) in self.iter_mut() {
match state {
MappingState::Inactive | MappingState::Failed => {
let duration = config.temporary.then_some(MAPPING_DURATION);
if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
mapping: mapping.clone(),
duration,
}) {
log::debug!(
"could not request port mapping for {} on the gateway: {}",
mapping.multiaddr,
err
);
}
*state = MappingState::Pending;
}
MappingState::Active(timeout) => {
if Pin::new(timeout).poll(cx).is_ready() {
let duration = config.temporary.then_some(MAPPING_DURATION);
if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
mapping: mapping.clone(),
duration,
}) {
log::debug!(
"could not request port mapping for {} on the gateway: {}",
mapping.multiaddr,
err
);
}
}
}
MappingState::Pending | MappingState::Permanent => {}
}
}
}
}

/// A [`NetworkBehaviour`] for UPnP port mapping. Automatically tries to map the external port
/// to an internal address on the gateway on a [`FromSwarm::NewListenAddr`].
pub struct Behaviour<P>
where
P: Provider,
Expand All @@ -143,7 +217,7 @@ where
state: GatewayState,

/// List of port mappings.
mappings: HashMap<Mapping, MappingState>,
mappings: MappingList,

/// Pending behaviour events to be emitted.
pending_events: VecDeque<Event>,
Expand Down Expand Up @@ -213,7 +287,7 @@ where
}) => {
let (addr, protocol) = match multiaddr_to_socketaddr_protocol(multiaddr.clone()) {
Ok(addr_port) => addr_port,
Err(_) => {
Err(()) => {
log::debug!("multiaddress not supported for UPnP {multiaddr}");
return;
}
Expand Down Expand Up @@ -251,10 +325,10 @@ where
};

let duration = self.config.temporary.then_some(MAPPING_DURATION);
if let Err(err) = gateway
.sender
.try_send(GatewayRequest::AddMapping(mapping.clone(), duration))
{
if let Err(err) = gateway.sender.try_send(GatewayRequest::AddMapping {
mapping: mapping.clone(),
duration,
}) {
log::debug!(
"could not request port mapping for {} on the gateway: {}",
mapping.multiaddr,
Expand Down Expand Up @@ -319,13 +393,14 @@ where
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, libp2p_swarm::THandlerInEvent<Self>>> {
loop {
// If there are pending addresses to be emitted we emit them first.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}
// If there are pending addresses to be emitted we emit them.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ToSwarm::GenerateEvent(event));
}

// We then check the `Gateway` current state.
// Loop through the gateway state so that if it changes from `Searching` to `Available`
// we poll the pending mapping requests.
loop {
match self.state {
GatewayState::Searching(ref mut fut) => match Pin::new(fut).poll(cx) {
Poll::Ready(result) => match result {
Expand All @@ -341,81 +416,62 @@ where
Poll::Pending => return Poll::Pending,
},
GatewayState::Available(ref mut gateway) => {
// Check pending mappings.
// Poll pending mapping requests.
if let Poll::Ready(Some(result)) = gateway.receiver.poll_next_unpin(cx) {
match result {
GatewayEvent::Mapped(mapping) => {
let state = self
let new_state = if self.config.temporary {
MappingState::Active(Delay::new(Duration::from_secs(
MAPPING_TIMEOUT,
)))
} else {
MappingState::Permanent
};

match self
.mappings
.get_mut(&mapping)
.expect("mapping should exist");
match state {
.insert(mapping.clone(), new_state)
.expect("mapping should exist")
{
MappingState::Pending => {
let external_multiaddr =
mapping.external_addr(gateway.addr);
self.pending_events.push_back(Event::NewExternalAddr(
external_multiaddr.clone(),
));
log::debug!(
"succcessfuly UPnP mapped {} for {} protocol",
"succcessfully mapped UPnP {} for {} protocol",
mapping.internal_addr,
mapping.protocol
);
let external_multiaddr = mapping
.multiaddr
.replace(0, |_| {
Some(multiaddr::Protocol::Ip4(gateway.addr))
})
.expect("multiaddr should be valid");
*state = if self.config.temporary {
MappingState::Permanent
} else {
MappingState::Active(Delay::new(Duration::from_secs(
MAPPING_TIMEOUT,
)))
};

self.pending_events.push_back(Event::NewExternalAddr(
external_multiaddr.clone(),
));
return Poll::Ready(ToSwarm::ExternalAddrConfirmed(
external_multiaddr,
));
}
MappingState::Active(_) => {
*state = MappingState::Active(Delay::new(
Duration::from_secs(MAPPING_TIMEOUT),
));

log::debug!(
"succcessfuly remapped UPnP {} for {} protocol",
"succcessfully renewed UPnP mapping {} for {} protocol",
mapping.internal_addr,
mapping.protocol
);
}
MappingState::Inactive
| MappingState::Permanent
| MappingState::Failed => {
unreachable!()
}
_ => unreachable!(),
}
}
GatewayEvent::MapFailure(mapping, err) => {
let state = self
match self
.mappings
.get_mut(&mapping)
.expect("mapping should exist");

match state {
.insert(mapping.clone(), MappingState::Failed)
.expect("mapping should exist")
{
MappingState::Active(_) => {
log::debug!(
"failed to remap UPnP mapped {} for {} protocol: {err}",
mapping.internal_addr,
mapping.protocol
);
*state = MappingState::Failed;
let external_multiaddr = mapping
.multiaddr
.replace(0, |_| {
Some(multiaddr::Protocol::Ip4(gateway.addr))
})
.expect("multiaddr should be valid");

let external_multiaddr =
mapping.external_addr(gateway.addr);
self.pending_events.push_back(Event::ExpiredExternalAddr(
external_multiaddr.clone(),
));
Expand All @@ -429,18 +485,15 @@ where
mapping.internal_addr,
mapping.protocol
);
*state = MappingState::Failed;
}
MappingState::Inactive
| MappingState::Permanent
| MappingState::Failed => {
_ => {
unreachable!()
}
}
}
GatewayEvent::Removed(mapping) => {
log::debug!(
"succcessfuly removed UPnP mapping {} for {} protocol",
"succcessfully removed UPnP mapping {} for {} protocol",
mapping.internal_addr,
mapping.protocol
);
Expand Down Expand Up @@ -469,40 +522,7 @@ where
}

// Renew expired and request inactive mappings.
for (mapping, state) in self.mappings.iter_mut() {
match state {
MappingState::Inactive | MappingState::Failed => {
let duration = self.config.temporary.then_some(MAPPING_DURATION);
if let Err(err) = gateway
.sender
.try_send(GatewayRequest::AddMapping(mapping.clone(), duration))
{
log::debug!(
"could not request port mapping for {} on the gateway: {}",
mapping.multiaddr,
err
);
}
*state = MappingState::Pending;
}
MappingState::Active(timeout) => {
if Pin::new(timeout).poll(cx).is_ready() {
let duration =
self.config.temporary.then_some(MAPPING_DURATION);
if let Err(err) = gateway.sender.try_send(
GatewayRequest::AddMapping(mapping.clone(), duration),
) {
log::debug!(
"could not request port mapping for {} on the gateway: {}",
mapping.multiaddr,
err
);
}
}
}
MappingState::Pending | MappingState::Permanent => {}
}
}
self.mappings.renew(&mut self.config, gateway, cx);
return Poll::Pending;
}
GatewayState::GatewayNotFound => {
Expand All @@ -513,9 +533,9 @@ where
}
}

/// Extracts a `SocketAddr` and `Protocol` from a given `Multiaddr`.
/// Extracts a [`SocketAddrV4`] and [`Protocol`] from a given [`Multiaddr`].
///
/// Fails if the given `Multiaddr` does not begin with an IP
/// Fails if the given [`Multiaddr`] does not begin with an IP
/// protocol encapsulating a TCP or UDP port.
fn multiaddr_to_socketaddr_protocol(mut addr: Multiaddr) -> Result<(SocketAddrV4, Protocol), ()> {
let mut port = None;
Expand Down
4 changes: 2 additions & 2 deletions protocols/upnp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! Implementation of UPnP port mapping for libp2p.
//!
//! This crate provides a `tokio::Behaviour` and `async_std::Behaviour`, depending on the enabled features, which
//! implements the `NetworkBehaviour` trait. This struct will automatically try to map the ports externally to internal
//! implements the [`NetworkBehaviour`] trait. This struct will automatically try to map the ports externally to internal
//! addresses on the gateway.
//!

Expand Down Expand Up @@ -64,7 +64,7 @@ impl Config {
}
}

/// Configures if the port mappings be temporary (1 hour) or permanent.
/// Configures the port mappings to be temporary (1 hour) or permanent.
pub fn temporary(self, temporary: bool) -> Self {
Self { temporary, ..self }
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/upnp/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ macro_rules! impl_provider {
select! {
req = task_receiver.select_next_some() => {
let fut = match req {
GatewayRequest::AddMapping(mapping, duration) => {
GatewayRequest::AddMapping{ mapping, duration } => {
let duration = duration.unwrap_or(0);
let gateway = gateway.clone();
async move {
Expand Down

0 comments on commit b0f8f7a

Please sign in to comment.