From 975c9033ee672a36da4213f94de0df742f84cf6c Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 22 Mar 2024 15:35:05 +0100 Subject: [PATCH] refactor(iroh-cli): avoid futures crate --- Cargo.lock | 3 +- iroh-cli/Cargo.toml | 3 +- iroh-cli/src/commands/#start.rs# | 277 +++++++++++++++++++++++++++++++ iroh-cli/src/commands/author.rs | 2 +- iroh-cli/src/commands/blob.rs | 2 +- iroh-cli/src/commands/doc.rs | 38 +++-- iroh-cli/src/commands/doctor.rs | 4 +- iroh-cli/src/commands/node.rs | 2 +- iroh-cli/src/commands/start.rs | 5 +- iroh-cli/src/commands/tag.rs | 2 +- iroh/src/node/builder.rs | 4 +- 11 files changed, 313 insertions(+), 29 deletions(-) create mode 100644 iroh-cli/src/commands/#start.rs# diff --git a/Cargo.lock b/Cargo.lock index 154ac4a2842..eecefbec634 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2453,7 +2453,8 @@ dependencies = [ "dialoguer", "dirs-next", "duct", - "futures", + "futures-buffered", + "futures-lite", "hex", "human-time", "indicatif", diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 6c926d0d0fb..114a04cf397 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -29,7 +29,8 @@ console = { version = "0.15.5" } derive_more = { version = "1.0.0-beta.1", features = ["display"] } dialoguer = { version = "0.11.0", default-features = false } dirs-next = { version = "2.0.0" } -futures = "0.3.30" +futures-buffered = "0.2.4" +futures-lite = "2.3" hex = "0.4.3" human-time = { version = "0.1.6" } indicatif = { version = "0.17", features = ["tokio"] } diff --git a/iroh-cli/src/commands/#start.rs# b/iroh-cli/src/commands/#start.rs# new file mode 100644 index 00000000000..9cad900e5a7 --- /dev/null +++ b/iroh-cli/src/commands/#start.rs# @@ -0,0 +1,277 @@ +use std::{net::SocketAddr, path::Path, time::Duration, future::Future}; + +use anyhow::Result; +use colored::Colorize; +use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; +use iroh::node::Node; +use iroh::{ + net::relay::{RelayMap, RelayMode}, + node::RpcStatus, +}; +use tracing::{info_span, Instrument}; + +use crate::config::NodeConfig; + +/// Whether to stop the node after running a command or run forever until stopped. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum RunType { + /// Run a single command, and then shutdown the node. Allow to abort with Ctrl-C. + SingleCommandAbortable, + /// Run a single command, and then shutdown the node. Do not abort on Ctrl-C (expects Ctrl-C to be handled internally). + SingleCommandNoAbort, + /// Run until manually stopped (through Ctrl-C or shutdown RPC command) + UntilStopped, +} + +#[derive(thiserror::Error, Debug)] +#[error("iroh is already running on port {0}")] +pub struct AlreadyRunningError(u16); + +pub async fn run_with_command( + config: &NodeConfig, + iroh_data_root: &Path, + run_type: RunType, + command: F, +) -> Result<()> +where + F: FnOnce(iroh::client::mem::Iroh) -> T + Send + 'static, + T: Future> + 'static, +{ + let metrics_fut = start_metrics_server(config.metrics_addr); + + let res = run_with_command_inner(config, iroh_data_root, run_type, command).await; + + if let Some(metrics_fut) = metrics_fut { + metrics_fut.abort(); + } + + let (clear_rpc, res) = match res { + Ok(()) => (true, res), + Err(e) => match e.downcast::() { + // iroh is already running in a different process, do no remove the rpc lockfile + Ok(already_running) => (false, Err(already_running.into())), + Err(e) => (true, Err(e)), + }, + }; + + if clear_rpc { + RpcStatus::clear(iroh_data_root).await?; + } + + res +} + +async fn run_with_command_inner( + config: &NodeConfig, + iroh_data_root: &Path, + run_type: RunType, + command: F, +) -> Result<()> +where + F: FnOnce(iroh::client::mem::Iroh) -> T + Send + 'static, + T: Future> + 'static, +{ + let relay_map = config.relay_map()?; + + let spinner = create_spinner("Iroh booting..."); + let node = start_node(iroh_data_root, relay_map).await?; + drop(spinner); + + eprintln!("{}", welcome_message(&node)?); + + let client = node.client().clone(); + + let mut command_task = node.local_pool_handle().spawn_pinned(move || { + async move { + match command(client).await { + Err(err) => Err(err), + Ok(()) => { + // keep the task open forever if not running in single-command mode + if run_type == RunType::UntilStopped { + futures_lite::future::pending().await + } + Ok(()) + } + } + } + .instrument(info_span!("command")) + }); + + let node2 = node.clone(); + tokio::select! { + biased; + // always abort on signal-c + _ = tokio::signal::ctrl_c(), if run_type != RunType::SingleCommandNoAbort => { + command_task.abort(); + node.shutdown(); + // node.await?; + } + // abort if the command task finishes (will run forever if not in single-command mode) + res = &mut command_task => { + node.shutdown(); + // let _ = node.await; + res??; + } + // abort if the node future completes (shutdown called or error) + res = node2 => { + command_task.abort(); + res?; + } + } + Ok(()) +} + +pub(crate) async fn start_node( + iroh_data_root: &Path, + relay_map: Option, +) -> Result> { + let rpc_status = RpcStatus::load(iroh_data_root).await?; + match rpc_status { + RpcStatus::Running { port, .. } => { + return Err(AlreadyRunningError(port).into()); + } + RpcStatus::Stopped => { + // all good, we can go ahead + } + } + + let relay_mode = match relay_map { + None => RelayMode::Default, + Some(relay_map) => RelayMode::Custom(relay_map), + }; + + Node::persistent(iroh_data_root) + .await? + .relay_mode(relay_mode) + .enable_rpc() + .await? + .spawn() + .await +} + +fn welcome_message(node: &Node) -> Result { + let msg = format!( + "{}\nNode ID: {}\n", + "Iroh is running".green(), + node.node_id() + ); + + Ok(msg) +} + +/// Create a nice spinner. +fn create_spinner(msg: &'static str) -> ProgressBar { + let pb = ProgressBar::new_spinner(); + pb.enable_steady_tick(Duration::from_millis(80)); + pb.set_draw_target(ProgressDrawTarget::stderr()); + pb.set_style( + ProgressStyle::with_template("{spinner:.blue} {msg}") + .unwrap() + .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]), + ); + pb.set_message(msg); + pb.with_finish(indicatif::ProgressFinish::AndClear) +} + +pub fn start_metrics_server( + metrics_addr: Option, +) -> Option> { + // doesn't start the server if the address is None + if let Some(metrics_addr) = metrics_addr { + // metrics are initilaized in iroh::node::Node::spawn + // here we only start the server + return Some(tokio::task::spawn(async move { + if let Err(e) = iroh_metrics::metrics::start_metrics_server(metrics_addr).await { + eprintln!("Failed to start metrics server: {e}"); + } + })); + } + tracing::info!("Metrics server not started, no address provided"); + None +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::bail; + use iroh::util::path::IrohPaths; + + #[tokio::test] + async fn test_run_rpc_lock_file() -> Result<()> { + let data_dir = tempfile::TempDir::with_prefix("rpc-lock-file-")?; + let lock_file_path = data_dir + .path() + .join(IrohPaths::RpcLock.with_root(data_dir.path())); + let data_dir_path = data_dir.path().to_path_buf(); + + let (ready_s, ready_r) = tokio::sync::oneshot::channel(); + let (close_s, close_r) = tokio::sync::oneshot::channel(); + + // run the first start command, using channels to coordinate so we know when the node has fully booted up, and when we need to shut the node down + let fut1 = run_with_command( + &NodeConfig::default(), + &data_dir_path, + RunType::SingleCommandAbortable, + |_| async move { + // inform the test the node is booted up + ready_s.send(()).unwrap(); + + // wait until the test tells us to shut down the node + close_r.await?; + Ok(()) + }, + ) + .await; + + // allow ample time for iroh to boot up + tokio::time::timeout(Duration::from_millis(20000), { + fut1.race(ready_r) + }).await.unwrap(); + + bail!("First `run_with_command` call never started"); + } + + // ensure the rpc lock file exists + if !lock_file_path.try_exists()? { + start.abort(); + bail!("First `run_with_command` call never made the rpc lockfile"); + } + + // run the second command, this should fail + if run_with_command( + &NodeConfig::default(), + data_dir.path(), + RunType::SingleCommandAbortable, + |_| async move { Ok(()) }, + ) + .await + .is_ok() + { + start.abort(); + bail!("Second `run_with_command` call should return error"); + } + + // ensure the rpc lock file still exists + if !lock_file_path.try_exists()? { + start.abort(); + bail!("Second `run_with_command` removed the rpc lockfile"); + } + + // inform the node it should close + close_s.send(()).unwrap(); + + // wait for the node to close + if tokio::time::timeout(Duration::from_millis(1000), start) + .await + .is_err() + { + bail!("First `run_with_command` never closed"); + } + + // ensure the lockfile no longer exists + if lock_file_path.try_exists()? { + bail!("First `run_with_command` closed without removing the rpc lockfile"); + } + Ok(()) + } +} diff --git a/iroh-cli/src/commands/author.rs b/iroh-cli/src/commands/author.rs index c584d9d0285..c512292e42e 100644 --- a/iroh-cli/src/commands/author.rs +++ b/iroh-cli/src/commands/author.rs @@ -1,6 +1,6 @@ use anyhow::{bail, Result}; use clap::Parser; -use futures::TryStreamExt; +use futures_lite::StreamExt; use iroh::base::base32::fmt_short; use iroh::sync::AuthorId; diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index d3d3c382558..865856d0c59 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{anyhow, bail, ensure, Context, Result}; use clap::Subcommand; use console::{style, Emoji}; -use futures::{Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use indicatif::{ HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState, ProgressStyle, diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index c04439d6b55..c19dabee89d 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -10,7 +10,8 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::Parser; use colored::Colorize; use dialoguer::Confirm; -use futures::{Stream, StreamExt, TryStreamExt}; +use futures_buffered::BufferedStreamExt; +use futures_lite::{Stream, StreamExt}; use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle}; use iroh::base::base32::fmt_short; use quic_rpc::ServiceConnection; @@ -799,8 +800,11 @@ where (String, u64, Option, u64), >::new())); - let _stats: Vec = blob_add_progress - .filter_map(|item| async { + let doc2 = doc.clone(); + let imp2 = task_imp.clone(); + + let _stats: Vec<_> = blob_add_progress + .filter_map(|item| { let item = match item.context("Error adding files") { Err(e) => return Some(Err(e)), Ok(item) => item, @@ -871,20 +875,22 @@ where } } }) - .try_chunks(1024) - .map_ok(|chunks| { - futures::stream::iter(chunks.into_iter().map(|(key, hash, size)| { - let doc = doc.clone(); - let imp = task_imp.clone(); - Ok(async move { - doc.set_hash(author_id, key, hash, size).await?; - imp.import_progress(); - anyhow::Ok(size) - }) - })) + .map(move |res| { + let doc = doc2.clone(); + let imp = imp2.clone(); + async move { + match res { + Ok((key, hash, size)) => { + let doc = doc.clone(); + doc.set_hash(author_id, key, hash, size).await?; + imp.import_progress(); + Ok(size) + } + Err(err) => Err(err), + } + } }) - .try_flatten() - .try_buffer_unordered(64) + .buffered_unordered(128) .try_collect() .await?; diff --git a/iroh-cli/src/commands/doctor.rs b/iroh-cli/src/commands/doctor.rs index 1d774296ed8..95f50e7fe89 100644 --- a/iroh-cli/src/commands/doctor.rs +++ b/iroh-cli/src/commands/doctor.rs @@ -14,7 +14,7 @@ use crate::config::{iroh_data_root, NodeConfig}; use anyhow::Context; use clap::Subcommand; -use futures::StreamExt; +use futures_lite::StreamExt; use indicatif::{HumanBytes, MultiProgress, ProgressBar}; use iroh::{ base::ticket::Ticket, @@ -200,7 +200,7 @@ fn update_pb( } }) } else { - tokio::spawn(futures::future::ready(())) + tokio::spawn(std::future::ready(())) } } diff --git a/iroh-cli/src/commands/node.rs b/iroh-cli/src/commands/node.rs index 695e3e8dcc4..e7c75b92efd 100644 --- a/iroh-cli/src/commands/node.rs +++ b/iroh-cli/src/commands/node.rs @@ -5,7 +5,7 @@ use clap::Subcommand; use colored::Colorize; use comfy_table::Table; use comfy_table::{presets::NOTHING, Cell}; -use futures::{Stream, StreamExt}; +use futures_lite::{Stream, StreamExt}; use human_time::ToHumanTimeString; use iroh::client::Iroh; use iroh::net::{key::PublicKey, magic_endpoint::ConnectionInfo, magicsock::DirectAddrInfo}; diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index ad523507a2f..2a56b40e13b 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -1,9 +1,8 @@ -use std::{net::SocketAddr, path::Path, time::Duration}; +use std::{future::Future, net::SocketAddr, path::Path, time::Duration}; use crate::config::NodeConfig; use anyhow::Result; use colored::Colorize; -use futures::Future; use indicatif::{ProgressBar, ProgressDrawTarget, ProgressStyle}; use iroh::node::Node; use iroh::{ @@ -88,7 +87,7 @@ where Ok(()) => { // keep the task open forever if not running in single-command mode if run_type == RunType::UntilStopped { - futures::future::pending().await + futures_lite::future::pending().await } Ok(()) } diff --git a/iroh-cli/src/commands/tag.rs b/iroh-cli/src/commands/tag.rs index f3b2d011f76..69edf005c34 100644 --- a/iroh-cli/src/commands/tag.rs +++ b/iroh-cli/src/commands/tag.rs @@ -1,7 +1,7 @@ use anyhow::Result; use bytes::Bytes; use clap::Subcommand; -use futures::StreamExt; +use futures_lite::StreamExt; use iroh::bytes::Tag; use iroh::{client::Iroh, rpc_protocol::ProviderService}; use quic_rpc::ServiceConnection; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index ef1854cb275..63ead79b92a 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -382,13 +382,13 @@ where ) }; - let task = Arc::new( + /*let task = Arc::new( async move { task.await?; anyhow::Ok(()) } .boxed(), - ); + );*/ let node = Node { inner,