From 464b26634ca2cb881deff1787ec759e11b081097 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 6 Nov 2024 11:31:31 +0100 Subject: [PATCH 1/6] add async unit test without contention (works as expected) --- Cargo.lock | 4 + crates/store/re_dataframe/Cargo.toml | 13 ++- crates/store/re_dataframe/src/query.rs | 125 ++++++++++++++++++++++++- 3 files changed, 139 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ade2a683a34..1ecd15633953 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5581,6 +5581,7 @@ dependencies = [ "anyhow", "itertools 0.13.0", "nohash-hasher", + "rayon", "re_arrow2", "re_chunk", "re_chunk_store", @@ -5591,6 +5592,8 @@ dependencies = [ "re_types", "re_types_core", "similar-asserts", + "tokio", + "tokio-stream", "unindent", ] @@ -8378,6 +8381,7 @@ dependencies = [ "libc", "mio 1.0.2", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/crates/store/re_dataframe/Cargo.toml b/crates/store/re_dataframe/Cargo.toml index 1370ca068596..f88dc0e92d1f 100644 --- a/crates/store/re_dataframe/Cargo.toml +++ b/crates/store/re_dataframe/Cargo.toml @@ -12,9 +12,11 @@ repository.workspace = true rust-version.workspace = true version.workspace = true + [lints] workspace = true + [package.metadata.docs.rs] all-features = true @@ -32,15 +34,22 @@ re_log_types.workspace = true re_query.workspace = true re_tracing.workspace = true re_types_core.workspace = true - # External dependencies: anyhow.workspace = true arrow2.workspace = true itertools.workspace = true nohash-hasher.workspace = true +rayon.workspace = true [dev-dependencies] +# Rerun dependencies: re_types.workspace = true - +# External dependencies: similar-asserts.workspace = true +tokio = { workspace = true, features = [ + "macros", + "rt-multi-thread", + "signal", +] } +tokio-stream.workspace = true unindent.workspace = true diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 2ee6c5aaaf27..6979cad68d83 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -1254,7 +1254,9 @@ impl QueryHandle { mod tests { use std::sync::Arc; - use re_chunk::{util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint}; + use re_chunk::{ + util::concatenate_record_batches, Chunk, ChunkId, RowId, TimePoint, TransportChunk, + }; use re_chunk_store::{ ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ResolvedTimeRange, TimeInt, }; @@ -1263,6 +1265,7 @@ mod tests { example_components::{MyColor, MyLabel, MyPoint}, EntityPath, Timeline, }; + use re_query::StorageEngine; use re_types::components::ClearIsRecursive; use re_types_core::Loggable as _; @@ -2436,6 +2439,126 @@ mod tests { Ok(()) } + #[tokio::test] + async fn async_barebones() -> anyhow::Result<()> { + use tokio_stream::StreamExt as _; + + re_log::setup_logging(); + + /// Wraps a [`QueryHandle`] in a [`Stream`]. + pub struct QueryHandleStream(pub QueryHandle); + impl tokio_stream::Stream for QueryHandleStream { + type Item = TransportChunk; + + #[inline] + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let fut = self.0.next_row_batch_async(); + let fut = std::pin::pin!(fut); + + use std::future::Future; + fut.poll(cx) + } + } + + let store = ChunkStoreHandle::new(create_nasty_store()?); + eprintln!("{store}"); + let query_cache = QueryCache::new_handle(store.clone()); + let query_engine = QueryEngine::new(store.clone(), query_cache.clone()); + + let filtered_index = Some(Timeline::new_sequence("frame_nr")); + + // static + tokio::spawn({ + let query_engine = query_engine.clone(); + async move { + let query = QueryExpression::default(); + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + QueryHandleStream(query_engine.query(query.clone())) + .collect::>() + .await + .len() as u64, + query_handle.num_rows() + ); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &QueryHandleStream(query_engine.query(query.clone())) + .collect::>() + .await, + )?; + eprintln!("{dataframe}"); + + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + Int64[None], + Timestamp(Nanosecond, None)[None], + ListArray[None], + ListArray[[c]], + ListArray[None], + ]\ + ", + ); + + similar_asserts::assert_eq!(expected, got); + + Ok::<_, anyhow::Error>(()) + } + }); + + // temporal + tokio::spawn({ + async move { + let query = QueryExpression { + filtered_index, + ..Default::default() + }; + eprintln!("{query:#?}:"); + + let query_handle = query_engine.query(query.clone()); + assert_eq!( + QueryHandleStream(query_engine.query(query.clone())) + .collect::>() + .await + .len() as u64, + query_handle.num_rows() + ); + let dataframe = concatenate_record_batches( + query_handle.schema().clone(), + &QueryHandleStream(query_engine.query(query.clone())) + .collect::>() + .await, + )?; + eprintln!("{dataframe}"); + + let got = format!("{:#?}", dataframe.data.iter().collect_vec()); + let expected = unindent::unindent( + "\ + [ + Int64[10, 20, 30, 40, 50, 60, 70], + Timestamp(Nanosecond, None)[1970-01-01 00:00:00.000000010, None, None, None, 1970-01-01 00:00:00.000000050, None, 1970-01-01 00:00:00.000000070], + ListArray[None, None, [2], [3], [4], None, [6]], + ListArray[[c], [c], [c], [c], [c], [c], [c]], + ListArray[[{x: 0, y: 0}], [{x: 1, y: 1}], [{x: 2, y: 2}], [{x: 3, y: 3}], [{x: 4, y: 4}], [{x: 5, y: 5}], [{x: 8, y: 8}]], + ]\ + " + ); + + similar_asserts::assert_eq!(expected, got); + + Ok::<_, anyhow::Error>(()) + } + }); + + Ok(()) + } + /// Returns a very nasty [`ChunkStore`] with all kinds of partial updates, chunk overlaps, /// repeated timestamps, duplicated chunks, partial multi-timelines, flat and recursive clears, etc. fn create_nasty_store() -> anyhow::Result { From 139767ba451a2f288741483e5c2f5dd1c4d71b4c Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 6 Nov 2024 12:49:33 +0100 Subject: [PATCH 2/6] demonstrate the issue when ending in a pending state (this deadlocks) --- crates/store/re_dataframe/src/query.rs | 68 ++++++++++++++++++++++++-- 1 file changed, 65 insertions(+), 3 deletions(-) diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 6979cad68d83..46d7034ece3e 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -1207,7 +1207,10 @@ impl QueryHandle { } #[inline] - pub async fn next_row_batch_async(&self) -> Option { + pub async fn next_row_batch_async(&self) -> Option + where + E: 'static + Send + Clone, + { let row = self.next_row_async().await?; // If we managed to get a row, then the state must be initialized already. @@ -2468,10 +2471,12 @@ mod tests { let query_cache = QueryCache::new_handle(store.clone()); let query_engine = QueryEngine::new(store.clone(), query_cache.clone()); + let engine_guard = query_engine.engine.write_arc(); + let filtered_index = Some(Timeline::new_sequence("frame_nr")); // static - tokio::spawn({ + let handle_static = tokio::spawn({ let query_engine = query_engine.clone(); async move { let query = QueryExpression::default(); @@ -2513,7 +2518,7 @@ mod tests { }); // temporal - tokio::spawn({ + let handle_temporal = tokio::spawn({ async move { let query = QueryExpression { filtered_index, @@ -2556,6 +2561,63 @@ mod tests { } }); + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + + let handle_queries = tokio::spawn(async move { + let mut handle_static = std::pin::pin!(handle_static); + let mut handle_temporal = std::pin::pin!(handle_temporal); + + // Poll the query handles, just once. + // + // Because the storage engine is already held by a writer, this will put them in a pending state, + // waiting to be woken up. If nothing wakes them up, then this will simply deadlock. + { + // Although it might look scary, all we're doing is crafting a noop waker manually, + // because `std::task::Waker::noop` is unstable. + // + // We'll use this to build a noop async context, so that we can poll our promises + // manually. + const RAW_WAKER_NOOP: std::task::RawWaker = { + const VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new( + |_| RAW_WAKER_NOOP, // Cloning just returns a new no-op raw waker + |_| {}, // `wake` does nothing + |_| {}, // `wake_by_ref` does nothing + |_| {}, // Dropping does nothing as we don't allocate anything + ); + std::task::RawWaker::new(std::ptr::null(), &VTABLE) + }; + + #[allow(unsafe_code)] + let mut cx = std::task::Context::from_waker( + // Safety: a Waker is just a privacy-preserving wrapper around a RawWaker. + unsafe { + std::mem::transmute::<&std::task::RawWaker, &std::task::Waker>( + &RAW_WAKER_NOOP, + ) + }, + ); + + use std::future::Future as _; + assert!(handle_static.as_mut().poll(&mut cx).is_pending()); + assert!(handle_temporal.as_mut().poll(&mut cx).is_pending()); + } + + tx.send(()).unwrap(); + + handle_static.await??; + handle_temporal.await??; + + Ok::<_, anyhow::Error>(()) + }); + + rx.await?; + + // Release the writer: the queries should now be able to stream to completion, provided + // that _something_ wakes them up appropriately. + drop(engine_guard); + + handle_queries.await??; + Ok(()) } From 677d99e36b4b74097207a673c0879d483a2fcafd Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 6 Nov 2024 12:52:09 +0100 Subject: [PATCH 3/6] properly schedule waking job (fixes deadlock) --- crates/store/re_dataframe/src/query.rs | 30 +++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 46d7034ece3e..6920adf19810 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -813,14 +813,38 @@ impl QueryHandle { /// ``` pub fn next_row_async( &self, - ) -> impl std::future::Future>>> { + ) -> impl std::future::Future>>> + where + E: 'static + Send + Clone, + { let res: Option> = self .engine .try_with(|store, cache| self._next_row(store, cache)); - std::future::poll_fn(move |_cx| match &res { + let engine = self.engine.clone(); + std::future::poll_fn(move |cx| match &res { Some(row) => std::task::Poll::Ready(row.clone()), - None => std::task::Poll::Pending, + None => { + // The lock is already held by a writer, we have to yield control back to the async + // runtime, for now. + // Before we do so, we need to schedule a callback that will be in charge of waking up + // the async task once we can possibly make progress once again. + + // Commenting out this code should make the `async_barebones` test deadlock. + rayon::spawn({ + let engine = engine.clone(); + let waker = cx.waker().clone(); + move || { + engine.with(|_store, _cache| { + // This is of course optimistic -- we might end up right back here on + // next tick. That's fine. + waker.wake(); + }); + } + }); + + std::task::Poll::Pending + } }) } From 300a2f62a7ccf0bee139ae82bfc4a2aff3f9d347 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 6 Nov 2024 12:57:23 +0100 Subject: [PATCH 4/6] lint --- crates/store/re_dataframe/src/query.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 6920adf19810..140bffdacc06 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -2474,6 +2474,7 @@ mod tests { /// Wraps a [`QueryHandle`] in a [`Stream`]. pub struct QueryHandleStream(pub QueryHandle); + impl tokio_stream::Stream for QueryHandleStream { type Item = TransportChunk; From e172d6d64628a07119d3b9d32467ffba8bbd51fe Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 6 Nov 2024 13:00:15 +0100 Subject: [PATCH 5/6] fmt --- crates/store/re_dataframe/Cargo.toml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/store/re_dataframe/Cargo.toml b/crates/store/re_dataframe/Cargo.toml index f88dc0e92d1f..eb15df0a9b11 100644 --- a/crates/store/re_dataframe/Cargo.toml +++ b/crates/store/re_dataframe/Cargo.toml @@ -46,10 +46,6 @@ rayon.workspace = true re_types.workspace = true # External dependencies: similar-asserts.workspace = true -tokio = { workspace = true, features = [ - "macros", - "rt-multi-thread", - "signal", -] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "signal"] } tokio-stream.workspace = true unindent.workspace = true From 06cb77377bae5d46b50d5009ba59f3f369963f5f Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 6 Nov 2024 13:45:07 +0100 Subject: [PATCH 6/6] we'll add support for wasm32 when we need to (if ever) --- crates/store/re_dataframe/src/query.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 140bffdacc06..6ad5e8ce9f00 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -811,6 +811,7 @@ impl QueryHandle { /// // … /// } /// ``` + #[cfg(not(target_arch = "wasm32"))] pub fn next_row_async( &self, ) -> impl std::future::Future>>> @@ -1231,6 +1232,7 @@ impl QueryHandle { } #[inline] + #[cfg(not(target_arch = "wasm32"))] pub async fn next_row_batch_async(&self) -> Option where E: 'static + Send + Clone,