Skip to content

Commit

Permalink
CloseAll
Browse files Browse the repository at this point in the history
  • Loading branch information
timotheyca committed Mar 10, 2024
1 parent 077d2af commit 0a9eee2
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
members = ["internal/ruchei-sample"]

[workspace.package]
version = "0.0.71" # ad7038ef3b571dc133c108e14e6bb0f8cdcd812d and earlier have invalid versions
version = "0.0.72" # ad7038ef3b571dc133c108e14e6bb0f8cdcd812d and earlier have invalid versions
edition = "2021"
publish = true
license = "MIT OR Apache-2.0"
Expand Down
163 changes: 163 additions & 0 deletions src/close_all.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use std::{
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use futures_util::{
future::FusedFuture,
lock::{Mutex, OwnedMutexGuard, OwnedMutexLockFuture},
ready,
stream::FusedStream,
Future, Sink, Stream,
};
use pin_project::pin_project;

struct Closed;

/// Yielded by [`CloseAll`]. Gets closed when incoming stream terminates.
#[pin_project]
pub struct CloseOne<S, Out> {
#[pin]
stream: S,
#[pin]
closing: OwnedMutexLockFuture<Closed>,
terminated: bool,
_out: PhantomData<Out>,
}

impl<In, Out, E, S: Stream<Item = Result<In, E>> + Sink<Out, Error = E>> Stream
for CloseOne<S, Out>
{
type Item = Result<In, E>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if *this.terminated {
Poll::Ready(None)
} else if this.closing.is_terminated() || this.closing.poll(cx).is_ready() {
let r = ready!(this.stream.poll_close(cx));
*this.terminated = true;
r?;
Poll::Ready(None)
} else {
Poll::Ready(match ready!(this.stream.poll_next(cx)) {
Some(Ok(item)) => Some(Ok(item)),
Some(Err(e)) => {
*this.terminated = true;
Some(Err(e))
}
None => {
*this.terminated = true;
None
}
})
}
}
}

impl<In, Out, E, S: Stream<Item = Result<In, E>> + Sink<Out, Error = E>> FusedStream
for CloseOne<S, Out>
{
fn is_terminated(&self) -> bool {
self.terminated
}
}

impl<In, Out, E, S: Stream<Item = Result<In, E>> + Sink<Out, Error = E>> Sink<Out>
for CloseOne<S, Out>
{
type Error = E;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
if this.closing.is_terminated() {
Poll::Pending
} else {
this.stream.poll_ready(cx)
}
}

fn start_send(self: Pin<&mut Self>, item: Out) -> Result<(), Self::Error> {
let this = self.project();
if this.closing.is_terminated() {
Ok(())
} else {
this.stream.start_send(item)
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
if this.closing.is_terminated() {
Poll::Pending
} else {
this.stream.poll_flush(cx)
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let this = self.project();
if *this.terminated {
Poll::Ready(Ok(()))
} else {
let r = ready!(this.stream.poll_close(cx));
*this.terminated = true;
Poll::Ready(r)
}
}
}

/// Closes all yielded streams ([`CloseOne`]s) on termination of incoming stream.
#[pin_project]
pub struct CloseAll<R, Out> {
#[pin]
stream: R,
guard: Option<OwnedMutexGuard<Closed>>,
lock: Arc<Mutex<Closed>>,
_out: PhantomData<Out>,
}

impl<Out, S, R: Stream<Item = S>> Stream for CloseAll<R, Out> {
type Item = CloseOne<S, Out>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
Poll::Ready(match ready!(this.stream.poll_next(cx)) {
Some(stream) => Some(CloseOne {
stream,
closing: this.lock.clone().lock_owned(),
terminated: false,
_out: PhantomData,
}),
None => {
this.guard.take();
None
}
})
}
}

impl<Out, S, R: Stream<Item = S>> FusedStream for CloseAll<R, Out> {
fn is_terminated(&self) -> bool {
self.guard.is_none()
}
}

pub trait CloseAllExt<Out>: Sized {
fn close_all(self) -> CloseAll<Self, Out>;
}

impl<Out, S: Sink<Out>, R: Stream<Item = S>> CloseAllExt<Out> for R {
fn close_all(self) -> CloseAll<Self, Out> {
let lock = Arc::new(Mutex::new(Closed));
let guard = lock.clone().try_lock_owned().unwrap();
CloseAll {
stream: self,
guard: Some(guard),
lock,
_out: PhantomData,
}
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
extern crate self as ruchei;

pub mod callback;
pub mod close_all;
pub mod concurrent;
pub mod echo;
pub mod group_by_key;
Expand Down

0 comments on commit 0a9eee2

Please sign in to comment.