Skip to content

Commit

Permalink
Add a few more instances for AsyncStreamReader
Browse files Browse the repository at this point in the history
especially the ability to turn an AsyncSliceReader into an AsyncStreamReader
  • Loading branch information
rklaehn committed Apr 9, 2024
1 parent 145d855 commit 2f64498
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 34 deletions.
61 changes: 59 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
//! an allocation.
#![deny(missing_docs, rustdoc::broken_intra_doc_links)]

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use std::future::Future;
use std::io;
use std::io::{self, Cursor};

/// A trait to abstract async reading from different resource.
///
Expand Down Expand Up @@ -188,6 +188,31 @@ impl AsyncStreamReader for Bytes {
}
}

impl AsyncStreamReader for &[u8] {
async fn read(&mut self, len: usize) -> io::Result<Bytes> {
let len = len.min(self.len());
let res = Bytes::copy_from_slice(&self[..len]);
*self = &self[len..];
Ok(res)
}
}

impl AsyncStreamReader for BytesMut {
async fn read(&mut self, len: usize) -> io::Result<Bytes> {
let res = self.split_to(len.min(BytesMut::len(self)));
Ok(res.freeze())
}
}

impl<T: AsyncSliceReader> AsyncStreamReader for Cursor<T> {
async fn read(&mut self, len: usize) -> io::Result<Bytes> {
let offset = self.position();
let res = self.get_mut().read_at(offset, len).await?;
self.set_position(offset + res.len() as u64);
Ok(res)
}
}

/// A non seekable writer, e.g. a network socket.
pub trait AsyncStreamWriter {
/// Write the entire slice.
Expand Down Expand Up @@ -218,6 +243,38 @@ impl<T: AsyncStreamWriter> AsyncStreamWriter for &mut T {
}
}

impl AsyncStreamWriter for Vec<u8> {
async fn write(&mut self, data: &[u8]) -> io::Result<()> {
self.extend_from_slice(data);
Ok(())
}

async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
self.extend_from_slice(data.as_ref());
Ok(())
}

async fn sync(&mut self) -> io::Result<()> {
Ok(())
}
}

impl AsyncStreamWriter for BytesMut {
async fn write(&mut self, data: &[u8]) -> io::Result<()> {
self.extend_from_slice(data);
Ok(())
}

async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
self.extend_from_slice(data.as_ref());
Ok(())
}

async fn sync(&mut self) -> io::Result<()> {
Ok(())
}
}

#[cfg(feature = "tokio-io")]
mod tokio_io;
#[cfg(feature = "tokio-io")]
Expand Down
32 changes: 0 additions & 32 deletions src/mem.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::AsyncStreamWriter;

use super::{AsyncSliceReader, AsyncSliceWriter};
use bytes::{Bytes, BytesMut};
use std::io;
Expand Down Expand Up @@ -64,36 +62,6 @@ impl AsyncSliceWriter for Vec<u8> {
}
}

impl AsyncStreamWriter for Vec<u8> {
async fn write(&mut self, data: &[u8]) -> io::Result<()> {
self.extend_from_slice(data);
Ok(())
}

async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
self.write(&data).await
}

async fn sync(&mut self) -> io::Result<()> {
Ok(())
}
}

impl AsyncStreamWriter for bytes::BytesMut {
async fn write(&mut self, data: &[u8]) -> io::Result<()> {
self.extend_from_slice(data);
Ok(())
}

async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
self.write(&data).await
}

async fn sync(&mut self) -> io::Result<()> {
Ok(())
}
}

pub(crate) fn limited_range(offset: u64, len: usize, buf_len: usize) -> std::ops::Range<usize> {
if offset < buf_len as u64 {
let start = offset as usize;
Expand Down

0 comments on commit 2f64498

Please sign in to comment.