Skip to content

Commit

Permalink
Stream car mirror responses
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Jan 3, 2024
1 parent 3642d3a commit 126f147
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 102 deletions.
5 changes: 2 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ futures = "0.3.25"
bytes = "1.4"
blake3 = "1.3"
chrono = "0.4.24"
car-mirror = { git = "https://github.com/fission-codes/rs-car-mirror.git", branch = "matheus23/upgrade-wnfs", features = ["quick_cache"] }
car-mirror = { git = "https://github.com/fission-codes/rs-car-mirror.git", branch = "matheus23/streaming", features = ["quick_cache"] }
cid = "0.10"
clap = { version = "4.2", features = ["derive"] }
ignore = "0.4.20"
Expand All @@ -31,6 +31,7 @@ iroh-base = "0.12.0"
tokio-util = { version = "0.7.10", features = ["codec"] }
async-recursion = "1.0.5"
quick_cache = "0.4.0"
quinn = "0.10.2"

# enable the "rug" feature (using GMP) for rs-wnfs, speeding up nameaccumulator operations
[target.'cfg(unix)'.dependencies]
Expand Down
211 changes: 114 additions & 97 deletions src/commands/listen_sync.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use std::io::Cursor;
use std::sync::Arc;

use crate::state::{
Appa, ALPN_APPA_CAR_MIRROR_PULL, ALPN_APPA_KEY_VALUE_FETCH, DATA_ROOT, PRIVATE_ACCESS_KEY,
ROOT_DIR,
};
use crate::store::cache_missing::CacheMissing;
use crate::store::flatfs::Flatfs;
use anyhow::Result;
use car_mirror::common::CarFile;
use anyhow::{bail, Result};
use car_mirror::incremental_verification::IncrementalDagVerification;
use car_mirror::messages::{Bloom, PullRequest};
use car_mirror::traits::InMemoryCache;
Expand All @@ -17,8 +13,11 @@ use futures::{SinkExt, TryStreamExt};
use iroh_base::ticket::Ticket;
use iroh_net::magic_endpoint::accept_conn;
use iroh_net::ticket::NodeTicket;
use iroh_net::MagicEndpoint;

use iroh_net::{MagicEndpoint, NodeId};
use quinn::Connection;
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::LengthDelimitedCodec;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -78,76 +77,99 @@ pub async fn listen() -> Result<()> {
"new connection from {peer_id} with ALPN {alpn} (coming from {})",
conn.remote_address()
);
match alpn.as_bytes() {
ALPN_APPA_KEY_VALUE_FETCH => {
let (mut send, mut recv) = conn.accept_bi().await?;
tracing::info!("Waiting for data...");
let msg = recv.read_to_end(100).await?;
tracing::info!("Got data!");
let key = String::from_utf8(msg)?;
let value = appa.fs.store.get(&key)?;
send.write_all(&postcard::to_stdvec(&value)?).await?;
send.finish().await?;
}
ALPN_APPA_CAR_MIRROR_PULL => {
let appa = appa.clone();
let endpoint = Arc::clone(&endpoint);
let config = car_mirror_config();
let cache = cache.clone();
tokio::spawn(async move {
let (send, recv) = conn.accept_bi().await?;
tracing::debug!("accepted bi stream, waiting for data...");

let mut recv = LengthDelimitedCodec::builder()
.max_frame_length(128 * 1024)
.new_read(recv);

let mut send = LengthDelimitedCodec::builder()
.max_frame_length(config.receive_maximum)
.new_write(send);

loop {
let typ = endpoint
.connection_info(peer_id)
.await?
.map(|info| info.conn_type.to_string())
.unwrap_or("None".into());
tracing::info!("Endpoint connection type: {typ}");

let Some(message) = recv.try_next().await? else {
tracing::info!("Got EOF, closing.");
return Ok::<_, anyhow::Error>(());
};
tracing::info!("got pull message");

let (root, request) =
postcard::from_bytes::<PullMsg>(&message)?.into_parts()?;

tracing::info!("decoded msg");

let response = car_mirror::pull::response(
root,
request,
&config,
&appa.fs.store,
&cache,
)
.await?;

tracing::info!("Sending response ({} bytes)", response.bytes.len());
send.send(response.bytes).await?;
tracing::info!("Sent.");
}
});
}
_ => {
println!("Unsupported protocol identifier (ALPN): {alpn}");
}

if let Err(e) =
accept_connection(peer_id, alpn.as_bytes(), conn, &appa, &endpoint, &cache).await
{
tracing::error!(?e, alpn, "Failed running protocol. Continuing.");
}
}
Ok(())
}

async fn accept_connection(
peer_id: NodeId,
alpn: &[u8],
conn: Connection,
appa: &Appa,
endpoint: &Arc<MagicEndpoint>,
cache: &InMemoryCache,
) -> Result<(), anyhow::Error> {
Ok(match alpn {
ALPN_APPA_KEY_VALUE_FETCH => {
let (mut send, mut recv) = conn.accept_bi().await?;
tracing::info!("Waiting for data...");
let msg = recv.read_to_end(100).await?;
tracing::info!("Got data!");
let key = String::from_utf8(msg)?;
let value = appa.fs.store.get(&key)?;
send.write_all(&postcard::to_stdvec(&value)?).await?;
send.finish().await?;
}
ALPN_APPA_CAR_MIRROR_PULL => {
let appa = appa.clone();
let endpoint = Arc::clone(endpoint);
let cache = cache.clone();
tokio::spawn(async move {
loop {
let Ok((send, recv)) = conn.accept_bi().await else {
return;
};
let appa = appa.clone();
let endpoint = Arc::clone(&endpoint);
let cache = cache.clone();
tokio::spawn(async move {
server_respond_pull(recv, send, endpoint, peer_id, appa, cache).await
});
}
});
}
_ => {
bail!("Unsupported protocol identifier");
}
})
}

async fn server_respond_pull(
recv: impl AsyncRead + Unpin,
send: impl AsyncWrite + Send + Unpin,
endpoint: Arc<MagicEndpoint>,
peer_id: iroh_net::NodeId,
appa: Appa,
cache: InMemoryCache,
) -> Result<()> {
tracing::debug!("accepted bi stream");
let mut recv = LengthDelimitedCodec::builder()
.max_frame_length(128 * 1024)
.new_read(recv);

let typ = endpoint
.connection_info(peer_id)
.await?
.map(|info| info.conn_type.to_string())
.unwrap_or("None".into());
tracing::info!("Endpoint connection type: {typ}");

let Some(message) = recv.try_next().await? else {
tracing::info!("Got EOF, closing.");
return Ok(());
};

tracing::info!("got pull message");
let (root, request) = postcard::from_bytes::<PullMsg>(&message)?.into_parts()?;

tracing::info!("responding with car stream");
car_mirror::common::block_send_car_stream(
root,
Some(request.into()),
send,
&appa.fs.store,
&cache,
)
.await?;
Ok(())
}

pub async fn sync(ticket: String) -> Result<()> {
let ticket: NodeTicket = Ticket::deserialize(ticket.as_ref())?;
let store = CacheMissing::new(150_000, Flatfs::new(ROOT_DIR)?);
Expand Down Expand Up @@ -198,53 +220,48 @@ pub async fn sync(ticket: String) -> Result<()> {
.connect(ticket.node_addr().clone(), ALPN_APPA_CAR_MIRROR_PULL)
.await?;

let (send, recv) = connection.open_bi().await?;
let mut send = LengthDelimitedCodec::builder()
.max_frame_length(128 * 1024)
.new_write(send);
let mut recv = LengthDelimitedCodec::builder()
.max_frame_length(config.receive_maximum)
.new_read(recv);

let mut last_response = None;
loop {
let req = car_mirror::pull::request(root, last_response, &config, &store, &cache).await?;

let dag_verification = IncrementalDagVerification::new([root], &store, &cache).await?;
tracing::info!(
num_blocks_want = dag_verification.want_cids.len(),
num_blocks_have = dag_verification.have_cids.len(),
"State of transfer"
);

let mut req: PullRequest = dag_verification
.into_receiver_state(config.bloom_fpr)
.into();

if req.indicates_finished() {
println!("Done!");
store.inner.put(PRIVATE_ACCESS_KEY, &access_key_bytes)?;
store.inner.put(DATA_ROOT, root.to_bytes())?;
break;
}
let msg = postcard::to_stdvec(&PullMsg::new(root, req))?;

req.resources.truncate(config.max_roots_per_round);

tracing::info!("Opening new connection.");
let (send, recv) = connection.open_bi().await?;
let mut send = LengthDelimitedCodec::builder()
.max_frame_length(128 * 1024)
.new_write(send);

tracing::info!("Sending pull msg");
let msg = postcard::to_stdvec(&PullMsg::new(root, req))?;
send.send(msg.into()).await?;
tracing::info!("Pull msg sent, waiting for response");

let Some(bytes) = recv.try_next().await? else {
println!("Prematurely closed stream! Aborting.");
break;
};
tracing::info!("Response received, {} bytes", bytes.len());
let response = CarFile {
bytes: bytes.into(),
};

last_response = Some(response);
if let Err(e) =
car_mirror::common::block_receive_car_stream(root, recv, &config, &store, &cache).await
{
tracing::warn!(?e, "Got error on receive, continuing.");
}
}
Ok(())
}

pub fn car_mirror_config() -> car_mirror::common::Config {
let mut config = car_mirror::common::Config::default();
config.receive_maximum *= 100;
config.send_minimum *= 180;
let config = car_mirror::common::Config::default();
config
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use wnfs::common::Storable;

#[derive(Debug, Parser)]
#[command(name = "appa")]
#[command(about = "A simple wnfs interface, syncing with iroh", long_about = None)]
#[command(about = "A simple wnfs interface, syncing with iroh-net and car-mirror", long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
Expand Down

0 comments on commit 126f147

Please sign in to comment.