Skip to content

Commit

Permalink
feat: Adds RegionScanner trait (#3948)
Browse files Browse the repository at this point in the history
* feat: define region scanner

* feat: single partition scanner

* feat: use single partition scanner

* feat: implement ExecutionPlan wip

* feat: mito engine returns single partition scanner

* feat: implement DisplayAs for region server

* feat: dummy table provider use handle_partitioned_query()

* test: update sqlness test

* feat: table provider use ReadFromRegion

* refactor: remove StreamScanAdapter

* chore: update lock

* style: fix clippy

* refactor: remove handle_query from the RegionEngine trait

* chore: address CR comments

* refactor: rename methods

* refactor: rename ReadFromRegion to RegionScanExec
  • Loading branch information
evenyag authored May 20, 2024
1 parent 19543f9 commit 179c8c7
Show file tree
Hide file tree
Showing 32 changed files with 371 additions and 209 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
derive_builder = "0.12"
Expand Down
5 changes: 2 additions & 3 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
Expand All @@ -32,7 +31,7 @@ use query::query_engine::DescribeResult;
use query::{QueryEngine, QueryEngineContext};
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
Expand Down Expand Up @@ -193,7 +192,7 @@ impl RegionEngine for MockRegionEngine {
&self,
_region_id: RegionId,
_request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
) -> Result<RegionScannerRef, BoxedError> {
unimplemented!()
}

Expand Down
30 changes: 21 additions & 9 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
Expand All @@ -49,6 +51,20 @@ impl FileRegionEngine {
inner: Arc::new(EngineInner::new(object_store)),
}
}

async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.get_region(region_id)
.await
.context(RegionNotFoundSnafu { region_id })
.map_err(BoxedError::new)?
.query(request)
.map_err(BoxedError::new)
}
}

#[async_trait]
Expand All @@ -72,14 +88,10 @@ impl RegionEngine for FileRegionEngine {
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.get_region(region_id)
.await
.context(RegionNotFoundSnafu { region_id })
.map_err(BoxedError::new)?
.query(request)
.map_err(BoxedError::new)
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}

async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
Expand Down
26 changes: 19 additions & 7 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -155,16 +157,14 @@ impl RegionEngine for MetricEngine {
})
}

/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}

/// Retrieves region's metadata.
Expand Down Expand Up @@ -251,6 +251,18 @@ impl MetricEngine {
.logical_regions(physical_region_id)
.await
}

/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
}
}

struct MetricEngineInner {
Expand Down
4 changes: 2 additions & 2 deletions src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl MetricEngineInner {
.start_timer();

self.mito
.handle_query(region_id, request)
.scan_to_stream(region_id, request)
.await
.context(MitoReadOperationSnafu)
}
Expand All @@ -82,7 +82,7 @@ impl MetricEngineInner {
.transform_request(physical_region_id, logical_region_id, request)
.await?;
self.mito
.handle_query(data_region_id, request)
.scan_to_stream(data_region_id, request)
.await
.context(MitoReadOperationSnafu)
}
Expand Down
8 changes: 4 additions & 4 deletions src/metric-engine/src/metadata_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl MetadataRegion {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
.handle_query(region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
Expand All @@ -317,7 +317,7 @@ impl MetadataRegion {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
.handle_query(region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
Expand Down Expand Up @@ -351,7 +351,7 @@ impl MetadataRegion {
};
let record_batch_stream = self
.mito
.handle_query(region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
Expand Down Expand Up @@ -590,7 +590,7 @@ mod test {
let scan_req = MetadataRegion::build_read_request("test_key");
let record_batch_stream = metadata_region
.mito
.handle_query(region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.unwrap();
let scan_result = collect(record_batch_stream).await.unwrap();
Expand Down
33 changes: 27 additions & 6 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -115,11 +115,35 @@ impl MitoEngine {
Ok(region.region_usage().await)
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
pub async fn scan_to_stream(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
.map_err(BoxedError::new)
}

/// Returns a scanner to scan for `request`.
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner()
}

/// Returns a region scanner to scan the region for `request`.
async fn region_scanner(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef> {
let scanner = self.scanner(region_id, request)?;
scanner.region_scanner().await
}

/// Scans a region.
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
self.inner.handle_query(region_id, request)
Expand Down Expand Up @@ -312,16 +336,13 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
) -> Result<RegionScannerRef, BoxedError> {
self.region_scanner(region_id, request)
.await
.map_err(BoxedError::new)
}
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async fn test_put_after_alter() {
| | b | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/engine/append_mode_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn test_append_mode_write_query() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -183,7 +183,7 @@ async fn test_append_mode_compaction() {
// Reopens the region.
reopen_region(&engine, region_id, region_dir, false, region_opts).await;
let stream = engine
.handle_query(region_id, ScanRequest::default())
.scan_to_stream(region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
18 changes: 9 additions & 9 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ async fn test_region_replay() {
assert_eq!(0, result.affected_rows);

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(42, batches.iter().map(|b| b.num_rows()).sum::<usize>());

Expand Down Expand Up @@ -166,7 +166,7 @@ async fn test_write_query_region() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -227,7 +227,7 @@ async fn test_different_order() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------+---------------------+
Expand Down Expand Up @@ -289,7 +289,7 @@ async fn test_different_order_and_type() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------+---------------------+
Expand Down Expand Up @@ -341,7 +341,7 @@ async fn test_put_delete() {
delete_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -383,7 +383,7 @@ async fn test_delete_not_null_fields() {
delete_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand All @@ -398,7 +398,7 @@ async fn test_delete_not_null_fields() {
// Reopen and scan again.
reopen_region(&engine, region_id, region_dir, false, HashMap::new()).await;
let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
Expand Down Expand Up @@ -447,7 +447,7 @@ async fn test_put_overwrite() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
Expand Down Expand Up @@ -688,7 +688,7 @@ async fn test_cache_null_primary_key() {
put_rows(&engine, region_id, rows).await;

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+-------+---------+---------------------+
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/engine/catchup_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn test_catchup_with_last_entry_id() {
// Scans
let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.scan_to_stream(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -264,7 +264,7 @@ async fn test_catchup_without_last_entry_id() {

let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.scan_to_stream(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -367,7 +367,7 @@ async fn test_catchup_with_manifest_update() {

let request = ScanRequest::default();
let stream = follower_engine
.handle_query(region_id, request)
.scan_to_stream(region_id, request)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
Loading

0 comments on commit 179c8c7

Please sign in to comment.