diff --git a/Cargo.lock b/Cargo.lock index 4a5d24c..b0eff59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,7 @@ dependencies = [ "libipld", "postcard", "quick_cache", + "quinn", "rand", "serde", "tempfile", @@ -490,10 +491,9 @@ dependencies = [ [[package]] name = "car-mirror" version = "0.1.0" -source = "git+https://github.com/fission-codes/rs-car-mirror.git?branch=matheus23/upgrade-wnfs#1645c9064f8a2db2be7a7bcba817bf1c67ccad56" +source = "git+https://github.com/fission-codes/rs-car-mirror.git?branch=matheus23/streaming#956a5c0d6da42f53e2c42f9c344f4468d5a65511" dependencies = [ "anyhow", - "async-stream", "async-trait", "bytes", "deterministic-bloom", @@ -507,7 +507,6 @@ dependencies = [ "thiserror", "tokio", "tracing", - "tracing-subscriber", "wnfs-common", ] diff --git a/Cargo.toml b/Cargo.toml index 3f850c5..b75a296 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,7 @@ futures = "0.3.25" bytes = "1.4" blake3 = "1.3" chrono = "0.4.24" -car-mirror = { git = "https://github.com/fission-codes/rs-car-mirror.git", branch = "matheus23/upgrade-wnfs", features = ["quick_cache"] } +car-mirror = { git = "https://github.com/fission-codes/rs-car-mirror.git", branch = "matheus23/streaming", features = ["quick_cache"] } cid = "0.10" clap = { version = "4.2", features = ["derive"] } ignore = "0.4.20" @@ -31,6 +31,7 @@ iroh-base = "0.12.0" tokio-util = { version = "0.7.10", features = ["codec"] } async-recursion = "1.0.5" quick_cache = "0.4.0" +quinn = "0.10.2" # enable the "rug" feature (using GMP) for rs-wnfs, speeding up nameaccumulator operations [target.'cfg(unix)'.dependencies] diff --git a/src/commands/listen_sync.rs b/src/commands/listen_sync.rs index a31bb32..dba354e 100644 --- a/src/commands/listen_sync.rs +++ b/src/commands/listen_sync.rs @@ -1,14 +1,10 @@ -use std::io::Cursor; -use std::sync::Arc; - use crate::state::{ Appa, ALPN_APPA_CAR_MIRROR_PULL, ALPN_APPA_KEY_VALUE_FETCH, DATA_ROOT, PRIVATE_ACCESS_KEY, ROOT_DIR, }; use crate::store::cache_missing::CacheMissing; use crate::store::flatfs::Flatfs; -use anyhow::Result; -use car_mirror::common::CarFile; +use anyhow::{bail, Result}; use car_mirror::incremental_verification::IncrementalDagVerification; use car_mirror::messages::{Bloom, PullRequest}; use car_mirror::traits::InMemoryCache; @@ -17,8 +13,11 @@ use futures::{SinkExt, TryStreamExt}; use iroh_base::ticket::Ticket; use iroh_net::magic_endpoint::accept_conn; use iroh_net::ticket::NodeTicket; -use iroh_net::MagicEndpoint; - +use iroh_net::{MagicEndpoint, NodeId}; +use quinn::Connection; +use std::io::Cursor; +use std::sync::Arc; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::LengthDelimitedCodec; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -78,76 +77,99 @@ pub async fn listen() -> Result<()> { "new connection from {peer_id} with ALPN {alpn} (coming from {})", conn.remote_address() ); - match alpn.as_bytes() { - ALPN_APPA_KEY_VALUE_FETCH => { - let (mut send, mut recv) = conn.accept_bi().await?; - tracing::info!("Waiting for data..."); - let msg = recv.read_to_end(100).await?; - tracing::info!("Got data!"); - let key = String::from_utf8(msg)?; - let value = appa.fs.store.get(&key)?; - send.write_all(&postcard::to_stdvec(&value)?).await?; - send.finish().await?; - } - ALPN_APPA_CAR_MIRROR_PULL => { - let appa = appa.clone(); - let endpoint = Arc::clone(&endpoint); - let config = car_mirror_config(); - let cache = cache.clone(); - tokio::spawn(async move { - let (send, recv) = conn.accept_bi().await?; - tracing::debug!("accepted bi stream, waiting for data..."); - - let mut recv = LengthDelimitedCodec::builder() - .max_frame_length(128 * 1024) - .new_read(recv); - - let mut send = LengthDelimitedCodec::builder() - .max_frame_length(config.receive_maximum) - .new_write(send); - - loop { - let typ = endpoint - .connection_info(peer_id) - .await? - .map(|info| info.conn_type.to_string()) - .unwrap_or("None".into()); - tracing::info!("Endpoint connection type: {typ}"); - - let Some(message) = recv.try_next().await? else { - tracing::info!("Got EOF, closing."); - return Ok::<_, anyhow::Error>(()); - }; - tracing::info!("got pull message"); - - let (root, request) = - postcard::from_bytes::(&message)?.into_parts()?; - - tracing::info!("decoded msg"); - - let response = car_mirror::pull::response( - root, - request, - &config, - &appa.fs.store, - &cache, - ) - .await?; - - tracing::info!("Sending response ({} bytes)", response.bytes.len()); - send.send(response.bytes).await?; - tracing::info!("Sent."); - } - }); - } - _ => { - println!("Unsupported protocol identifier (ALPN): {alpn}"); - } + + if let Err(e) = + accept_connection(peer_id, alpn.as_bytes(), conn, &appa, &endpoint, &cache).await + { + tracing::error!(?e, alpn, "Failed running protocol. Continuing."); } } Ok(()) } +async fn accept_connection( + peer_id: NodeId, + alpn: &[u8], + conn: Connection, + appa: &Appa, + endpoint: &Arc, + cache: &InMemoryCache, +) -> Result<(), anyhow::Error> { + Ok(match alpn { + ALPN_APPA_KEY_VALUE_FETCH => { + let (mut send, mut recv) = conn.accept_bi().await?; + tracing::info!("Waiting for data..."); + let msg = recv.read_to_end(100).await?; + tracing::info!("Got data!"); + let key = String::from_utf8(msg)?; + let value = appa.fs.store.get(&key)?; + send.write_all(&postcard::to_stdvec(&value)?).await?; + send.finish().await?; + } + ALPN_APPA_CAR_MIRROR_PULL => { + let appa = appa.clone(); + let endpoint = Arc::clone(endpoint); + let cache = cache.clone(); + tokio::spawn(async move { + loop { + let Ok((send, recv)) = conn.accept_bi().await else { + return; + }; + let appa = appa.clone(); + let endpoint = Arc::clone(&endpoint); + let cache = cache.clone(); + tokio::spawn(async move { + server_respond_pull(recv, send, endpoint, peer_id, appa, cache).await + }); + } + }); + } + _ => { + bail!("Unsupported protocol identifier"); + } + }) +} + +async fn server_respond_pull( + recv: impl AsyncRead + Unpin, + send: impl AsyncWrite + Send + Unpin, + endpoint: Arc, + peer_id: iroh_net::NodeId, + appa: Appa, + cache: InMemoryCache, +) -> Result<()> { + tracing::debug!("accepted bi stream"); + let mut recv = LengthDelimitedCodec::builder() + .max_frame_length(128 * 1024) + .new_read(recv); + + let typ = endpoint + .connection_info(peer_id) + .await? + .map(|info| info.conn_type.to_string()) + .unwrap_or("None".into()); + tracing::info!("Endpoint connection type: {typ}"); + + let Some(message) = recv.try_next().await? else { + tracing::info!("Got EOF, closing."); + return Ok(()); + }; + + tracing::info!("got pull message"); + let (root, request) = postcard::from_bytes::(&message)?.into_parts()?; + + tracing::info!("responding with car stream"); + car_mirror::common::block_send_car_stream( + root, + Some(request.into()), + send, + &appa.fs.store, + &cache, + ) + .await?; + Ok(()) +} + pub async fn sync(ticket: String) -> Result<()> { let ticket: NodeTicket = Ticket::deserialize(ticket.as_ref())?; let store = CacheMissing::new(150_000, Flatfs::new(ROOT_DIR)?); @@ -198,18 +220,7 @@ pub async fn sync(ticket: String) -> Result<()> { .connect(ticket.node_addr().clone(), ALPN_APPA_CAR_MIRROR_PULL) .await?; - let (send, recv) = connection.open_bi().await?; - let mut send = LengthDelimitedCodec::builder() - .max_frame_length(128 * 1024) - .new_write(send); - let mut recv = LengthDelimitedCodec::builder() - .max_frame_length(config.receive_maximum) - .new_read(recv); - - let mut last_response = None; loop { - let req = car_mirror::pull::request(root, last_response, &config, &store, &cache).await?; - let dag_verification = IncrementalDagVerification::new([root], &store, &cache).await?; tracing::info!( num_blocks_want = dag_verification.want_cids.len(), @@ -217,34 +228,40 @@ pub async fn sync(ticket: String) -> Result<()> { "State of transfer" ); + let mut req: PullRequest = dag_verification + .into_receiver_state(config.bloom_fpr) + .into(); + if req.indicates_finished() { println!("Done!"); store.inner.put(PRIVATE_ACCESS_KEY, &access_key_bytes)?; store.inner.put(DATA_ROOT, root.to_bytes())?; break; } - let msg = postcard::to_stdvec(&PullMsg::new(root, req))?; + + req.resources.truncate(config.max_roots_per_round); + + tracing::info!("Opening new connection."); + let (send, recv) = connection.open_bi().await?; + let mut send = LengthDelimitedCodec::builder() + .max_frame_length(128 * 1024) + .new_write(send); + tracing::info!("Sending pull msg"); + let msg = postcard::to_stdvec(&PullMsg::new(root, req))?; send.send(msg.into()).await?; tracing::info!("Pull msg sent, waiting for response"); - let Some(bytes) = recv.try_next().await? else { - println!("Prematurely closed stream! Aborting."); - break; - }; - tracing::info!("Response received, {} bytes", bytes.len()); - let response = CarFile { - bytes: bytes.into(), - }; - - last_response = Some(response); + if let Err(e) = + car_mirror::common::block_receive_car_stream(root, recv, &config, &store, &cache).await + { + tracing::warn!(?e, "Got error on receive, continuing."); + } } Ok(()) } pub fn car_mirror_config() -> car_mirror::common::Config { - let mut config = car_mirror::common::Config::default(); - config.receive_maximum *= 100; - config.send_minimum *= 180; + let config = car_mirror::common::Config::default(); config } diff --git a/src/main.rs b/src/main.rs index 4f5c345..4c7c373 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,7 +13,7 @@ use wnfs::common::Storable; #[derive(Debug, Parser)] #[command(name = "appa")] -#[command(about = "A simple wnfs interface, syncing with iroh", long_about = None)] +#[command(about = "A simple wnfs interface, syncing with iroh-net and car-mirror", long_about = None)] struct Cli { #[command(subcommand)] command: Commands,