Skip to content

Commit

Permalink
refactor(iroh): avoid futures crate
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Mar 22, 2024
1 parent c3b46aa commit 9fa2af7
Show file tree
Hide file tree
Showing 20 changed files with 122 additions and 109 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::{

use crate::{get::Stats, protocol::RangeSpecSeq, store::Store, Hash, HashAndFormat};
use bao_tree::ChunkRanges;
use futures_lite::{future::BoxedLocal, StreamExt, Stream};
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
use iroh_net::{MagicEndpoint, NodeId};
use tokio::{
sync::{mpsc, oneshot},
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl Getter for TestingGetter {
tokio::time::sleep(request_duration).await;
Ok(Stats::default())
}
.boxed_local()
.boxed_local()
}
}

Expand Down
4 changes: 3 additions & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ bytes = "1"
data-encoding = "2.4.0"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] }
flume = "0.11"
futures = "0.3.25"
futures-buffered = "0.2.4"
futures-lite = "2.3"
futures-util = "0.3"
genawaiter = { version = "0.99", default-features = false, features = ["futures03"] }
hashlink = "0.8.4"
hex = { version = "0.4.3" }
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! TODO: Contains only iroh sync related methods. Add other methods.

use futures::{Stream, StreamExt};
use futures_lite::{Stream, StreamExt};
use quic_rpc::{RpcClient, ServiceConnection};

use crate::rpc_protocol::ProviderService;
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/client/authors.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use futures::{Stream, TryStreamExt};
use futures_lite::{stream::StreamExt, Stream};
use iroh_sync::AuthorId;
use quic_rpc::{RpcClient, ServiceConnection};

Expand All @@ -26,6 +26,6 @@ where
/// List document authors for which we have a secret key.
pub async fn list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
let stream = self.rpc.server_streaming(AuthorListRequest {}).await?;
Ok(flatten(stream).map_ok(|res| res.author_id))
Ok(flatten(stream).map(|res| res.map(|res| res.author_id)))
}
}
20 changes: 11 additions & 9 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
future::Future,
io,
path::PathBuf,
pin::Pin,
Expand All @@ -8,7 +9,8 @@ use std::{

use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt};
use futures_lite::{Stream, StreamExt};
use futures_util::SinkExt;
use iroh_base::ticket::BlobTicket;
use iroh_bytes::{
export::ExportProgress,
Expand Down Expand Up @@ -180,7 +182,7 @@ where

/// Write a blob by passing bytes.
pub async fn add_bytes(&self, bytes: impl Into<Bytes>) -> anyhow::Result<BlobAddOutcome> {
let input = futures::stream::once(futures::future::ready(Ok(bytes.into())));
let input = futures_lite::stream::once(Ok(bytes.into()));
self.add_stream(input, SetTagOption::Auto).await?.await
}

Expand All @@ -190,7 +192,7 @@ where
bytes: impl Into<Bytes>,
name: impl Into<Tag>,
) -> anyhow::Result<BlobAddOutcome> {
let input = futures::stream::once(futures::future::ready(Ok(bytes.into())));
let input = futures_lite::stream::once(Ok(bytes.into()));
self.add_stream(input, SetTagOption::Named(name.into()))
.await?
.await
Expand All @@ -207,14 +209,14 @@ where
.rpc
.server_streaming(BlobValidateRequest { repair })
.await?;
Ok(stream.map_err(anyhow::Error::from))
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
}

/// Download a blob from another node and add it to the local database.
pub async fn download(&self, req: BlobDownloadRequest) -> Result<BlobDownloadProgress> {
let stream = self.rpc.server_streaming(req).await?;
Ok(BlobDownloadProgress::new(
stream.map_err(anyhow::Error::from),
stream.map(|res| res.map_err(anyhow::Error::from)),
))
}

Expand Down Expand Up @@ -414,7 +416,7 @@ impl BlobAddProgress {
impl Stream for BlobAddProgress {
type Item = Result<AddProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
Pin::new(&mut self.stream).poll_next(cx)
}
}

Expand All @@ -423,7 +425,7 @@ impl Future for BlobAddProgress {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.stream.poll_next_unpin(cx) {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
Expand Down Expand Up @@ -519,7 +521,7 @@ impl BlobDownloadProgress {
impl Stream for BlobDownloadProgress {
type Item = Result<DownloadProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
Pin::new(&mut self.stream).poll_next(cx)
}
}

Expand All @@ -528,7 +530,7 @@ impl Future for BlobDownloadProgress {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match self.stream.poll_next_unpin(cx) {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
return Poll::Ready(Err(anyhow!("Response stream ended prematurely")))
Expand Down
18 changes: 10 additions & 8 deletions iroh/src/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures::{Stream, StreamExt, TryStreamExt};
use futures_lite::{Stream, StreamExt};
use iroh_base::key::PublicKey;
use iroh_bytes::{export::ExportProgress, store::ExportMode, Hash};
use iroh_net::NodeAddr;
Expand Down Expand Up @@ -72,7 +72,7 @@ where
/// List all documents.
pub async fn list(&self) -> Result<impl Stream<Item = Result<(NamespaceId, CapabilityKind)>>> {
let stream = self.rpc.server_streaming(DocListRequest {}).await?;
Ok(flatten(stream).map_ok(|res| (res.id, res.capability)))
Ok(flatten(stream).map(|res| res.map(|res| (res.id, res.capability))))
}

/// Get a [`Doc`] client for a single document. Return None if the document cannot be found.
Expand Down Expand Up @@ -293,7 +293,7 @@ where
query: query.into(),
})
.await?;
Ok(flatten(stream).map_ok(|res| res.entry.into()))
Ok(flatten(stream).map(|res| res.map(|res| res.entry.into())))
}

/// Get a single entry.
Expand Down Expand Up @@ -340,9 +340,10 @@ where
.rpc
.server_streaming(DocSubscribeRequest { doc_id: self.id() })
.await?;
Ok(flatten(stream)
.map_ok(|res| res.event.into())
.map_err(Into::into))
Ok(flatten(stream).map(|res| match res {
Ok(res) => Ok(res.event.into()),
Err(err) => Err(err.into()),
}))
}

/// Get status info for this document
Expand Down Expand Up @@ -588,7 +589,7 @@ pub struct DocImportFileOutcome {
impl Stream for DocImportFileProgress {
type Item = Result<DocImportProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
Pin::new(&mut self.stream).poll_next(cx)
}
}

Expand Down Expand Up @@ -653,8 +654,9 @@ pub struct DocExportFileOutcome {

impl Stream for DocExportFileProgress {
type Item = Result<ExportProgress>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
Pin::new(&mut self.stream).poll_next(cx)
}
}

Expand Down
4 changes: 2 additions & 2 deletions iroh/src/client/node.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;

use anyhow::Result;
use futures::{Stream, TryStreamExt};
use futures_lite::{Stream, StreamExt};
use iroh_base::key::PublicKey;
use iroh_net::magic_endpoint::ConnectionInfo;
use quic_rpc::{RpcClient, ServiceConnection};
Expand Down Expand Up @@ -32,7 +32,7 @@ where
/// Get information about the different connections we have made
pub async fn connections(&self) -> Result<impl Stream<Item = Result<ConnectionInfo>>> {
let stream = self.rpc.server_streaming(NodeConnectionsRequest {}).await?;
Ok(flatten(stream).map_ok(|res| res.conn_info))
Ok(flatten(stream).map(|res| res.map(|res| res.conn_info)))
}

/// Get connection information about a node
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/client/tags.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use futures::{Stream, TryStreamExt};
use futures_lite::{Stream, StreamExt};
use iroh_bytes::Tag;
use quic_rpc::{RpcClient, ServiceConnection};

Expand All @@ -18,7 +18,7 @@ where
/// List all tags.
pub async fn list(&self) -> Result<impl Stream<Item = Result<ListTagsResponse>>> {
let stream = self.rpc.server_streaming(ListTagsRequest).await?;
Ok(stream.map_err(anyhow::Error::from))
Ok(stream.map(|res| res.map_err(anyhow::Error::from)))
}

/// Delete a tag.
Expand Down
17 changes: 9 additions & 8 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use std::sync::Arc;
use std::task::Poll;

use anyhow::{anyhow, Result};
use futures::future::{BoxFuture, Shared};
use futures::{FutureExt, StreamExt};
use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt};
use iroh_bytes::store::Store as BaoStore;
use iroh_bytes::BlobFormat;
use iroh_bytes::Hash;
Expand Down Expand Up @@ -46,7 +45,7 @@ mod rpc_status;
pub use builder::{Builder, GcPolicy, StorageConfig};
pub use rpc_status::RpcStatus;

type EventCallback = Box<dyn Fn(Event) -> BoxFuture<'static, ()> + 'static + Sync + Send>;
type EventCallback = Box<dyn Fn(Event) -> BoxFuture<()> + 'static + Sync + Send>;

#[derive(Default, derive_more::Debug, Clone)]
struct Callbacks(#[debug("..")] Arc<RwLock<Vec<EventCallback>>>);
Expand All @@ -67,8 +66,9 @@ impl Callbacks {

impl iroh_bytes::provider::EventSender for Callbacks {
fn send(&self, event: iroh_bytes::provider::Event) -> BoxFuture<()> {
let this = self.clone();
async move {
let cbs = self.0.read().await;
let cbs = this.0.read().await;
for cb in &*cbs {
cb(Event::ByteProvide(event.clone())).await;
}
Expand All @@ -90,7 +90,7 @@ impl iroh_bytes::provider::EventSender for Callbacks {
#[derive(Debug, Clone)]
pub struct Node<D> {
inner: Arc<NodeInner<D>>,
task: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
task: (), // Arc<BoxFuture<anyhow::Result<()>>>,
client: crate::client::mem::Iroh,
}

Expand All @@ -102,7 +102,7 @@ struct NodeInner<D> {
cancel_token: CancellationToken,
controller: FlumeConnection<ProviderResponse, ProviderRequest>,
#[debug("callbacks: Sender<Box<dyn Fn(Event)>>")]
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<'static, ()> + Send + Sync + 'static>>,
cb_sender: mpsc::Sender<Box<dyn Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>>,
callbacks: Callbacks,
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
Expand Down Expand Up @@ -190,7 +190,7 @@ impl<D: BaoStore> Node<D> {
/// progress.
///
/// Warning: The callback must complete quickly, as otherwise it will block ongoing work.
pub async fn subscribe<F: Fn(Event) -> BoxFuture<'static, ()> + Send + Sync + 'static>(
pub async fn subscribe<F: Fn(Event) -> BoxFuture<()> + Send + Sync + 'static>(
&self,
cb: F,
) -> Result<()> {
Expand Down Expand Up @@ -254,7 +254,8 @@ impl<D> Future for Node<D> {
type Output = Result<(), Arc<JoinError>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.task).poll(cx)
// Pin::new(&mut self.task).poll(cx)
todo!()
}
}

Expand Down
12 changes: 10 additions & 2 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use anyhow::{bail, Context, Result};
use futures::{FutureExt, StreamExt, TryFutureExt};
use futures_lite::{FutureExt, StreamExt};
use iroh_base::key::SecretKey;
use iroh_bytes::{
downloader::Downloader,
Expand Down Expand Up @@ -382,9 +382,17 @@ where
)
};

let task = Arc::new(
async move {
task.await?;
anyhow::Ok(())
}
.boxed(),
);

let node = Node {
inner,
task: task.map_err(Arc::new).boxed().shared(),
task: (),
client,
};

Expand Down
Loading

0 comments on commit 9fa2af7

Please sign in to comment.