Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task not get rescheduled randomly #22

Open
sgdxbc opened this issue Mar 22, 2022 · 1 comment
Open

Task not get rescheduled randomly #22

sgdxbc opened this issue Mar 22, 2022 · 1 comment

Comments

@sgdxbc
Copy link

sgdxbc commented Mar 22, 2022

Code for reproduce:

use std::{
    cell::RefCell,
    thread::{self, sleep},
    time::{Duration, Instant},
};

use async_task::{Runnable, Task};
use futures::{channel::mpsc, Future, StreamExt};

thread_local! {
    static RUNNABLE_LIST: RefCell<Vec<Runnable>> = RefCell::new(Vec::new());
}

fn poll_once() -> usize {
    let runnable_list: Vec<_> =
        RUNNABLE_LIST.with(|runnable_list| runnable_list.borrow_mut().drain(..).collect());
    let count = runnable_list.len();
    for runnable in runnable_list {
        let waker = runnable.waker();
        runnable.run();
        waker.wake();
    }
    count
}

fn spawn(task: impl Future<Output = ()> + Send + 'static) -> Task<()> {
    let (runnable, handle) = async_task::spawn(task, |runnable| {
        RUNNABLE_LIST.with(|runnable_list| runnable_list.borrow_mut().push(runnable));
    });
    runnable.schedule();
    handle
}

fn main() {
    let (tx, mut rx) = mpsc::unbounded();
    let handle = spawn(async move {
        loop {
            let _: () = rx.next().await.unwrap();
        }
    });

    thread::spawn(move || loop {
        sleep(Duration::from_millis(20));
        tx.unbounded_send(()).unwrap();
    });

    let start = Instant::now();
    while Instant::now() - start < Duration::from_millis(1 * 1000) {
        // assert_eq!(poll_once(), 1);
        poll_once();
    }
    assert_eq!(poll_once(), 1);
    drop(handle);
}

I understand wake up runnable immediately unconditionally will waste a lot of CPU, but I have a good reason to do it in my case.

The problem here is poll_once returns 0 after random duration, and probably cannot last for 1 second. That means the task disappeared in the system at some instant. According to my observation it never appear again after that, even when channel tx send new message.

One interesting point is that when simplify async task into futures::pending() it never disappear. Is there anything I did incorrect?

@sbarral
Copy link

sbarral commented Aug 5, 2022

I could indeed reproduce this, but I have a theory: I suspect this might happen if the call to waker.wake() happens while the task is being concurrently woken by the sender. When this happens, the first caller to wake (in this case the sender thread) is responsible for scheduling the task.

In such scenario, the task may not be visible in the queue yet the next time poll_once is called, possibly because the sender was pre-empted before it could re-schedule the task, or because of inter-thread communication latency, or simply because the scheduling process takes more instructions. So my hunch is that this is a concurrency bug in the user code rather than in async-task.

Edit: I just realized that the queue is thread-local, so what happens is probably simpler. If at any moment the receiver finds an empty channel, it registers its waker and the next time the sender sends an item, the task is pushed to wrong queue (i.e. the queue owned by the sender thread).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants