Skip to content

Commit

Permalink
fixup: iroh-bytes downloader
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed Mar 22, 2024
1 parent fad95c1 commit c3b46aa
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 15 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tracing-futures = "0.2.5"
[dev-dependencies]
http-body = "0.4.5"
iroh-test = { path = "../iroh-test" }
futures-buffered = "0.2.4"
proptest = "1.0.0"
serde_json = "1.0.107"
serde_test = "1.0.176"
Expand All @@ -63,9 +64,10 @@ tempfile = "3.10.0"

[features]
default = ["fs-store"]
fs-store = ["reflink-copy", "redb"]
downloader = ["iroh-net", "parking_lot", "tokio-util/time"]
metrics = ["iroh-metrics"]
fs-store = ["dep:reflink-copy", "redb"]
redb = ["dep:redb"]
downloader = ["dep:iroh-net", "dep:parking_lot", "tokio-util/time"]
metrics = ["dep:iroh-metrics"]

[[example]]
name = "provide-bytes"
Expand Down
10 changes: 4 additions & 6 deletions iroh-bytes/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use std::{

use crate::{get::Stats, protocol::RangeSpecSeq, store::Store, Hash, HashAndFormat};
use bao_tree::ChunkRanges;
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use futures_lite::{future::BoxedLocal, StreamExt, Stream};
use iroh_net::{MagicEndpoint, NodeId};
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -64,9 +64,7 @@ const SERVICE_CHANNEL_CAPACITY: usize = 128;
pub type Id = u64;

/// Trait modeling a dialer. This allows for IO-less testing.
pub trait Dialer:
futures::Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin
{
pub trait Dialer: Stream<Item = (NodeId, anyhow::Result<Self::Connection>)> + Unpin {
/// Type of connections returned by the Dialer.
type Connection: Clone;
/// Dial a node.
Expand All @@ -89,7 +87,7 @@ pub enum FailureAction {
}

/// Future of a get request.
type GetFut = LocalBoxFuture<'static, Result<Stats, FailureAction>>;
type GetFut = BoxedLocal<Result<Stats, FailureAction>>;

/// Trait modelling performing a single request over a connection. This allows for IO-less testing.
pub trait Getter {
Expand Down Expand Up @@ -205,7 +203,7 @@ impl std::future::Future for DownloadHandle {
use std::task::Poll::*;
// make it easier on holders of the handle to poll the result, removing the receiver error
// from the middle
match self.receiver.poll_unpin(cx) {
match std::pin::Pin::new(&mut self.receiver).poll(cx) {
Ready(Ok(result)) => Ready(result),
Ready(Err(recv_err)) => Ready(Err(anyhow::anyhow!("oneshot error: {recv_err}"))),
Pending => Pending,
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
store::Store,
util::progress::IgnoreProgressSender,
};
use futures::FutureExt;
use futures_lite::FutureExt;
#[cfg(feature = "metrics")]
use iroh_metrics::{inc, inc_by};

Expand Down
6 changes: 3 additions & 3 deletions iroh-bytes/src/downloader/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn deduplication() {
handles.push(h);
}
assert!(
futures::future::join_all(handles)
futures_buffered::join_all(handles)
.await
.into_iter()
.all(|r| r.is_ok()),
Expand Down Expand Up @@ -168,7 +168,7 @@ async fn max_concurrent_requests() {
}

assert!(
futures::future::join_all(handles)
futures_buffered::join_all(handles)
.await
.into_iter()
.all(|r| r.is_ok()),
Expand Down Expand Up @@ -211,7 +211,7 @@ async fn max_concurrent_requests_per_peer() {
handles.push(h);
}

futures::future::join_all(handles).await;
futures_buffered::join_all(handles).await;
}

/// Tests that providers are preferred over candidates.
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/downloader/test/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Dialer for TestingDialer {
}
}

impl futures::Stream for TestingDialer {
impl Stream for TestingDialer {
type Item = (NodeId, anyhow::Result<NodeId>);

fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
3 changes: 2 additions & 1 deletion iroh-bytes/src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{sync::Arc, time::Duration};

use futures_lite::FutureExt;
use parking_lot::RwLock;

use super::*;
Expand Down Expand Up @@ -30,7 +31,7 @@ impl Getter for TestingGetter {
tokio::time::sleep(request_duration).await;
Ok(Stats::default())
}
.boxed_local()
.boxed_local()
}
}

Expand Down

0 comments on commit c3b46aa

Please sign in to comment.