Skip to content

Commit

Permalink
provide a non-destructive mechanism to determine if a sink/stream are…
Browse files Browse the repository at this point in the history
… paired
  • Loading branch information
tones111 committed Nov 7, 2023
1 parent 0b0fc12 commit 20878b9
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 1 deletion.
7 changes: 6 additions & 1 deletion futures-util/src/lock/bilock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ impl<T> BiLock<T> {
(Self { arc: arc.clone() }, Self { arc })
}

/// Returns `true` only if the two `BiLock<T>`s originated from the same call to `BiLock::new`.
pub fn are_paired(first: &Self, second: &Self) -> bool {
Arc::ptr_eq(&first.arc, &second.arc)
}

/// Attempt to acquire this lock, returning `Pending` if it can't be
/// acquired.
///
Expand Down Expand Up @@ -156,7 +161,7 @@ impl<T> BiLock<T> {
where
T: Unpin,
{
if Arc::ptr_eq(&self.arc, &other.arc) {
if Self::are_paired(&self, &other) {
drop(other);
let inner = Arc::try_unwrap(self.arc)
.ok()
Expand Down
81 changes: 81 additions & 0 deletions futures-util/src/stream/stream/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ pub struct SplitStream<S>(BiLock<S>);

impl<S> Unpin for SplitStream<S> {}

impl<S> SplitStream<S> {
/// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
pub fn is_pair<Item>(&self, other: &SplitSink<S, Item>) -> bool {
other.is_pair(&self)
}
}

impl<S: Unpin> SplitStream<S> {
/// Attempts to put the two "halves" of a split `Stream + Sink` back
/// together. Succeeds only if the `SplitStream<S>` and `SplitSink<S>` are
Expand Down Expand Up @@ -60,6 +67,13 @@ impl<S: Sink<Item> + Unpin, Item> SplitSink<S, Item> {
}
}

impl<S, Item> SplitSink<S, Item> {
/// Returns `true` if the `SplitStream<S>` and `SplitSink<S>` originate from the same call to `StreamExt::split`.
pub fn is_pair(&self, other: &SplitStream<S>) -> bool {
BiLock::are_paired(&self.lock, &other.0)
}
}

impl<S: Sink<Item>, Item> SplitSink<S, Item> {
fn poll_flush_slot(
mut inner: Pin<&mut S>,
Expand Down Expand Up @@ -142,3 +156,70 @@ impl<T, Item> fmt::Display for ReuniteError<T, Item> {

#[cfg(feature = "std")]
impl<T: core::any::Any, Item> std::error::Error for ReuniteError<T, Item> {}

#[cfg(test)]
mod tests {
use super::*;
use crate::{sink::Sink, stream::StreamExt};
use core::marker::PhantomData;

struct NopStream<Item> {
phantom: PhantomData<Item>,
}

impl<Item> Stream for NopStream<Item> {
type Item = Item;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}

impl<Item> Sink<Item> for NopStream<Item> {
type Error = ();

fn poll_ready(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
todo!()
}

fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> {
todo!()
}

fn poll_flush(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
todo!()
}

fn poll_close(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
todo!()
}
}

#[test]
#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
fn test_pairing() {
let s1 = NopStream::<()> { phantom: PhantomData };
let (sink1, stream1) = s1.split();
assert!(sink1.is_pair(&stream1));
assert!(stream1.is_pair(&sink1));

let s2 = NopStream::<()> { phantom: PhantomData };
let (sink2, stream2) = s2.split();
assert!(sink2.is_pair(&stream2));
assert!(stream2.is_pair(&sink2));

assert!(!sink1.is_pair(&stream2));
assert!(!stream1.is_pair(&sink2));
assert!(!sink2.is_pair(&stream1));
assert!(!stream2.is_pair(&sink1));
}
}

0 comments on commit 20878b9

Please sign in to comment.