Skip to content

Commit

Permalink
back to async again
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Dec 21, 2023
1 parent 901114e commit debc136
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 198 deletions.
87 changes: 49 additions & 38 deletions examples/n2n-miniprotocols/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use pallas::{network::{
facades::PeerClient,
miniprotocols::{chainsync, Point, MAINNET_MAGIC, blockfetch, keepalive},
}, ledger::traverse::MultiEraHeader};
use tokio::{time::Instant, select};
use pallas::{
ledger::traverse::MultiEraHeader,
network::{
facades::PeerClient,
miniprotocols::{blockfetch, chainsync, keepalive, Point, MAINNET_MAGIC},
},
};
use std::time::Duration;
use thiserror::Error;
use futures::{future::FutureExt, pin_mut};
use tokio::{select, time::Instant};

#[derive(Error, Debug)]
pub enum Error {
Expand All @@ -24,17 +27,27 @@ pub enum Error {
PallasTraverseError(#[from] pallas::ledger::traverse::Error),
}

async fn do_blockfetch(blockfetch_client: &mut blockfetch::Client, range: (Point, Point)) -> Result<(), Error> {
async fn do_blockfetch(
blockfetch_client: &mut blockfetch::Client,
range: (Point, Point),
) -> Result<(), Error> {
let blocks = blockfetch_client.fetch_range(range.clone()).await?;

for block in &blocks {
tracing::trace!("received block of size: {}", block.len());
}
tracing::info!("received {} blocks. last slot: {}", blocks.len(), range.1.slot_or_default());
tracing::info!(
"received {} blocks. last slot: {}",
blocks.len(),
range.1.slot_or_default()
);
Ok(())
}

async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_client: &mut blockfetch::Client) -> Result<(), Error> {
async fn do_chainsync(
mut chainsync_client: chainsync::N2NClient,
mut blockfetch_client: blockfetch::Client,
) -> Result<(), Error> {
let known_points = vec![Point::Specific(
43847831u64,
hex::decode("15b9eeee849dd6386d3770b0745e0450190f7560e5159b1b3ab13b14b2684a45")?,
Expand Down Expand Up @@ -64,18 +77,18 @@ async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_cl
MultiEraHeader::EpochBoundary(_) => {
tracing::info!("epoch boundary");
None
},
}
MultiEraHeader::AlonzoCompatible(_) | MultiEraHeader::Babbage(_) => {
if next_log.elapsed().as_secs() > 1 {
tracing::info!("chainsync block header: {}", number);
next_log = Instant::now();
}
Some(Point::Specific(slot, hash))
},
}
MultiEraHeader::Byron(_) => {
tracing::info!("ignoring byron header");
None
},
}
}
}
Some(_) => {
Expand All @@ -88,14 +101,17 @@ async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_cl
block_count += 1;
if block_count == 1 {
start_point = p;
}
else if block_count == 10 {
} else if block_count == 10 {
end_point = p;
do_blockfetch(blockfetch_client, (start_point.clone(), end_point.clone())).await?;
do_blockfetch(
&mut blockfetch_client,
(start_point.clone(), end_point.clone()),
)
.await?;
block_count = 0;
}
},
None => {},
}
None => {}
};
}
chainsync::NextResponse::RollBackward(x, _) => log::info!("rollback to {:?}", x),
Expand All @@ -104,15 +120,11 @@ async fn do_chainsync(chainsync_client: &mut chainsync::N2NClient, blockfetch_cl
}
}

async fn do_keepalive(keepalive_client: &mut keepalive::Client) -> Result<(), Error> {
let mut keepalive_timer = Instant::now();
async fn do_keepalive(mut keepalive_client: keepalive::Client) -> Result<(), Error> {
loop {
if keepalive_timer.elapsed().as_secs() > 20 {
tracing::info!("sending keepalive...");
keepalive_client.send_keepalive().await?;
tracing::info!("keepalive sent");
keepalive_timer = Instant::now();
}
tokio::time::sleep(Duration::from_secs(20)).await;
keepalive_client.send_keepalive().await?;
tracing::info!("keepalive sent");
}
}

Expand All @@ -130,20 +142,18 @@ async fn main() {
// relay.
let server = "backbone.cardano-mainnet.iohk.io:3001";
// let server = "localhost:6000";
let mut peer = PeerClient::connect(server, MAINNET_MAGIC)
.await
.unwrap();
let peer = PeerClient::connect(server, MAINNET_MAGIC).await.unwrap();

let chainsync_handle = tokio::spawn(async move {
do_chainsync(&mut peer.chainsync, &mut peer.blockfetch).await?;
Ok::<(), Error>(())
}).fuse();
let keepalive_handle = tokio::spawn(async move {
do_keepalive(&mut peer.keepalive).await?;
Ok::<(), Error>(())
}).fuse();
let PeerClient {
plexer,
chainsync,
blockfetch,
keepalive,
..
} = peer;

pin_mut!(chainsync_handle, keepalive_handle);
let chainsync_handle = tokio::spawn(do_chainsync(chainsync, blockfetch));
let keepalive_handle = tokio::spawn(do_keepalive(keepalive));

// If any of these concurrent tasks exit or fail, the others are canceled.
select! {
Expand Down Expand Up @@ -178,7 +188,8 @@ async fn main() {
}
}
}
peer.plexer_handle.abort();

plexer.abort().await;

tracing::info!("waiting 10 seconds before reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
Expand Down
78 changes: 40 additions & 38 deletions pallas-network/src/facades.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::net::{SocketAddr, TcpListener};
use std::net::SocketAddr;
use std::path::Path;

Check warning on line 2 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

unused import: `std::path::Path`
use thiserror::Error;
use tracing::error;

use tokio::net::TcpListener;

#[cfg(unix)]
use std::os::unix::net::UnixListener;
use tokio::net::{unix::SocketAddr as UnixSocketAddr, UnixListener};

use crate::miniprotocols::handshake::{n2c, n2n, Confirmation, VersionNumber};

Check warning on line 11 in pallas-network/src/facades.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

unused imports: `Confirmation`, `VersionNumber`, `n2c`

Expand Down Expand Up @@ -34,12 +36,12 @@ pub enum Error {

/// Client of N2N Ouroboros
pub struct PeerClient {
plexer: RunningPlexer,
handshake: handshake::N2NClient,
chainsync: chainsync::N2NClient,
blockfetch: blockfetch::Client,
txsubmission: txsubmission::Client,
keepalive: keepalive::Client,
pub plexer: RunningPlexer,
pub handshake: handshake::N2NClient,
pub chainsync: chainsync::N2NClient,
pub blockfetch: blockfetch::Client,
pub txsubmission: txsubmission::Client,
pub keepalive: keepalive::Client,
}

impl PeerClient {
Expand All @@ -65,9 +67,8 @@ impl PeerClient {
}

pub async fn connect(addr: &'static str, magic: u64) -> Result<Self, Error> {
let bearer = tokio::task::spawn_blocking(move || Bearer::connect_tcp(addr))
let bearer = Bearer::connect_tcp(addr)
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;

let mut client = Self::new(bearer);
Expand Down Expand Up @@ -96,6 +97,15 @@ impl PeerClient {
&mut self.chainsync
}

pub async fn with_chainsync<T, O, Fut>(&mut self, op: T) -> tokio::task::JoinHandle<O>
where
T: FnOnce(&mut chainsync::N2NClient) -> Fut,
Fut: std::future::Future<Output = O> + Send + 'static,
O: Send + 'static,
{
tokio::spawn(op(&mut self.chainsync))
}

pub fn blockfetch(&mut self) -> &mut blockfetch::Client {
&mut self.blockfetch
}
Expand All @@ -108,8 +118,8 @@ impl PeerClient {
&mut self.keepalive
}

pub fn abort(&self) {
self.plexer.abort();
pub async fn abort(self) {
self.plexer.abort().await
}
}

Expand Down Expand Up @@ -151,15 +161,10 @@ impl PeerServer {
}
}

pub async fn accept(
listener: impl AsRef<TcpListener> + Send + 'static,
magic: u64,
) -> Result<Self, Error> {
let (bearer, address) =
tokio::task::spawn_blocking(move || Bearer::accept_tcp(listener.as_ref()))
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;
pub async fn accept(listener: &TcpListener, magic: u64) -> Result<Self, Error> {
let (bearer, address) = Bearer::accept_tcp(listener)
.await
.map_err(Error::ConnectFailure)?;

let mut client = Self::new(bearer);

Expand All @@ -174,7 +179,7 @@ impl PeerServer {
client.accepted_version = Some(version);
Ok(client)
} else {
client.abort();
client.abort().await;
Err(Error::IncompatibleVersion)
}
}
Expand All @@ -195,8 +200,8 @@ impl PeerServer {
&mut self.txsubmission
}

pub fn abort(&self) {
self.plexer.abort();
pub async fn abort(self) {
self.plexer.abort().await
}
}

Expand Down Expand Up @@ -231,9 +236,8 @@ impl NodeClient {
path: impl AsRef<Path> + Send + 'static,
magic: u64,
) -> Result<Self, Error> {
let bearer = tokio::task::spawn_blocking(move || Bearer::connect_unix(path))
let bearer = Bearer::connect_unix(path)
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;

let mut client = Self::new(bearer);
Expand Down Expand Up @@ -311,7 +315,7 @@ impl NodeClient {
Err(Error::IncompatibleVersion)
}
Confirmation::QueryReply(version_table) => {
plexer.abort();
plexer.abort().await;
Ok(version_table)
}
}
Expand All @@ -329,8 +333,8 @@ impl NodeClient {
&mut self.statequery
}

pub fn abort(&self) {
self.plexer.abort();
pub async fn abort(self) {
self.plexer.abort().await
}
}

Expand All @@ -341,7 +345,7 @@ pub struct NodeServer {
handshake: handshake::N2CServer,
chainsync: chainsync::N2CServer,
statequery: localstate::Server,
accepted_address: Option<std::os::unix::net::SocketAddr>,
accepted_address: Option<UnixSocketAddr>,
accpeted_version: Option<(VersionNumber, n2c::VersionData)>,
}

Expand Down Expand Up @@ -374,11 +378,9 @@ impl NodeServer {
listener: impl AsRef<UnixListener> + Send + 'static,
magic: u64,
) -> Result<Self, Error> {
let (bearer, address) =
tokio::task::spawn_blocking(move || Bearer::accept_unix(listener.as_ref()))
.await
.expect("can't join tokio thread")
.map_err(Error::ConnectFailure)?;
let (bearer, address) = Bearer::accept_unix(listener.as_ref())
.await
.map_err(Error::ConnectFailure)?;

let mut client = Self::new(bearer).await;

Expand All @@ -393,7 +395,7 @@ impl NodeServer {
client.accpeted_version = Some(version);
Ok(client)
} else {
client.abort();
client.abort().await;
Err(Error::IncompatibleVersion)
}
}
Expand All @@ -410,7 +412,7 @@ impl NodeServer {
&mut self.statequery
}

pub fn abort(&self) {
self.plexer.abort();
pub async fn abort(self) {
self.plexer.abort().await
}
}
Loading

0 comments on commit debc136

Please sign in to comment.