From d6bcffe1b5b9c3b9a0d31a4504da3f51bf2376ed Mon Sep 17 00:00:00 2001 From: Olof Blomqvist Date: Thu, 16 Nov 2023 14:06:34 +0100 Subject: [PATCH] feat(network): implement windows named pipes connections (#279) --- pallas-network/Cargo.toml | 4 +++ pallas-network/src/facades.rs | 50 ++++++++++++++++++++++++------- pallas-network/src/multiplexer.rs | 38 ++++++++++++++++++++--- 3 files changed, 78 insertions(+), 14 deletions(-) diff --git a/pallas-network/Cargo.toml b/pallas-network/Cargo.toml index 6dbb6207..174c2c8d 100644 --- a/pallas-network/Cargo.toml +++ b/pallas-network/Cargo.toml @@ -23,6 +23,10 @@ thiserror = "1.0.31" tokio = { version = "1", features = ["net", "io-util", "time", "sync", "macros"] } tracing = "0.1.37" +[target.'cfg(windows)'.dependencies] +tokio-named-pipes = "0.1.0" +windows-sys = "0.48.0" + [dev-dependencies] tracing-subscriber = "0.3.16" tokio = { version = "1", features = ["full"] } diff --git a/pallas-network/src/facades.rs b/pallas-network/src/facades.rs index 491fdbd8..31a55390 100644 --- a/pallas-network/src/facades.rs +++ b/pallas-network/src/facades.rs @@ -8,7 +8,13 @@ use tracing::{debug, error}; #[cfg(not(target_os = "windows"))] use tokio::net::UnixListener; -use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber}; +use crate::miniprotocols::handshake::{ + n2c::VersionData, + n2n, VersionTable, + Confirmation, + VersionNumber +}; + use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE; use crate::{ miniprotocols::{ @@ -154,13 +160,8 @@ pub struct NodeClient { } impl NodeClient { - #[cfg(not(target_os = "windows"))] - pub async fn connect(path: impl AsRef, magic: u64) -> Result { - debug!("connecting"); - - let bearer = Bearer::connect_unix(path) - .await - .map_err(Error::ConnectFailure)?; + + async fn connect_bearer(bearer:Bearer,versions: VersionTable) -> Result { let mut plexer = multiplexer::Plexer::new(bearer); @@ -170,7 +171,6 @@ impl NodeClient { let plexer_handle = tokio::spawn(async move { plexer.run().await }); - let versions = handshake::n2c::VersionTable::v10_and_above(magic); let mut client = handshake::Client::new(hs_channel); let handshake = client @@ -191,6 +191,35 @@ impl NodeClient { }) } + + #[cfg(not(target_os = "windows"))] + pub async fn connect(path: impl AsRef, magic: u64) -> Result { + debug!("connecting"); + + let bearer = Bearer::connect_unix(path) + .await + .map_err(Error::ConnectFailure)?; + + let versions = handshake::n2c::VersionTable::v10_and_above(magic); + + Self::connect_bearer(bearer,versions).await + } + + #[cfg(target_os = "windows")] + pub async fn connect(pipe_name: impl AsRef, magic: u64) -> Result { + debug!("connecting"); + + let bearer = Bearer::connect_named_pipe(pipe_name) + .await + .map_err(Error::ConnectFailure)?; + + let versions = + handshake::n2c::VersionTable::only_v10(magic); + + Self::connect_bearer(bearer,versions).await + + } + #[cfg(not(target_os = "windows"))] pub async fn handshake_query( path: impl AsRef, @@ -245,7 +274,8 @@ impl NodeClient { } } -/// Server of N2C Ouroboros +/// Server of N2C Ouroboros. +#[cfg(not(target_os = "windows"))] pub struct NodeServer { pub plexer_handle: JoinHandle>, pub version: (VersionNumber, n2c::VersionData), diff --git a/pallas-network/src/multiplexer.rs b/pallas-network/src/multiplexer.rs index ffd5b054..57d23d3a 100644 --- a/pallas-network/src/multiplexer.rs +++ b/pallas-network/src/multiplexer.rs @@ -3,7 +3,6 @@ use byteorder::{ByteOrder, NetworkEndian}; use pallas_codec::{minicbor, Fragment}; use std::net::SocketAddr; -use std::path::Path; use thiserror::Error; use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; @@ -15,6 +14,12 @@ use tracing::{debug, error, trace}; #[cfg(not(target_os = "windows"))] use tokio::net::{UnixListener, UnixStream}; +#[cfg(not(target_os = "windows"))] +use tokio::net::UnixListener; + +#[cfg(windows)] +use tokio::net::windows::named_pipe::NamedPipeClient; + const HEADER_LEN: usize = 8; pub type Timestamp = u32; @@ -63,6 +68,7 @@ pub struct Segment { #[cfg(target_os = "windows")] pub enum Bearer { Tcp(TcpStream), + NamedPipe(NamedPipeClient) } #[cfg(not(target_os = "windows"))] @@ -92,17 +98,35 @@ impl Bearer { Ok((Self::Unix(stream), addr)) } + #[cfg(windows)] + pub async fn connect_named_pipe(pipe_name: impl AsRef) -> Result{ + let client = loop { + match tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name) { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PIPE_BUSY as i32) => (), + Err(e) => return Err(e), + } + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + }; + Ok(Self::NamedPipe(client)) + } + #[cfg(not(target_os = "windows"))] - pub async fn connect_unix(path: impl AsRef) -> Result { + pub async fn connect_unix(path: impl AsRef) -> Result { let stream = UnixStream::connect(path).await?; Ok(Self::Unix(stream)) } - pub async fn readable(&self) -> tokio::io::Result<()> { + pub async fn readable(&mut self) -> tokio::io::Result<()> { match self { Bearer::Tcp(x) => x.readable().await, #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.readable().await, + + #[cfg(target_os = "windows")] + Bearer::NamedPipe(x) => x.readable().await + } } @@ -111,6 +135,8 @@ impl Bearer { Bearer::Tcp(x) => x.try_read(buf), #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.try_read(buf), + #[cfg(target_os = "windows")] + Bearer::NamedPipe(x) => x.try_read(buf) } } @@ -119,6 +145,8 @@ impl Bearer { Bearer::Tcp(x) => x.write_all(buf).await, #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.write_all(buf).await, + #[cfg(target_os = "windows")] + Bearer::NamedPipe(x) => x.write_all(buf).await, } } @@ -127,6 +155,8 @@ impl Bearer { Bearer::Tcp(x) => x.flush().await, #[cfg(not(target_os = "windows"))] Bearer::Unix(x) => x.flush().await, + #[cfg(target_os = "windows")] + Bearer::NamedPipe(x) => x.flush().await, } } } @@ -173,7 +203,7 @@ impl SegmentBuffer { let remaining = required - self.1.len(); let mut buf = vec![0u8; remaining]; - + match self.0.try_read(&mut buf) { Ok(0) => { error!("empty bearer");