From debc136a1f4f751c8a81774c16dffb76785da965 Mon Sep 17 00:00:00 2001 From: Santiago Carmuega Date: Thu, 21 Dec 2023 09:31:26 -0300 Subject: [PATCH] back to async again --- examples/n2n-miniprotocols/src/main.rs | 87 ++++++------ pallas-network/src/facades.rs | 78 +++++------ pallas-network/src/multiplexer.rs | 182 ++++++++++++------------- pallas-network/tests/plexer.rs | 24 ++-- pallas-network/tests/protocols.rs | 31 +++-- 5 files changed, 204 insertions(+), 198 deletions(-) diff --git a/examples/n2n-miniprotocols/src/main.rs b/examples/n2n-miniprotocols/src/main.rs index 7c5f1877..25ecae86 100644 --- a/examples/n2n-miniprotocols/src/main.rs +++ b/examples/n2n-miniprotocols/src/main.rs @@ -1,10 +1,13 @@ -use pallas::{network::{ - facades::PeerClient, - miniprotocols::{chainsync, Point, MAINNET_MAGIC, blockfetch, keepalive}, -}, ledger::traverse::MultiEraHeader}; -use tokio::{time::Instant, select}; +use pallas::{ + ledger::traverse::MultiEraHeader, + network::{ + facades::PeerClient, + miniprotocols::{blockfetch, chainsync, keepalive, Point, MAINNET_MAGIC}, + }, +}; +use std::time::Duration; use thiserror::Error; -use futures::{future::FutureExt, pin_mut}; +use tokio::{select, time::Instant}; #[derive(Error, Debug)] pub enum Error { @@ -24,17 +27,27 @@ pub enum Error { PallasTraverseError(#[from] pallas::ledger::traverse::Error), } -async fn do_blockfetch(blockfetch_client: &mut blockfetch::Client, range: (Point, Point)) -> Result<(), Error> { +async fn do_blockfetch( + blockfetch_client: &mut blockfetch::Client, + range: (Point, Point), +) -> Result<(), Error> { let blocks = blockfetch_client.fetch_range(range.clone()).await?; for block in &blocks { tracing::trace!("received block of size: {}", block.len()); } - tracing::info!("received {} blocks. last slot: {}", blocks.len(), range.1.slot_or_default()); + tracing::info!( + "received {} blocks. last slot: {}", + blocks.len(), + range.1.slot_or_default() + ); Ok(()) } -async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_client: &mut blockfetch::Client) -> Result<(), Error> { +async fn do_chainsync( + mut chainsync_client: chainsync::N2NClient, + mut blockfetch_client: blockfetch::Client, +) -> Result<(), Error> { let known_points = vec![Point::Specific( 43847831u64, hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")?, @@ -64,18 +77,18 @@ async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_cl MultiEraHeader::EpochBoundary(_) => { tracing::info!("epoch boundary"); None - }, + } MultiEraHeader::AlonzoCompatible(_) | MultiEraHeader::Babbage(_) => { if next_log.elapsed().as_secs() > 1 { tracing::info!("chainsync block header: {}", number); next_log = Instant::now(); } Some(Point::Specific(slot, hash)) - }, + } MultiEraHeader::Byron(_) => { tracing::info!("ignoring byron header"); None - }, + } } } Some(_) => { @@ -88,14 +101,17 @@ async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_cl block_count += 1; if block_count == 1 { start_point = p; - } - else if block_count == 10 { + } else if block_count == 10 { end_point = p; - do_blockfetch(blockfetch_client, (start_point.clone(), end_point.clone())).await?; + do_blockfetch( + &mut blockfetch_client, + (start_point.clone(), end_point.clone()), + ) + .await?; block_count = 0; } - }, - None => {}, + } + None => {} }; } chainsync::NextResponse::RollBackward(x, _) => log::info!("rollback to {:?}", x), @@ -104,15 +120,11 @@ async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_cl } } -async fn do_keepalive(keepalive_client: &mut keepalive::Client) -> Result<(), Error> { - let mut keepalive_timer = Instant::now(); +async fn do_keepalive(mut keepalive_client: keepalive::Client) -> Result<(), Error> { loop { - if keepalive_timer.elapsed().as_secs() > 20 { - tracing::info!("sending keepalive..."); - keepalive_client.send_keepalive().await?; - tracing::info!("keepalive sent"); - keepalive_timer = Instant::now(); - } + tokio::time::sleep(Duration::from_secs(20)).await; + keepalive_client.send_keepalive().await?; + tracing::info!("keepalive sent"); } } @@ -130,20 +142,18 @@ async fn main() { // relay. let server = "backbone.cardano-mainnet.iohk.io:3001"; // let server = "localhost:6000"; - let mut peer = PeerClient::connect(server, MAINNET_MAGIC) - .await - .unwrap(); + let peer = PeerClient::connect(server, MAINNET_MAGIC).await.unwrap(); - let chainsync_handle = tokio::spawn(async move { - do_chainsync(&mut peer.chainsync, &mut peer.blockfetch).await?; - Ok::<(), Error>(()) - }).fuse(); - let keepalive_handle = tokio::spawn(async move { - do_keepalive(&mut peer.keepalive).await?; - Ok::<(), Error>(()) - }).fuse(); + let PeerClient { + plexer, + chainsync, + blockfetch, + keepalive, + .. + } = peer; - pin_mut!(chainsync_handle, keepalive_handle); + let chainsync_handle = tokio::spawn(do_chainsync(chainsync, blockfetch)); + let keepalive_handle = tokio::spawn(do_keepalive(keepalive)); // If any of these concurrent tasks exit or fail, the others are canceled. select! { @@ -178,7 +188,8 @@ async fn main() { } } } - peer.plexer_handle.abort(); + + plexer.abort().await; tracing::info!("waiting 10 seconds before reconnecting..."); tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 740eba95..2b3c508b 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -1,10 +1,12 @@ -use std::net::{SocketAddr, TcpListener}; +use std::net::SocketAddr; use std::path::Path; use thiserror::Error; use tracing::error; +use tokio::net::TcpListener; + #[cfg(unix)] -use std::os::unix::net::UnixListener; +use tokio::net::{unix::SocketAddr as UnixSocketAddr, UnixListener}; use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber}; @@ -34,12 +36,12 @@ pub enum Error { /// Client of N2N Ouroboros pub struct PeerClient { - plexer: RunningPlexer, - handshake: handshake::N2NClient, - chainsync: chainsync::N2NClient, - blockfetch: blockfetch::Client, - txsubmission: txsubmission::Client, - keepalive: keepalive::Client, + pub plexer: RunningPlexer, + pub handshake: handshake::N2NClient, + pub chainsync: chainsync::N2NClient, + pub blockfetch: blockfetch::Client, + pub txsubmission: txsubmission::Client, + pub keepalive: keepalive::Client, } impl PeerClient { @@ -65,9 +67,8 @@ impl PeerClient { } pub async fn connect(addr: &'static str, magic: u64) -> Result { - let bearer = tokio::task::spawn_blocking(move || Bearer::connect_tcp(addr)) + let bearer = Bearer::connect_tcp(addr) .await - .expect("can't join tokio thread") .map_err(Error::ConnectFailure)?; let mut client = Self::new(bearer); @@ -96,6 +97,15 @@ impl PeerClient { &mut self.chainsync } + pub async fn with_chainsync(&mut self, op: T) -> tokio::task::JoinHandle + where + T: FnOnce(&mut chainsync::N2NClient) -> Fut, + Fut: std::future::Future + Send + 'static, + O: Send + 'static, + { + tokio::spawn(op(&mut self.chainsync)) + } + pub fn blockfetch(&mut self) -> &mut blockfetch::Client { &mut self.blockfetch } @@ -108,8 +118,8 @@ impl PeerClient { &mut self.keepalive } - pub fn abort(&self) { - self.plexer.abort(); + pub async fn abort(self) { + self.plexer.abort().await } } @@ -151,15 +161,10 @@ impl PeerServer { } } - pub async fn accept( - listener: impl AsRef + Send + 'static, - magic: u64, - ) -> Result { - let (bearer, address) = - tokio::task::spawn_blocking(move || Bearer::accept_tcp(listener.as_ref())) - .await - .expect("can't join tokio thread") - .map_err(Error::ConnectFailure)?; + pub async fn accept(listener: &TcpListener, magic: u64) -> Result { + let (bearer, address) = Bearer::accept_tcp(listener) + .await + .map_err(Error::ConnectFailure)?; let mut client = Self::new(bearer); @@ -174,7 +179,7 @@ impl PeerServer { client.accepted_version = Some(version); Ok(client) } else { - client.abort(); + client.abort().await; Err(Error::IncompatibleVersion) } } @@ -195,8 +200,8 @@ impl PeerServer { &mut self.txsubmission } - pub fn abort(&self) { - self.plexer.abort(); + pub async fn abort(self) { + self.plexer.abort().await } } @@ -231,9 +236,8 @@ impl NodeClient { path: impl AsRef + Send + 'static, magic: u64, ) -> Result { - let bearer = tokio::task::spawn_blocking(move || Bearer::connect_unix(path)) + let bearer = Bearer::connect_unix(path) .await - .expect("can't join tokio thread") .map_err(Error::ConnectFailure)?; let mut client = Self::new(bearer); @@ -311,7 +315,7 @@ impl NodeClient { Err(Error::IncompatibleVersion) } Confirmation::QueryReply(version_table) => { - plexer.abort(); + plexer.abort().await; Ok(version_table) } } @@ -329,8 +333,8 @@ impl NodeClient { &mut self.statequery } - pub fn abort(&self) { - self.plexer.abort(); + pub async fn abort(self) { + self.plexer.abort().await } } @@ -341,7 +345,7 @@ pub struct NodeServer { handshake: handshake::N2CServer, chainsync: chainsync::N2CServer, statequery: localstate::Server, - accepted_address: Option, + accepted_address: Option, accpeted_version: Option<(VersionNumber, n2c::VersionData)>, } @@ -374,11 +378,9 @@ impl NodeServer { listener: impl AsRef + Send + 'static, magic: u64, ) -> Result { - let (bearer, address) = - tokio::task::spawn_blocking(move || Bearer::accept_unix(listener.as_ref())) - .await - .expect("can't join tokio thread") - .map_err(Error::ConnectFailure)?; + let (bearer, address) = Bearer::accept_unix(listener.as_ref()) + .await + .map_err(Error::ConnectFailure)?; let mut client = Self::new(bearer).await; @@ -393,7 +395,7 @@ impl NodeServer { client.accpeted_version = Some(version); Ok(client) } else { - client.abort(); + client.abort().await; Err(Error::IncompatibleVersion) } } @@ -410,7 +412,7 @@ impl NodeServer { &mut self.statequery } - pub fn abort(&self) { - self.plexer.abort(); + pub async fn abort(self) { + self.plexer.abort().await } } diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 6b192b2b..f3ab7f97 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -2,25 +2,19 @@ use byteorder::{ByteOrder, NetworkEndian}; use pallas_codec::{minicbor, Fragment}; -use std::{ - io::{Read, Write}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - thread::JoinHandle, -}; use thiserror::Error; -use tokio::sync::mpsc::error::SendError; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::task::JoinHandle; use tokio::time::Instant; +use tokio::{select, sync::mpsc::error::SendError}; use tracing::{debug, error, trace}; -type IOResult = std::io::Result; +type IOResult = tokio::io::Result; -use std::net as tcp; +use tokio::net as tcp; #[cfg(unix)] -use std::os::unix::net as unix; +use tokio::net as unix; #[cfg(windows)] use tokio::net::windows::named_pipe::NamedPipeClient; @@ -81,17 +75,20 @@ pub enum Bearer { } impl Bearer { - pub async fn connect_tcp(addr: impl ToSocketAddrs) -> Result { - let stream = TcpStream::connect(addr).await?; - // add tcp_keepalive + fn configure_tcp(stream: &tcp::TcpStream) -> IOResult<()> { let sock_ref = socket2::SockRef::from(&stream); let mut tcp_keepalive = socket2::TcpKeepalive::new(); tcp_keepalive = tcp_keepalive.with_time(tokio::time::Duration::from_secs(20)); tcp_keepalive = tcp_keepalive.with_interval(tokio::time::Duration::from_secs(20)); - let _ = sock_ref.set_tcp_keepalive(&tcp_keepalive); - // add tcp_nodelay - let _ = sock_ref.set_nodelay(true); + sock_ref.set_tcp_keepalive(&tcp_keepalive)?; + sock_ref.set_nodelay(true)?; + Ok(()) + } + + pub async fn connect_tcp(addr: impl tcp::ToSocketAddrs) -> Result { + let stream = tcp::TcpStream::connect(addr).await?; + Self::configure_tcp(&stream)?; Ok(Self::Tcp(stream)) } @@ -99,27 +96,29 @@ impl Bearer { addr: impl tcp::ToSocketAddrs, timeout: std::time::Duration, ) -> IOResult { - let addr = addr.to_socket_addrs()?.next().unwrap(); - let stream = tcp::TcpStream::connect_timeout(&addr, timeout)?; - stream.set_nodelay(true)?; - Ok(Self::Tcp(stream)) + select! { + result = Self::connect_tcp(addr) => result, + _ = tokio::time::sleep(timeout) => Err(tokio::io::Error::new(tokio::io::ErrorKind::TimedOut, "connect timeout")), + } } - pub fn accept_tcp(listener: &tcp::TcpListener) -> IOResult<(Self, tcp::SocketAddr)> { - let (stream, addr) = listener.accept()?; - stream.set_nodelay(true)?; + pub async fn accept_tcp(listener: &tcp::TcpListener) -> IOResult<(Self, std::net::SocketAddr)> { + let (stream, addr) = listener.accept().await?; + Self::configure_tcp(&stream)?; Ok((Self::Tcp(stream), addr)) } #[cfg(unix)] - pub fn connect_unix(path: impl AsRef) -> IOResult { - let stream = unix::UnixStream::connect(path)?; + pub async fn connect_unix(path: impl AsRef) -> IOResult { + let stream = unix::UnixStream::connect(path).await?; Ok(Self::Unix(stream)) } #[cfg(unix)] - pub fn accept_unix(listener: &unix::UnixListener) -> IOResult<(Self, unix::SocketAddr)> { - let (stream, addr) = listener.accept()?; + pub async fn accept_unix( + listener: &unix::UnixListener, + ) -> IOResult<(Self, unix::unix::SocketAddr)> { + let (stream, addr) = listener.accept().await?; Ok((Self::Unix(stream), addr)) } @@ -133,60 +132,60 @@ impl Bearer { pub fn into_split(self) -> (BearerReadHalf, BearerWriteHalf) { match self { Bearer::Tcp(x) => { - let y = x.try_clone().unwrap(); - (BearerReadHalf::Tcp(x), BearerWriteHalf::Tcp(y)) + let (r, w) = x.into_split(); + (BearerReadHalf::Tcp(r), BearerWriteHalf::Tcp(w)) } #[cfg(unix)] Bearer::Unix(x) => { - let y = x.try_clone().unwrap(); - (BearerReadHalf::Unix(x), BearerWriteHalf::Unix(y)) + let (r, w) = x.into_split(); + (BearerReadHalf::Unix(r), BearerWriteHalf::Unix(w)) } } } } pub enum BearerReadHalf { - Tcp(tcp::TcpStream), + Tcp(tcp::tcp::OwnedReadHalf), #[cfg(unix)] - Unix(unix::UnixStream), + Unix(unix::unix::OwnedReadHalf), } impl BearerReadHalf { - fn read_exact(&mut self, buf: &mut [u8]) -> IOResult<()> { + async fn read_exact(&mut self, buf: &mut [u8]) -> IOResult { match self { - BearerReadHalf::Tcp(x) => x.read_exact(buf), + BearerReadHalf::Tcp(x) => x.read_exact(buf).await, #[cfg(unix)] - BearerReadHalf::Unix(x) => x.read_exact(buf), + BearerReadHalf::Unix(x) => x.read_exact(buf).await, } } } pub enum BearerWriteHalf { - Tcp(tcp::TcpStream), + Tcp(tcp::tcp::OwnedWriteHalf), #[cfg(unix)] - Unix(unix::UnixStream), + Unix(unix::unix::OwnedWriteHalf), } impl BearerWriteHalf { - fn write_all(&mut self, buf: &[u8]) -> IOResult<()> { + async fn write_all(&mut self, buf: &[u8]) -> IOResult<()> { match self { - Self::Tcp(x) => x.write_all(buf), + Self::Tcp(x) => x.write_all(buf).await, #[cfg(unix)] - Self::Unix(x) => x.write_all(buf), + Self::Unix(x) => x.write_all(buf).await, } } - fn flush(&mut self) -> IOResult<()> { + async fn flush(&mut self) -> IOResult<()> { match self { - Self::Tcp(x) => x.flush(), + Self::Tcp(x) => x.flush().await, #[cfg(unix)] - Self::Unix(x) => x.flush(), + Self::Unix(x) => x.flush().await, } } } @@ -216,6 +215,9 @@ pub enum Error { #[error("plexer failed to mux chunk")] PlexerMux, + + #[error("failure to abort the plexer threads")] + AbortFailure, } type Egress = ( @@ -227,20 +229,20 @@ pub struct Demuxer(BearerReadHalf, Egress); impl Demuxer { pub fn new(bearer: BearerReadHalf) -> Self { - let egress = tokio::sync::broadcast::channel(100); + let egress = tokio::sync::broadcast::channel(10_000); Self(bearer, egress) } - pub fn read_segment(&mut self) -> Result<(Protocol, Payload), Error> { + pub async fn read_segment(&mut self) -> Result<(Protocol, Payload), Error> { trace!("waiting for segment header"); let mut buf = vec![0u8; HEADER_LEN]; - self.0.read_exact(&mut buf).map_err(Error::BearerIo)?; + self.0.read_exact(&mut buf).await.map_err(Error::BearerIo)?; let header = Header::from(buf.as_slice()); trace!("waiting for full segment"); let segment_size = header.payload_len as usize; let mut buf = vec![0u8; segment_size]; - self.0.read_exact(&mut buf).map_err(Error::BearerIo)?; + self.0.read_exact(&mut buf).await.map_err(Error::BearerIo)?; Ok((header.protocol, buf)) } @@ -262,11 +264,19 @@ impl Demuxer { self.1 .0.subscribe() } - pub fn tick(&mut self) -> Result<(), Error> { - let (protocol, payload) = self.read_segment()?; + pub async fn tick(&mut self) -> Result<(), Error> { + let (protocol, payload) = self.read_segment().await?; trace!(protocol, "demux happening"); self.demux(protocol, payload) } + + pub async fn run(&mut self) -> Result<(), Error> { + loop { + if let Err(err) = self.tick().await { + break Err(err); + } + } + } } type Ingress = ( @@ -285,7 +295,7 @@ impl Muxer { Self(bearer, clock, ingress) } - fn write_segment(&mut self, protocol: u16, payload: &[u8]) -> Result<(), std::io::Error> { + async fn write_segment(&mut self, protocol: u16, payload: &[u8]) -> Result<(), std::io::Error> { let header = Header { protocol, timestamp: self.1.elapsed().as_micros() as u32, @@ -293,16 +303,17 @@ impl Muxer { }; let buf: [u8; 8] = header.into(); - self.0.write_all(&buf)?; - self.0.write_all(payload)?; + self.0.write_all(&buf).await?; + self.0.write_all(payload).await?; - self.0.flush()?; + self.0.flush().await?; Ok(()) } - pub fn mux(&mut self, msg: (Protocol, Payload)) -> Result<(), Error> { + pub async fn mux(&mut self, msg: (Protocol, Payload)) -> Result<(), Error> { self.write_segment(msg.0, &msg.1) + .await .map_err(|_| Error::PlexerMux)?; if tracing::event_enabled!(tracing::Level::TRACE) { @@ -320,16 +331,24 @@ impl Muxer { self.2 .0.clone() } - pub fn tick(&mut self) -> Result<(), Error> { - let msg = self.2 .1.blocking_recv(); + pub async fn tick(&mut self) -> Result<(), Error> { + let msg = self.2 .1.recv().await; if let Some(x) = msg { trace!(protocol = x.0, "mux happening"); - self.mux(x)? + self.mux(x).await? } Ok(()) } + + pub async fn run(&mut self) -> Result<(), Error> { + loop { + if let Err(err) = self.tick().await { + break Err(err); + } + } + } } type ToPlexerPort = tokio::sync::mpsc::Sender<(Protocol, Payload)>; @@ -395,19 +414,12 @@ impl AgentChannel { pub struct RunningPlexer { demuxer: JoinHandle>, muxer: JoinHandle>, - abort: Arc, } impl RunningPlexer { - pub fn abort(&self) { - self.abort.store(true, Ordering::Relaxed); - } - - pub fn join(self) -> Result<(), Error> { - self.demuxer.join().expect("couldn't join demuxer thread")?; - self.muxer.join().expect("couldn't join muxer thread")?; - - Ok(()) + pub async fn abort(self) { + self.demuxer.abort(); + self.muxer.abort(); } } @@ -441,35 +453,11 @@ impl Plexer { pub fn spawn(self) -> RunningPlexer { let mut demuxer = self.demuxer; let mut muxer = self.muxer; - let abort = Arc::new(AtomicBool::new(false)); - - let demuxer_abort = abort.clone(); - let demuxer = std::thread::spawn(move || loop { - if demuxer_abort.load(Ordering::Relaxed) { - break Ok(()); - } - if let Err(err) = demuxer.tick() { - break Err(err); - } - }); - - let muxer_abort = abort.clone(); - let muxer = std::thread::spawn(move || loop { - if muxer_abort.load(Ordering::Relaxed) { - break Ok(()); - } + let demuxer = tokio::spawn(async move { demuxer.run().await }); + let muxer = tokio::spawn(async move { muxer.run().await }); - if let Err(err) = muxer.tick() { - break Err(err); - } - }); - - RunningPlexer { - demuxer, - muxer, - abort, - } + RunningPlexer { demuxer, muxer } } } diff --git a/pallas-network/tests/plexer.rs b/pallas-network/tests/plexer.rs index 0f078e3c..13de2c7a 100644 --- a/pallas-network/tests/plexer.rs +++ b/pallas-network/tests/plexer.rs @@ -2,20 +2,24 @@ use std::net::{Ipv4Addr, SocketAddrV4}; use pallas_network::multiplexer::{Bearer, Plexer}; use rand::{distributions::Uniform, Rng}; -use std::net::TcpListener; +use tokio::net::TcpListener; -fn setup_passive_muxer() -> Plexer { - let server = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap(); +async fn setup_passive_muxer() -> Plexer { + let server = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)) + .await + .unwrap(); println!("listening for connections on port {P}"); - let (bearer, _) = Bearer::accept_tcp(&server).unwrap(); + let (bearer, _) = Bearer::accept_tcp(&server).await.unwrap(); Plexer::new(bearer) } -fn setup_active_muxer() -> Plexer { - let bearer = Bearer::connect_tcp(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)).unwrap(); +async fn setup_active_muxer() -> Plexer { + let bearer = Bearer::connect_tcp(SocketAddrV4::new(Ipv4Addr::LOCALHOST, P)) + .await + .unwrap(); println!("active plexer connected"); @@ -29,13 +33,13 @@ fn random_payload(size: usize) -> Vec { #[tokio::test] async fn one_way_small_sequence_of_payloads() { - let passive = tokio::task::spawn_blocking(setup_passive_muxer::<50301>); + let passive = tokio::task::spawn(setup_passive_muxer::<50301>()); // HACK: a small sleep seems to be required for Github actions runner to // formally expose the port tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let mut active = setup_active_muxer::<50301>(); + let mut active = setup_active_muxer::<50301>().await; let mut passive = passive.await.unwrap(); @@ -53,6 +57,6 @@ async fn one_way_small_sequence_of_payloads() { assert_eq!(payload, received_payload); } - passive.abort(); - active.abort(); + passive.abort().await; + active.abort().await; } diff --git a/pallas-network/tests/protocols.rs b/pallas-network/tests/protocols.rs index 9cbc2b9e..3984d3a0 100644 --- a/pallas-network/tests/protocols.rs +++ b/pallas-network/tests/protocols.rs @@ -19,11 +19,12 @@ use pallas_network::miniprotocols::{ }; use pallas_network::miniprotocols::{handshake, localstate, txsubmission, MAINNET_MAGIC}; use pallas_network::multiplexer::{Bearer, Plexer}; -use std::net::TcpListener; use std::path::Path; +use tokio::net::TcpListener; + #[cfg(unix)] -use std::os::unix::net::UnixListener; +use tokio::net::UnixListener; #[tokio::test] #[ignore] @@ -176,8 +177,9 @@ pub async fn blockfetch_server_and_client_happy_path() { hex::decode("deadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef").unwrap(), ); - let listener = - Arc::new(TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30003)).unwrap()); + let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30003)) + .await + .unwrap(); let server = tokio::spawn({ let bodies = block_bodies.clone(); @@ -185,7 +187,7 @@ pub async fn blockfetch_server_and_client_happy_path() { async move { // server setup - let mut peer_server = PeerServer::accept(listener, 0).await.unwrap(); + let mut peer_server = PeerServer::accept(&listener, 0).await.unwrap(); let server_bf = peer_server.blockfetch(); @@ -270,13 +272,11 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { async move { // server setup - let server_listener = - TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30002)).unwrap(); - - let bearer = - tokio::task::spawn_blocking(move || Bearer::accept_tcp(&server_listener).unwrap()); + let server_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30002)) + .await + .unwrap(); - let (bearer, _) = bearer.await.unwrap(); + let (bearer, _) = Bearer::accept_tcp(&server_listener).await.unwrap(); let mut server_plexer = Plexer::new(bearer); @@ -381,7 +381,7 @@ pub async fn chainsync_server_and_client_happy_path_n2n() { assert!(server_cs.recv_while_idle().await.unwrap().is_none()); assert_eq!(*server_cs.state(), chainsync::State::Done); - server_plexer.abort(); + server_plexer.abort().await; } }); @@ -780,13 +780,14 @@ pub async fn local_state_query_server_and_client_happy_path() { pub async fn txsubmission_server_and_client_happy_path_n2n() { let test_txs = vec![(vec![0], vec![0, 0, 0]), (vec![1], vec![1, 1, 1])]; - let server_listener = - Arc::new(TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)).unwrap()); + let server_listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 30001)) + .await + .unwrap(); let server = tokio::spawn({ let test_txs = test_txs.clone(); async move { - let mut peer_server = PeerServer::accept(server_listener, 0).await.unwrap(); + let mut peer_server = PeerServer::accept(&server_listener, 0).await.unwrap(); let server_txsub = peer_server.txsubmission();