diff --git a/examples/n2c-miniprotocols/src/main.rs b/examples/n2c-miniprotocols/src/main.rs index 00072ba4..ea253f2e 100644 --- a/examples/n2c-miniprotocols/src/main.rs +++ b/examples/n2c-miniprotocols/src/main.rs @@ -58,7 +58,7 @@ async fn do_chainsync(client: &mut NodeClient) { // environment const SOCKET_PATH: &str = "/tmp/node.socket"; -#[cfg(target_family = "unix")] +#[cfg(unix)] #[tokio::main] async fn main() { tracing::subscriber::set_global_default( diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 31a55390..8ba4f366 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -5,15 +5,10 @@ use tokio::net::TcpListener; use tokio::task::JoinHandle; use tracing::{debug, error}; -#[cfg(not(target_os = "windows"))] +#[cfg(unix)] use tokio::net::UnixListener; -use crate::miniprotocols::handshake::{ - n2c::VersionData, - n2n, VersionTable, - Confirmation, - VersionNumber -}; +use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable}; use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE; use crate::{ @@ -160,9 +155,10 @@ pub struct NodeClient { } impl NodeClient { - - async fn connect_bearer(bearer:Bearer,versions: VersionTable) -> Result { - + async fn connect_bearer( + bearer: Bearer, + versions: VersionTable, + ) -> Result { let mut plexer = multiplexer::Plexer::new(bearer); let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE); @@ -191,8 +187,7 @@ impl NodeClient { }) } - - #[cfg(not(target_os = "windows"))] + #[cfg(unix)] pub async fn connect(path: impl AsRef, magic: u64) -> Result { debug!("connecting"); @@ -202,25 +197,26 @@ impl NodeClient { let versions = handshake::n2c::VersionTable::v10_and_above(magic); - Self::connect_bearer(bearer,versions).await + Self::connect_bearer(bearer, versions).await } - #[cfg(target_os = "windows")] - pub async fn connect(pipe_name: impl AsRef, magic: u64) -> Result { + #[cfg(windows)] + pub async fn connect( + pipe_name: impl AsRef, + magic: u64, + ) -> Result { debug!("connecting"); let bearer = Bearer::connect_named_pipe(pipe_name) .await .map_err(Error::ConnectFailure)?; - - let versions = - handshake::n2c::VersionTable::only_v10(magic); - Self::connect_bearer(bearer,versions).await + let versions = handshake::n2c::VersionTable::v10_and_above(magic); + Self::connect_bearer(bearer, versions).await } - #[cfg(not(target_os = "windows"))] + #[cfg(unix)] pub async fn handshake_query( path: impl AsRef, magic: u64, @@ -275,7 +271,7 @@ impl NodeClient { } /// Server of N2C Ouroboros. -#[cfg(not(target_os = "windows"))] +#[cfg(unix)] pub struct NodeServer { pub plexer_handle: JoinHandle>, pub version: (VersionNumber, n2c::VersionData), @@ -283,7 +279,7 @@ pub struct NodeServer { pub statequery: localstate::Server, } -#[cfg(not(target_os = "windows"))] +#[cfg(unix)] impl NodeServer { pub async fn accept(listener: &UnixListener, magic: u64) -> Result { let (bearer, _) = Bearer::accept_unix(listener) diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index 57d23d3a..219e1f49 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -11,12 +11,9 @@ use tokio::sync::mpsc::error::SendError; use tokio::time::Instant; use tracing::{debug, error, trace}; -#[cfg(not(target_os = "windows"))] +#[cfg(unix)] use tokio::net::{UnixListener, UnixStream}; -#[cfg(not(target_os = "windows"))] -use tokio::net::UnixListener; - #[cfg(windows)] use tokio::net::windows::named_pipe::NamedPipeClient; @@ -65,16 +62,15 @@ pub struct Segment { pub payload: Payload, } -#[cfg(target_os = "windows")] +#[cfg(unix)] pub enum Bearer { Tcp(TcpStream), - NamedPipe(NamedPipeClient) -} -#[cfg(not(target_os = "windows"))] -pub enum Bearer { - Tcp(TcpStream), + #[cfg(unix)] Unix(UnixStream), + + #[cfg(windows)] + NamedPipe(NamedPipeClient), } const BUFFER_LEN: usize = 1024 * 10; @@ -90,7 +86,13 @@ impl Bearer { Ok((Self::Tcp(stream), addr)) } - #[cfg(not(target_os = "windows"))] + #[cfg(unix)] + pub async fn connect_unix(path: impl AsRef) -> Result { + let stream = UnixStream::connect(path).await?; + Ok(Self::Unix(stream)) + } + + #[cfg(unix)] pub async fn accept_unix( listener: &UnixListener, ) -> tokio::io::Result<(Self, tokio::net::unix::SocketAddr)> { @@ -99,53 +101,60 @@ impl Bearer { } #[cfg(windows)] - pub async fn connect_named_pipe(pipe_name: impl AsRef) -> Result{ + pub async fn connect_named_pipe( + pipe_name: impl AsRef, + ) -> Result { + // TODO: revisit if busy wait logic is required let client = loop { match tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name) { Ok(client) => break client, - Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PIPE_BUSY as i32) => (), + Err(e) + if e.raw_os_error() + == Some(windows_sys::Win32::Foundation::ERROR_PIPE_BUSY as i32) => + { + () + } Err(e) => return Err(e), } - + tokio::time::sleep(std::time::Duration::from_millis(50)).await; }; - Ok(Self::NamedPipe(client)) - } - #[cfg(not(target_os = "windows"))] - pub async fn connect_unix(path: impl AsRef) -> Result { - let stream = UnixStream::connect(path).await?; - Ok(Self::Unix(stream)) + Ok(Self::NamedPipe(client)) } pub async fn readable(&mut self) -> tokio::io::Result<()> { match self { Bearer::Tcp(x) => x.readable().await, - #[cfg(not(target_os = "windows"))] + + #[cfg(unix)] Bearer::Unix(x) => x.readable().await, - #[cfg(target_os = "windows")] - Bearer::NamedPipe(x) => x.readable().await - + #[cfg(windows)] + Bearer::NamedPipe(x) => x.readable().await, } } fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result { match self { Bearer::Tcp(x) => x.try_read(buf), - #[cfg(not(target_os = "windows"))] + + #[cfg(unix)] Bearer::Unix(x) => x.try_read(buf), - #[cfg(target_os = "windows")] - Bearer::NamedPipe(x) => x.try_read(buf) + + #[cfg(windows)] + Bearer::NamedPipe(x) => x.try_read(buf), } } async fn write_all(&mut self, buf: &[u8]) -> tokio::io::Result<()> { match self { Bearer::Tcp(x) => x.write_all(buf).await, - #[cfg(not(target_os = "windows"))] + + #[cfg(unix)] Bearer::Unix(x) => x.write_all(buf).await, - #[cfg(target_os = "windows")] + + #[cfg(windows)] Bearer::NamedPipe(x) => x.write_all(buf).await, } } @@ -153,9 +162,11 @@ impl Bearer { async fn flush(&mut self) -> tokio::io::Result<()> { match self { Bearer::Tcp(x) => x.flush().await, - #[cfg(not(target_os = "windows"))] + + #[cfg(unix)] Bearer::Unix(x) => x.flush().await, - #[cfg(target_os = "windows")] + + #[cfg(windows)] Bearer::NamedPipe(x) => x.flush().await, } } @@ -203,7 +214,7 @@ impl SegmentBuffer { let remaining = required - self.1.len(); let mut buf = vec![0u8; remaining]; - + match self.0.try_read(&mut buf) { Ok(0) => { error!("empty bearer");