Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lost message when selecing on MPMC in loop from multiple threads #135

Open
asonix opened this issue Nov 25, 2023 · 2 comments
Open

Lost message when selecing on MPMC in loop from multiple threads #135

asonix opened this issue Nov 25, 2023 · 2 comments

Comments

@asonix
Copy link

asonix commented Nov 25, 2023

I have tried to make this example as minimal as I possibly could, but basically this program will hang after a short time on my computer (intel i7 1165g7 + linux 6.6.2 + glibc)

The basic premise is this:

  • a single thread loops, sending messages into a channel
  • sometimes it spawns background threads, which select on that channel and a "signal" channel
  • sometimes it sends a message to the signal channel, which causes one background thread to exit
  • it cycles between spawning up to 3 threads and reaping down to 1 thread
  • the message sent to MPMC is itself a channel that sends on drop
  • the main thread waits for one of the background threads to drop each message before sending another

I find that very often, during the first thread reaping cycle, a message that is sent is never dropped, causing the main thread to never progress, even though there still exist threads that are selecting on the channel. I believe this is due to a race in the Selector implementation

without further ado, here's the code:

fn main() {
    let (sender, receiver) = flume::bounded(8);

    let mut signals: Vec<flume::Sender<()>> = Vec::new();

    let mut launch = true;

    for i in 0u64.. {
        if i % 100000 == 0 {
            println!("looping");
        }

        if i % 10000 == 0 {
            if signals.len() >= 3 || !launch {
                launch = false;

                if let Some(signal_tx) = signals.pop() {
                    signal_tx.send(()).expect("Sent");
                    drop(signal_tx);
                }
            }

            if signals.len() <= 1 || launch {
                launch = true;

                let (signal_tx, signal) = flume::bounded(1);

                signals.push(signal_tx);

                let rx2 = receiver.clone();
                std::thread::spawn(move || {
                    println!("Launching thread");
                    while !race(&rx2, &signal) {
                        // spin
                    }
                    println!("Closing thread");
                });
            }
        }

        let (dropper_tx, dropper) = flume::bounded(1);

        sender.send(Dropper { sender: dropper_tx }).expect("sent");

        dropper.recv().expect("received");
    }
}

struct Dropper {
    sender: flume::Sender<()>,
}

impl Drop for Dropper {
    fn drop(&mut self) {
        self.sender.send(()).expect("sent");
    }
}

fn race(receiver: &flume::Receiver<Dropper>, signal: &flume::Receiver<()>) -> bool {
    flume::Selector::new()
        .recv(receiver, |res| {
            let out = res.is_err();
            drop(res);
            out
        })
        .recv(signal, |_res| true)
        .wait()
}
@asonix
Copy link
Author

asonix commented Nov 25, 2023

edit: i'm very sorry the formatting of folded code is the way it is

I'll add that the program does not exhibit this behavior when flume::Selector is replaced by a simple async block_on + select + recv_async

I'll provide a simple working example here. This is what a working race looks like:

fn race(receiver: &flume::Receiver<Dropper>, signal: &flume::Receiver<()>) -> bool {
    match selector::blocking_select(receiver.recv_async(), signal.recv_async()) {
        selector::Either::Left(res) => {
            let out = res.is_err();
            drop(res);
            out
        }
        selector::Either::Right(_res) => true,
    }
}
implementation of simple async executor behind fold ```rust use std::{ future::Future, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, Arc, }, task::{Context, Poll, Wake, Waker}, };

struct ThreadWaker {
thread: std::thread::Thread,
}

impl Wake for ThreadWaker {
fn wake(self: Arc) {
self.thread.unpark();
}

fn wake_by_ref(self: &Arc<Self>) {
    self.thread.unpark();
}

}

pub(super) enum Either<L, R> {
Left(L),
Right(R),
}

struct Select<F1, F2> {
left: F1,
left_woken: Arc,

right: F2,
right_woken: Arc<AtomicBool>,

}

struct SelectWaker {
inner: Waker,
flag: Arc,
}

impl Wake for SelectWaker {
fn wake_by_ref(self: &Arc) {
self.flag.store(true, Ordering::Release);

    self.inner.wake_by_ref();
}

fn wake(self: Arc<Self>) {
    self.flag.store(true, Ordering::Release);

    match Arc::try_unwrap(self) {
        Ok(this) => this.inner.wake(),
        Err(this) => this.inner.wake_by_ref(),
    }
}

}

impl<F1, F2> Future for Select<F1, F2>
where
F1: Future + Unpin,
F2: Future + Unpin,
{
type Output = Either<F1::Output, F2::Output>;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let left_waker = Arc::new(SelectWaker {
        inner: cx.waker().clone(),
        flag: self.left_woken.clone(),
    })
    .into();

    let mut left_ctx = Context::from_waker(&left_waker);

    if let Poll::Ready(left_out) = Pin::new(&mut self.left).poll(&mut left_ctx) {
        return Poll::Ready(Either::Left(left_out));
    }

    let right_waker = Arc::new(SelectWaker {
        inner: cx.waker().clone(),
        flag: self.right_woken.clone(),
    })
    .into();

    let mut right_ctx = Context::from_waker(&right_waker);

    if let Poll::Ready(right_out) = Pin::new(&mut self.right).poll(&mut right_ctx) {
        return Poll::Ready(Either::Right(right_out));
    }

    Poll::Pending
}

}

pub(super) fn blocking_select<Left, Right>(
left: Left,
right: Right,
) -> Either<Left::Output, Right::Output>
where
Left: Future,
Right: Future,
{
block_on(select(left, right))
}

fn block_on(fut: F) -> F::Output
where
F: Future,
{
let thread_waker = Arc::new(ThreadWaker {
thread: std::thread::current(),
})
.into();

let mut ctx = Context::from_waker(&thread_waker);

let mut fut = std::pin::pin!(fut);

loop {
    if let Poll::Ready(out) = fut.as_mut().poll(&mut ctx) {
        return out;
    }

    // doesn't race - unpark followed by park will result in park returning immediately
    std::thread::park();
}

}

async fn select<Left, Right>(left: Left, right: Right) -> Either<Left::Output, Right::Output>
where
Left: Future,
Right: Future,
{
let left = std::pin::pin!(left);
let right = std::pin::pin!(right);

Select {
    left,
    left_woken: Arc::new(AtomicBool::new(true)),

    right,
    right_woken: Arc::new(AtomicBool::new(true)),
}
.await

}

</summary>

@asonix
Copy link
Author

asonix commented Nov 25, 2023

I've put all the demo code in a repo on my forgejo: https://git.asonix.dog/asonix/flume-deadlock

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant