From 0b43401056fbbade5ab6107990626ca6ce4e4d5d Mon Sep 17 00:00:00 2001 From: Al Liu Date: Sun, 28 Apr 2024 00:56:03 +0800 Subject: [PATCH 1/2] use AsyncWaitGroup instead of JoinHandle --- Cargo.toml | 2 +- core/Cargo.toml | 4 +-- core/src/base.rs | 30 +++++++--------- core/src/network/packet/handler.rs | 13 +++---- core/src/network/packet/listener.rs | 10 +++--- core/src/network/stream.rs | 12 +++---- core/src/state.rs | 56 ++++++++++++++--------------- transports/net/Cargo.toml | 4 +-- transports/net/src/lib.rs | 25 ++++++++----- transports/quic/Cargo.toml | 8 ++--- transports/quic/src/lib.rs | 38 ++++++++++++-------- transports/quic/src/processor.rs | 10 ++++++ transports/quic/src/tests.rs | 4 +-- 13 files changed, 112 insertions(+), 104 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fb4106a6..94dee334 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ rustdoc-args = ["--cfg", "docsrs"] [workspace.dependencies] auto_impl = "1" -atomic_refcell = "0.1" agnostic-lite = { version = "0.3", features = ["time"] } agnostic = "0.3.5" async-lock = "3" @@ -57,6 +56,7 @@ transformable = { version = "0.1.6", features = ["smol_str", "bytes"] } thiserror = "1" tracing = "0.1" viewit = "0.1.5" +wg = { version = "0.9", default-features = false, features = ["future", "std", "triomphe"] } memberlist-core = { version = "0.2", path = "core", default-features = false } memberlist-net = { version = "0.2", path = "transports/net", default-features = false } diff --git a/core/Cargo.toml b/core/Cargo.toml index c006577c..567b2fdd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -42,11 +42,10 @@ once_cell = "1.17" rustix = { version = "0.38", features = ["system"] } [target.'cfg(windows)'.dependencies] -hostname = "0.3" +hostname = "0.4" [dependencies] auto_impl.workspace = true -atomic_refcell.workspace = true agnostic-lite.workspace = true async-channel.workspace = true async-lock.workspace = true @@ -68,6 +67,7 @@ memberlist-types.workspace = true thiserror.workspace = true tracing.workspace = true viewit.workspace = true +wg.workspace = true base64 = { version = "0.22", optional = true } diff --git a/core/src/base.rs b/core/src/base.rs index 1f4688e7..f4a840bd 100644 --- a/core/src/base.rs +++ b/core/src/base.rs @@ -6,13 +6,12 @@ use std::{ }, }; -use agnostic_lite::{AsyncSpawner, RuntimeLite}; +use agnostic_lite::RuntimeLite; use async_channel::{Receiver, Sender}; use async_lock::{Mutex, RwLock}; -use atomic_refcell::AtomicRefCell; -use futures::stream::FuturesUnordered; use nodecraft::{resolver::AddressResolver, CheapClone, Node}; +use wg::AsyncWaitGroup; use super::{ awareness::Awareness, @@ -284,9 +283,7 @@ where pub(crate) leave_broadcast_tx: Sender<()>, pub(crate) leave_lock: Mutex<()>, pub(crate) leave_broadcast_rx: Receiver<()>, - pub(crate) handles: AtomicRefCell< - FuturesUnordered<<::Spawner as AsyncSpawner>::JoinHandle<()>>, - >, + pub(crate) wg: AsyncWaitGroup, pub(crate) probe_index: AtomicUsize, pub(crate) handoff_tx: Sender<()>, pub(crate) handoff_rx: Receiver<()>, @@ -416,7 +413,7 @@ where leave_lock: Mutex::new(()), leave_broadcast_rx, probe_index: AtomicUsize::new(0), - handles: AtomicRefCell::new(FuturesUnordered::new()), + wg: AsyncWaitGroup::new(), handoff_tx, handoff_rx, queue: Mutex::new(MessageQueue::new()), @@ -431,12 +428,11 @@ where }; { - let handles = this.inner.handles.borrow(); - handles.push(this.stream_listener(shutdown_rx.clone())); - handles.push(this.packet_handler(shutdown_rx.clone())); - handles.push(this.packet_listener(shutdown_rx.clone())); + this.stream_listener(shutdown_rx.clone()); + this.packet_handler(shutdown_rx.clone()); + this.packet_listener(shutdown_rx.clone()); #[cfg(feature = "metrics")] - handles.push(this.check_broadcast_queue_depth(shutdown_rx.clone())); + this.check_broadcast_queue_depth(shutdown_rx.clone()); } Ok((shutdown_rx, this.inner.advertise.cheap_clone(), this)) @@ -468,16 +464,14 @@ where } #[cfg(feature = "metrics")] - fn check_broadcast_queue_depth( - &self, - shutdown_rx: Receiver<()>, - ) -> <::Spawner as AsyncSpawner>::JoinHandle<()> { + fn check_broadcast_queue_depth(&self, shutdown_rx: Receiver<()>) { use futures::{FutureExt, StreamExt}; let queue_check_interval = self.inner.opts.queue_check_interval; let this = self.clone(); - - ::spawn(async move { + let wg = this.inner.wg.add(1); + ::spawn_detach(async move { + scopeguard::defer!(wg.done();); let tick = ::interval(queue_check_interval); futures::pin_mut!(tick); loop { diff --git a/core/src/network/packet/handler.rs b/core/src/network/packet/handler.rs index 37633f46..0064d056 100644 --- a/core/src/network/packet/handler.rs +++ b/core/src/network/packet/handler.rs @@ -1,7 +1,5 @@ use crate::base::MessageHandoff; -use agnostic_lite::AsyncSpawner; - use super::*; impl Memberlist @@ -12,13 +10,12 @@ where /// a long running thread that processes messages received /// over the packet interface, but is decoupled from the listener to avoid /// blocking the listener which may cause ping/ack messages to be delayed. - pub(crate) fn packet_handler( - &self, - shutdown_rx: async_channel::Receiver<()>, - ) -> <::Spawner as AsyncSpawner>::JoinHandle<()> { + pub(crate) fn packet_handler(&self, shutdown_rx: async_channel::Receiver<()>) { let this = self.clone(); let handoff_rx = this.inner.handoff_rx.clone(); - ::spawn(async move { + let wg = this.inner.wg.add(1); + ::spawn_detach(async move { + scopeguard::defer!(wg.done();); loop { futures::select! { _ = shutdown_rx.recv().fuse() => { @@ -38,7 +35,7 @@ where } } } - }) + }); } /// Returns the next message to process in priority order, using LIFO diff --git a/core/src/network/packet/listener.rs b/core/src/network/packet/listener.rs index 5d1567a2..703e910a 100644 --- a/core/src/network/packet/listener.rs +++ b/core/src/network/packet/listener.rs @@ -1,5 +1,4 @@ use crate::{base::MessageHandoff, transport::Wire}; -use agnostic_lite::AsyncSpawner; use either::Either; use super::*; @@ -35,13 +34,12 @@ where D: Delegate::ResolvedAddress>, T: Transport, { - pub(crate) fn packet_listener( - &self, - shutdown_rx: async_channel::Receiver<()>, - ) -> <::Spawner as AsyncSpawner>::JoinHandle<()> { + pub(crate) fn packet_listener(&self, shutdown_rx: async_channel::Receiver<()>) { let this = self.clone(); let packet_rx = this.inner.transport.packet(); - ::spawn(async move { + let wg = this.inner.wg.add(1); + ::spawn_detach(async move { + scopeguard::defer!(wg.done();); 'outer: loop { futures::select! { _ = shutdown_rx.recv().fuse() => { diff --git a/core/src/network/stream.rs b/core/src/network/stream.rs index a1791682..98eb282e 100644 --- a/core/src/network/stream.rs +++ b/core/src/network/stream.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use agnostic_lite::AsyncSpawner; use smol_str::SmolStr; use crate::delegate::DelegateError; @@ -15,13 +14,12 @@ where { /// A long running thread that pulls incoming streams from the /// transport and hands them off for processing. - pub(crate) fn stream_listener( - &self, - shutdown_rx: async_channel::Receiver<()>, - ) -> <::Spawner as AsyncSpawner>::JoinHandle<()> { + pub(crate) fn stream_listener(&self, shutdown_rx: async_channel::Receiver<()>) { let this = self.clone(); let transport_rx = this.inner.transport.stream(); - ::spawn(async move { + let wg = this.inner.wg.add(1); + ::spawn_detach(async move { + scopeguard::defer!(wg.done();); tracing::debug!("memberlist: stream listener start"); loop { futures::select! { @@ -49,7 +47,7 @@ where } } } - }) + }); } /// Used to merge the remote state with our local state diff --git a/core/src/state.rs b/core/src/state.rs index d73d3340..ac46a5ae 100644 --- a/core/src/state.rs +++ b/core/src/state.rs @@ -18,7 +18,7 @@ use super::{ Member, Members, }; -use agnostic_lite::{AsyncSpawner, RuntimeLite}; +use agnostic_lite::RuntimeLite; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use nodecraft::{resolver::AddressResolver, CheapClone, Node}; @@ -635,12 +635,14 @@ where macro_rules! bail_trigger { ($fn:ident) => { paste::paste! { - async fn [](&self, stagger: Duration, interval: Duration, stop_rx: async_channel::Receiver<()>) -> <::Spawner as AsyncSpawner>::JoinHandle<()> + async fn [](&self, stagger: Duration, interval: Duration, stop_rx: async_channel::Receiver<()>) { let this = self.clone(); // Use a random stagger to avoid syncronizing let rand_stagger = random_stagger(stagger); - ::spawn(async move { + let wg = this.inner.wg.add(1); + ::spawn_detach(async move { + scopeguard::defer!(wg.done();); let delay = ::sleep(rand_stagger); futures::select! { @@ -667,7 +669,7 @@ macro_rules! bail_trigger { } tracing::debug!(concat!("memberlist.state: ", stringify!($fn), " trigger exits")); - }) + }); } } }; @@ -680,36 +682,31 @@ where { /// Used to ensure the Tick is performed periodically. pub(crate) async fn schedule(&self, shutdown_rx: async_channel::Receiver<()>) { - let handles = self.inner.handles.borrow(); // Create a new probeTicker if self.inner.opts.probe_interval > Duration::ZERO { - handles.push( - self - .trigger_probe( - self.inner.opts.probe_interval, - self.inner.opts.probe_interval, - shutdown_rx.clone(), - ) - .await, - ); + self + .trigger_probe( + self.inner.opts.probe_interval, + self.inner.opts.probe_interval, + shutdown_rx.clone(), + ) + .await; } // Create a push pull ticker if needed if self.inner.opts.push_pull_interval > Duration::ZERO { - handles.push(self.trigger_push_pull(shutdown_rx.clone()).await); + self.trigger_push_pull(shutdown_rx.clone()).await; } // Create a gossip ticker if needed if self.inner.opts.gossip_interval > Duration::ZERO && self.inner.opts.gossip_nodes > 0 { - handles.push( - self - .trigger_gossip( - self.inner.opts.gossip_interval, - self.inner.opts.gossip_interval, - shutdown_rx.clone(), - ) - .await, - ); + self + .trigger_gossip( + self.inner.opts.gossip_interval, + self.inner.opts.gossip_interval, + shutdown_rx.clone(), + ) + .await; } } @@ -717,17 +714,16 @@ where bail_trigger!(gossip); - async fn trigger_push_pull( - &self, - stop_rx: async_channel::Receiver<()>, - ) -> <::Spawner as AsyncSpawner>::JoinHandle<()> { + async fn trigger_push_pull(&self, stop_rx: async_channel::Receiver<()>) { let interval = self.inner.opts.push_pull_interval; let this = self.clone(); + let wg = this.inner.wg.add(1); // Use a random stagger to avoid syncronizing let mut rng = rand::thread_rng(); let rand_stagger = Duration::from_millis(rng.gen_range(0..interval.as_millis() as u64)); - ::spawn(async move { + ::spawn_detach(async move { + scopeguard::defer!(wg.done();); futures::select! { _ = ::sleep(rand_stagger).fuse() => {}, _ = stop_rx.recv().fuse() => { @@ -750,7 +746,7 @@ where }, } } - }) + }); } // Used to perform a single round of failure detection and gossip diff --git a/transports/net/Cargo.toml b/transports/net/Cargo.toml index 58709f4f..ab7c0fc0 100644 --- a/transports/net/Cargo.toml +++ b/transports/net/Cargo.toml @@ -64,7 +64,6 @@ dnssec = ["dns", "nodecraft/dnssec"] getrandom = { version = "0.2", features = ["js"] } [dependencies] -atomic_refcell.workspace = true agnostic.workspace = true async-channel.workspace = true async-lock.workspace = true @@ -83,9 +82,10 @@ memberlist-core.workspace = true thiserror.workspace = true tracing.workspace = true viewit.workspace = true +wg.workspace = true # tls -futures-rustls = { version = "0.25", optional = true } +futures-rustls = { version = "0.26", optional = true } # native-tls async-native-tls = { version = "0.5", optional = true } diff --git a/transports/net/src/lib.rs b/transports/net/src/lib.rs index 10f5726d..63226b2b 100644 --- a/transports/net/src/lib.rs +++ b/transports/net/src/lib.rs @@ -19,9 +19,8 @@ use std::{ use agnostic::{ net::{Net, UdpSocket}, - AsyncSpawner, Runtime, RuntimeLite, + Runtime, RuntimeLite, }; -use atomic_refcell::AtomicRefCell; use byteorder::{ByteOrder, NetworkEndian}; use bytes::{BufMut, BytesMut}; use checksum::CHECKSUM_SIZE; @@ -38,6 +37,7 @@ use memberlist_core::{ util::{batch, Batch, IsGlobalIp}, }; use peekable::future::{AsyncPeekExt, AsyncPeekable}; +use wg::AsyncWaitGroup; /// Compress/decompress related. #[cfg(feature = "compression")] @@ -148,7 +148,7 @@ where stream_layer: Arc, #[cfg(feature = "encryption")] encryptor: Option, - handles: AtomicRefCell::JoinHandle<()>>>, + wg: AsyncWaitGroup, resolver: Arc, shutdown_tx: async_channel::Sender<()>, _marker: PhantomData, @@ -291,7 +291,7 @@ where let advertise_addr = resolved_bind_address[expose_addr_index]; let self_addr = opts.bind_addresses[expose_addr_index].cheap_clone(); let shutdown = Arc::new(AtomicBool::new(false)); - let handles = FuturesUnordered::new(); + let wg = AsyncWaitGroup::new(); // Fire them up start that we've been able to create them all. // keep the first tcp and udp listener, gossip protocol, we made sure there's at least one // udp and tcp listener can @@ -306,7 +306,11 @@ where shutdown_rx: shutdown_rx.clone(), local_addr: *promised_addr, }; - handles.push(R::spawn(processor.run())); + let pwg = wg.add(1); + R::spawn_detach(async move { + processor.run().await; + pwg.done(); + }); let processor = PacketProcessor:: { packet_tx: packet_tx.clone(), @@ -326,7 +330,11 @@ where skip_inbound_label_check: opts.skip_inbound_label_check, }; - handles.push(R::spawn(processor.run())); + let pwg = wg.add(1); + R::spawn_detach(async move { + processor.run().await; + pwg.done(); + }); } // find final advertise address @@ -365,7 +373,7 @@ where opts, packet_rx, stream_rx, - handles: AtomicRefCell::new(handles), + wg: AsyncWaitGroup::new(), v4_sockets: v4_sockets.into_iter().map(|(ln, _)| ln).collect(), v4_round_robin: AtomicUsize::new(0), v6_sockets: v6_sockets.into_iter().map(|(ln, _)| ln).collect(), @@ -664,8 +672,7 @@ where return Ok(()); } - let mut handles = core::mem::take(&mut *self.handles.borrow_mut()); - while handles.next().await.is_some() {} + self.wg.wait().await; Ok(()) } } diff --git a/transports/quic/Cargo.toml b/transports/quic/Cargo.toml index c2846f10..9718b0c4 100644 --- a/transports/quic/Cargo.toml +++ b/transports/quic/Cargo.toml @@ -48,7 +48,6 @@ test = ["memberlist-core/test", "rcgen"] [dependencies] auto_impl.workspace = true -atomic_refcell.workspace = true agnostic-lite.workspace = true agnostic = { workspace = true, optional = true, features = ["net"] } async-channel.workspace = true @@ -67,6 +66,7 @@ thiserror.workspace = true smol_str.workspace = true scopeguard.workspace = true viewit.workspace = true +wg.workspace = true # serde serde = { workspace = true, optional = true } @@ -80,11 +80,11 @@ quinn = { version = "0.10.2", default-features = false, optional = true, feature rustls = { version = "0.21.9", default-features = false, optional = true, features = ["dangerous_configuration"] } # test -rcgen = { version = "0.12", optional = true } +rcgen = { version = "0.13", optional = true } # s2n -s2n-quic = { version = "1.36", optional = true } -s2n-quic-transport = { version = "0.36", optional = true } +s2n-quic = { version = "1.37", optional = true } +s2n-quic-transport = { version = "0.37", optional = true } # compression rayon = { version = "1.8", optional = true } diff --git a/transports/quic/src/lib.rs b/transports/quic/src/lib.rs index 58f64b2f..0ccfce65 100644 --- a/transports/quic/src/lib.rs +++ b/transports/quic/src/lib.rs @@ -16,9 +16,7 @@ use std::{ time::{Duration, Instant}, }; -use agnostic::AsyncSpawner; use agnostic_lite::RuntimeLite; -use atomic_refcell::AtomicRefCell; use byteorder::{ByteOrder, NetworkEndian}; use bytes::Bytes; use crossbeam_skiplist::SkipMap; @@ -55,6 +53,7 @@ pub use options::*; /// Abstract the [`StremLayer`](crate::stream_layer::StreamLayer) for [`QuicTransport`]. pub mod stream_layer; use stream_layer::*; +use wg::AsyncWaitGroup; const MAX_MESSAGE_LEN_SIZE: usize = core::mem::size_of::(); const MAX_MESSAGE_SIZE: usize = u32::MAX as usize; @@ -112,7 +111,7 @@ where v4_connectors: SmallVec, v6_round_robin: AtomicUsize, v6_connectors: SmallVec, - handles: AtomicRefCell::JoinHandle<()>>>, + wg: AsyncWaitGroup, resolver: A, shutdown_tx: async_channel::Sender<()>, @@ -148,6 +147,7 @@ where let mut v4_acceptors = SmallVec::with_capacity(opts.bind_addresses.len()); let mut v6_acceptors = SmallVec::with_capacity(opts.bind_addresses.len()); let mut resolved_bind_address = SmallVec::new(); + let wg = AsyncWaitGroup::new(); for addr in opts.bind_addresses.iter() { let addr = resolver @@ -198,7 +198,6 @@ where let expose_addr_index = Self::find_advertise_addr_index(&resolved_bind_address); let advertise_addr = resolved_bind_address[expose_addr_index]; let self_addr = opts.bind_addresses[expose_addr_index].cheap_clone(); - let handles = FuturesUnordered::new(); // Fire them up start that we've been able to create them all. // keep the first tcp and udp listener, gossip protocol, we made sure there's at least one @@ -212,6 +211,7 @@ where local_addr, timeout: opts.timeout, shutdown_rx: shutdown_rx.clone(), + wg: wg.clone(), skip_inbound_label_check: opts.skip_inbound_label_check, #[cfg(feature = "compression")] offload_size: opts.offload_size, @@ -219,7 +219,11 @@ where metric_labels: opts.metric_labels.clone().unwrap_or_default(), }; - handles.push(R::spawn(processor.run())); + let pwg = wg.add(1); + R::spawn_detach(async move { + processor.run().await; + pwg.done(); + }); } // find final advertise address @@ -237,12 +241,17 @@ where let interval = ::interval(opts.connection_pool_cleanup_period); let pool = connection_pool.clone(); let shutdown_rx = shutdown_rx.clone(); - handles.push(R::spawn(Self::connection_pool_cleaner( - pool, - interval, - shutdown_rx, - opts.connection_ttl.unwrap_or(Duration::ZERO), - ))); + let pwg = wg.add(1); + R::spawn_detach(async move { + Self::connection_pool_cleaner( + pool, + interval, + shutdown_rx, + opts.connection_ttl.unwrap_or(Duration::ZERO), + ) + .await; + pwg.done(); + }); Ok(Self { advertise_addr: final_advertise_addr, @@ -252,7 +261,7 @@ where opts, packet_rx, stream_rx, - handles: AtomicRefCell::new(handles), + wg, v4_connectors, v6_connectors, v4_round_robin: AtomicUsize::new(0), @@ -682,9 +691,8 @@ where } } - // Block until all the listener threads have died. - let mut handles = core::mem::take(&mut *self.handles.borrow_mut()); - while handles.next().await.is_some() {} + self.wg.wait().await; + Ok(()) } } diff --git a/transports/quic/src/processor.rs b/transports/quic/src/processor.rs index 07563159..34d433da 100644 --- a/transports/quic/src/processor.rs +++ b/transports/quic/src/processor.rs @@ -16,6 +16,7 @@ pub(super) struct Processor< pub(super) shutdown_rx: async_channel::Receiver<()>, + pub(super) wg: AsyncWaitGroup, pub(super) skip_inbound_label_check: bool, pub(super) timeout: Option, @@ -42,6 +43,7 @@ where label, skip_inbound_label_check, timeout, + wg, #[cfg(feature = "compression")] offload_size, #[cfg(feature = "metrics")] @@ -57,6 +59,7 @@ where shutdown_rx, skip_inbound_label_check, timeout, + wg, #[cfg(feature = "compression")] offload_size, #[cfg(feature = "metrics")] @@ -75,6 +78,7 @@ where shutdown_rx: async_channel::Receiver<()>, skip_inbound_label_check: bool, timeout: Option, + wg: AsyncWaitGroup, #[cfg(feature = "compression")] offload_size: usize, #[cfg(feature = "metrics")] metric_labels: Arc, ) { @@ -104,6 +108,7 @@ where let label = label.cheap_clone(); #[cfg(feature = "metrics")] let metric_labels = metric_labels.clone(); + let twg = wg.add(1); ::spawn_detach(async move { Self::handle_connection( connection, @@ -115,9 +120,11 @@ where timeout, skip_inbound_label_check, shutdown_rx, + twg.clone(), #[cfg(feature = "compression")] offload_size, #[cfg(feature = "metrics")] metric_labels, ).await; + twg.done(); }); } Err(e) => { @@ -159,6 +166,7 @@ where timeout: Option, skip_inbound_label_check: bool, shutdown_rx: async_channel::Receiver<()>, + wg: AsyncWaitGroup, #[cfg(feature = "compression")] offload_size: usize, #[cfg(feature = "metrics")] metric_labels: Arc, ) { @@ -187,6 +195,7 @@ where let label = label.cheap_clone(); #[cfg(feature = "metrics")] let metric_labels = metric_labels.clone(); + let twg = wg.add(1); ::spawn_detach(async move { Self::handle_packet( stream, @@ -199,6 +208,7 @@ where #[cfg(feature = "compression")] offload_size, #[cfg(feature = "metrics")] metric_labels, ).await; + twg.done(); }); } } diff --git a/transports/quic/src/tests.rs b/transports/quic/src/tests.rs index 6b8cdf03..58d253c9 100644 --- a/transports/quic/src/tests.rs +++ b/transports/quic/src/tests.rs @@ -540,8 +540,8 @@ mod quinn_stream_layer { fn configure_server() -> Result> { let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap(); - let cert_der = cert.serialize_der().unwrap(); - let priv_key = cert.serialize_private_key_der(); + let cert_der = cert.cert.der().to_vec(); + let priv_key = cert.key_pair.serialize_der(); let priv_key = rustls::PrivateKey(priv_key); let cert_chain = vec![rustls::Certificate(cert_der.clone())]; From ac132fb1eeb5bd3de521fab65ccd4dd2549de616 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Sun, 28 Apr 2024 01:18:48 +0800 Subject: [PATCH 2/2] WIP --- core/src/lib.rs | 64 ++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index befb1f0d..f4cd686c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -60,8 +60,8 @@ pub mod tests { use std::net::SocketAddr; use nodecraft::resolver::AddressResolver; - #[cfg(not(windows))] - use parking_lot::Mutex; + // #[cfg(not(windows))] + // use parking_lot::Mutex; pub use paste; use self::{delegate::Delegate, error::Error, transport::Transport}; @@ -120,44 +120,44 @@ pub mod tests { /// Any error type used for testing. pub type AnyError = Box; - #[cfg(not(windows))] - static IPV4_BIND_NUM: Mutex = Mutex::new(10); - #[cfg(not(windows))] - static IPV6_BIND_NUM: Mutex = Mutex::new(10); + // #[cfg(not(windows))] + // static IPV4_BIND_NUM: Mutex = Mutex::new(10); + // #[cfg(not(windows))] + // static IPV6_BIND_NUM: Mutex = Mutex::new(10); /// Returns the next socket addr v4 pub fn next_socket_addr_v4(_network: u8) -> SocketAddr { - #[cfg(not(windows))] - { - let mut mu = IPV4_BIND_NUM.lock(); - let addr: SocketAddr = format!("127.0.{}.{}:0", _network, *mu).parse().unwrap(); - *mu += 1; - if *mu > 255 { - *mu = 10; - } - - addr - } - - #[cfg(windows)] + // #[cfg(not(windows))] + // { + // let mut mu = IPV4_BIND_NUM.lock(); + // let addr: SocketAddr = format!("127.0.{}.{}:0", _network, *mu).parse().unwrap(); + // *mu += 1; + // if *mu > 255 { + // *mu = 10; + // } + + // addr + // } + + // #[cfg(windows)] "127.0.0.1:0".parse().unwrap() } /// Returns the next socket addr v6 pub fn next_socket_addr_v6() -> SocketAddr { - #[cfg(not(windows))] - { - let mut mu = IPV6_BIND_NUM.lock(); - let addr: SocketAddr = format!("[fc00::1:{}]:0", *mu).parse().unwrap(); - *mu += 1; - if *mu > 255 { - *mu = 10; - } - - addr - } - - #[cfg(windows)] + // #[cfg(not(windows))] + // { + // let mut mu = IPV6_BIND_NUM.lock(); + // let addr: SocketAddr = format!("[fc00::1:{}]:0", *mu).parse().unwrap(); + // *mu += 1; + // if *mu > 255 { + // *mu = 10; + // } + + // addr + // } + + // #[cfg(windows)] "[::1]:0".parse().unwrap() }