Skip to content

Commit

Permalink
fix: fix conditional code for windows builds
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Nov 16, 2023
1 parent d6bcffe commit 105da86
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 55 deletions.
2 changes: 1 addition & 1 deletion examples/n2c-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn do_chainsync(client: &mut NodeClient) {
// environment
const SOCKET_PATH: &str = "/tmp/node.socket";

#[cfg(target_family = "unix")]
#[cfg(unix)]
#[tokio::main]
async fn main() {
tracing::subscriber::set_global_default(
Expand Down
40 changes: 18 additions & 22 deletions pallas-network/src/facades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,10 @@ use tokio::net::TcpListener;
use tokio::task::JoinHandle;
use tracing::{debug, error};

#[cfg(not(target_os = "windows"))]
#[cfg(unix)]
use tokio::net::UnixListener;

use crate::miniprotocols::handshake::{
n2c::VersionData,
n2n, VersionTable,
Confirmation,
VersionNumber
};
use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable};

use crate::miniprotocols::PROTOCOL_N2N_HANDSHAKE;
use crate::{
Expand Down Expand Up @@ -160,9 +155,10 @@ pub struct NodeClient {
}

impl NodeClient {

async fn connect_bearer(bearer:Bearer,versions: VersionTable<VersionData>) -> Result<Self, Error> {

async fn connect_bearer(
bearer: Bearer,
versions: VersionTable<n2c::VersionData>,
) -> Result<Self, Error> {
let mut plexer = multiplexer::Plexer::new(bearer);

let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
Expand Down Expand Up @@ -191,8 +187,7 @@ impl NodeClient {
})
}


#[cfg(not(target_os = "windows"))]
#[cfg(unix)]
pub async fn connect(path: impl AsRef<Path>, magic: u64) -> Result<Self, Error> {
debug!("connecting");

Expand All @@ -202,25 +197,26 @@ impl NodeClient {

let versions = handshake::n2c::VersionTable::v10_and_above(magic);

Self::connect_bearer(bearer,versions).await
Self::connect_bearer(bearer, versions).await
}

#[cfg(target_os = "windows")]
pub async fn connect(pipe_name: impl AsRef<std::ffi::OsStr>, magic: u64) -> Result<Self, Error> {
#[cfg(windows)]
pub async fn connect(
pipe_name: impl AsRef<std::ffi::OsStr>,
magic: u64,
) -> Result<Self, Error> {
debug!("connecting");

let bearer = Bearer::connect_named_pipe(pipe_name)
.await
.map_err(Error::ConnectFailure)?;

let versions =
handshake::n2c::VersionTable::only_v10(magic);

Self::connect_bearer(bearer,versions).await
let versions = handshake::n2c::VersionTable::v10_and_above(magic);

Self::connect_bearer(bearer, versions).await
}

#[cfg(not(target_os = "windows"))]
#[cfg(unix)]
pub async fn handshake_query(
path: impl AsRef<Path>,
magic: u64,
Expand Down Expand Up @@ -275,15 +271,15 @@ impl NodeClient {
}

/// Server of N2C Ouroboros.
#[cfg(not(target_os = "windows"))]
#[cfg(unix)]
pub struct NodeServer {
pub plexer_handle: JoinHandle<Result<(), crate::multiplexer::Error>>,
pub version: (VersionNumber, n2c::VersionData),
pub chainsync: chainsync::N2CServer,
pub statequery: localstate::Server,
}

#[cfg(not(target_os = "windows"))]
#[cfg(unix)]
impl NodeServer {
pub async fn accept(listener: &UnixListener, magic: u64) -> Result<Self, Error> {
let (bearer, _) = Bearer::accept_unix(listener)
Expand Down
75 changes: 43 additions & 32 deletions pallas-network/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ use tokio::sync::mpsc::error::SendError;
use tokio::time::Instant;
use tracing::{debug, error, trace};

#[cfg(not(target_os = "windows"))]
#[cfg(unix)]
use tokio::net::{UnixListener, UnixStream};

#[cfg(not(target_os = "windows"))]
use tokio::net::UnixListener;

#[cfg(windows)]
use tokio::net::windows::named_pipe::NamedPipeClient;

Expand Down Expand Up @@ -65,16 +62,15 @@ pub struct Segment {
pub payload: Payload,
}

#[cfg(target_os = "windows")]
#[cfg(unix)]
pub enum Bearer {
Tcp(TcpStream),
NamedPipe(NamedPipeClient)
}

#[cfg(not(target_os = "windows"))]
pub enum Bearer {
Tcp(TcpStream),
#[cfg(unix)]
Unix(UnixStream),

#[cfg(windows)]
NamedPipe(NamedPipeClient),
}

const BUFFER_LEN: usize = 1024 * 10;
Expand All @@ -90,7 +86,13 @@ impl Bearer {
Ok((Self::Tcp(stream), addr))
}

#[cfg(not(target_os = "windows"))]
#[cfg(unix)]
pub async fn connect_unix(path: impl AsRef<std::path::Path>) -> Result<Self, tokio::io::Error> {
let stream = UnixStream::connect(path).await?;
Ok(Self::Unix(stream))
}

#[cfg(unix)]
pub async fn accept_unix(
listener: &UnixListener,
) -> tokio::io::Result<(Self, tokio::net::unix::SocketAddr)> {
Expand All @@ -99,63 +101,72 @@ impl Bearer {
}

#[cfg(windows)]
pub async fn connect_named_pipe(pipe_name: impl AsRef<std::ffi::OsStr>) -> Result<Self, tokio::io::Error>{
pub async fn connect_named_pipe(
pipe_name: impl AsRef<std::ffi::OsStr>,
) -> Result<Self, tokio::io::Error> {
// TODO: revisit if busy wait logic is required
let client = loop {
match tokio::net::windows::named_pipe::ClientOptions::new().open(&pipe_name) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(windows_sys::Win32::Foundation::ERROR_PIPE_BUSY as i32) => (),
Err(e)
if e.raw_os_error()
== Some(windows_sys::Win32::Foundation::ERROR_PIPE_BUSY as i32) =>
{
()
}
Err(e) => return Err(e),
}

tokio::time::sleep(std::time::Duration::from_millis(50)).await;
};
Ok(Self::NamedPipe(client))
}

#[cfg(not(target_os = "windows"))]
pub async fn connect_unix(path: impl AsRef<std::path::Path>) -> Result<Self, tokio::io::Error> {
let stream = UnixStream::connect(path).await?;
Ok(Self::Unix(stream))
Ok(Self::NamedPipe(client))
}

pub async fn readable(&mut self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.readable().await,
#[cfg(not(target_os = "windows"))]

#[cfg(unix)]
Bearer::Unix(x) => x.readable().await,

#[cfg(target_os = "windows")]
Bearer::NamedPipe(x) => x.readable().await

#[cfg(windows)]
Bearer::NamedPipe(x) => x.readable().await,
}
}

fn try_read(&mut self, buf: &mut [u8]) -> tokio::io::Result<usize> {
match self {
Bearer::Tcp(x) => x.try_read(buf),
#[cfg(not(target_os = "windows"))]

#[cfg(unix)]
Bearer::Unix(x) => x.try_read(buf),
#[cfg(target_os = "windows")]
Bearer::NamedPipe(x) => x.try_read(buf)

#[cfg(windows)]
Bearer::NamedPipe(x) => x.try_read(buf),
}
}

async fn write_all(&mut self, buf: &[u8]) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.write_all(buf).await,
#[cfg(not(target_os = "windows"))]

#[cfg(unix)]
Bearer::Unix(x) => x.write_all(buf).await,
#[cfg(target_os = "windows")]

#[cfg(windows)]
Bearer::NamedPipe(x) => x.write_all(buf).await,
}
}

async fn flush(&mut self) -> tokio::io::Result<()> {
match self {
Bearer::Tcp(x) => x.flush().await,
#[cfg(not(target_os = "windows"))]

#[cfg(unix)]
Bearer::Unix(x) => x.flush().await,
#[cfg(target_os = "windows")]

#[cfg(windows)]
Bearer::NamedPipe(x) => x.flush().await,
}
}
Expand Down Expand Up @@ -203,7 +214,7 @@ impl SegmentBuffer {

let remaining = required - self.1.len();
let mut buf = vec![0u8; remaining];

match self.0.try_read(&mut buf) {
Ok(0) => {
error!("empty bearer");
Expand Down

0 comments on commit 105da86

Please sign in to comment.