Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds RegionScanner trait #3948

Merged
merged 16 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-physical-plan = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "34eda15b73a9e278af8844b30ed2f1c21c10359c" }
derive_builder = "0.12"
Expand Down
7 changes: 3 additions & 4 deletions src/datanode/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use common_function::function::FunctionRef;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
Expand All @@ -32,7 +31,7 @@ use query::query_engine::DescribeResult;
use query::{QueryEngine, QueryEngineContext};
use session::context::QueryContextRef;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use table::TableRef;
Expand Down Expand Up @@ -189,11 +188,11 @@ impl RegionEngine for MockRegionEngine {
Ok(RegionResponse::new(0))
}

async fn handle_query(
async fn handle_partitioned_query(
&self,
_region_id: RegionId,
_request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
) -> Result<RegionScannerRef, BoxedError> {
evenyag marked this conversation as resolved.
Show resolved Hide resolved
unimplemented!()
}

Expand Down
32 changes: 22 additions & 10 deletions src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use common_telemetry::{error, info};
use object_store::ObjectStore;
use snafu::{ensure, OptionExt};
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionCreateRequest, RegionDropRequest, RegionOpenRequest,
RegionRequest,
Expand All @@ -49,6 +51,20 @@ impl FileRegionEngine {
inner: Arc::new(EngineInner::new(object_store)),
}
}

async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.get_region(region_id)
.await
.context(RegionNotFoundSnafu { region_id })
.map_err(BoxedError::new)?
.query(request)
.map_err(BoxedError::new)
}
}

#[async_trait]
Expand All @@ -68,18 +84,14 @@ impl RegionEngine for FileRegionEngine {
.map_err(BoxedError::new)
}

async fn handle_query(
async fn handle_partitioned_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.get_region(region_id)
.await
.context(RegionNotFoundSnafu { region_id })
.map_err(BoxedError::new)?
.query(request)
.map_err(BoxedError::new)
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}

async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
Expand Down
28 changes: 20 additions & 8 deletions src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse, SinglePartitionScanner,
};
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

Expand Down Expand Up @@ -155,16 +157,14 @@ impl RegionEngine for MetricEngine {
})
}

/// Handles substrait query and return a stream of record batches
async fn handle_query(
async fn handle_partitioned_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}

/// Retrieves region's metadata.
Expand Down Expand Up @@ -251,6 +251,18 @@ impl MetricEngine {
.logical_regions(physical_region_id)
.await
}

/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
self.inner
.read_region(region_id, request)
.await
.map_err(BoxedError::new)
}
}

struct MetricEngineInner {
Expand Down
35 changes: 28 additions & 7 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use object_store::manager::ObjectStoreManagerRef;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -115,11 +115,35 @@ impl MitoEngine {
Ok(region.region_usage().await)
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
pub async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
.map_err(BoxedError::new)
}

/// Returns a scanner to scan for `request`.
fn scanner(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
self.scan_region(region_id, request)?.scanner()
}

/// Returns a region scanner to scan the region for `request`.
async fn region_scanner(
&self,
region_id: RegionId,
request: ScanRequest,
) -> Result<RegionScannerRef> {
let scanner = self.scanner(region_id, request)?;
scanner.region_scanner().await
}

/// Scans a region.
fn scan_region(&self, region_id: RegionId, request: ScanRequest) -> Result<ScanRegion> {
self.inner.handle_query(region_id, request)
Expand Down Expand Up @@ -312,16 +336,13 @@ impl RegionEngine for MitoEngine {
.map_err(BoxedError::new)
}

/// Handle substrait query and return a stream of record batches
#[tracing::instrument(skip_all)]
async fn handle_query(
async fn handle_partitioned_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
) -> Result<RegionScannerRef, BoxedError> {
self.region_scanner(region_id, request)
.await
.map_err(BoxedError::new)
}
Expand Down
9 changes: 9 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::time::Instant;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner};
use store_api::storage::ScanRequest;
use table::predicate::{Predicate, TimeRangePredicateBuilder};
use tokio::sync::{mpsc, Semaphore};
Expand Down Expand Up @@ -57,6 +58,14 @@ impl Scanner {
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}

/// Returns a [RegionScanner] to scan the region.
pub(crate) async fn region_scanner(&self) -> Result<RegionScannerRef> {
let stream = self.scan().await?;
let scanner = SinglePartitionScanner::new(stream);

Ok(Arc::new(scanner))
}
}

#[cfg(test)]
Expand Down
16 changes: 8 additions & 8 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ impl UnorderedScan {
}
}

#[cfg(test)]
impl UnorderedScan {
/// Returns the input.
pub(crate) fn input(&self) -> &ScanInput {
&self.input
}
}
evenyag marked this conversation as resolved.
Show resolved Hide resolved

/// Metrics for [UnorderedScan].
#[derive(Debug, Default)]
struct Metrics {
Expand All @@ -216,11 +224,3 @@ struct Metrics {
/// Number of rows returned.
num_rows: usize,
}

#[cfg(test)]
impl UnorderedScan {
/// Returns the input.
pub(crate) fn input(&self) -> &ScanInput {
&self.input
}
}
8 changes: 4 additions & 4 deletions src/query/src/dummy_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::storage::{RegionId, ScanRequest};
use table::table::scan::StreamScanAdapter;
use table::table::scan::ReadFromRegion;

use crate::error::{GetRegionMetadataSnafu, Result};

Expand Down Expand Up @@ -168,12 +168,12 @@ impl TableProvider for DummyTableProvider {
.collect();
request.limit = limit;

let stream = self
let scanner = self
.engine
.handle_query(self.region_id, request)
.handle_partitioned_query(self.region_id, request)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(Arc::new(StreamScanAdapter::new(stream)))
Ok(Arc::new(ReadFromRegion::new(scanner)))
evenyag marked this conversation as resolved.
Show resolved Hide resolved
}

fn supports_filters_pushdown(
Expand Down
7 changes: 3 additions & 4 deletions src/query/src/optimizer/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ use api::v1::SemanticType;
use async_trait::async_trait;
use common_error::ext::{BoxedError, PlainError};
use common_error::status_code::StatusCode;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::ColumnSchema;
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_engine::{RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse};
use store_api::region_request::RegionRequest;
use store_api::storage::{ConcreteDataType, RegionId, ScanRequest};

Expand Down Expand Up @@ -63,11 +62,11 @@ impl RegionEngine for MetaRegionEngine {
unimplemented!()
}

async fn handle_query(
async fn handle_partitioned_query(
&self,
_region_id: RegionId,
_request: ScanRequest,
) -> Result<SendableRecordBatchStream, BoxedError> {
) -> Result<RegionScannerRef, BoxedError> {
unimplemented!()
}

Expand Down
1 change: 1 addition & 0 deletions src/store-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ common-macro.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-wal.workspace = true
datafusion-physical-plan.workspace = true
datatypes.workspace = true
derive_builder.workspace = true
futures.workspace = true
Expand Down
Loading
Loading