From 20878b9ecafb6f5e3959fb3271939750ccead19e Mon Sep 17 00:00:00 2001 From: Paul Sbarra Date: Mon, 6 Nov 2023 21:37:17 -0600 Subject: [PATCH] provide a non-destructive mechanism to determine if a sink/stream are paired --- futures-util/src/lock/bilock.rs | 7 ++- futures-util/src/stream/stream/split.rs | 81 +++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/futures-util/src/lock/bilock.rs b/futures-util/src/lock/bilock.rs index 7ddc66ad2c..bde7f71abc 100644 --- a/futures-util/src/lock/bilock.rs +++ b/futures-util/src/lock/bilock.rs @@ -69,6 +69,11 @@ impl BiLock { (Self { arc: arc.clone() }, Self { arc }) } + /// Returns `true` only if the two `BiLock`s originated from the same call to `BiLock::new`. + pub fn are_paired(first: &Self, second: &Self) -> bool { + Arc::ptr_eq(&first.arc, &second.arc) + } + /// Attempt to acquire this lock, returning `Pending` if it can't be /// acquired. /// @@ -156,7 +161,7 @@ impl BiLock { where T: Unpin, { - if Arc::ptr_eq(&self.arc, &other.arc) { + if Self::are_paired(&self, &other) { drop(other); let inner = Arc::try_unwrap(self.arc) .ok() diff --git a/futures-util/src/stream/stream/split.rs b/futures-util/src/stream/stream/split.rs index e2034e0c27..dd00db54dd 100644 --- a/futures-util/src/stream/stream/split.rs +++ b/futures-util/src/stream/stream/split.rs @@ -15,6 +15,13 @@ pub struct SplitStream(BiLock); impl Unpin for SplitStream {} +impl SplitStream { + /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. + pub fn is_pair(&self, other: &SplitSink) -> bool { + other.is_pair(&self) + } +} + impl SplitStream { /// Attempts to put the two "halves" of a split `Stream + Sink` back /// together. Succeeds only if the `SplitStream` and `SplitSink` are @@ -60,6 +67,13 @@ impl + Unpin, Item> SplitSink { } } +impl SplitSink { + /// Returns `true` if the `SplitStream` and `SplitSink` originate from the same call to `StreamExt::split`. + pub fn is_pair(&self, other: &SplitStream) -> bool { + BiLock::are_paired(&self.lock, &other.0) + } +} + impl, Item> SplitSink { fn poll_flush_slot( mut inner: Pin<&mut S>, @@ -142,3 +156,70 @@ impl fmt::Display for ReuniteError { #[cfg(feature = "std")] impl std::error::Error for ReuniteError {} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{sink::Sink, stream::StreamExt}; + use core::marker::PhantomData; + + struct NopStream { + phantom: PhantomData, + } + + impl Stream for NopStream { + type Item = Item; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + todo!() + } + } + + impl Sink for NopStream { + type Error = (); + + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn start_send(self: Pin<&mut Self>, _item: Item) -> Result<(), Self::Error> { + todo!() + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + todo!() + } + } + + #[test] + #[cfg_attr(docsrs, doc(cfg(feature = "sink")))] + fn test_pairing() { + let s1 = NopStream::<()> { phantom: PhantomData }; + let (sink1, stream1) = s1.split(); + assert!(sink1.is_pair(&stream1)); + assert!(stream1.is_pair(&sink1)); + + let s2 = NopStream::<()> { phantom: PhantomData }; + let (sink2, stream2) = s2.split(); + assert!(sink2.is_pair(&stream2)); + assert!(stream2.is_pair(&sink2)); + + assert!(!sink1.is_pair(&stream2)); + assert!(!stream1.is_pair(&sink2)); + assert!(!sink2.is_pair(&stream1)); + assert!(!stream2.is_pair(&sink1)); + } +}