Skip to content

Commit

Permalink
Get rid of tokio_helper
Browse files Browse the repository at this point in the history
we don't need the futures to be named anymore
  • Loading branch information
rklaehn committed Apr 9, 2024
1 parent bb03877 commit bee4232
Showing 1 changed file with 9 additions and 139 deletions.
148 changes: 9 additions & 139 deletions src/tokio_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ use pin_project::pin_project;
use std::{
future::Future,
io::{self, Read, Seek, SeekFrom},
marker::PhantomPinned,
path::PathBuf,
pin::Pin,
task::{ready, Context, Poll},
task::{Context, Poll},
};
use tokio::{
io::{AsyncReadExt, AsyncWrite},
Expand Down Expand Up @@ -252,15 +251,15 @@ impl<W> ConcatenateSliceWriter<W> {

impl<W: AsyncWrite + Unpin + 'static> AsyncSliceWriter for ConcatenateSliceWriter<W> {
async fn write_bytes_at(&mut self, _offset: u64, data: Bytes) -> io::Result<()> {
tokio_helper::write_bytes(&mut self.0, data).await
self.0.write_all(&data).await
}

async fn write_at(&mut self, _offset: u64, bytes: &[u8]) -> io::Result<()> {
tokio_helper::write(&mut self.0, bytes).await
self.0.write_all(bytes).await
}

async fn sync(&mut self) -> io::Result<()> {
tokio_helper::flush(&mut self.0).await
self.0.flush().await
}

async fn set_len(&mut self, _len: u64) -> io::Result<()> {
Expand All @@ -272,17 +271,19 @@ impl<W: AsyncWrite + Unpin + 'static> AsyncSliceWriter for ConcatenateSliceWrite
#[derive(Debug, Clone)]
pub struct TokioStreamWriter<T>(pub T);

use tokio::io::AsyncWriteExt;

impl<T: tokio::io::AsyncWrite + Unpin> AsyncStreamWriter for TokioStreamWriter<T> {
async fn write(&mut self, data: &[u8]) -> io::Result<()> {
tokio_helper::write(&mut self.0, data).await
self.0.write_all(data).await
}

async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
tokio_helper::write_bytes(&mut self.0, data).await
self.0.write_all(&data).await
}

async fn sync(&mut self) -> io::Result<()> {
tokio_helper::flush(&mut self.0).await
self.0.flush().await
}
}

Expand All @@ -297,134 +298,3 @@ impl<T: tokio::io::AsyncRead + Unpin> AsyncStreamReader for TokioStreamReader<T>
Ok(buf.into())
}
}

/// Futures copied from tokio, because they are private in tokio
mod tokio_helper {
use bytes::Buf;

use super::*;

/// Future that writes a slice to a writer
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project]
pub struct Write<'a, W: ?Sized> {
writer: &'a mut W,
buf: &'a [u8],
// Make this future `!Unpin` for compatibility with async trait methods.
#[pin]
_pin: PhantomPinned,
}

pub(crate) fn write<'a, W>(writer: &'a mut W, buf: &'a [u8]) -> Write<'a, W>
where
W: AsyncWrite + Unpin + ?Sized,
{
Write {
writer,
buf,
_pin: PhantomPinned,
}
}

impl<W> Future for Write<'_, W>
where
W: AsyncWrite + Unpin + ?Sized,
{
type Output = io::Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let me = self.project();
while !me.buf.is_empty() {
let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.buf))?;
{
let (_, rest) = std::mem::take(&mut *me.buf).split_at(n);
*me.buf = rest;
}
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
}

Poll::Ready(Ok(()))
}
}

/// Future that writes Bytes to a writer
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project]
pub struct WriteBytes<'a, W: ?Sized> {
writer: &'a mut W,
bytes: Bytes,
// Make this future `!Unpin` for compatibility with async trait methods.
#[pin]
_pin: PhantomPinned,
}

pub(crate) fn write_bytes<W>(writer: &mut W, bytes: Bytes) -> WriteBytes<'_, W>
where
W: AsyncWrite + Unpin + ?Sized,
{
WriteBytes {
writer,
bytes,
_pin: PhantomPinned,
}
}

impl<W> Future for WriteBytes<'_, W>
where
W: AsyncWrite + Unpin + ?Sized,
{
type Output = io::Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let me = self.project();
while !me.bytes.is_empty() {
let n = ready!(Pin::new(&mut *me.writer).poll_write(cx, me.bytes))?;
// this could panic if n > len, but that would violate the contract of AsyncWrite
me.bytes.advance(n);
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
}

Poll::Ready(Ok(()))
}
}

/// Future that flushes a writer
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project]
pub struct Flush<'a, A: ?Sized> {
a: &'a mut A,
// Make this future `!Unpin` for compatibility with async trait methods.
#[pin]
_pin: PhantomPinned,
}

/// Creates a future which will entirely flush an I/O object.
pub(crate) fn flush<A>(a: &mut A) -> Flush<'_, A>
where
A: AsyncWrite + Unpin + ?Sized,
{
Flush {
a,
_pin: PhantomPinned,
}
}

impl<A> Future for Flush<'_, A>
where
A: AsyncWrite + Unpin + ?Sized,
{
type Output = io::Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();
Pin::new(&mut *me.a).poll_flush(cx)
}
}
}

0 comments on commit bee4232

Please sign in to comment.