diff --git a/Cargo.lock b/Cargo.lock index a4f63ea2a72b..84941ae3a226 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10169,6 +10169,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-wal", + "datafusion-physical-plan 37.0.0", "datatypes", "derive_builder 0.12.0", "futures", diff --git a/Cargo.toml b/Cargo.toml index 13a62f6682c3..f3f99f9c63bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,6 +110,7 @@ datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } +datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" } derive_builder = "0.12" diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 04af03ec857d..327e1be46256 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -23,7 +23,6 @@ use common_function::function::FunctionRef; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::prelude::ScalarUdf; use common_query::Output; -use common_recordbatch::SendableRecordBatchStream; use common_runtime::Runtime; use query::dataframe::DataFrame; use query::plan::LogicalPlan; @@ -32,7 +31,7 @@ use query::query_engine::DescribeResult; use query::{QueryEngine, QueryEngineContext}; use session::context::QueryContextRef; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use table::TableRef; @@ -193,7 +192,7 @@ impl RegionEngine for MockRegionEngine { &self, _region_id: RegionId, _request: ScanRequest, - ) -> Result { + ) -> Result { unimplemented!() } diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index fa4f2c5a3f0d..e0a3a6ebdc42 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -25,7 +25,9 @@ use common_telemetry::{error, info}; use object_store::ObjectStore; use snafu::{ensure, OptionExt}; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{ + RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner, +}; use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest, RegionRequest, @@ -49,6 +51,20 @@ impl FileRegionEngine { inner: Arc::new(EngineInner::new(object_store)), } } + + async fn handle_query( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { + self.inner + .get_region(region_id) + .await + .context(RegionNotFoundSnafu { region_id }) + .map_err(BoxedError::new)? + .query(request) + .map_err(BoxedError::new) + } } #[async_trait] @@ -72,14 +88,10 @@ impl RegionEngine for FileRegionEngine { &self, region_id: RegionId, request: ScanRequest, - ) -> Result { - self.inner - .get_region(region_id) - .await - .context(RegionNotFoundSnafu { region_id }) - .map_err(BoxedError::new)? - .query(request) - .map_err(BoxedError::new) + ) -> Result { + let stream = self.handle_query(region_id, request).await?; + let scanner = Arc::new(SinglePartitionScanner::new(stream)); + Ok(scanner) } async fn get_metadata(&self, region_id: RegionId) -> Result { diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 1cf36f5661a0..20c7e0d050ae 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -35,7 +35,9 @@ use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use store_api::metadata::RegionMetadataRef; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{ + RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner, +}; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -155,16 +157,14 @@ impl RegionEngine for MetricEngine { }) } - /// Handles substrait query and return a stream of record batches async fn handle_query( &self, region_id: RegionId, request: ScanRequest, - ) -> Result { - self.inner - .read_region(region_id, request) - .await - .map_err(BoxedError::new) + ) -> Result { + let stream = self.handle_query(region_id, request).await?; + let scanner = Arc::new(SinglePartitionScanner::new(stream)); + Ok(scanner) } /// Retrieves region's metadata. @@ -251,6 +251,18 @@ impl MetricEngine { .logical_regions(physical_region_id) .await } + + /// Handles substrait query and return a stream of record batches + async fn handle_query( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { + self.inner + .read_region(region_id, request) + .await + .map_err(BoxedError::new) + } } struct MetricEngineInner { diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index 6093d41cd2be..ed4d6b6e4f7a 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -62,7 +62,7 @@ impl MetricEngineInner { .start_timer(); self.mito - .handle_query(region_id, request) + .scan_to_stream(region_id, request) .await .context(MitoReadOperationSnafu) } @@ -82,7 +82,7 @@ impl MetricEngineInner { .transform_request(physical_region_id, logical_region_id, request) .await?; self.mito - .handle_query(data_region_id, request) + .scan_to_stream(data_region_id, request) .await .context(MitoReadOperationSnafu) } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 73ad6e9b3286..6c869c6e3f2f 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -300,7 +300,7 @@ impl MetadataRegion { let scan_req = Self::build_read_request(key); let record_batch_stream = self .mito - .handle_query(region_id, scan_req) + .scan_to_stream(region_id, scan_req) .await .context(MitoReadOperationSnafu)?; let scan_result = collect(record_batch_stream) @@ -317,7 +317,7 @@ impl MetadataRegion { let scan_req = Self::build_read_request(key); let record_batch_stream = self .mito - .handle_query(region_id, scan_req) + .scan_to_stream(region_id, scan_req) .await .context(MitoReadOperationSnafu)?; let scan_result = collect(record_batch_stream) @@ -351,7 +351,7 @@ impl MetadataRegion { }; let record_batch_stream = self .mito - .handle_query(region_id, scan_req) + .scan_to_stream(region_id, scan_req) .await .context(MitoReadOperationSnafu)?; let scan_result = collect(record_batch_stream) @@ -590,7 +590,7 @@ mod test { let scan_req = MetadataRegion::build_read_request("test_key"); let record_batch_stream = metadata_region .mito - .handle_query(region_id, scan_req) + .scan_to_stream(region_id, scan_req) .await .unwrap(); let scan_result = collect(record_batch_stream).await.unwrap(); diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 71954678ad04..e0223a5585ee 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -62,7 +62,7 @@ use object_store::manager::ObjectStoreManagerRef; use snafu::{ensure, OptionExt, ResultExt}; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse}; use store_api::region_request::{AffectedRows, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::oneshot; @@ -115,11 +115,35 @@ impl MitoEngine { Ok(region.region_usage().await) } + /// Handle substrait query and return a stream of record batches + #[tracing::instrument(skip_all)] + pub async fn scan_to_stream( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> std::result::Result { + self.scanner(region_id, request) + .map_err(BoxedError::new)? + .scan() + .await + .map_err(BoxedError::new) + } + /// Returns a scanner to scan for `request`. fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result { self.scan_region(region_id, request)?.scanner() } + /// Returns a region scanner to scan the region for `request`. + async fn region_scanner( + &self, + region_id: RegionId, + request: ScanRequest, + ) -> Result { + let scanner = self.scanner(region_id, request)?; + scanner.region_scanner().await + } + /// Scans a region. fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result { self.inner.handle_query(region_id, request) @@ -312,16 +336,13 @@ impl RegionEngine for MitoEngine { .map_err(BoxedError::new) } - /// Handle substrait query and return a stream of record batches #[tracing::instrument(skip_all)] async fn handle_query( &self, region_id: RegionId, request: ScanRequest, - ) -> std::result::Result { - self.scanner(region_id, request) - .map_err(BoxedError::new)? - .scan() + ) -> Result { + self.region_scanner(region_id, request) .await .map_err(BoxedError::new) } diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index ba25cc1a638e..29f1ecc18895 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -245,7 +245,7 @@ async fn test_put_after_alter() { | | b | 2.0 | 1970-01-01T00:00:02 | +-------+-------+---------+---------------------+"; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, batches.pretty_print().unwrap()); } diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index d1fc41739034..85509094fed7 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -63,7 +63,7 @@ async fn test_append_mode_write_query() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -183,7 +183,7 @@ async fn test_append_mode_compaction() { // Reopens the region. reopen_region(&engine, region_id, region_dir, false, region_opts).await; let stream = engine - .handle_query(region_id, ScanRequest::default()) + .scan_to_stream(region_id, ScanRequest::default()) .await .unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index a0f6b6df441b..dbe33ff37f89 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -128,7 +128,7 @@ async fn test_region_replay() { assert_eq!(0, result.affected_rows); let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::()); @@ -166,7 +166,7 @@ async fn test_write_query_region() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -227,7 +227,7 @@ async fn test_different_order() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+-------+---------+---------+---------------------+ @@ -289,7 +289,7 @@ async fn test_different_order_and_type() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+-------+---------+---------+---------------------+ @@ -341,7 +341,7 @@ async fn test_put_delete() { delete_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -383,7 +383,7 @@ async fn test_delete_not_null_fields() { delete_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -398,7 +398,7 @@ async fn test_delete_not_null_fields() { // Reopen and scan again. reopen_region(&engine, region_id, region_dir, false, HashMap::new()).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, batches.pretty_print().unwrap()); } @@ -447,7 +447,7 @@ async fn test_put_overwrite() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -688,7 +688,7 @@ async fn test_cache_null_primary_key() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+-------+---------+---------------------+ diff --git a/src/mito2/src/engine/catchup_test.rs b/src/mito2/src/engine/catchup_test.rs index b9779b2ea1c4..3d0a04017ef2 100644 --- a/src/mito2/src/engine/catchup_test.rs +++ b/src/mito2/src/engine/catchup_test.rs @@ -104,7 +104,7 @@ async fn test_catchup_with_last_entry_id() { // Scans let request = ScanRequest::default(); let stream = follower_engine - .handle_query(region_id, request) + .scan_to_stream(region_id, request) .await .unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -264,7 +264,7 @@ async fn test_catchup_without_last_entry_id() { let request = ScanRequest::default(); let stream = follower_engine - .handle_query(region_id, request) + .scan_to_stream(region_id, request) .await .unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); @@ -367,7 +367,7 @@ async fn test_catchup_with_manifest_update() { let request = ScanRequest::default(); let stream = follower_engine - .handle_query(region_id, request) + .scan_to_stream(region_id, request) .await .unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index 602eea30bde1..9ce3c53b7661 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -231,7 +231,7 @@ async fn test_engine_create_with_memtable_opts() { put_rows(&engine, region_id, rows).await; let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs index c3c35f9ba0c8..4d123a89b8ba 100644 --- a/src/mito2/src/engine/filter_deleted_test.rs +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -69,7 +69,7 @@ async fn test_scan_without_filtering_deleted() { // scan let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/engine/open_test.rs b/src/mito2/src/engine/open_test.rs index dc0590cdd079..3cf4a21e561a 100644 --- a/src/mito2/src/engine/open_test.rs +++ b/src/mito2/src/engine/open_test.rs @@ -276,7 +276,7 @@ async fn test_open_region_skip_wal_replay() { .unwrap(); let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -305,7 +305,7 @@ async fn test_open_region_skip_wal_replay() { .unwrap(); let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/engine/parallel_test.rs b/src/mito2/src/engine/parallel_test.rs index c6c97f68d269..cc5d98291230 100644 --- a/src/mito2/src/engine/parallel_test.rs +++ b/src/mito2/src/engine/parallel_test.rs @@ -57,7 +57,7 @@ async fn scan_in_parallel( .unwrap(); let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/engine/projection_test.rs b/src/mito2/src/engine/projection_test.rs index af08acfba0d0..6e31a56c8b37 100644 --- a/src/mito2/src/engine/projection_test.rs +++ b/src/mito2/src/engine/projection_test.rs @@ -79,7 +79,7 @@ async fn test_scan_projection() { output_ordering: None, limit: None, }; - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 27a66d68c6d2..7c20dd8a5d10 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -53,7 +53,7 @@ async fn check_prune_row_groups(expr: DfExpr, expected: &str) { flush_region(&engine, region_id, Some(5)).await; let stream = engine - .handle_query( + .scan_to_stream( region_id, ScanRequest { filters: vec![Expr::from(expr)], @@ -186,7 +186,7 @@ async fn test_prune_memtable() { .await; let stream = engine - .handle_query( + .scan_to_stream( region_id, ScanRequest { filters: vec![time_range_expr(0, 20)], @@ -238,7 +238,7 @@ async fn test_prune_memtable_complex_expr() { let filters = vec![time_range_expr(4, 7), Expr::from(col("tag_0").lt(lit("6")))]; let stream = engine - .handle_query( + .scan_to_stream( region_id, ScanRequest { filters, diff --git a/src/mito2/src/engine/truncate_test.rs b/src/mito2/src/engine/truncate_test.rs index b9336afad3d1..91c08f3b3e78 100644 --- a/src/mito2/src/engine/truncate_test.rs +++ b/src/mito2/src/engine/truncate_test.rs @@ -55,7 +55,7 @@ async fn test_engine_truncate_region_basic() { // Scan the region. let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -75,7 +75,7 @@ async fn test_engine_truncate_region_basic() { // Scan the region. let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "++\n++"; assert_eq!(expected, batches.pretty_print().unwrap()); @@ -104,7 +104,7 @@ async fn test_engine_put_data_after_truncate() { // Scan the region let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -131,7 +131,7 @@ async fn test_engine_put_data_after_truncate() { // Scan the region. let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ @@ -261,7 +261,7 @@ async fn test_engine_truncate_reopen() { // Scan the region. let request = ScanRequest::default(); - let stream = engine.handle_query(region_id, request).await.unwrap(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "++\n++"; assert_eq!(expected, batches.pretty_print().unwrap()); diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a33765479b11..f619744dc0c9 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -20,6 +20,7 @@ use std::time::Instant; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, error, warn}; use common_time::range::TimestampRange; +use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner}; use store_api::storage::ScanRequest; use table::predicate::{Predicate, TimeRangePredicateBuilder}; use tokio::sync::{mpsc, Semaphore}; @@ -57,6 +58,14 @@ impl Scanner { Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await, } } + + /// Returns a [RegionScanner] to scan the region. + pub(crate) async fn region_scanner(&self) -> Result { + let stream = self.scan().await?; + let scanner = SinglePartitionScanner::new(stream); + + Ok(Arc::new(scanner)) + } } #[cfg(test)] diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index f725d83817ac..5042764ede8c 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -198,6 +198,14 @@ impl UnorderedScan { } } +#[cfg(test)] +impl UnorderedScan { + /// Returns the input. + pub(crate) fn input(&self) -> &ScanInput { + &self.input + } +} + /// Metrics for [UnorderedScan]. #[derive(Debug, Default)] struct Metrics { @@ -216,11 +224,3 @@ struct Metrics { /// Number of rows returned. num_rows: usize, } - -#[cfg(test)] -impl UnorderedScan { - /// Returns the input. - pub(crate) fn input(&self) -> &ScanInput { - &self.input - } -} diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 71ec0d4ac7e0..c39b6bad59ee 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -31,7 +31,7 @@ use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; use store_api::storage::{RegionId, ScanRequest}; -use table::table::scan::StreamScanAdapter; +use table::table::scan::RegionScanExec; use crate::error::{GetRegionMetadataSnafu, Result}; @@ -168,12 +168,12 @@ impl TableProvider for DummyTableProvider { .collect(); request.limit = limit; - let stream = self + let scanner = self .engine .handle_query(self.region_id, request) .await .map_err(|e| DataFusionError::External(Box::new(e)))?; - Ok(Arc::new(StreamScanAdapter::new(stream))) + Ok(Arc::new(RegionScanExec::new(scanner))) } fn supports_filters_pushdown( diff --git a/src/query/src/optimizer/test_util.rs b/src/query/src/optimizer/test_util.rs index ea18e54a09b4..4ffc3e28e08f 100644 --- a/src/query/src/optimizer/test_util.rs +++ b/src/query/src/optimizer/test_util.rs @@ -23,12 +23,11 @@ use api::v1::SemanticType; use async_trait::async_trait; use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; -use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::ColumnSchema; use store_api::metadata::{ ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, }; -use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse}; +use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse}; use store_api::region_request::RegionRequest; use store_api::storage::{ConcreteDataType, RegionId, ScanRequest}; @@ -67,7 +66,7 @@ impl RegionEngine for MetaRegionEngine { &self, _region_id: RegionId, _request: ScanRequest, - ) -> Result { + ) -> Result { unimplemented!() } diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 2c9a42a9d8e4..7068f6686d1b 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -17,6 +17,7 @@ common-macro.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-wal.workspace = true +datafusion-physical-plan.workspace = true datatypes.workspace = true derive_builder.workspace = true futures.workspace = true diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 7e461617aa21..f4b5aec37e7d 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -15,15 +15,19 @@ //! Region Engine's definition use std::any::Any; -use std::fmt::Display; -use std::sync::Arc; +use std::fmt::{Debug, Display}; +use std::sync::{Arc, Mutex}; use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; use api::region::RegionResponse; use async_trait::async_trait; use common_error::ext::BoxedError; +use common_query::error::ExecuteRepeatedlySnafu; use common_recordbatch::SendableRecordBatchStream; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType}; +use datatypes::schema::SchemaRef; use serde::{Deserialize, Serialize}; +use snafu::OptionExt; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; @@ -120,6 +124,57 @@ impl From for RegionRole { } } +/// Output partition properties of the [RegionScanner]. +#[derive(Debug)] +pub enum ScannerPartitioning { + /// Unknown partitioning scheme with a known number of partitions + Unknown(usize), +} + +impl ScannerPartitioning { + /// Returns the number of partitions. + pub fn num_partitions(&self) -> usize { + match self { + ScannerPartitioning::Unknown(num_partitions) => *num_partitions, + } + } +} + +/// Properties of the [RegionScanner]. +#[derive(Debug)] +pub struct ScannerProperties { + /// Partitions to scan. + partitioning: ScannerPartitioning, +} + +impl ScannerProperties { + /// Creates a new [ScannerProperties] with the given partitioning. + pub fn new(partitioning: ScannerPartitioning) -> Self { + Self { partitioning } + } + + /// Returns properties of partitions to scan. + pub fn partitioning(&self) -> &ScannerPartitioning { + &self.partitioning + } +} + +/// A scanner that provides a way to scan the region concurrently. +/// The scanner splits the region into partitions so that each partition can be scanned concurrently. +/// You can use this trait to implement an [ExecutionPlan](datafusion_physical_plan::ExecutionPlan). +pub trait RegionScanner: Debug + DisplayAs + Send + Sync { + /// Returns the properties of the scanner. + fn properties(&self) -> &ScannerProperties; + + /// Returns the schema of the record batches. + fn schema(&self) -> SchemaRef; + + /// Scans the partition and returns a stream of record batches. + fn scan_partition(&self, partition: usize) -> Result; +} + +pub type RegionScannerRef = Arc; + #[async_trait] pub trait RegionEngine: Send + Sync { /// Name of this engine @@ -132,12 +187,12 @@ pub trait RegionEngine: Send + Sync { request: RegionRequest, ) -> Result; - /// Handles substrait query and return a stream of record batches + /// Handles query and return a scanner that can be used to scan the region concurrently. async fn handle_query( &self, region_id: RegionId, request: ScanRequest, - ) -> Result; + ) -> Result; /// Retrieves region's metadata. async fn get_metadata(&self, region_id: RegionId) -> Result; @@ -172,3 +227,52 @@ pub trait RegionEngine: Send + Sync { } pub type RegionEngineRef = Arc; + +/// A [RegionScanner] that only scans a single partition. +pub struct SinglePartitionScanner { + stream: Mutex>, + schema: SchemaRef, + properties: ScannerProperties, +} + +impl SinglePartitionScanner { + /// Creates a new [SinglePartitionScanner] with the given stream. + pub fn new(stream: SendableRecordBatchStream) -> Self { + let schema = stream.schema(); + Self { + stream: Mutex::new(Some(stream)), + schema, + properties: ScannerProperties::new(ScannerPartitioning::Unknown(1)), + } + } +} + +impl Debug for SinglePartitionScanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SinglePartitionScanner: ") + } +} + +impl RegionScanner for SinglePartitionScanner { + fn properties(&self) -> &ScannerProperties { + &self.properties + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn scan_partition(&self, _partition: usize) -> Result { + let mut stream = self.stream.lock().unwrap(); + stream + .take() + .context(ExecuteRepeatedlySnafu) + .map_err(BoxedError::new) + } +} + +impl DisplayAs for SinglePartitionScanner { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index ffc6618a548a..d00988958531 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -26,9 +26,10 @@ use datafusion_expr::expr::Expr as DfExpr; use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; +use store_api::region_engine::SinglePartitionScanner; use store_api::storage::ScanRequest; -use crate::table::scan::StreamScanAdapter; +use crate::table::scan::RegionScanExec; use crate::table::{TableRef, TableType}; /// Adapt greptime's [TableRef] to DataFusion's [TableProvider]. @@ -110,11 +111,12 @@ impl TableProvider for DfTableProviderAdapter { .collect::>() }); - let mut stream_adapter = StreamScanAdapter::new(stream); + let scanner = Arc::new(SinglePartitionScanner::new(stream)); + let mut plan = RegionScanExec::new(scanner); if let Some(sort_expr) = sort_expr { - stream_adapter = stream_adapter.with_output_ordering(sort_expr); + plan = plan.with_output_ordering(sort_expr); } - Ok(Arc::new(stream_adapter)) + Ok(Arc::new(plan)) } fn supports_filters_pushdown( diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 4c09c8904781..c612ff3746cb 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -13,12 +13,10 @@ // limitations under the License. use std::any::Any; -use std::fmt::{self, Debug, Formatter}; use std::pin::Pin; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; -use common_query::error::ExecuteRepeatedlySnafu; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream, SendableRecordBatchStream}; use common_telemetry::tracing::Span; use common_telemetry::tracing_context::TracingContext; @@ -32,59 +30,54 @@ use datafusion::physical_plan::{ use datafusion_common::DataFusionError; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; -use snafu::OptionExt; +use store_api::region_engine::RegionScannerRef; use crate::table::metrics::MemoryUsageMetrics; -/// Adapt greptime's [SendableRecordBatchStream] to [ExecutionPlan]. -pub struct StreamScanAdapter { - stream: Mutex>, - schema: SchemaRef, +/// A plan to read multiple partitions from a region of a table. +#[derive(Debug)] +pub struct RegionScanExec { + scanner: RegionScannerRef, + arrow_schema: ArrowSchemaRef, + /// The expected output ordering for the plan. output_ordering: Option>, metric: ExecutionPlanMetricsSet, properties: PlanProperties, } -impl Debug for StreamScanAdapter { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("StreamScanAdapter") - .field("stream", &"") - .finish() - } -} - -impl StreamScanAdapter { - pub fn new(stream: SendableRecordBatchStream) -> Self { - let schema = stream.schema(); +impl RegionScanExec { + pub fn new(scanner: RegionScannerRef) -> Self { + let arrow_schema = scanner.schema().arrow_schema().clone(); + let scanner_props = scanner.properties(); let properties = PlanProperties::new( - EquivalenceProperties::new(schema.arrow_schema().clone()), - Partitioning::UnknownPartitioning(1), + EquivalenceProperties::new(arrow_schema.clone()), + Partitioning::UnknownPartitioning(scanner_props.partitioning().num_partitions()), ExecutionMode::Bounded, ); Self { - stream: Mutex::new(Some(stream)), - schema, + scanner, + arrow_schema, output_ordering: None, metric: ExecutionPlanMetricsSet::new(), properties, } } + /// Set the expected output ordering for the plan. pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { self.output_ordering = Some(output_ordering); self } } -impl ExecutionPlan for StreamScanAdapter { +impl ExecutionPlan for RegionScanExec { fn as_any(&self) -> &dyn Any { self } fn schema(&self) -> ArrowSchemaRef { - self.schema.arrow_schema().clone() + self.arrow_schema.clone() } fn properties(&self) -> &PlanProperties { @@ -98,7 +91,7 @@ impl ExecutionPlan for StreamScanAdapter { fn with_new_children( self: Arc, _children: Vec>, - ) -> DfResult> { + ) -> datafusion_common::Result> { Ok(self) } @@ -106,12 +99,15 @@ impl ExecutionPlan for StreamScanAdapter { &self, partition: usize, context: Arc, - ) -> DfResult { + ) -> datafusion_common::Result { let tracing_context = TracingContext::from_json(context.session_id().as_str()); - let span = tracing_context.attach(common_telemetry::tracing::info_span!("stream_adapter")); + let span = + tracing_context.attach(common_telemetry::tracing::info_span!("read_from_region")); - let mut stream = self.stream.lock().unwrap(); - let stream = stream.take().context(ExecuteRepeatedlySnafu)?; + let stream = self + .scanner + .scan_partition(partition) + .map_err(|e| DataFusionError::External(Box::new(e)))?; let mem_usage_metrics = MemoryUsageMetrics::new(&self.metric, partition); Ok(Box::pin(StreamWithMetricWrapper { stream, @@ -125,9 +121,10 @@ impl ExecutionPlan for StreamScanAdapter { } } -impl DisplayAs for StreamScanAdapter { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:?}", self) +impl DisplayAs for RegionScanExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + // The scanner contains all information needed to display the plan. + write!(f, "{:?}", self.scanner) } } @@ -177,12 +174,15 @@ impl DfRecordBatchStream for StreamWithMetricWrapper { #[cfg(test)] mod test { + use std::sync::Arc; + use common_recordbatch::{RecordBatch, RecordBatches}; use datafusion::prelude::SessionContext; use datatypes::data_type::ConcreteDataType; - use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::Int32Vector; use futures::TryStreamExt; + use store_api::region_engine::SinglePartitionScanner; use super::*; @@ -210,9 +210,10 @@ mod test { RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); let stream = recordbatches.as_stream(); - let scan = StreamScanAdapter::new(stream); + let scanner = Arc::new(SinglePartitionScanner::new(stream)); + let plan = RegionScanExec::new(scanner); let actual: SchemaRef = Arc::new( - scan.properties + plan.properties .eq_properties .schema() .clone() @@ -221,12 +222,12 @@ mod test { ); assert_eq!(actual, schema); - let stream = scan.execute(0, ctx.task_ctx()).unwrap(); + let stream = plan.execute(0, ctx.task_ctx()).unwrap(); let recordbatches = stream.try_collect::>().await.unwrap(); assert_eq!(batch1.df_record_batch(), &recordbatches[0]); assert_eq!(batch2.df_record_batch(), &recordbatches[1]); - let result = scan.execute(0, ctx.task_ctx()); + let result = plan.execute(0, ctx.task_ctx()); assert!(result.is_err()); match result { Err(e) => assert!(e diff --git a/tests/cases/distributed/explain/analyze.result b/tests/cases/distributed/explain/analyze.result index 1ce443adfff6..762f70bdace4 100644 --- a/tests/cases/distributed/explain/analyze.result +++ b/tests/cases/distributed/explain/analyze.result @@ -35,7 +35,7 @@ explain analyze SELECT count(*) FROM system_metrics; |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(greptime.public.system_REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_SinglePartitionScanner: REDACTED |_|_|_| |_|_| Total rows: 1_| +-+-+-+ diff --git a/tests/cases/distributed/optimizer/order_by.result b/tests/cases/distributed/optimizer/order_by.result index 88ce54ce5de9..5c03ac107c16 100644 --- a/tests/cases/distributed/optimizer/order_by.result +++ b/tests/cases/distributed/optimizer/order_by.result @@ -1,61 +1,61 @@ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers; -+---------------+-------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | StreamScanAdapter { stream: "" } | -| | | -+---------------+-------------------------------------------------------------+ ++---------------+-----------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SinglePartitionScanner: | +| | | ++---------------+-----------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number desc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+-------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 DESC] | +| | SinglePartitionScanner: | +| | | ++---------------+-------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number asc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+-------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | SinglePartitionScanner: | +| | | ++---------------+-------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number desc limit 10; -+---------------+-----------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+-----------------------------------------------------------------+ ++---------------+---------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | +| | SinglePartitionScanner: | +| | | ++---------------+---------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number asc limit 10; -+---------------+-----------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+-----------------------------------------------------------------+ ++---------------+------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | +| | SinglePartitionScanner: | +| | | ++---------------+------------------------------------------------------------+ diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index 4484d96c699e..5ccc155b05fe 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -74,7 +74,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; | 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_StreamScanAdapter { stream: "" } REDACTED +| 1_| 0_|_SinglePartitionScanner: REDACTED |_|_|_| |_|_| Total rows: 10_| +-+-+-+ diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index 8b5b09dc803a..e8e388b916c0 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -30,7 +30,7 @@ TQL ANALYZE (0, 10, '5s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_SinglePartitionScanner: REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -59,7 +59,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_SinglePartitionScanner: REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -87,7 +87,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_SinglePartitionScanner: REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -117,7 +117,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_StreamScanAdapter { stream: "" } REDACTED +|_|_|_SinglePartitionScanner: REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ diff --git a/tests/cases/standalone/optimizer/order_by.result b/tests/cases/standalone/optimizer/order_by.result index 574f753e4073..49996d130e78 100644 --- a/tests/cases/standalone/optimizer/order_by.result +++ b/tests/cases/standalone/optimizer/order_by.result @@ -1,56 +1,56 @@ explain select * from numbers; -+---------------+-------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | StreamScanAdapter { stream: "" } | -| | | -+---------------+-------------------------------------------------------------+ ++---------------+-----------------------------------------------------+ +| plan_type | plan | ++---------------+-----------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SinglePartitionScanner: | +| | | ++---------------+-----------------------------------------------------+ explain select * from numbers order by number desc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+-------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 DESC] | +| | SinglePartitionScanner: | +| | | ++---------------+-------------------------------------------------------+ explain select * from numbers order by number asc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+-------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST] | +| | SinglePartitionScanner: | +| | | ++---------------+-------------------------------------------------------+ explain select * from numbers order by number desc limit 10; -+---------------+-----------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+-----------------------------------------------------------------+ ++---------------+---------------------------------------------------------+ +| plan_type | plan | ++---------------+---------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 DESC] | +| | SinglePartitionScanner: | +| | | ++---------------+---------------------------------------------------------+ explain select * from numbers order by number asc limit 10; -+---------------+-----------------------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | -| | StreamScanAdapter { stream: "" } | -| | | -+---------------+-----------------------------------------------------------------+ ++---------------+------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST] | +| | SinglePartitionScanner: | +| | | ++---------------+------------------------------------------------------------+