diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c26bfb5..fbbcf5b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,7 @@ on: - main env: - MSRV: "1.63" + MSRV: "1.75" RUST_BACKTRACE: 1 RUSTFLAGS: -Dwarnings @@ -100,13 +100,13 @@ jobs: - name: cargo check run: cargo check --workspace --all-features --lib --bins - minimal-crates: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - uses: dtolnay/rust-toolchain@nightly - - uses: swatinem/rust-cache@v2 - - name: cargo check - run: | - rm -f Cargo.lock - cargo +nightly check -Z minimal-versions --workspace --all-features --lib --bins + # minimal-crates: + # runs-on: ubuntu-latest + # steps: + # - uses: actions/checkout@v2 + # - uses: dtolnay/rust-toolchain@nightly + # - uses: swatinem/rust-cache@v2 + # - name: cargo check + # run: | + # rm -f Cargo.lock + # cargo +nightly check -Z minimal-versions --workspace --all-features --lib --bins diff --git a/Cargo.lock b/Cargo.lock index e7b25fb..13e0ea2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -452,7 +452,7 @@ checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" [[package]] name = "iroh-io" -version = "0.3.0" +version = "0.4.0" dependencies = [ "axum", "bytes", diff --git a/Cargo.toml b/Cargo.toml index ebb5fc4..0966e5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iroh-io" -version = "0.3.0" +version = "0.4.0" edition = "2021" license = "Apache-2.0 OR MIT" authors = ["rklaehn ", "n0 team"] @@ -8,6 +8,7 @@ repository = "https://github.com/n0-computer/iroh" description = "async local io" keywords = ["io", "async", "http"] categories = ["asynchronous"] +rust-version = "1.75" [dependencies] bytes = "1.4.0" diff --git a/src/http.rs b/src/http.rs index 96ce4b8..e4a68d1 100644 --- a/src/http.rs +++ b/src/http.rs @@ -3,7 +3,7 @@ //! Uses the [reqwest](https://docs.rs/reqwest) crate. Somewhat inspired by //! use super::*; -use futures::{future::LocalBoxFuture, FutureExt, Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; use reqwest::{ header::{HeaderMap, HeaderValue}, Method, StatusCode, Url, @@ -121,32 +121,6 @@ pub mod http_adapter { use super::*; - newtype_future!( - /// The future returned by [`HttpAdapter::read_at`] - ReadAtFuture, - LocalBoxFuture<'a, io::Result>, - io::Result - ); - - impl fmt::Debug for ReadAtFuture<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ReadAtFuture").finish_non_exhaustive() - } - } - - newtype_future!( - /// The future returned by [`HttpAdapter::len`] - LenFuture, - LocalBoxFuture<'a, io::Result>, - io::Result - ); - - impl fmt::Debug for LenFuture<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("LenFuture").finish_non_exhaustive() - } - } - /// Options for [HttpAdapter] #[derive(Debug, Clone, Default)] pub struct Opts { @@ -154,55 +128,40 @@ pub mod http_adapter { } impl AsyncSliceReader for HttpAdapter { - type ReadAtFuture<'a> = ReadAtFuture<'a>; - - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { - ReadAtFuture( - async move { - let mut stream = self.get_stream_at(offset, len).await?; - let mut res = BytesMut::with_capacity(len.min(1024)); - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - res.extend_from_slice(&chunk); - if BytesMut::len(&res) >= len { - break; - } - } - // we do not want to rely on the server sending the exact amount of bytes - res.truncate(len); - Ok(res.freeze()) + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { + let mut stream = self.get_stream_at(offset, len).await?; + let mut res = BytesMut::with_capacity(len.min(1024)); + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + res.extend_from_slice(&chunk); + if BytesMut::len(&res) >= len { + break; } - .boxed_local(), - ) + } + // we do not want to rely on the server sending the exact amount of bytes + res.truncate(len); + Ok(res.freeze()) } - type LenFuture<'a> = LenFuture<'a>; - - fn len(&mut self) -> Self::LenFuture<'_> { - LenFuture( - async move { - let io_err = |text: &str| io::Error::new(io::ErrorKind::Other, text); - let head_response = self - .head_request() - .await - .map_err(|_| io_err("head request failed"))?; - if !head_response.status().is_success() { - return Err(io_err("head request failed")); - } - let size = head_response - .headers() - .get("content-length") - .ok_or_else(|| io_err("content-length header missing"))?; - let text = size - .to_str() - .map_err(|_| io_err("content-length malformed"))?; - let size = - u64::from_str(text).map_err(|_| io_err("content-length malformed"))?; - self.size = Some(size); - Ok(size) - } - .boxed_local(), - ) + async fn len(&mut self) -> io::Result { + let io_err = |text: &str| io::Error::new(io::ErrorKind::Other, text); + let head_response = self + .head_request() + .await + .map_err(|_| io_err("head request failed"))?; + if !head_response.status().is_success() { + return Err(io_err("head request failed")); + } + let size = head_response + .headers() + .get("content-length") + .ok_or_else(|| io_err("content-length header missing"))?; + let text = size + .to_str() + .map_err(|_| io_err("content-length malformed"))?; + let size = u64::from_str(text).map_err(|_| io_err("content-length malformed"))?; + self.size = Some(size); + Ok(size) } } } diff --git a/src/lib.rs b/src/lib.rs index 96d68c3..1d1fc29 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,7 @@ #![deny(missing_docs, rustdoc::broken_intra_doc_links)] use bytes::Bytes; -use futures::{future::Either, Future}; +use futures::Future; use std::io; /// A trait to abstract async reading from different resource. @@ -52,10 +52,6 @@ use std::io; /// See xRead, xFileSize in #[allow(clippy::len_without_is_empty)] pub trait AsyncSliceReader { - /// The future returned by read_at - type ReadAtFuture<'a>: Future> + 'a - where - Self: 'a; /// Read the entire buffer at the given position. /// /// Will return at most `len` bytes, but may return fewer if the resource is smaller. @@ -64,48 +60,39 @@ pub trait AsyncSliceReader { /// It will never return an io error independent of the range as long as the underlying /// resource is valid. #[must_use = "io futures must be polled to completion"] - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_>; + fn read_at(&mut self, offset: u64, len: usize) -> impl Future>; - /// The future returned by len - type LenFuture<'a>: Future> + 'a - where - Self: 'a; /// Get the length of the resource #[must_use = "io futures must be polled to completion"] - fn len(&mut self) -> Self::LenFuture<'_>; + fn len(&mut self) -> impl Future>; } impl<'b, T: AsyncSliceReader> AsyncSliceReader for &'b mut T { - type ReadAtFuture<'a> = T::ReadAtFuture<'a> where T: 'a, 'b: 'a; - type LenFuture<'a> = T::LenFuture<'a> where T: 'a, 'b: 'a; - - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { - (**self).read_at(offset, len) + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { + (**self).read_at(offset, len).await } - fn len(&mut self) -> Self::LenFuture<'_> { - (**self).len() + async fn len(&mut self) -> io::Result { + (**self).len().await } } impl AsyncSliceReader for Box { - type ReadAtFuture<'a> = T::ReadAtFuture<'a> where T: 'a; - type LenFuture<'a> = T::LenFuture<'a> where T: 'a; - - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { - (**self).read_at(offset, len) + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { + (**self).read_at(offset, len).await } - fn len(&mut self) -> Self::LenFuture<'_> { - (**self).len() + async fn len(&mut self) -> io::Result { + (**self).len().await } } /// Extension trait for [AsyncSliceReader]. pub trait AsyncSliceReaderExt: AsyncSliceReader { /// Read the entire resource into a [bytes::Bytes] buffer, if possible. - fn read_to_end(&mut self) -> Self::ReadAtFuture<'_> { - self.read_at(0, usize::MAX) + #[allow(async_fn_in_trait)] + async fn read_to_end(&mut self) -> io::Result { + self.read_at(0, usize::MAX).await } } @@ -121,187 +108,116 @@ impl AsyncSliceReaderExt for T {} /// This is similar to the io interface of sqlite. /// See xWrite in pub trait AsyncSliceWriter: Sized { - /// The future returned by write_at - type WriteAtFuture<'a>: Future> + 'a - where - Self: 'a; /// Write the entire slice at the given position. /// /// if self.len < offset + data.len(), the underlying resource will be extended. /// if self.len < offset, the gap will be filled with zeros. #[must_use = "io futures must be polled to completion"] - fn write_at<'a>(&'a mut self, offset: u64, data: &'a [u8]) -> Self::WriteAtFuture<'a>; + fn write_at(&mut self, offset: u64, data: &[u8]) -> impl Future>; - /// The future returned by write_bytes_at - type WriteBytesAtFuture<'a>: Future> + 'a - where - Self: 'a; /// Write the entire Bytes at the given position. /// /// Use this if you have a Bytes, to avoid allocations. /// Other than that it is equivalent to [AsyncSliceWriter::write_at]. #[must_use = "io futures must be polled to completion"] - fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_>; + fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> impl Future>; - /// The future returned by set_len - type SetLenFuture<'a>: Future> + 'a - where - Self: 'a; /// Set the length of the underlying storage. #[must_use = "io futures must be polled to completion"] - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_>; + fn set_len(&mut self, len: u64) -> impl Future>; - /// The future returned by sync - type SyncFuture<'a>: Future> + 'a - where - Self: 'a; /// Sync any buffers to the underlying storage. #[must_use = "io futures must be polled to completion"] - fn sync(&mut self) -> Self::SyncFuture<'_>; + fn sync(&mut self) -> impl Future>; } impl<'b, T: AsyncSliceWriter> AsyncSliceWriter for &'b mut T { - type WriteAtFuture<'a> = T::WriteAtFuture<'a> where T: 'a, 'b: 'a; - type WriteBytesAtFuture<'a> = T::WriteBytesAtFuture<'a> where T: 'a, 'b: 'a; - type SetLenFuture<'a> = T::SetLenFuture<'a> where T: 'a, 'b: 'a; - type SyncFuture<'a> = T::SyncFuture<'a> where T: 'a, 'b: 'a; - - fn write_at<'a>(&'a mut self, offset: u64, data: &'a [u8]) -> Self::WriteAtFuture<'a> { - (**self).write_at(offset, data) + async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> { + (**self).write_at(offset, data).await } - fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> { - (**self).write_bytes_at(offset, data) + async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> { + (**self).write_bytes_at(offset, data).await } - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_> { - (**self).set_len(len) + async fn set_len(&mut self, len: u64) -> io::Result<()> { + (**self).set_len(len).await } - fn sync(&mut self) -> Self::SyncFuture<'_> { - (**self).sync() + async fn sync(&mut self) -> io::Result<()> { + (**self).sync().await } } impl AsyncSliceWriter for Box { - type WriteAtFuture<'a> = T::WriteAtFuture<'a> where T: 'a; - type WriteBytesAtFuture<'a> = T::WriteBytesAtFuture<'a> where T: 'a; - type SetLenFuture<'a> = T::SetLenFuture<'a> where T: 'a; - type SyncFuture<'a> = T::SyncFuture<'a> where T: 'a; - - fn write_at<'a>(&'a mut self, offset: u64, data: &'a [u8]) -> Self::WriteAtFuture<'a> { - (**self).write_at(offset, data) + async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> { + (**self).write_at(offset, data).await } - fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> { - (**self).write_bytes_at(offset, data) + async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> { + (**self).write_bytes_at(offset, data).await } - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_> { - (**self).set_len(len) + async fn set_len(&mut self, len: u64) -> io::Result<()> { + (**self).set_len(len).await } - fn sync(&mut self) -> Self::SyncFuture<'_> { - (**self).sync() + async fn sync(&mut self) -> io::Result<()> { + (**self).sync().await } } /// A non seekable reader, e.g. a network socket. pub trait AsyncStreamReader { - /// Future returned by read - type ReadFuture<'a>: Future> + 'a - where - Self: 'a; /// Read at most `len` bytes. To read to the end, pass u64::MAX. /// /// returns an empty buffer to indicate EOF. - fn read(&mut self, len: usize) -> Self::ReadFuture<'_>; + fn read(&mut self, len: usize) -> impl Future>; } impl AsyncStreamReader for &mut T { - type ReadFuture<'a> = T::ReadFuture<'a> where Self: 'a; - - fn read(&mut self, len: usize) -> Self::ReadFuture<'_> { - (**self).read(len) + async fn read(&mut self, len: usize) -> io::Result { + (**self).read(len).await } } impl AsyncStreamReader for Bytes { - type ReadFuture<'a> = futures::future::Ready>; - - fn read(&mut self, len: usize) -> Self::ReadFuture<'_> { + async fn read(&mut self, len: usize) -> io::Result { let res = self.split_to(len.min(Bytes::len(self))); - futures::future::ok(res) + Ok(res) } } /// A non seekable writer, e.g. a network socket. pub trait AsyncStreamWriter { - /// Future returned by write - type WriteFuture<'a>: Future> + 'a - where - Self: 'a; /// Write the entire slice. /// /// In case of an error, some bytes may have been written. - fn write<'a>(&'a mut self, data: &'a [u8]) -> Self::WriteFuture<'a>; + fn write(&mut self, data: &[u8]) -> impl Future>; - /// Future returned by write - type WriteBytesFuture<'a>: Future> + 'a - where - Self: 'a; /// Write the entire bytes. /// /// In case of an error, some bytes may have been written. - fn write_bytes(&mut self, data: Bytes) -> Self::WriteBytesFuture<'_>; + fn write_bytes(&mut self, data: Bytes) -> impl Future>; - /// Future returned by sync - type SyncFuture<'a>: Future> + 'a - where - Self: 'a; /// Sync any buffers to the underlying storage. - fn sync(&mut self) -> Self::SyncFuture<'_>; + fn sync(&mut self) -> impl Future>; } impl AsyncStreamWriter for &mut T { - type WriteFuture<'a> = T::WriteFuture<'a> where Self: 'a; - - fn write<'a>(&'a mut self, data: &'a [u8]) -> Self::WriteFuture<'a> { - (**self).write(data) + async fn write(&mut self, data: &[u8]) -> io::Result<()> { + (**self).write(data).await } - type SyncFuture<'a> = T::SyncFuture<'a> where Self: 'a; - - fn sync(&mut self) -> Self::SyncFuture<'_> { - (**self).sync() + async fn sync(&mut self) -> io::Result<()> { + (**self).sync().await } - type WriteBytesFuture<'a> = T::WriteBytesFuture<'a> where Self: 'a; - - fn write_bytes(&mut self, data: Bytes) -> Self::WriteBytesFuture<'_> { - (**self).write_bytes(data) + async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> { + (**self).write_bytes(data).await } } -#[cfg(any(feature = "tokio-io", feature = "http"))] -/// A macro to create a newtype wrapper for a future, to hide the underlying type. -macro_rules! newtype_future { - ($(#[$outer:meta])* $name:ident, $inner:ty, $output:ty) => { - #[repr(transparent)] - #[pin_project::pin_project] - #[must_use] - $(#[$outer])* - pub struct $name<'a>(#[pin] $inner); - - impl<'a> futures::Future for $name<'a> { - type Output = $output; - fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll { - self.project().0.poll(cx) - } - } - }; -} - #[cfg(feature = "tokio-io")] mod tokio_io; #[cfg(feature = "tokio-io")] @@ -323,19 +239,17 @@ where L: AsyncSliceReader + 'static, R: AsyncSliceReader + 'static, { - type ReadAtFuture<'a> = Either, R::ReadAtFuture<'a>>; - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { match self { - futures::future::Either::Left(l) => Either::Left(l.read_at(offset, len)), - futures::future::Either::Right(r) => Either::Right(r.read_at(offset, len)), + Self::Left(l) => l.read_at(offset, len).await, + Self::Right(r) => r.read_at(offset, len).await, } } - type LenFuture<'a> = Either, R::LenFuture<'a>>; - fn len(&mut self) -> Self::LenFuture<'_> { + async fn len(&mut self) -> io::Result { match self { - futures::future::Either::Left(l) => Either::Left(l.len()), - futures::future::Either::Right(r) => Either::Right(r.len()), + Self::Left(l) => l.len().await, + Self::Right(r) => r.len().await, } } } @@ -345,35 +259,31 @@ where L: AsyncSliceWriter + 'static, R: AsyncSliceWriter + 'static, { - type WriteBytesAtFuture<'a> = Either, R::WriteBytesAtFuture<'a>>; - fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> { + async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> { match self { - futures::future::Either::Left(l) => Either::Left(l.write_bytes_at(offset, data)), - futures::future::Either::Right(r) => Either::Right(r.write_bytes_at(offset, data)), + Self::Left(l) => l.write_bytes_at(offset, data).await, + Self::Right(r) => r.write_bytes_at(offset, data).await, } } - type WriteAtFuture<'a> = Either, R::WriteAtFuture<'a>>; - fn write_at<'a>(&'a mut self, offset: u64, data: &'a [u8]) -> Self::WriteAtFuture<'a> { + async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> { match self { - futures::future::Either::Left(l) => Either::Left(l.write_at(offset, data)), - futures::future::Either::Right(r) => Either::Right(r.write_at(offset, data)), + Self::Left(l) => l.write_at(offset, data).await, + Self::Right(r) => r.write_at(offset, data).await, } } - type SyncFuture<'a> = Either, R::SyncFuture<'a>>; - fn sync(&mut self) -> Self::SyncFuture<'_> { + async fn sync(&mut self) -> io::Result<()> { match self { - futures::future::Either::Left(l) => Either::Left(l.sync()), - futures::future::Either::Right(r) => Either::Right(r.sync()), + Self::Left(l) => l.sync().await, + Self::Right(r) => r.sync().await, } } - type SetLenFuture<'a> = Either, R::SetLenFuture<'a>>; - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_> { + async fn set_len(&mut self, len: u64) -> io::Result<()> { match self { - futures::future::Either::Left(l) => Either::Left(l.set_len(len)), - futures::future::Either::Right(r) => Either::Right(r.set_len(len)), + Self::Left(l) => l.set_len(len).await, + Self::Right(r) => r.set_len(len).await, } } } @@ -384,19 +294,17 @@ where L: AsyncSliceReader + 'static, R: AsyncSliceReader + 'static, { - type ReadAtFuture<'a> = Either, R::ReadAtFuture<'a>>; - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { match self { - tokio_util::either::Either::Left(l) => Either::Left(l.read_at(offset, len)), - tokio_util::either::Either::Right(r) => Either::Right(r.read_at(offset, len)), + Self::Left(l) => l.read_at(offset, len).await, + Self::Right(r) => r.read_at(offset, len).await, } } - type LenFuture<'a> = Either, R::LenFuture<'a>>; - fn len(&mut self) -> Self::LenFuture<'_> { + async fn len(&mut self) -> io::Result { match self { - tokio_util::either::Either::Left(l) => Either::Left(l.len()), - tokio_util::either::Either::Right(r) => Either::Right(r.len()), + Self::Left(l) => l.len().await, + Self::Right(r) => r.len().await, } } } @@ -407,35 +315,31 @@ where L: AsyncSliceWriter + 'static, R: AsyncSliceWriter + 'static, { - type WriteBytesAtFuture<'a> = Either, R::WriteBytesAtFuture<'a>>; - fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> { + async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> { match self { - tokio_util::either::Either::Left(l) => Either::Left(l.write_bytes_at(offset, data)), - tokio_util::either::Either::Right(r) => Either::Right(r.write_bytes_at(offset, data)), + Self::Left(l) => l.write_bytes_at(offset, data).await, + Self::Right(r) => r.write_bytes_at(offset, data).await, } } - type WriteAtFuture<'a> = Either, R::WriteAtFuture<'a>>; - fn write_at<'a>(&'a mut self, offset: u64, data: &'a [u8]) -> Self::WriteAtFuture<'a> { + async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> { match self { - tokio_util::either::Either::Left(l) => Either::Left(l.write_at(offset, data)), - tokio_util::either::Either::Right(r) => Either::Right(r.write_at(offset, data)), + Self::Left(l) => l.write_at(offset, data).await, + Self::Right(r) => r.write_at(offset, data).await, } } - type SyncFuture<'a> = Either, R::SyncFuture<'a>>; - fn sync(&mut self) -> Self::SyncFuture<'_> { + async fn sync(&mut self) -> io::Result<()> { match self { - tokio_util::either::Either::Left(l) => Either::Left(l.sync()), - tokio_util::either::Either::Right(r) => Either::Right(r.sync()), + Self::Left(l) => l.sync().await, + Self::Right(r) => r.sync().await, } } - type SetLenFuture<'a> = Either, R::SetLenFuture<'a>>; - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_> { + async fn set_len(&mut self, len: u64) -> io::Result<()> { match self { - tokio_util::either::Either::Left(l) => Either::Left(l.set_len(len)), - tokio_util::either::Either::Right(r) => Either::Right(r.set_len(len)), + Self::Left(l) => l.set_len(len).await, + Self::Right(r) => r.set_len(len).await, } } } diff --git a/src/mem.rs b/src/mem.rs index ea237ab..1f4ad45 100644 --- a/src/mem.rs +++ b/src/mem.rs @@ -2,114 +2,95 @@ use crate::AsyncStreamWriter; use super::{AsyncSliceReader, AsyncSliceWriter}; use bytes::{Bytes, BytesMut}; -use futures::future; use std::io; impl AsyncSliceReader for bytes::Bytes { - type ReadAtFuture<'a> = future::Ready>; - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { - future::ok(get_limited_slice(self, offset, len)) + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { + Ok(get_limited_slice(self, offset, len)) } - type LenFuture<'a> = future::Ready>; - fn len(&mut self) -> Self::LenFuture<'_> { - future::ok(Bytes::len(self) as u64) + async fn len(&mut self) -> io::Result { + Ok(Bytes::len(self) as u64) } } impl AsyncSliceReader for bytes::BytesMut { - type ReadAtFuture<'a> = future::Ready>; - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { - future::ok(copy_limited_slice(self, offset, len)) + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { + Ok(copy_limited_slice(self, offset, len)) } - type LenFuture<'a> = future::Ready>; - fn len(&mut self) -> Self::LenFuture<'_> { - future::ok(BytesMut::len(self) as u64) + async fn len(&mut self) -> io::Result { + Ok(BytesMut::len(self) as u64) } } impl AsyncSliceWriter for bytes::BytesMut { - type WriteBytesAtFuture<'a> = future::Ready>; - fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> { - future::ready(write_extend(self, offset, &data)) + async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> { + write_extend(self, offset, &data) } - type WriteAtFuture<'a> = future::Ready>; - fn write_at(&mut self, offset: u64, data: &[u8]) -> Self::WriteAtFuture<'_> { - future::ready(write_extend(self, offset, data)) + async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> { + write_extend(self, offset, data) } - type SetLenFuture<'a> = future::Ready>; - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_> { + async fn set_len(&mut self, len: u64) -> io::Result<()> { let len = len.try_into().unwrap_or(usize::MAX); self.resize(len, 0); - future::ok(()) + Ok(()) } - type SyncFuture<'a> = future::Ready>; - fn sync(&mut self) -> Self::SyncFuture<'_> { - future::ok(()) + async fn sync(&mut self) -> io::Result<()> { + Ok(()) } } impl AsyncSliceWriter for Vec { - type WriteBytesAtFuture<'a> = future::Ready>; - fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> { - future::ready(write_extend_vec(self, offset, &data)) + async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> { + write_extend_vec(self, offset, &data) } - type WriteAtFuture<'a> = future::Ready>; - fn write_at(&mut self, offset: u64, data: &[u8]) -> Self::WriteAtFuture<'_> { - future::ready(write_extend_vec(self, offset, data)) + async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> { + write_extend_vec(self, offset, data) } - type SetLenFuture<'a> = future::Ready>; - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_> { + async fn set_len(&mut self, len: u64) -> io::Result<()> { let len = len.try_into().unwrap_or(usize::MAX); self.resize(len, 0); - future::ok(()) + Ok(()) } - type SyncFuture<'a> = future::Ready>; - fn sync(&mut self) -> Self::SyncFuture<'_> { - future::ok(()) + async fn sync(&mut self) -> io::Result<()> { + Ok(()) } } impl AsyncStreamWriter for Vec { - type WriteFuture<'a> = futures::future::Ready>; - fn write(&mut self, data: &[u8]) -> Self::WriteFuture<'_> { + async fn write(&mut self, data: &[u8]) -> io::Result<()> { self.extend_from_slice(data); - futures::future::ok(()) + Ok(()) } - type WriteBytesFuture<'a> = Self::WriteFuture<'a>; - fn write_bytes(&mut self, data: Bytes) -> Self::WriteBytesFuture<'_> { - self.write(&data) + async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> { + self.write(&data).await } - type SyncFuture<'a> = futures::future::Ready>; - fn sync(&mut self) -> Self::SyncFuture<'_> { - futures::future::ok(()) + async fn sync(&mut self) -> io::Result<()> { + Ok(()) } } impl AsyncStreamWriter for bytes::BytesMut { - type WriteFuture<'a> = futures::future::Ready>; - fn write(&mut self, data: &[u8]) -> Self::WriteFuture<'_> { + async fn write(&mut self, data: &[u8]) -> io::Result<()> { self.extend_from_slice(data); - futures::future::ok(()) + Ok(()) } - type WriteBytesFuture<'a> = Self::WriteFuture<'a>; - fn write_bytes(&mut self, data: Bytes) -> Self::WriteBytesFuture<'_> { - self.write(&data) + async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> { + self.write(&data).await } - type SyncFuture<'a> = futures::future::Ready>; - fn sync(&mut self) -> Self::SyncFuture<'_> { - futures::future::ok(()) + async fn sync(&mut self) -> io::Result<()> { + Ok(()) } } diff --git a/src/stats.rs b/src/stats.rs index f7421af..b66f0ee 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -7,11 +7,13 @@ //! Statistics are always added using saturating arithmetic, so there can't be //! a panic even in unlikely scenarios. use std::{ + io, pin::Pin, task::{Context, Poll}, time::Duration, }; +use bytes::Bytes; use futures::prelude::*; use pin_project::pin_project; @@ -141,16 +143,13 @@ impl TrackingStreamWriter { } impl AsyncStreamWriter for TrackingStreamWriter { - type WriteFuture<'a> = AggregateStats<'a, W::WriteFuture<'a>> where Self: 'a; - - fn write<'a>(&'a mut self, data: &'a [u8]) -> Self::WriteFuture<'a> { + async fn write(&mut self, data: &[u8]) -> io::Result<()> { // increase the size by the length of the data, even if the write fails self.stats.write.size = self.stats.write.size.saturating_add(data.len() as u64); - AggregateStats::new(self.inner.write(data), &mut self.stats.write.stats) + AggregateStats::new(self.inner.write(data), &mut self.stats.write.stats).await } - type WriteBytesFuture<'a> = AggregateStats<'a, W::WriteBytesFuture<'a>> where Self: 'a; - fn write_bytes(&mut self, data: bytes::Bytes) -> Self::WriteBytesFuture<'_> { + async fn write_bytes(&mut self, data: bytes::Bytes) -> io::Result<()> { // increase the size by the length of the data, even if the write fails self.stats.write_bytes.size = self .stats @@ -161,12 +160,11 @@ impl AsyncStreamWriter for TrackingStreamWriter { self.inner.write_bytes(data), &mut self.stats.write_bytes.stats, ) + .await } - type SyncFuture<'a> = AggregateStats<'a, W::SyncFuture<'a>> where Self: 'a; - - fn sync(&mut self) -> Self::SyncFuture<'_> { - AggregateStats::new(self.inner.sync(), &mut self.stats.sync) + async fn sync(&mut self) -> io::Result<()> { + AggregateStats::new(self.inner.sync(), &mut self.stats.sync).await } } @@ -224,10 +222,8 @@ impl TrackingStreamReader { } impl AsyncStreamReader for TrackingStreamReader { - type ReadFuture<'a> = AggregateSizeAndStats<'a, W::ReadFuture<'a>> where Self: 'a; - - fn read(&mut self, len: usize) -> Self::ReadFuture<'_> { - AggregateSizeAndStats::new(self.inner.read(len), &mut self.stats.read) + async fn read(&mut self, len: usize) -> io::Result { + AggregateSizeAndStats::new(self.inner.read(len), &mut self.stats.read).await } } @@ -288,16 +284,12 @@ impl TrackingSliceReader { } impl AsyncSliceReader for TrackingSliceReader { - type ReadAtFuture<'a> = AggregateSizeAndStats<'a, R::ReadAtFuture<'a>> where Self: 'a; - - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { - AggregateSizeAndStats::new(self.inner.read_at(offset, len), &mut self.stats.read_at) + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { + AggregateSizeAndStats::new(self.inner.read_at(offset, len), &mut self.stats.read_at).await } - type LenFuture<'a> = AggregateStats<'a, R::LenFuture<'a>> where Self: 'a; - - fn len(&mut self) -> Self::LenFuture<'_> { - AggregateStats::new(self.inner.len(), &mut self.stats.len) + async fn len(&mut self) -> io::Result { + AggregateStats::new(self.inner.len(), &mut self.stats.len).await } } @@ -364,20 +356,17 @@ impl TrackingSliceWriter { } impl AsyncSliceWriter for TrackingSliceWriter { - type WriteAtFuture<'a> = AggregateStats<'a, W::WriteAtFuture<'a>> where Self: 'a; - - fn write_at<'a>(&'a mut self, offset: u64, data: &'a [u8]) -> Self::WriteAtFuture<'a> { + async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> { // increase the size by the length of the data, even if the write fails self.stats.write_at.size = self.stats.write_at.size.saturating_add(data.len() as u64); AggregateStats::new( self.inner.write_at(offset, data), &mut self.stats.write_at.stats, ) + .await } - type WriteBytesAtFuture<'a> = AggregateStats<'a, W::WriteBytesAtFuture<'a>> where Self: 'a; - - fn write_bytes_at(&mut self, offset: u64, data: bytes::Bytes) -> Self::WriteBytesAtFuture<'_> { + async fn write_bytes_at(&mut self, offset: u64, data: bytes::Bytes) -> io::Result<()> { // increase the size by the length of the data, even if the write fails self.stats.write_bytes_at.size = self .stats @@ -388,18 +377,15 @@ impl AsyncSliceWriter for TrackingSliceWriter { self.inner.write_bytes_at(offset, data), &mut self.stats.write_bytes_at.stats, ) + .await } - type SetLenFuture<'a> = AggregateStats<'a, W::SetLenFuture<'a>> where Self: 'a; - - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_> { - AggregateStats::new(self.inner.set_len(len), &mut self.stats.set_len) + async fn set_len(&mut self, len: u64) -> io::Result<()> { + AggregateStats::new(self.inner.set_len(len), &mut self.stats.set_len).await } - type SyncFuture<'a> = AggregateStats<'a, W::SyncFuture<'a>> where Self: 'a; - - fn sync(&mut self) -> Self::SyncFuture<'_> { - AggregateStats::new(self.inner.sync(), &mut self.stats.sync) + async fn sync(&mut self) -> io::Result<()> { + AggregateStats::new(self.inner.sync(), &mut self.stats.sync).await } } diff --git a/src/tokio_io.rs b/src/tokio_io.rs index 3e07a74..1e964c9 100644 --- a/src/tokio_io.rs +++ b/src/tokio_io.rs @@ -1,6 +1,6 @@ //! Blocking io for [std::fs::File], using the tokio blocking task pool. use bytes::Bytes; -use futures::{future::LocalBoxFuture, Future, FutureExt}; +use futures::Future; use pin_project::pin_project; use std::{ io::{self, Read, Seek, SeekFrom}, @@ -65,101 +65,41 @@ pub mod file { use super::*; - newtype_future!( - /// The future returned by [File::read_at] - #[derive(Debug)] - ReadAtFuture, - Asyncify<'a, Bytes, FileAdapterFsm>, - io::Result - ); - newtype_future!( - /// The future returned by [File::len] - #[derive(Debug)] - LenFuture, - Asyncify<'a, u64, FileAdapterFsm>, - io::Result - ); - newtype_future!( - /// The future returned by [File::write_bytes_at] - #[derive(Debug)] - WriteBytesAtFuture, - Asyncify<'a, (), FileAdapterFsm>, - io::Result<()> - ); - newtype_future!( - /// The future returned by [File::write_at] - #[derive(Debug)] - WriteAtFuture, - Asyncify<'a, (), FileAdapterFsm>, - io::Result<()> - ); - newtype_future!( - /// The future returned by [File::set_len] - #[derive(Debug)] - SetLenFuture, - Asyncify<'a, (), FileAdapterFsm>, - io::Result<()> - ); - newtype_future!( - /// The future returned by [File::sync] - #[derive(Debug)] - SyncFuture, - Asyncify<'a, (), FileAdapterFsm>, - io::Result<()> - ); - impl AsyncSliceReader for File { - type ReadAtFuture<'a> = file::ReadAtFuture<'a>; - - fn read_at(&mut self, offset: u64, len: usize) -> Self::ReadAtFuture<'_> { - let fut = self - .0 - .take() - .map(|t| (t.read_at(offset, len), &mut self.0)) - .into(); - ReadAtFuture(fut) + async fn read_at(&mut self, offset: u64, len: usize) -> io::Result { + Asyncify::from(self.0.take().map(|t| (t.read_at(offset, len), &mut self.0))).await } - type LenFuture<'a> = LenFuture<'a>; - - fn len(&mut self) -> Self::LenFuture<'_> { - let fut = self.0.take().map(|t| (t.len(), &mut self.0)).into(); - LenFuture(fut) + async fn len(&mut self) -> io::Result { + Asyncify::from(self.0.take().map(|t| (t.len(), &mut self.0))).await } } impl AsyncSliceWriter for File { - type WriteBytesAtFuture<'a> = WriteBytesAtFuture<'a>; - - fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> { - let fut = self - .0 - .take() - .map(|t| (t.write_bytes_at(offset, data), &mut self.0)) - .into(); - WriteBytesAtFuture(fut) + async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> { + Asyncify::from( + self.0 + .take() + .map(|t| (t.write_bytes_at(offset, data), &mut self.0)), + ) + .await } - type WriteAtFuture<'a> = WriteAtFuture<'a>; - fn write_at(&mut self, offset: u64, data: &[u8]) -> Self::WriteAtFuture<'_> { - let fut = self - .0 - .take() - .map(|t| (t.write_at(offset, data), &mut self.0)) - .into(); - WriteAtFuture(fut) + async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> { + Asyncify::from( + self.0 + .take() + .map(|t| (t.write_at(offset, data), &mut self.0)), + ) + .await } - type SyncFuture<'a> = SyncFuture<'a>; - fn sync(&mut self) -> Self::SyncFuture<'_> { - let fut = self.0.take().map(|t| (t.sync(), &mut self.0)).into(); - SyncFuture(fut) + async fn sync(&mut self) -> io::Result<()> { + Asyncify::from(self.0.take().map(|t| (t.sync(), &mut self.0))).await } - type SetLenFuture<'a> = SetLenFuture<'a>; - fn set_len(&mut self, len: u64) -> Self::SetLenFuture<'_> { - let fut = self.0.take().map(|t| (t.set_len(len), &mut self.0)).into(); - SetLenFuture(fut) + async fn set_len(&mut self, len: u64) -> io::Result<()> { + Asyncify::from(self.0.take().map(|t| (t.set_len(len), &mut self.0))).await } } } @@ -313,24 +253,20 @@ impl ConcatenateSliceWriter { } impl AsyncSliceWriter for ConcatenateSliceWriter { - type WriteBytesAtFuture<'a> = concatenate_slice_writer::WriteBytesAt<'a, W>; - fn write_bytes_at(&mut self, _offset: u64, data: Bytes) -> Self::WriteBytesAtFuture<'_> { - tokio_helper::write_bytes(&mut self.0, data) + async fn write_bytes_at(&mut self, _offset: u64, data: Bytes) -> io::Result<()> { + tokio_helper::write_bytes(&mut self.0, data).await } - type WriteAtFuture<'a> = concatenate_slice_writer::WriteAt<'a, W>; - fn write_at<'a>(&'a mut self, _offset: u64, bytes: &'a [u8]) -> Self::WriteAtFuture<'a> { - tokio_helper::write(&mut self.0, bytes) + async fn write_at(&mut self, _offset: u64, bytes: &[u8]) -> io::Result<()> { + tokio_helper::write(&mut self.0, bytes).await } - type SyncFuture<'a> = concatenate_slice_writer::Sync<'a, W>; - fn sync(&mut self) -> Self::SyncFuture<'_> { - tokio_helper::flush(&mut self.0) + async fn sync(&mut self) -> io::Result<()> { + tokio_helper::flush(&mut self.0).await } - type SetLenFuture<'a> = futures::future::Ready>; - fn set_len(&mut self, _len: u64) -> Self::SetLenFuture<'_> { - futures::future::ready(io::Result::Ok(())) + async fn set_len(&mut self, _len: u64) -> io::Result<()> { + io::Result::Ok(()) } } @@ -346,22 +282,16 @@ pub mod concatenate_slice_writer { pub struct TokioStreamWriter(pub T); impl AsyncStreamWriter for TokioStreamWriter { - type WriteFuture<'a> = tokio_stream_writer::Write<'a, T> where Self: 'a; - - fn write<'a>(&'a mut self, data: &'a [u8]) -> Self::WriteFuture<'a> { - tokio_helper::write(&mut self.0, data) + async fn write(&mut self, data: &[u8]) -> io::Result<()> { + tokio_helper::write(&mut self.0, data).await } - type WriteBytesFuture<'a> = tokio_stream_writer::WriteBytes<'a, T> where Self: 'a; - - fn write_bytes(&mut self, data: Bytes) -> Self::WriteBytesFuture<'_> { - tokio_helper::write_bytes(&mut self.0, data) + async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> { + tokio_helper::write_bytes(&mut self.0, data).await } - type SyncFuture<'a> = tokio_stream_writer::Sync<'a, T> where Self: 'a; - - fn sync(&mut self) -> Self::SyncFuture<'_> { - tokio_helper::flush(&mut self.0) + async fn sync(&mut self) -> io::Result<()> { + tokio_helper::flush(&mut self.0).await } } @@ -370,17 +300,10 @@ impl AsyncStreamWriter for TokioStreamWriter(T); impl AsyncStreamReader for TokioStreamReader { - type ReadFuture<'a> = LocalBoxFuture<'a, io::Result> - where - Self: 'a; - - fn read(&mut self, len: usize) -> Self::ReadFuture<'_> { - async move { - let mut buf = Vec::with_capacity(len.min(MAX_PREALLOC)); - (&mut self.0).take(len as u64).read_to_end(&mut buf).await?; - Ok(buf.into()) - } - .boxed_local() + async fn read(&mut self, len: usize) -> io::Result { + let mut buf = Vec::with_capacity(len.min(MAX_PREALLOC)); + (&mut self.0).take(len as u64).read_to_end(&mut buf).await?; + Ok(buf.into()) } }