Skip to content

Commit

Permalink
refactor(iroh-cli): avoid futures crate
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Mar 22, 2024
1 parent 9fa2af7 commit 975c903
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 29 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion iroh-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ console = { version = "0.15.5" }
derive_more = { version = "1.0.0-beta.1", features = ["display"] }
dialoguer = { version = "0.11.0", default-features = false }
dirs-next = { version = "2.0.0" }
futures = "0.3.30"
futures-buffered = "0.2.4"
futures-lite = "2.3"
hex = "0.4.3"
human-time = { version = "0.1.6" }
indicatif = { version = "0.17", features = ["tokio"] }
Expand Down
277 changes: 277 additions & 0 deletions iroh-cli/src/commands/#start.rs#
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
use std::{net::SocketAddr, path::Path, time::Duration, future::Future};

use anyhow::Result;
use colored::Colorize;
use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle};
use iroh::node::Node;
use iroh::{
net::relay::{RelayMap, RelayMode},
node::RpcStatus,
};
use tracing::{info_span, Instrument};

use crate::config::NodeConfig;

/// Whether to stop the node after running a command or run forever until stopped.
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum RunType {
/// Run a single command, and then shutdown the node. Allow to abort with Ctrl-C.
SingleCommandAbortable,
/// Run a single command, and then shutdown the node. Do not abort on Ctrl-C (expects Ctrl-C to be handled internally).
SingleCommandNoAbort,
/// Run until manually stopped (through Ctrl-C or shutdown RPC command)
UntilStopped,
}

#[derive(thiserror::Error, Debug)]
#[error("iroh is already running on port {0}")]
pub struct AlreadyRunningError(u16);

pub async fn run_with_command<F, T>(
config: &NodeConfig,
iroh_data_root: &Path,
run_type: RunType,
command: F,
) -> Result<()>
where
F: FnOnce(iroh::client::mem::Iroh) -> T + Send + 'static,
T: Future<Output = Result<()>> + 'static,
{
let metrics_fut = start_metrics_server(config.metrics_addr);

let res = run_with_command_inner(config, iroh_data_root, run_type, command).await;

if let Some(metrics_fut) = metrics_fut {
metrics_fut.abort();
}

let (clear_rpc, res) = match res {
Ok(()) => (true, res),
Err(e) => match e.downcast::<AlreadyRunningError>() {
// iroh is already running in a different process, do no remove the rpc lockfile
Ok(already_running) => (false, Err(already_running.into())),
Err(e) => (true, Err(e)),
},
};

if clear_rpc {
RpcStatus::clear(iroh_data_root).await?;
}

res
}

async fn run_with_command_inner<F, T>(
config: &NodeConfig,
iroh_data_root: &Path,
run_type: RunType,
command: F,
) -> Result<()>
where
F: FnOnce(iroh::client::mem::Iroh) -> T + Send + 'static,
T: Future<Output = Result<()>> + 'static,
{
let relay_map = config.relay_map()?;

let spinner = create_spinner("Iroh booting...");
let node = start_node(iroh_data_root, relay_map).await?;
drop(spinner);

eprintln!("{}", welcome_message(&node)?);

let client = node.client().clone();

let mut command_task = node.local_pool_handle().spawn_pinned(move || {
async move {
match command(client).await {
Err(err) => Err(err),
Ok(()) => {
// keep the task open forever if not running in single-command mode
if run_type == RunType::UntilStopped {
futures_lite::future::pending().await
}
Ok(())
}
}
}
.instrument(info_span!("command"))
});

let node2 = node.clone();
tokio::select! {
biased;
// always abort on signal-c
_ = tokio::signal::ctrl_c(), if run_type != RunType::SingleCommandNoAbort => {
command_task.abort();
node.shutdown();
// node.await?;
}
// abort if the command task finishes (will run forever if not in single-command mode)
res = &mut command_task => {
node.shutdown();
// let _ = node.await;
res??;
}
// abort if the node future completes (shutdown called or error)
res = node2 => {
command_task.abort();
res?;
}
}
Ok(())
}

pub(crate) async fn start_node(
iroh_data_root: &Path,
relay_map: Option<RelayMap>,
) -> Result<Node<iroh::bytes::store::fs::Store>> {
let rpc_status = RpcStatus::load(iroh_data_root).await?;
match rpc_status {
RpcStatus::Running { port, .. } => {
return Err(AlreadyRunningError(port).into());
}
RpcStatus::Stopped => {
// all good, we can go ahead
}
}

let relay_mode = match relay_map {
None => RelayMode::Default,
Some(relay_map) => RelayMode::Custom(relay_map),
};

Node::persistent(iroh_data_root)
.await?
.relay_mode(relay_mode)
.enable_rpc()
.await?
.spawn()
.await
}

fn welcome_message<B: iroh::bytes::store::Store>(node: &Node<B>) -> Result<String> {
let msg = format!(
"{}\nNode ID: {}\n",
"Iroh is running".green(),
node.node_id()
);

Ok(msg)
}

/// Create a nice spinner.
fn create_spinner(msg: &'static str) -> ProgressBar {
let pb = ProgressBar::new_spinner();
pb.enable_steady_tick(Duration::from_millis(80));
pb.set_draw_target(ProgressDrawTarget::stderr());
pb.set_style(
ProgressStyle::with_template("{spinner:.blue} {msg}")
.unwrap()
.tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
);
pb.set_message(msg);
pb.with_finish(indicatif::ProgressFinish::AndClear)
}

pub fn start_metrics_server(
metrics_addr: Option<SocketAddr>,
) -> Option<tokio::task::JoinHandle<()>> {
// doesn't start the server if the address is None
if let Some(metrics_addr) = metrics_addr {
// metrics are initilaized in iroh::node::Node::spawn
// here we only start the server
return Some(tokio::task::spawn(async move {
if let Err(e) = iroh_metrics::metrics::start_metrics_server(metrics_addr).await {
eprintln!("Failed to start metrics server: {e}");
}
}));
}
tracing::info!("Metrics server not started, no address provided");
None
}

#[cfg(test)]
mod tests {
use super::*;
use anyhow::bail;
use iroh::util::path::IrohPaths;

#[tokio::test]
async fn test_run_rpc_lock_file() -> Result<()> {
let data_dir = tempfile::TempDir::with_prefix("rpc-lock-file-")?;
let lock_file_path = data_dir
.path()
.join(IrohPaths::RpcLock.with_root(data_dir.path()));
let data_dir_path = data_dir.path().to_path_buf();

let (ready_s, ready_r) = tokio::sync::oneshot::channel();
let (close_s, close_r) = tokio::sync::oneshot::channel();

// run the first start command, using channels to coordinate so we know when the node has fully booted up, and when we need to shut the node down
let fut1 = run_with_command(
&NodeConfig::default(),
&data_dir_path,
RunType::SingleCommandAbortable,
|_| async move {
// inform the test the node is booted up
ready_s.send(()).unwrap();

// wait until the test tells us to shut down the node
close_r.await?;
Ok(())
},
)
.await;

// allow ample time for iroh to boot up
tokio::time::timeout(Duration::from_millis(20000), {
fut1.race(ready_r)
}).await.unwrap();

bail!("First `run_with_command` call never started");
}

// ensure the rpc lock file exists
if !lock_file_path.try_exists()? {
start.abort();
bail!("First `run_with_command` call never made the rpc lockfile");
}

// run the second command, this should fail
if run_with_command(
&NodeConfig::default(),
data_dir.path(),
RunType::SingleCommandAbortable,
|_| async move { Ok(()) },
)
.await
.is_ok()
{
start.abort();
bail!("Second `run_with_command` call should return error");
}

// ensure the rpc lock file still exists
if !lock_file_path.try_exists()? {
start.abort();
bail!("Second `run_with_command` removed the rpc lockfile");
}

// inform the node it should close
close_s.send(()).unwrap();

// wait for the node to close
if tokio::time::timeout(Duration::from_millis(1000), start)
.await
.is_err()
{
bail!("First `run_with_command` never closed");
}

// ensure the lockfile no longer exists
if lock_file_path.try_exists()? {
bail!("First `run_with_command` closed without removing the rpc lockfile");
}
Ok(())
}
}
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/author.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::{bail, Result};
use clap::Parser;
use futures::TryStreamExt;
use futures_lite::StreamExt;
use iroh::base::base32::fmt_short;

use iroh::sync::AuthorId;
Expand Down
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use anyhow::{anyhow, bail, ensure, Context, Result};
use clap::Subcommand;
use console::{style, Emoji};
use futures::{Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use indicatif::{
HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState,
ProgressStyle,
Expand Down
38 changes: 22 additions & 16 deletions iroh-cli/src/commands/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use anyhow::{anyhow, bail, Context, Result};
use clap::Parser;
use colored::Colorize;
use dialoguer::Confirm;
use futures::{Stream, StreamExt, TryStreamExt};
use futures_buffered::BufferedStreamExt;
use futures_lite::{Stream, StreamExt};
use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle};
use iroh::base::base32::fmt_short;
use quic_rpc::ServiceConnection;
Expand Down Expand Up @@ -799,8 +800,11 @@ where
(String, u64, Option<Hash>, u64),
>::new()));

let _stats: Vec<u64> = blob_add_progress
.filter_map(|item| async {
let doc2 = doc.clone();
let imp2 = task_imp.clone();

let _stats: Vec<_> = blob_add_progress
.filter_map(|item| {
let item = match item.context("Error adding files") {
Err(e) => return Some(Err(e)),
Ok(item) => item,
Expand Down Expand Up @@ -871,20 +875,22 @@ where
}
}
})
.try_chunks(1024)
.map_ok(|chunks| {
futures::stream::iter(chunks.into_iter().map(|(key, hash, size)| {
let doc = doc.clone();
let imp = task_imp.clone();
Ok(async move {
doc.set_hash(author_id, key, hash, size).await?;
imp.import_progress();
anyhow::Ok(size)
})
}))
.map(move |res| {
let doc = doc2.clone();
let imp = imp2.clone();
async move {
match res {
Ok((key, hash, size)) => {
let doc = doc.clone();
doc.set_hash(author_id, key, hash, size).await?;
imp.import_progress();
Ok(size)
}
Err(err) => Err(err),
}
}
})
.try_flatten()
.try_buffer_unordered(64)
.buffered_unordered(128)
.try_collect()
.await?;

Expand Down
4 changes: 2 additions & 2 deletions iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::config::{iroh_data_root, NodeConfig};

use anyhow::Context;
use clap::Subcommand;
use futures::StreamExt;
use futures_lite::StreamExt;
use indicatif::{HumanBytes, MultiProgress, ProgressBar};
use iroh::{
base::ticket::Ticket,
Expand Down Expand Up @@ -200,7 +200,7 @@ fn update_pb(
}
})
} else {
tokio::spawn(futures::future::ready(()))
tokio::spawn(std::future::ready(()))
}
}

Expand Down
Loading

0 comments on commit 975c903

Please sign in to comment.