Skip to content

Commit

Permalink
update provider to handle tasks sequentially instead of concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
jxs committed Jul 25, 2023
1 parent b0f8f7a commit 4a90a78
Showing 1 changed file with 41 additions and 46 deletions.
87 changes: 41 additions & 46 deletions protocols/upnp/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ pub trait Provider {

macro_rules! impl_provider {
($impl:ident, $executor: ident, $gateway:ident, $protocol: ident, $options: ident) => {
use crate::behaviour::{GatewayRequest, GatewayEvent};
use super::{Gateway, Config, Protocol};
use super::{Config, Gateway, Protocol};
use crate::behaviour::{GatewayEvent, GatewayRequest};

use async_trait::async_trait;
use futures::{channel::mpsc, SinkExt, StreamExt};
use std::error::Error;
use futures::{channel::mpsc, select, stream::FuturesUnordered, FutureExt, StreamExt, SinkExt};

impl From<Protocol> for $protocol {
fn from(protocol: Protocol) -> Self {
Expand All @@ -86,54 +86,49 @@ macro_rules! impl_provider {
let gateway = $gateway::search_gateway(options).await?;
let addr = gateway.get_external_ip().await?;

let (events_sender, mut task_receiver) = mpsc::channel(5);
let (events_sender, mut task_receiver) = mpsc::channel(10);
let (mut task_sender, events_queue) = mpsc::channel(0);

$executor::spawn(async move {
let mut futs = FuturesUnordered::new();
loop {
select! {
req = task_receiver.select_next_some() => {
let fut = match req {
GatewayRequest::AddMapping{ mapping, duration } => {
let duration = duration.unwrap_or(0);
let gateway = gateway.clone();
async move {
match gateway
.add_port(
mapping.protocol.into(),
mapping.internal_addr.port(),
mapping.internal_addr,
duration,
"rust-libp2p mapping",
).await {
Ok(()) => GatewayEvent::Mapped(mapping),
Err(err) => GatewayEvent::MapFailure(mapping, err.into()),
}
}.boxed()
}
GatewayRequest::RemoveMapping(mapping) => {
let gateway = gateway.clone();
async move {
match gateway
.remove_port(
mapping.protocol.into(),
mapping.internal_addr.port()
).await {
Ok(()) => GatewayEvent::Removed(mapping),
Err(err) => GatewayEvent::RemovalFailure(mapping, err.into()),
}
}.boxed()
}

};
futs.push(fut);
},
event = futs.select_next_some() => {
task_sender.send(event).await.expect("receiver should be available");
// The task sender has dropped so we can return.
let Some(req) = task_receiver.next().await else { return; };
let event = match req {
GatewayRequest::AddMapping { mapping, duration } => {
let duration = duration.unwrap_or(0);
let gateway = gateway.clone();
match gateway
.add_port(
mapping.protocol.into(),
mapping.internal_addr.port(),
mapping.internal_addr,
duration,
"rust-libp2p mapping",
)
.await
{
Ok(()) => GatewayEvent::Mapped(mapping),
Err(err) => GatewayEvent::MapFailure(mapping, err.into()),
}
}
complete => break,
}
GatewayRequest::RemoveMapping(mapping) => {
let gateway = gateway.clone();
match gateway
.remove_port(
mapping.protocol.into(),
mapping.internal_addr.port(),
)
.await
{
Ok(()) => GatewayEvent::Removed(mapping),
Err(err) => GatewayEvent::RemovalFailure(mapping, err.into()),
}
}
};
task_sender
.send(event)
.await
.expect("receiver should be available");
}
});

Expand Down

0 comments on commit 4a90a78

Please sign in to comment.