Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryHandle: automatically scheduled wake-up call #8017

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5581,6 +5581,7 @@ dependencies = [
"anyhow",
"itertools 0.13.0",
"nohash-hasher",
"rayon",
"re_arrow2",
"re_chunk",
"re_chunk_store",
Expand All @@ -5591,6 +5592,8 @@ dependencies = [
"re_types",
"re_types_core",
"similar-asserts",
"tokio",
"tokio-stream",
"unindent",
]

Expand Down Expand Up @@ -8378,6 +8381,7 @@ dependencies = [
"libc",
"mio 1.0.2",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
Expand Down
9 changes: 7 additions & 2 deletions crates/store/re_dataframe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ repository.workspace = true
rust-version.workspace = true
version.workspace = true


[lints]
workspace = true


[package.metadata.docs.rs]
all-features = true

Expand All @@ -32,15 +34,18 @@ re_log_types.workspace = true
re_query.workspace = true
re_tracing.workspace = true
re_types_core.workspace = true

# External dependencies:
anyhow.workspace = true
arrow2.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
rayon.workspace = true

[dev-dependencies]
# Rerun dependencies:
re_types.workspace = true

# External dependencies:
similar-asserts.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] }
tokio-stream.workspace = true
unindent.workspace = true
222 changes: 217 additions & 5 deletions crates/store/re_dataframe/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -811,16 +811,41 @@ impl<E: StorageEngineLike> QueryHandle<E> {
/// // …
/// }
/// ```
#[cfg(not(target_arch = "wasm32"))]
pub fn next_row_async(
&self,
) -> impl std::future::Future<Output = Option<Vec<Box<dyn ArrowArray>>>> {
) -> impl std::future::Future<Output = Option<Vec<Box<dyn ArrowArray>>>>
where
E: 'static + Send + Clone,
{
let res: Option<Option<_>> = self
.engine
.try_with(|store, cache| self._next_row(store, cache));

std::future::poll_fn(move |_cx| match &res {
let engine = self.engine.clone();
std::future::poll_fn(move |cx| match &res {
Some(row) => std::task::Poll::Ready(row.clone()),
None => std::task::Poll::Pending,
None => {
// The lock is already held by a writer, we have to yield control back to the async
// runtime, for now.
// Before we do so, we need to schedule a callback that will be in charge of waking up
// the async task once we can possibly make progress once again.

// Commenting out this code should make the `async_barebones` test deadlock.
rayon::spawn({
let engine = engine.clone();
let waker = cx.waker().clone();
move || {
engine.with(|_store, _cache| {
// This is of course optimistic -- we might end up right back here on
// next tick. That's fine.
waker.wake();
});
}
});

std::task::Poll::Pending
}
})
}

Expand Down Expand Up @@ -1207,7 +1232,11 @@ impl<E: StorageEngineLike> QueryHandle<E> {
}

#[inline]
pub async fn next_row_batch_async(&self) -> Option<RecordBatch> {
#[cfg(not(target_arch = "wasm32"))]
pub async fn next_row_batch_async(&self) -> Option<RecordBatch>
where
E: 'static + Send + Clone,
{
let row = self.next_row_async().await?;

// If we managed to get a row, then the state must be initialized already.
Expand Down Expand Up @@ -1254,7 +1283,9 @@ impl<E: StorageEngineLike> QueryHandle<E> {
mod tests {
use std::sync::Arc;

use re_chunk::{util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint};
use re_chunk::{
util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint, TransportChunk,
};
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ResolvedTimeRange, TimeInt,
};
Expand All @@ -1263,6 +1294,7 @@ mod tests {
example_components::{MyColor, MyLabel, MyPoint},
EntityPath, Timeline,
};
use re_query::StorageEngine;
use re_types::components::ClearIsRecursive;
use re_types_core::Loggable as _;

Expand Down Expand Up @@ -2436,6 +2468,186 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn async_barebones() -> anyhow::Result<()> {
use tokio_stream::StreamExt as _;

re_log::setup_logging();

/// Wraps a [`QueryHandle`] in a [`Stream`].
pub struct QueryHandleStream(pub QueryHandle<StorageEngine>);

impl tokio_stream::Stream for QueryHandleStream {
type Item = TransportChunk;

#[inline]
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let fut = self.0.next_row_batch_async();
let fut = std::pin::pin!(fut);

use std::future::Future;
fut.poll(cx)
}
}

let store = ChunkStoreHandle::new(create_nasty_store()?);
eprintln!("{store}");
let query_cache = QueryCache::new_handle(store.clone());
let query_engine = QueryEngine::new(store.clone(), query_cache.clone());

let engine_guard = query_engine.engine.write_arc();

let filtered_index = Some(Timeline::new_sequence("frame_nr"));

// static
let handle_static = tokio::spawn({
let query_engine = query_engine.clone();
async move {
let query = QueryExpression::default();
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
assert_eq!(
QueryHandleStream(query_engine.query(query.clone()))
.collect::<Vec<_>>()
.await
.len() as u64,
query_handle.num_rows()
);
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&QueryHandleStream(query_engine.query(query.clone()))
.collect::<Vec<_>>()
.await,
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
let expected = unindent::unindent(
"\
[
Int64[None],
Timestamp(Nanosecond, None)[None],
ListArray[None],
ListArray[[c]],
ListArray[None],
]\
",
);

similar_asserts::assert_eq!(expected, got);

Ok::<_, anyhow::Error>(())
}
});

// temporal
let handle_temporal = tokio::spawn({
async move {
let query = QueryExpression {
filtered_index,
..Default::default()
};
eprintln!("{query:#?}:");

let query_handle = query_engine.query(query.clone());
assert_eq!(
QueryHandleStream(query_engine.query(query.clone()))
.collect::<Vec<_>>()
.await
.len() as u64,
query_handle.num_rows()
);
let dataframe = concatenate_record_batches(
query_handle.schema().clone(),
&QueryHandleStream(query_engine.query(query.clone()))
.collect::<Vec<_>>()
.await,
)?;
eprintln!("{dataframe}");

let got = format!("{:#?}", dataframe.data.iter().collect_vec());
let expected = unindent::unindent(
"\
[
Int64[10, 20, 30, 40, 50, 60, 70],
Timestamp(Nanosecond, None)[1970-01-01 00:00:00.000000010, None, None, None, 1970-01-01 00:00:00.000000050, None, 1970-01-01 00:00:00.000000070],
ListArray[None, None, [2], [3], [4], None, [6]],
ListArray[[c], [c], [c], [c], [c], [c], [c]],
ListArray[[{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [{x: 5, y: 5}], [{x: 8, y: 8}]],
]\
"
);

similar_asserts::assert_eq!(expected, got);

Ok::<_, anyhow::Error>(())
}
});

let (tx, rx) = tokio::sync::oneshot::channel::<()>();

let handle_queries = tokio::spawn(async move {
let mut handle_static = std::pin::pin!(handle_static);
let mut handle_temporal = std::pin::pin!(handle_temporal);

// Poll the query handles, just once.
//
// Because the storage engine is already held by a writer, this will put them in a pending state,
// waiting to be woken up. If nothing wakes them up, then this will simply deadlock.
{
// Although it might look scary, all we're doing is crafting a noop waker manually,
// because `std::task::Waker::noop` is unstable.
//
// We'll use this to build a noop async context, so that we can poll our promises
// manually.
const RAW_WAKER_NOOP: std::task::RawWaker = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's some voodoo!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup... 😞

const VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(
|_| RAW_WAKER_NOOP, // Cloning just returns a new no-op raw waker
|_| {}, // `wake` does nothing
|_| {}, // `wake_by_ref` does nothing
|_| {}, // Dropping does nothing as we don't allocate anything
);
std::task::RawWaker::new(std::ptr::null(), &VTABLE)
};

#[allow(unsafe_code)]
let mut cx = std::task::Context::from_waker(
// Safety: a Waker is just a privacy-preserving wrapper around a RawWaker.
unsafe {
std::mem::transmute::<&std::task::RawWaker, &std::task::Waker>(
&RAW_WAKER_NOOP,
)
},
);

use std::future::Future as _;
assert!(handle_static.as_mut().poll(&mut cx).is_pending());
assert!(handle_temporal.as_mut().poll(&mut cx).is_pending());
}

tx.send(()).unwrap();

handle_static.await??;
handle_temporal.await??;

Ok::<_, anyhow::Error>(())
});

rx.await?;

// Release the writer: the queries should now be able to stream to completion, provided
// that _something_ wakes them up appropriately.
drop(engine_guard);

handle_queries.await??;

Ok(())
}

/// Returns a very nasty [`ChunkStore`] with all kinds of partial updates, chunk overlaps,
/// repeated timestamps, duplicated chunks, partial multi-timelines, flat and recursive clears, etc.
fn create_nasty_store() -> anyhow::Result<ChunkStore> {
Expand Down
Loading