Skip to content

Commit

Permalink
[feat] add shrink_to_fit() to Sender<T> and Receiver<T>
Browse files Browse the repository at this point in the history
  • Loading branch information
rakbladsvalsen committed May 3, 2024
1 parent fcf3849 commit b317170
Show file tree
Hide file tree
Showing 12 changed files with 500 additions and 265 deletions.
221 changes: 157 additions & 64 deletions benches/basic.rs

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions examples/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ fn main() {
let (blue_tx, blue_rx) = flume::unbounded();

// Spawn two threads that each send a message into their respective channel
std::thread::spawn(move || { let _ = red_tx.send("Red"); });
std::thread::spawn(move || { let _ = blue_tx.send("Blue"); });
std::thread::spawn(move || {
let _ = red_tx.send("Red");
});
std::thread::spawn(move || {
let _ = blue_tx.send("Blue");
});

// Race them to see which one sends their message first
let winner = Selector::new()
Expand Down
118 changes: 71 additions & 47 deletions src/async.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
//! Futures and other types that allow asynchronous interaction with channels.
use crate::*;
use futures_core::{
future::FusedFuture,
stream::{FusedStream, Stream},
};
use futures_sink::Sink;
use spin1::Mutex as Spinlock;
use std::{
any::Any,
future::Future,
ops::Deref,
pin::Pin,
task::{Context, Poll, Waker},
any::Any,
ops::Deref,
};
use crate::*;
use futures_core::{stream::{Stream, FusedStream}, future::FusedFuture};
use futures_sink::Sink;
use spin1::Mutex as Spinlock;

struct AsyncSignal {
waker: Spinlock<Waker>,
Expand All @@ -35,8 +38,12 @@ impl Signal for AsyncSignal {
self.stream
}

fn as_any(&self) -> &(dyn Any + 'static) { self }
fn as_ptr(&self) -> *const () { self as *const _ as *const () }
fn as_any(&self) -> &(dyn Any + 'static) {
self
}
fn as_ptr(&self) -> *const () {
self as *const _ as *const ()
}
}

impl<T> Hook<T, AsyncSignal> {
Expand Down Expand Up @@ -149,9 +156,11 @@ impl<'a, T> SendFut<'a, T> {
fn reset_hook(&mut self) {
if let Some(SendState::QueuedItem(hook)) = self.hook.take() {
let hook: Arc<Hook<T, dyn Signal>> = hook;
wait_lock(&self.sender.shared.chan).sending
wait_lock(&self.sender.shared.chan)
.sending
.as_mut()
.unwrap().1
.unwrap()
.1
.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
}
}
Expand Down Expand Up @@ -188,7 +197,6 @@ impl<'a, T> Drop for SendFut<'a, T> {
}
}


impl<'a, T> Future for SendFut<'a, T> {
type Output = Result<(), SendError<T>>;

Expand All @@ -211,24 +219,28 @@ impl<'a, T> Future for SendFut<'a, T> {
let this = self.get_mut();
let (shared, this_hook) = (&this.sender.shared, &mut this.hook);

shared.send(
// item
item,
// should_block
true,
// make_signal
|msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)),
// do_block
|hook| {
*this_hook = Some(SendState::QueuedItem(hook));
Poll::Pending
},
)
.map(|r| r.map_err(|err| match err {
TrySendTimeoutError::Disconnected(msg) => SendError(msg),
_ => unreachable!(),
}))
} else { // Nothing to do
shared
.send(
// item
item,
// should_block
true,
// make_signal
|msg| Hook::slot(Some(msg), AsyncSignal::new(cx, false)),
// do_block
|hook| {
*this_hook = Some(SendState::QueuedItem(hook));
Poll::Pending
},
)
.map(|r| {
r.map_err(|err| match err {
TrySendTimeoutError::Disconnected(msg) => SendError(msg),
_ => unreachable!(),
})
})
} else {
// Nothing to do
Poll::Ready(Ok(()))
}
}
Expand Down Expand Up @@ -365,8 +377,16 @@ impl<'a, T> RecvFut<'a, T> {
let hook: Arc<Hook<T, dyn Signal>> = hook;
let mut chan = wait_lock(&self.receiver.shared.chan);
// We'd like to use `Arc::ptr_eq` here but it doesn't seem to work consistently with wide pointers?
chan.waiting.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
if hook.signal().as_any().downcast_ref::<AsyncSignal>().unwrap().woken.load(Ordering::SeqCst) {
chan.waiting
.retain(|s| s.signal().as_ptr() != hook.signal().as_ptr());
if hook
.signal()
.as_any()
.downcast_ref::<AsyncSignal>()
.unwrap()
.woken
.load(Ordering::SeqCst)
{
// If this signal has been fired, but we're being dropped (and so not listening to it),
// pass the signal on to another receiver
chan.try_wake_receiver_if_pending();
Expand Down Expand Up @@ -415,21 +435,24 @@ impl<'a, T> RecvFut<'a, T> {
let mut_self = self.get_mut();
let (shared, this_hook) = (&mut_self.receiver.shared, &mut mut_self.hook);

shared.recv(
// should_block
true,
// make_signal
|| Hook::trigger(AsyncSignal::new(cx, stream)),
// do_block
|hook| {
*this_hook = Some(hook);
Poll::Pending
},
)
.map(|r| r.map_err(|err| match err {
TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
_ => unreachable!(),
}))
shared
.recv(
// should_block
true,
// make_signal
|| Hook::trigger(AsyncSignal::new(cx, stream)),
// do_block
|hook| {
*this_hook = Some(hook);
Poll::Pending
},
)
.map(|r| {
r.map_err(|err| match err {
TryRecvTimeoutError::Disconnected => RecvError::Disconnected,
_ => unreachable!(),
})
})
}
}

Expand Down Expand Up @@ -526,7 +549,8 @@ impl<'a, T> Stream for RecvStream<'a, T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_inner(cx, true) { // stream = true
match Pin::new(&mut self.0).poll_inner(cx, true) {
// stream = true
Poll::Pending => Poll::Pending,
Poll::Ready(item) => {
self.0.reset_hook();
Expand Down
Loading

0 comments on commit b317170

Please sign in to comment.