Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use AsyncWaitGroup instead of JoinHandles #43

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ rustdoc-args = ["--cfg", "docsrs"]

[workspace.dependencies]
auto_impl = "1"
atomic_refcell = "0.1"
agnostic-lite = { version = "0.3", features = ["time"] }
agnostic = "0.3.5"
async-lock = "3"
Expand Down Expand Up @@ -57,6 +56,7 @@ transformable = { version = "0.1.6", features = ["smol_str", "bytes"] }
thiserror = "1"
tracing = "0.1"
viewit = "0.1.5"
wg = { version = "0.9", default-features = false, features = ["future", "std", "triomphe"] }

memberlist-core = { version = "0.2", path = "core", default-features = false }
memberlist-net = { version = "0.2", path = "transports/net", default-features = false }
Expand Down
4 changes: 2 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ once_cell = "1.17"
rustix = { version = "0.38", features = ["system"] }

[target.'cfg(windows)'.dependencies]
hostname = "0.3"
hostname = "0.4"

[dependencies]
auto_impl.workspace = true
atomic_refcell.workspace = true
agnostic-lite.workspace = true
async-channel.workspace = true
async-lock.workspace = true
Expand All @@ -68,6 +67,7 @@ memberlist-types.workspace = true
thiserror.workspace = true
tracing.workspace = true
viewit.workspace = true
wg.workspace = true

base64 = { version = "0.22", optional = true }

Expand Down
30 changes: 12 additions & 18 deletions core/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@ use std::{
},
};

use agnostic_lite::{AsyncSpawner, RuntimeLite};
use agnostic_lite::RuntimeLite;
use async_channel::{Receiver, Sender};
use async_lock::{Mutex, RwLock};

use atomic_refcell::AtomicRefCell;
use futures::stream::FuturesUnordered;
use nodecraft::{resolver::AddressResolver, CheapClone, Node};
use wg::AsyncWaitGroup;

use super::{
awareness::Awareness,
Expand Down Expand Up @@ -284,9 +283,7 @@ where
pub(crate) leave_broadcast_tx: Sender<()>,
pub(crate) leave_lock: Mutex<()>,
pub(crate) leave_broadcast_rx: Receiver<()>,
pub(crate) handles: AtomicRefCell<
FuturesUnordered<<<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()>>,
>,
pub(crate) wg: AsyncWaitGroup,
pub(crate) probe_index: AtomicUsize,
pub(crate) handoff_tx: Sender<()>,
pub(crate) handoff_rx: Receiver<()>,
Expand Down Expand Up @@ -416,7 +413,7 @@ where
leave_lock: Mutex::new(()),
leave_broadcast_rx,
probe_index: AtomicUsize::new(0),
handles: AtomicRefCell::new(FuturesUnordered::new()),
wg: AsyncWaitGroup::new(),
handoff_tx,
handoff_rx,
queue: Mutex::new(MessageQueue::new()),
Expand All @@ -431,12 +428,11 @@ where
};

{
let handles = this.inner.handles.borrow();
handles.push(this.stream_listener(shutdown_rx.clone()));
handles.push(this.packet_handler(shutdown_rx.clone()));
handles.push(this.packet_listener(shutdown_rx.clone()));
this.stream_listener(shutdown_rx.clone());
this.packet_handler(shutdown_rx.clone());
this.packet_listener(shutdown_rx.clone());
#[cfg(feature = "metrics")]
handles.push(this.check_broadcast_queue_depth(shutdown_rx.clone()));
this.check_broadcast_queue_depth(shutdown_rx.clone());
}

Ok((shutdown_rx, this.inner.advertise.cheap_clone(), this))
Expand Down Expand Up @@ -468,16 +464,14 @@ where
}

#[cfg(feature = "metrics")]
fn check_broadcast_queue_depth(
&self,
shutdown_rx: Receiver<()>,
) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
fn check_broadcast_queue_depth(&self, shutdown_rx: Receiver<()>) {
use futures::{FutureExt, StreamExt};

let queue_check_interval = self.inner.opts.queue_check_interval;
let this = self.clone();

<T::Runtime as RuntimeLite>::spawn(async move {
let wg = this.inner.wg.add(1);
<T::Runtime as RuntimeLite>::spawn_detach(async move {
scopeguard::defer!(wg.done(););
let tick = <T::Runtime as RuntimeLite>::interval(queue_check_interval);
futures::pin_mut!(tick);
loop {
Expand Down
64 changes: 32 additions & 32 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ pub mod tests {
use std::net::SocketAddr;

use nodecraft::resolver::AddressResolver;
#[cfg(not(windows))]
use parking_lot::Mutex;
// #[cfg(not(windows))]
// use parking_lot::Mutex;
pub use paste;

use self::{delegate::Delegate, error::Error, transport::Transport};
Expand Down Expand Up @@ -120,44 +120,44 @@ pub mod tests {
/// Any error type used for testing.
pub type AnyError = Box<dyn std::error::Error + Send + Sync + 'static>;

#[cfg(not(windows))]
static IPV4_BIND_NUM: Mutex<usize> = Mutex::new(10);
#[cfg(not(windows))]
static IPV6_BIND_NUM: Mutex<usize> = Mutex::new(10);
// #[cfg(not(windows))]
// static IPV4_BIND_NUM: Mutex<usize> = Mutex::new(10);
// #[cfg(not(windows))]
// static IPV6_BIND_NUM: Mutex<usize> = Mutex::new(10);

/// Returns the next socket addr v4
pub fn next_socket_addr_v4(_network: u8) -> SocketAddr {
#[cfg(not(windows))]
{
let mut mu = IPV4_BIND_NUM.lock();
let addr: SocketAddr = format!("127.0.{}.{}:0", _network, *mu).parse().unwrap();
*mu += 1;
if *mu > 255 {
*mu = 10;
}

addr
}

#[cfg(windows)]
// #[cfg(not(windows))]
// {
// let mut mu = IPV4_BIND_NUM.lock();
// let addr: SocketAddr = format!("127.0.{}.{}:0", _network, *mu).parse().unwrap();
// *mu += 1;
// if *mu > 255 {
// *mu = 10;
// }

// addr
// }

// #[cfg(windows)]
"127.0.0.1:0".parse().unwrap()
}

/// Returns the next socket addr v6
pub fn next_socket_addr_v6() -> SocketAddr {
#[cfg(not(windows))]
{
let mut mu = IPV6_BIND_NUM.lock();
let addr: SocketAddr = format!("[fc00::1:{}]:0", *mu).parse().unwrap();
*mu += 1;
if *mu > 255 {
*mu = 10;
}

addr
}

#[cfg(windows)]
// #[cfg(not(windows))]
// {
// let mut mu = IPV6_BIND_NUM.lock();
// let addr: SocketAddr = format!("[fc00::1:{}]:0", *mu).parse().unwrap();
// *mu += 1;
// if *mu > 255 {
// *mu = 10;
// }

// addr
// }

// #[cfg(windows)]
"[::1]:0".parse().unwrap()
}

Expand Down
13 changes: 5 additions & 8 deletions core/src/network/packet/handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use crate::base::MessageHandoff;

use agnostic_lite::AsyncSpawner;

use super::*;

impl<D, T> Memberlist<T, D>
Expand All @@ -12,13 +10,12 @@ where
/// a long running thread that processes messages received
/// over the packet interface, but is decoupled from the listener to avoid
/// blocking the listener which may cause ping/ack messages to be delayed.
pub(crate) fn packet_handler(
&self,
shutdown_rx: async_channel::Receiver<()>,
) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
pub(crate) fn packet_handler(&self, shutdown_rx: async_channel::Receiver<()>) {
let this = self.clone();
let handoff_rx = this.inner.handoff_rx.clone();
<T::Runtime as RuntimeLite>::spawn(async move {
let wg = this.inner.wg.add(1);
<T::Runtime as RuntimeLite>::spawn_detach(async move {
scopeguard::defer!(wg.done(););
loop {
futures::select! {
_ = shutdown_rx.recv().fuse() => {
Expand All @@ -38,7 +35,7 @@ where
}
}
}
})
});
}

/// Returns the next message to process in priority order, using LIFO
Expand Down
10 changes: 4 additions & 6 deletions core/src/network/packet/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{base::MessageHandoff, transport::Wire};
use agnostic_lite::AsyncSpawner;
use either::Either;

use super::*;
Expand Down Expand Up @@ -35,13 +34,12 @@ where
D: Delegate<Id = T::Id, Address = <T::Resolver as AddressResolver>::ResolvedAddress>,
T: Transport,
{
pub(crate) fn packet_listener(
&self,
shutdown_rx: async_channel::Receiver<()>,
) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
pub(crate) fn packet_listener(&self, shutdown_rx: async_channel::Receiver<()>) {
let this = self.clone();
let packet_rx = this.inner.transport.packet();
<T::Runtime as RuntimeLite>::spawn(async move {
let wg = this.inner.wg.add(1);
<T::Runtime as RuntimeLite>::spawn_detach(async move {
scopeguard::defer!(wg.done(););
'outer: loop {
futures::select! {
_ = shutdown_rx.recv().fuse() => {
Expand Down
12 changes: 5 additions & 7 deletions core/src/network/stream.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use agnostic_lite::AsyncSpawner;
use smol_str::SmolStr;

use crate::delegate::DelegateError;
Expand All @@ -15,13 +14,12 @@ where
{
/// A long running thread that pulls incoming streams from the
/// transport and hands them off for processing.
pub(crate) fn stream_listener(
&self,
shutdown_rx: async_channel::Receiver<()>,
) -> <<T::Runtime as RuntimeLite>::Spawner as AsyncSpawner>::JoinHandle<()> {
pub(crate) fn stream_listener(&self, shutdown_rx: async_channel::Receiver<()>) {
let this = self.clone();
let transport_rx = this.inner.transport.stream();
<T::Runtime as RuntimeLite>::spawn(async move {
let wg = this.inner.wg.add(1);
<T::Runtime as RuntimeLite>::spawn_detach(async move {
scopeguard::defer!(wg.done(););
tracing::debug!("memberlist: stream listener start");
loop {
futures::select! {
Expand Down Expand Up @@ -49,7 +47,7 @@ where
}
}
}
})
});
}

/// Used to merge the remote state with our local state
Expand Down
Loading
Loading