Skip to content

Commit

Permalink
feat: enhance windowed-sort optimizer rule (#4910)
Browse files Browse the repository at this point in the history
* add RegionScanner::metadata

Signed-off-by: Ruihang Xia <[email protected]>

* skip PartSort when there is no tag column

Signed-off-by: Ruihang Xia <[email protected]>

* add more sqlness test

Signed-off-by: Ruihang Xia <[email protected]>

* handle desc

Signed-off-by: Ruihang Xia <[email protected]>

* fix: should keep part sort on DESC

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Oct 31, 2024
1 parent ea6df9b commit 8b60c27
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 26 deletions.
3 changes: 2 additions & 1 deletion src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ impl RegionEngine for FileRegionEngine {
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let metadata = self.get_metadata(region_id).await?;
// We don't support enabling append mode for file engine.
let scanner = Box::new(SinglePartitionScanner::new(stream, false));
let scanner = Box::new(SinglePartitionScanner::new(stream, false, metadata));
Ok(scanner)
}

Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use common_telemetry::tracing;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};
use store_api::storage::TimeSeriesRowSelector;
use tokio::sync::Semaphore;
Expand Down Expand Up @@ -321,6 +322,10 @@ impl RegionScanner for SeqScan {
let predicate = self.stream_ctx.input.predicate();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

fn metadata(&self) -> RegionMetadataRef {
self.stream_ctx.input.mapper.metadata().clone()
}
}

impl DisplayAs for SeqScan {
Expand Down
5 changes: 5 additions & 0 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
use datatypes::schema::SchemaRef;
use futures::{Stream, StreamExt};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties};

use crate::error::{PartitionOutOfRangeSnafu, Result};
Expand Down Expand Up @@ -229,6 +230,10 @@ impl RegionScanner for UnorderedScan {
let predicate = self.stream_ctx.input.predicate();
predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
}

fn metadata(&self) -> RegionMetadataRef {
self.stream_ctx.input.mapper.metadata().clone()
}
}

impl DisplayAs for UnorderedScan {
Expand Down
29 changes: 24 additions & 5 deletions src/query/src/optimizer/parallelize_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use common_telemetry::debug;
use datafusion::config::ConfigOptions;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DataFusionError, Result};
Expand Down Expand Up @@ -48,9 +49,16 @@ impl ParallelizeScan {
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut first_order_expr = None;

let result = plan
.transform_down(|plan| {
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
// save the first order expr
first_order_expr = sort_exec.expr().first().cloned();
} else if let Some(region_scan_exec) =
plan.as_any().downcast_ref::<RegionScanExec>()
{
if region_scan_exec.is_partition_set() {
return Ok(Transformed::no(plan));
}
Expand All @@ -66,10 +74,21 @@ impl ParallelizeScan {
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
);

// sort the ranges in each partition
// TODO(ruihang): smart sort!
for ranges in partition_ranges.iter_mut() {
ranges.sort_by(|a, b| a.start.cmp(&b.start));
// Sort the ranges in each partition based on the order expr
//
// This optimistically assumes that the first order expr is on the time index column
// to skip the validation of the order expr. As it's not harmful if this condition
// is not met.
if let Some(order_expr) = &first_order_expr
&& order_expr.options.descending
{
for ranges in partition_ranges.iter_mut() {
ranges.sort_by(|a, b| b.end.cmp(&a.end));
}
} else {
for ranges in partition_ranges.iter_mut() {
ranges.sort_by(|a, b| a.start.cmp(&b.start));
}
}

// update the partition ranges
Expand Down
29 changes: 21 additions & 8 deletions src/query/src/optimizer/windowed_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ impl WindowedSortPhysicalRule {
};

if let Some(first_sort_expr) = sort_exec.expr().first()
&& !first_sort_expr.options.descending
&& let Some(column_expr) = first_sort_expr
.expr
.as_any()
Expand All @@ -87,18 +86,28 @@ impl WindowedSortPhysicalRule {
} else {
return Ok(Transformed::no(plan));
}

let first_sort_expr = sort_exec.expr().first().unwrap().clone();
let part_sort_exec = Arc::new(PartSortExec::new(
first_sort_expr.clone(),
scanner_info.partition_ranges.clone(),
sort_exec.input().clone(),
));

// PartSortExec is unnecessary if:
// - there is no tag column, and
// - the sort is ascending on the time index column
let new_input = if scanner_info.tag_columns.is_empty()
&& !first_sort_expr.options.descending
{
sort_exec.input().clone()
} else {
Arc::new(PartSortExec::new(
first_sort_expr.clone(),
scanner_info.partition_ranges.clone(),
sort_exec.input().clone(),
))
};

let windowed_sort_exec = WindowedSortExec::try_new(
first_sort_expr,
sort_exec.fetch(),
scanner_info.partition_ranges,
part_sort_exec,
new_input,
)?;

return Ok(Transformed {
Expand All @@ -119,11 +128,13 @@ impl WindowedSortPhysicalRule {
struct ScannerInfo {
partition_ranges: Vec<Vec<PartitionRange>>,
time_index: String,
tag_columns: Vec<String>,
}

fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
let mut partition_ranges = None;
let mut time_index = None;
let mut tag_columns = None;

input.transform_up(|plan| {
// Unappliable case, reset the state.
Expand All @@ -139,6 +150,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = region_scan_exec.time_index();
tag_columns = Some(region_scan_exec.tag_columns());

// set distinguish_partition_ranges to true, this is an incorrect workaround
region_scan_exec.with_distinguish_partition_range(true);
Expand All @@ -151,6 +163,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
ScannerInfo {
partition_ranges: partition_ranges?,
time_index: time_index?,
tag_columns: tag_columns?,
}
};

Expand Down
1 change: 1 addition & 0 deletions src/query/src/part_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ struct PartSortStream {
input_complete: bool,
schema: SchemaRef,
partition_ranges: Vec<PartitionRange>,
#[allow(dead_code)] // this is used under #[debug_assertions]
partition: usize,
cur_part_idx: usize,
metrics: BaselineMetrics,
Expand Down
1 change: 1 addition & 0 deletions src/query/src/window_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ pub struct WindowedSortStream {
/// working ranges promise once input stream get a value out of current range, future values will never be in this range
all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
/// The input partition ranges
#[allow(dead_code)] // this is used under #[debug_assertions]
ranges: Vec<PartitionRange>,
/// Execution metrics
metrics: BaselineMetrics,
Expand Down
17 changes: 15 additions & 2 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,9 @@ pub trait RegionScanner: Debug + DisplayAs + Send {
/// Returns the schema of the record batches.
fn schema(&self) -> SchemaRef;

/// Returns the metadata of the region.
fn metadata(&self) -> RegionMetadataRef;

/// Prepares the scanner with the given partition ranges.
///
/// This method is for the planner to adjust the scanner's behavior based on the partition ranges.
Expand Down Expand Up @@ -414,18 +417,24 @@ pub struct SinglePartitionScanner {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
properties: ScannerProperties,
metadata: RegionMetadataRef,
}

impl SinglePartitionScanner {
/// Creates a new [SinglePartitionScanner] with the given stream.
pub fn new(stream: SendableRecordBatchStream, append_mode: bool) -> Self {
/// Creates a new [SinglePartitionScanner] with the given stream and metadata.
pub fn new(
stream: SendableRecordBatchStream,
append_mode: bool,
metadata: RegionMetadataRef,
) -> Self {
let schema = stream.schema();
Self {
stream: Mutex::new(Some(stream)),
schema,
properties: ScannerProperties::default()
.with_parallelism(1)
.with_append_mode(append_mode),
metadata,
}
}
}
Expand Down Expand Up @@ -468,6 +477,10 @@ impl RegionScanner for SinglePartitionScanner {
fn has_predicate(&self) -> bool {
false
}

fn metadata(&self) -> RegionMetadataRef {
self.metadata.clone()
}
}

impl DisplayAs for SinglePartitionScanner {
Expand Down
59 changes: 50 additions & 9 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ impl RegionScanExec {
.timestamp_column()
.map(|x| x.name.clone())
}

pub fn tag_columns(&self) -> Vec<String> {
self.scanner
.lock()
.unwrap()
.metadata()
.primary_key_columns()
.map(|col| col.column_schema.name.clone())
.collect()
}
}

impl ExecutionPlan for RegionScanExec {
Expand Down Expand Up @@ -301,41 +311,72 @@ impl DfRecordBatchStream for StreamWithMetricWrapper {
mod test {
use std::sync::Arc;

use api::v1::SemanticType;
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::prelude::SessionContext;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Int32Vector;
use datatypes::vectors::{Int32Vector, TimestampMillisecondVector};
use futures::TryStreamExt;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::region_engine::SinglePartitionScanner;
use store_api::storage::RegionId;

use super::*;

#[tokio::test]
async fn test_simple_table_scan() {
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::int32_datatype(),
false,
)]));
let schema = Arc::new(Schema::new(vec![
ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
]));

let batch1 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([1, 2])) as _],
vec![
Arc::new(Int32Vector::from_slice([1, 2])) as _,
Arc::new(TimestampMillisecondVector::from_slice([1000, 2000])) as _,
],
)
.unwrap();
let batch2 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([3, 4, 5])) as _],
vec![
Arc::new(Int32Vector::from_slice([3, 4, 5])) as _,
Arc::new(TimestampMillisecondVector::from_slice([3000, 4000, 5000])) as _,
],
)
.unwrap();

let recordbatches =
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let stream = recordbatches.as_stream();

let scanner = Box::new(SinglePartitionScanner::new(stream, false));
let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int32_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![1]);
let region_metadata = Arc::new(builder.build().unwrap());

let scanner = Box::new(SinglePartitionScanner::new(stream, false, region_metadata));
let plan = RegionScanExec::new(scanner);
let actual: SchemaRef = Arc::new(
plan.properties
Expand Down
Loading

0 comments on commit 8b60c27

Please sign in to comment.