Skip to content

Commit

Permalink
improve facade entry point
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Dec 20, 2023
1 parent 63c0911 commit d71678a
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 139 deletions.
6 changes: 3 additions & 3 deletions examples/n2n-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use pallas::network::{
facades::PeerClient,
miniprotocols::{chainsync, Point, MAINNET_MAGIC},
multiplexer::Bearer,
};
use tokio::time::Instant;
use tracing::info;
Expand Down Expand Up @@ -66,8 +65,9 @@ async fn main() {

// setup a TCP socket to act as data bearer between our agents and the remote
// relay.
let bearer = Bearer::connect_tcp("relays-new.cardano-mainnet.iohk.io:3001").unwrap();
let mut peer = PeerClient::connect(bearer, MAINNET_MAGIC).await.unwrap();
let mut peer = PeerClient::connect("relays-new.cardano-mainnet.iohk.io:3001", MAINNET_MAGIC)
.await
.unwrap();

// fetch an arbitrary batch of block
do_blockfetch(&mut peer).await;
Expand Down
238 changes: 151 additions & 87 deletions pallas-network/src/facades.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::net::{SocketAddr, TcpListener};
use std::os::unix::net::UnixListener;

Check failure on line 2 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

failed to resolve: could not find `unix` in `os`
use std::path::Path;
use thiserror::Error;
use tracing::error;

use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber, VersionTable};
use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber};

Check warning on line 7 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

unused imports: `Confirmation`, `VersionNumber`, `n2c`

use crate::miniprotocols::{
blockfetch, chainsync, handshake, keepalive, localstate, txsubmission, PROTOCOL_N2C_CHAIN_SYNC,
PROTOCOL_N2C_HANDSHAKE, PROTOCOL_N2C_STATE_QUERY, PROTOCOL_N2N_BLOCK_FETCH,
PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_KEEP_ALIVE,
PROTOCOL_N2N_TX_SUBMISSION,
};
use crate::multiplexer::{self, Bearer};

use crate::multiplexer::{self, Bearer, RunningPlexer};

#[derive(Debug, Error)]
pub enum Error {
Expand All @@ -29,15 +33,15 @@ pub enum Error {
/// Client of N2N Ouroboros
pub struct PeerClient {
plexer: RunningPlexer,
pub handshake: handshake::Confirmation<handshake::n2n::VersionData>,
pub chainsync: chainsync::N2NClient,
pub blockfetch: blockfetch::Client,
pub txsubmission: txsubmission::Client,
pub keepalive: keepalive::Client,
handshake: handshake::N2NClient,
chainsync: chainsync::N2NClient,
blockfetch: blockfetch::Client,
txsubmission: txsubmission::Client,
keepalive: keepalive::Client,
}

impl PeerClient {
pub async fn connect(bearer: Bearer, magic: u64) -> Result<Self, Error> {
pub fn new(bearer: Bearer) -> Self {
let mut plexer = multiplexer::Plexer::new(bearer);

let hs_channel = plexer.subscribe_client(PROTOCOL_N2N_HANDSHAKE);
Expand All @@ -48,10 +52,28 @@ impl PeerClient {

let plexer = plexer.spawn();

Self {
plexer,
handshake: handshake::Client::new(hs_channel),
chainsync: chainsync::Client::new(cs_channel),
blockfetch: blockfetch::Client::new(bf_channel),
txsubmission: txsubmission::Client::new(txsub_channel),
keepalive: keepalive::Client::new(keepalive_channel),
}
}

pub async fn connect(addr: &'static str, magic: u64) -> Result<Self, Error> {
let bearer = tokio::task::spawn_blocking(move || Bearer::connect_tcp(addr))
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;

let mut client = Self::new(bearer);

let versions = handshake::n2n::VersionTable::v7_and_above(magic);
let mut client = handshake::Client::new(hs_channel);

let handshake = client
.handshake()
.handshake(versions)
.await
.map_err(Error::HandshakeProtocol)?;
Expand All @@ -61,14 +83,11 @@ impl PeerClient {
return Err(Error::IncompatibleVersion);
}

Ok(Self {
plexer,
handshake,
chainsync: chainsync::Client::new(cs_channel),
blockfetch: blockfetch::Client::new(bf_channel),
txsubmission: txsubmission::Client::new(txsub_channel),
keepalive: keepalive::Client::new(keepalive_channel),
})
Ok(client)
}

pub fn handshake(&mut self) -> &mut handshake::N2NClient {
&mut self.handshake
}

pub fn chainsync(&mut self) -> &mut chainsync::N2NClient {
Expand All @@ -94,48 +113,74 @@ impl PeerClient {

/// Server of N2N Ouroboros
pub struct PeerServer {
pub plexer: RunningPlexer,
pub version: (VersionNumber, n2n::VersionData),
pub chainsync: chainsync::N2NServer,
pub blockfetch: blockfetch::Server,
pub txsubmission: txsubmission::Server,
plexer: RunningPlexer,
handshake: handshake::N2NServer,
chainsync: chainsync::N2NServer,
blockfetch: blockfetch::Server,
txsubmission: txsubmission::Server,
accepted_address: Option<SocketAddr>,
accepted_version: Option<u64>,
}

impl PeerServer {
pub async fn serve(bearer: Bearer, magic: u64) -> Result<Self, Error> {
pub fn new(bearer: Bearer) -> Self {
let mut plexer = multiplexer::Plexer::new(bearer);

let hs_channel = plexer.subscribe_server(PROTOCOL_N2N_HANDSHAKE);
let cs_channel = plexer.subscribe_server(PROTOCOL_N2N_CHAIN_SYNC);
let bf_channel = plexer.subscribe_server(PROTOCOL_N2N_BLOCK_FETCH);
let txsub_channel = plexer.subscribe_server(PROTOCOL_N2N_TX_SUBMISSION);

let mut server_hs: handshake::Server<n2n::VersionData> = handshake::Server::new(hs_channel);
let server_cs = chainsync::N2NServer::new(cs_channel);
let server_bf = blockfetch::Server::new(bf_channel);
let server_txsub = txsubmission::Server::new(txsub_channel);
let hs = handshake::N2NServer::new(hs_channel);
let cs = chainsync::N2NServer::new(cs_channel);
let bf = blockfetch::Server::new(bf_channel);
let txsub = txsubmission::Server::new(txsub_channel);

let plexer = plexer.spawn();

let accepted_version = server_hs
Self {
plexer,
handshake: hs,
chainsync: cs,
blockfetch: bf,
txsubmission: txsub,
accepted_address: None,
accepted_version: None,
}
}

pub async fn accept(
listener: impl AsRef<TcpListener> + Send + 'static,
magic: u64,
) -> Result<Self, Error> {
let (bearer, address) =
tokio::task::spawn_blocking(move || Bearer::accept_tcp(listener.as_ref()))
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;

let mut client = Self::new(bearer);

let accepted_version = client
.handshake()
.handshake(n2n::VersionTable::v7_and_above(magic))
.await
.map_err(Error::HandshakeProtocol)?;

if let Some(ver) = accepted_version {
Ok(Self {
plexer,
version: ver,
chainsync: server_cs,
blockfetch: server_bf,
txsubmission: server_txsub,
})
if let Some((version, _)) = accepted_version {
client.accepted_address = Some(address);
client.accepted_version = Some(version);
Ok(client)
} else {
plexer.abort();
client.abort();
Err(Error::IncompatibleVersion)
}
}

pub fn handshake(&mut self) -> &mut handshake::N2NServer {
&mut self.handshake
}

pub fn chainsync(&mut self) -> &mut chainsync::N2NServer {
&mut self.chainsync
}
Expand All @@ -156,16 +201,13 @@ impl PeerServer {
/// Client of N2C Ouroboros
pub struct NodeClient {
plexer: RunningPlexer,
pub handshake: handshake::Confirmation<handshake::n2c::VersionData>,
pub chainsync: chainsync::N2CClient,
pub statequery: localstate::Client,
handshake: handshake::N2CClient,
chainsync: chainsync::N2CClient,
statequery: localstate::Client,
}

impl NodeClient {
async fn connect_bearer(
bearer: Bearer,
versions: VersionTable<n2c::VersionData>,
) -> Result<Self, Error> {
pub fn new(bearer: Bearer) -> Self {
let mut plexer = multiplexer::Plexer::new(bearer);

let hs_channel = plexer.subscribe_client(PROTOCOL_N2C_HANDSHAKE);
Expand All @@ -174,47 +216,39 @@ impl NodeClient {

let plexer = plexer.spawn();

let mut client = handshake::Client::new(hs_channel);

let handshake = client
.handshake(versions)
.await
.map_err(Error::HandshakeProtocol)?;

if let handshake::Confirmation::Rejected(reason) = handshake {
error!(?reason, "handshake refused");
return Err(Error::IncompatibleVersion);
}

Ok(Self {
Self {
plexer,
handshake,
handshake: handshake::Client::new(hs_channel),
chainsync: chainsync::Client::new(cs_channel),
statequery: localstate::Client::new(sq_channel),
})
}

#[cfg(unix)]
pub async fn connect(bearer: Bearer, magic: u64) -> Result<Self, Error> {
let versions = handshake::n2c::VersionTable::v10_and_above(magic);

Self::connect_bearer(bearer, versions).await
}
}

#[cfg(windows)]
pub async fn connect(
pipe_name: impl AsRef<std::ffi::OsStr>,
path: impl AsRef<Path> + Send + 'static,
magic: u64,
) -> Result<Self, Error> {
debug!("connecting");

let bearer = Bearer::connect_named_pipe(pipe_name)
let bearer = tokio::task::spawn_blocking(move || Bearer::connect_unix(path))

Check failure on line 231 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

no variant or associated item named `connect_unix` found for enum `Bearer` in the current scope
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;

let mut client = Self::new(bearer);

let versions = handshake::n2c::VersionTable::v10_and_above(magic);

Self::connect_bearer(bearer, versions).await
let handshake = client
.handshake()
.handshake(versions)
.await
.map_err(Error::HandshakeProtocol)?;

if let handshake::Confirmation::Rejected(reason) = handshake {
error!(?reason, "handshake refused");
return Err(Error::IncompatibleVersion);
}

Ok(client)
}

#[cfg(unix)]
Expand Down Expand Up @@ -252,6 +286,10 @@ impl NodeClient {
}
}

pub fn handshake(&mut self) -> &mut handshake::N2CClient {
&mut self.handshake
}

pub fn chainsync(&mut self) -> &mut chainsync::N2CClient {
&mut self.chainsync
}
Expand All @@ -268,45 +306,71 @@ impl NodeClient {
/// Server of N2C Ouroboros.
#[cfg(unix)]
pub struct NodeServer {
pub plexer: RunningPlexer,
pub version: (VersionNumber, n2c::VersionData),
pub chainsync: chainsync::N2CServer,
pub statequery: localstate::Server,
plexer: RunningPlexer,
handshake: handshake::N2CServer,
chainsync: chainsync::N2CServer,
statequery: localstate::Server,
accepted_address: Option<std::os::unix::net::SocketAddr>,
accpeted_version: Option<(VersionNumber, n2c::VersionData)>,
}

#[cfg(unix)]
impl NodeServer {
pub async fn serve(bearer: Bearer, magic: u64) -> Result<Self, Error> {
pub async fn new(bearer: Bearer) -> Self {
let mut plexer = multiplexer::Plexer::new(bearer);

let hs_channel = plexer.subscribe_server(PROTOCOL_N2C_HANDSHAKE);
let cs_channel = plexer.subscribe_server(PROTOCOL_N2C_CHAIN_SYNC);
let sq_channel = plexer.subscribe_server(PROTOCOL_N2C_STATE_QUERY);

let mut server_hs: handshake::Server<n2c::VersionData> = handshake::Server::new(hs_channel);
let server_hs = handshake::Server::<n2c::VersionData>::new(hs_channel);
let server_cs = chainsync::N2CServer::new(cs_channel);
let server_sq = localstate::Server::new(sq_channel);

let plexer = plexer.spawn();

let accepted_version = server_hs
Self {
plexer,
handshake: server_hs,
chainsync: server_cs,
statequery: server_sq,
accepted_address: None,
accpeted_version: None,
}
}

pub async fn accept(
listener: impl AsRef<UnixListener> + Send + 'static,
magic: u64,
) -> Result<Self, Error> {
let (bearer, address) =
tokio::task::spawn_blocking(move || Bearer::accept_unix(listener.as_ref()))
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;

let mut client = Self::new(bearer).await;

let accepted_version = client
.handshake()
.handshake(n2c::VersionTable::v10_and_above(magic))
.await
.map_err(Error::HandshakeProtocol)?;

if let Some(ver) = accepted_version {
Ok(Self {
plexer,
version: ver,
chainsync: server_cs,
statequery: server_sq,
})
if let Some(version) = accepted_version {
client.accepted_address = Some(address);
client.accpeted_version = Some(version);
Ok(client)
} else {
plexer.abort();
client.abort();
Err(Error::IncompatibleVersion)
}
}

pub fn handshake(&mut self) -> &mut handshake::N2CServer {
&mut self.handshake
}

pub fn chainsync(&mut self) -> &mut chainsync::N2CServer {
&mut self.chainsync
}
Expand Down
Loading

0 comments on commit d71678a

Please sign in to comment.