From 09376521c98ddc68fb1f72d273a2ee5bb8bccf89 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 1 Nov 2024 15:29:58 +0800 Subject: [PATCH 1/5] feat: simple limit impl in PartSort Signed-off-by: Ruihang Xia --- src/query/src/optimizer/windowed_sort.rs | 1 + src/query/src/part_sort.rs | 18 +++++++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index 63150fc1f896..edf819ce2ba1 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -98,6 +98,7 @@ impl WindowedSortPhysicalRule { } else { Arc::new(PartSortExec::new( first_sort_expr.clone(), + sort_exec.fetch(), scanner_info.partition_ranges.clone(), sort_exec.input().clone(), )) diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 2b258187b5b6..57280a8dac56 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -47,6 +47,7 @@ use crate::downcast_ts_array; pub struct PartSortExec { /// Physical sort expressions(that is, sort by timestamp) expression: PhysicalSortExpr, + limit: Option, input: Arc, /// Execution metrics metrics: ExecutionPlanMetricsSet, @@ -57,6 +58,7 @@ pub struct PartSortExec { impl PartSortExec { pub fn new( expression: PhysicalSortExpr, + limit: Option, partition_ranges: Vec>, input: Arc, ) -> Self { @@ -69,6 +71,7 @@ impl PartSortExec { Self { expression, + limit, input, metrics, partition_ranges, @@ -95,6 +98,7 @@ impl PartSortExec { let df_stream = Box::pin(PartSortStream::new( context, self, + self.limit, input_stream, self.partition_ranges[partition].clone(), partition, @@ -138,6 +142,7 @@ impl ExecutionPlan for PartSortExec { }; Ok(Arc::new(Self::new( self.expression.clone(), + self.limit, self.partition_ranges.clone(), new_input.clone(), ))) @@ -170,6 +175,7 @@ struct PartSortStream { reservation: MemoryReservation, buffer: Vec, expression: PhysicalSortExpr, + limit: Option, produced: usize, input: DfSendableRecordBatchStream, input_complete: bool, @@ -185,6 +191,7 @@ impl PartSortStream { fn new( context: Arc, sort: &PartSortExec, + limit: Option, input: DfSendableRecordBatchStream, partition_ranges: Vec, partition: usize, @@ -194,6 +201,7 @@ impl PartSortStream { .register(&context.runtime_env().memory_pool), buffer: Vec::new(), expression: sort.expression.clone(), + limit, produced: 0, input, input_complete: false, @@ -294,13 +302,20 @@ impl PartSortStream { ) })?; - let indices = sort_to_indices(&sort_column, opt, None).map_err(|e| { + let mut indices = sort_to_indices(&sort_column, opt, None).map_err(|e| { DataFusionError::ArrowError( e, Some(format!("Fail to sort to indices at {}", location!())), ) })?; + // apply limit if specified + if let Some(limit) = self.limit + && limit < indices.len() + { + indices = indices.slice(0, limit); + } + self.check_in_range( &sort_column, ( @@ -674,6 +689,7 @@ mod test { expr: Arc::new(Column::new("ts", 0)), options: opt, }, + None, vec![ranges], Arc::new(mock_input), ); From e726c9394612dd8a60875ebb76caa73d7e0802bd Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 1 Nov 2024 15:30:48 +0800 Subject: [PATCH 2/5] fix: update time_index method to return a non-optional String Co-authored-by: Yingwen Signed-off-by: Ruihang Xia --- src/query/src/optimizer/windowed_sort.rs | 2 +- src/table/src/table/scan.rs | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/query/src/optimizer/windowed_sort.rs b/src/query/src/optimizer/windowed_sort.rs index edf819ce2ba1..6d944a44c00a 100644 --- a/src/query/src/optimizer/windowed_sort.rs +++ b/src/query/src/optimizer/windowed_sort.rs @@ -150,7 +150,7 @@ fn fetch_partition_range(input: Arc) -> DataFusionResult() { partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges()); - time_index = region_scan_exec.time_index(); + time_index = Some(region_scan_exec.time_index()); tag_columns = Some(region_scan_exec.tag_columns()); // set distinguish_partition_ranges to true, this is an incorrect workaround diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index cc94a054de84..0eac7c0c354f 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -146,13 +146,15 @@ impl RegionScanExec { let _ = scanner.prepare(partition_ranges, distinguish_partition_range); } - pub fn time_index(&self) -> Option { + pub fn time_index(&self) -> String { self.scanner .lock() .unwrap() - .schema() - .timestamp_column() - .map(|x| x.name.clone()) + .metadata() + .time_index_column() + .column_schema + .name + .clone() } pub fn tag_columns(&self) -> Vec { From e6fc57b951872dfef7fdb8f5b65dbc1b451d7b57 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 1 Nov 2024 16:01:22 +0800 Subject: [PATCH 3/5] use builtin limit Signed-off-by: Ruihang Xia --- src/query/src/part_sort.rs | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 57280a8dac56..3768ba473816 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -302,20 +302,13 @@ impl PartSortStream { ) })?; - let mut indices = sort_to_indices(&sort_column, opt, None).map_err(|e| { + let indices = sort_to_indices(&sort_column, opt, self.limit).map_err(|e| { DataFusionError::ArrowError( e, Some(format!("Fail to sort to indices at {}", location!())), ) })?; - // apply limit if specified - if let Some(limit) = self.limit - && limit < indices.len() - { - indices = indices.slice(0, limit); - } - self.check_in_range( &sort_column, ( From d9766d99caafe9090b8eb5546b8e4918e0a30766 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 1 Nov 2024 16:19:49 +0800 Subject: [PATCH 4/5] add more info to analyze display Signed-off-by: Ruihang Xia --- src/query/src/part_sort.rs | 11 ++++++++++- src/query/src/window_sort.rs | 11 ++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 3768ba473816..2828db202d9d 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -110,7 +110,16 @@ impl PartSortExec { impl DisplayAs for PartSortExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "PartSortExec {}", self.expression) + write!( + f, + "PartSortExec: expr={} num_ranges={}", + self.expression, + self.partition_ranges.len(), + )?; + if let Some(limit) = self.limit { + write!(f, " limit={}", limit)?; + } + Ok(()) } } diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index 38b64e29aaa0..435a255beb95 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -169,7 +169,16 @@ impl WindowedSortExec { impl DisplayAs for WindowedSortExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "WindowedSortExec") + write!( + f, + "WindowedSortExec: expr={} num_ranges={}", + self.expression, + self.ranges.len() + )?; + if let Some(fetch) = self.fetch { + write!(f, " fetch={}", fetch)?; + } + Ok(()) } } From 880e599a850bca3e328e8e8046498215635086fb Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Fri, 1 Nov 2024 17:04:08 +0800 Subject: [PATCH 5/5] update sqlness Signed-off-by: Ruihang Xia --- .../standalone/common/order/windowed_sort.result | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/cases/standalone/common/order/windowed_sort.result b/tests/cases/standalone/common/order/windowed_sort.result index 9ecec83d2053..13b3503fb943 100644 --- a/tests/cases/standalone/common/order/windowed_sort.result +++ b/tests/cases/standalone/common/order/windowed_sort.result @@ -69,7 +69,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST] REDACTED -|_|_|_WindowedSortExec REDACTED +|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -101,8 +101,8 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@1 DESC] REDACTED -|_|_|_WindowedSortExec REDACTED -|_|_|_PartSortExec t@1 DESC REDACTED +|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=5 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -183,8 +183,8 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST] REDACTED -|_|_|_WindowedSortExec REDACTED -|_|_|_PartSortExec t@2 ASC NULLS LAST REDACTED +|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 limit=5 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_| @@ -216,8 +216,8 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5; |_|_|_| | 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED |_|_|_SortPreservingMergeExec: [t@2 DESC] REDACTED -|_|_|_WindowedSortExec REDACTED -|_|_|_PartSortExec t@2 DESC REDACTED +|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=2 fetch=5 REDACTED +|_|_|_PartSortExec: expr=t@2 DESC num_ranges=2 limit=5 REDACTED |_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED |_|_|_| |_|_| Total rows: 5_|