From 721a7d4fb57f907c60d0cc9d05790ae58df2b5ce Mon Sep 17 00:00:00 2001 From: WUJingdi Date: Tue, 14 May 2024 14:35:12 +0800 Subject: [PATCH] feat: support any precision in PromQL --- .../src/extension_plan/instant_manipulate.rs | 6 +- src/promql/src/extension_plan/normalize.rs | 5 +- .../src/extension_plan/range_manipulate.rs | 13 +- src/promql/src/planner.rs | 63 ++++++++- .../common/promql/precisions.result | 121 ++++++++++++++++++ .../standalone/common/promql/precisions.sql | 55 ++++++++ 6 files changed, 252 insertions(+), 11 deletions(-) create mode 100644 tests/cases/standalone/common/promql/precisions.result create mode 100644 tests/cases/standalone/common/promql/precisions.sql diff --git a/src/promql/src/extension_plan/instant_manipulate.rs b/src/promql/src/extension_plan/instant_manipulate.rs index 03e2c373eed8..a25994caf283 100644 --- a/src/promql/src/extension_plan/instant_manipulate.rs +++ b/src/promql/src/extension_plan/instant_manipulate.rs @@ -342,12 +342,14 @@ impl InstantManipulateStream { // and the function `vectorSelectorSingle` pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult { let mut take_indices = vec![]; - // TODO(ruihang): maybe the input is not timestamp millisecond array + let ts_column = input .column(self.time_index) .as_any() .downcast_ref::() - .unwrap(); + .ok_or(DataFusionError::Execution( + "Time index Column downcast to TimestampMillisecondArray failed".into(), + ))?; // field column for staleness check let field_column = self diff --git a/src/promql/src/extension_plan/normalize.rs b/src/promql/src/extension_plan/normalize.rs index 957c55fade54..56a60c0ef46b 100644 --- a/src/promql/src/extension_plan/normalize.rs +++ b/src/promql/src/extension_plan/normalize.rs @@ -250,12 +250,13 @@ pub struct SeriesNormalizeStream { impl SeriesNormalizeStream { pub fn normalize(&self, input: RecordBatch) -> DataFusionResult { - // TODO(ruihang): maybe the input is not timestamp millisecond array let ts_column = input .column(self.time_index) .as_any() .downcast_ref::() - .unwrap(); + .ok_or(DataFusionError::Execution( + "Time index Column downcast to TimestampMillisecondArray failed".into(), + ))?; // bias the timestamp column by offset let ts_column_biased = if self.offset == 0 { diff --git a/src/promql/src/extension_plan/range_manipulate.rs b/src/promql/src/extension_plan/range_manipulate.rs index 49002dabfa74..d29465ae1239 100644 --- a/src/promql/src/extension_plan/range_manipulate.rs +++ b/src/promql/src/extension_plan/range_manipulate.rs @@ -433,7 +433,7 @@ impl RangeManipulateStream { pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult> { let mut other_columns = (0..input.columns().len()).collect::>(); // calculate the range - let (aligned_ts, ranges) = self.calculate_range(&input); + let (aligned_ts, ranges) = self.calculate_range(&input)?; // ignore this if all ranges are empty if ranges.iter().all(|(_, len)| *len == 0) { return Ok(None); @@ -472,12 +472,17 @@ impl RangeManipulateStream { .map_err(|e| DataFusionError::ArrowError(e, None)) } - fn calculate_range(&self, input: &RecordBatch) -> (ArrayRef, Vec<(u32, u32)>) { + fn calculate_range( + &self, + input: &RecordBatch, + ) -> DataFusionResult<(ArrayRef, Vec<(u32, u32)>)> { let ts_column = input .column(self.time_index) .as_any() .downcast_ref::() - .unwrap(); + .ok_or(DataFusionError::Execution( + "Time index Column downcast to TimestampMillisecondArray failed".into(), + ))?; let mut aligned_ts = vec![]; let mut ranges = vec![]; @@ -506,7 +511,7 @@ impl RangeManipulateStream { let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _; - (aligned_ts_array, ranges) + Ok((aligned_ts_array, ranges)) } } diff --git a/src/promql/src/planner.rs b/src/promql/src/planner.rs index 0af53088388a..3f8a2706b4d8 100644 --- a/src/promql/src/planner.rs +++ b/src/promql/src/planner.rs @@ -35,7 +35,8 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType}; use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; use datafusion_expr::utils::conjunction; -use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit}; +use datatypes::data_type::ConcreteDataType; use itertools::Itertools; use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME}; use promql_parser::parser::{ @@ -910,9 +911,65 @@ impl PromPlanner { .resolve_table(table_ref.clone()) .await .context(CatalogSnafu)?; - // Safety: `scan_filters` is not empty. - let result = LogicalPlanBuilder::scan(table_ref, provider, None) + let table_schema = provider + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table_provider + .as_any() + .downcast_ref::() + .context(UnknownTableSnafu)? + .table() + .schema(); + + let time_index_type = &table_schema + .timestamp_column() + .with_context(|| TimeIndexNotFoundSnafu { + table: table_ref.to_quoted_string(), + })? + .data_type; + + let is_time_index_ms = *time_index_type == ConcreteDataType::time_millisecond_datatype() + || *time_index_type == ConcreteDataType::timestamp_millisecond_datatype(); + + let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None) .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + + if !is_time_index_ms { + // cast to ms if time_index not in Millisecond precision + let expr: Vec<_> = self + .ctx + .field_columns + .iter() + .map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone()))) + .chain(self.create_tag_column_exprs()?) + .chain(Some(DfExpr::Alias(Alias { + expr: Box::new(DfExpr::Cast(Cast { + expr: Box::new(self.create_time_index_column_expr()?), + data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None), + })), + relation: Some(table_ref.clone()), + name: self + .ctx + .time_index_column + .as_ref() + .with_context(|| TimeIndexNotFoundSnafu { + table: table_ref.to_quoted_string(), + })? + .clone(), + }))) + .collect::>(); + scan_plan = LogicalPlanBuilder::from(scan_plan) + .project(expr) + .context(DataFusionPlanningSnafu)? + .build() + .context(DataFusionPlanningSnafu)?; + } + + // Safety: `scan_filters` is not empty. + let result = LogicalPlanBuilder::from(scan_plan) .filter(conjunction(filter).unwrap()) .context(DataFusionPlanningSnafu)? .build() diff --git a/tests/cases/standalone/common/promql/precisions.result b/tests/cases/standalone/common/promql/precisions.result new file mode 100644 index 000000000000..a86cc1e188f7 --- /dev/null +++ b/tests/cases/standalone/common/promql/precisions.result @@ -0,0 +1,121 @@ +CREATE TABLE host_sec ( + ts timestamp(0) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE host_sec VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5, 'host1', 3), + (5, 'host2', 4), + (10, 'host1', 5), + (10, 'host2', 6), + (15, 'host1', 7), + (15, 'host2', 8); + +Affected Rows: 8 + +CREATE TABLE host_micro ( + ts timestamp(6) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +Affected Rows: 0 + +INSERT INTO TABLE host_micro VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5000000, 'host1', 3), + (5000000, 'host2', 4), + (10000000, 'host1', 5), + (10000000, 'host2', 6), + (15000000, 'host1', 7), + (15000000, 'host2', 8); + +Affected Rows: 8 + +-- Test on Timestamps of different precisions +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_sec{host="host1"}; + ++-----+-------+---------------------+ +| val | host | ts | ++-----+-------+---------------------+ +| 1.0 | host1 | 1970-01-01T00:00:00 | +| 3.0 | host1 | 1970-01-01T00:00:05 | +| 5.0 | host1 | 1970-01-01T00:00:10 | +| 7.0 | host1 | 1970-01-01T00:00:15 | ++-----+-------+---------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]); + ++---------------------+----------------------------------+-------+ +| ts | prom_avg_over_time(ts_range,val) | host | ++---------------------+----------------------------------+-------+ +| 1970-01-01T00:00:00 | 1.0 | host1 | +| 1970-01-01T00:00:05 | 2.0 | host1 | +| 1970-01-01T00:00:10 | 4.0 | host1 | +| 1970-01-01T00:00:15 | 6.0 | host1 | ++---------------------+----------------------------------+-------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_micro{host="host1"}; + ++-----+-------+---------------------+ +| val | host | ts | ++-----+-------+---------------------+ +| 1.0 | host1 | 1970-01-01T00:00:00 | +| 3.0 | host1 | 1970-01-01T00:00:05 | +| 5.0 | host1 | 1970-01-01T00:00:10 | +| 7.0 | host1 | 1970-01-01T00:00:15 | ++-----+-------+---------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_micro{host="host1"}[5s]); + ++---------------------+----------------------------------+-------+ +| ts | prom_avg_over_time(ts_range,val) | host | ++---------------------+----------------------------------+-------+ +| 1970-01-01T00:00:00 | 1.0 | host1 | +| 1970-01-01T00:00:05 | 2.0 | host1 | +| 1970-01-01T00:00:10 | 4.0 | host1 | +| 1970-01-01T00:00:15 | 6.0 | host1 | ++---------------------+----------------------------------+-------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_sec{host="host1"} + host_micro{host="host1"}; + ++-------+---------------------+-------------------------------+ +| host | ts | host_sec.val + host_micro.val | ++-------+---------------------+-------------------------------+ +| host1 | 1970-01-01T00:00:00 | 2.0 | +| host1 | 1970-01-01T00:00:05 | 6.0 | +| host1 | 1970-01-01T00:00:10 | 10.0 | +| host1 | 1970-01-01T00:00:15 | 14.0 | ++-------+---------------------+-------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]) + avg_over_time(host_micro{host="host1"}[5s]); + ++-------+---------------------+-----------------------------------------------------------------------------------------+ +| host | ts | host_sec.prom_avg_over_time(ts_range,val) + host_micro.prom_avg_over_time(ts_range,val) | ++-------+---------------------+-----------------------------------------------------------------------------------------+ +| host1 | 1970-01-01T00:00:00 | 2.0 | +| host1 | 1970-01-01T00:00:05 | 4.0 | +| host1 | 1970-01-01T00:00:10 | 8.0 | +| host1 | 1970-01-01T00:00:15 | 12.0 | ++-------+---------------------+-----------------------------------------------------------------------------------------+ + +DROP TABLE host_sec; + +Affected Rows: 0 + +DROP TABLE host_micro; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/promql/precisions.sql b/tests/cases/standalone/common/promql/precisions.sql new file mode 100644 index 000000000000..6966e5bca205 --- /dev/null +++ b/tests/cases/standalone/common/promql/precisions.sql @@ -0,0 +1,55 @@ +CREATE TABLE host_sec ( + ts timestamp(0) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +INSERT INTO TABLE host_sec VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5, 'host1', 3), + (5, 'host2', 4), + (10, 'host1', 5), + (10, 'host2', 6), + (15, 'host1', 7), + (15, 'host2', 8); + +CREATE TABLE host_micro ( + ts timestamp(6) time index, + host STRING PRIMARY KEY, + val DOUBLE, +); + +INSERT INTO TABLE host_micro VALUES + (0, 'host1', 1), + (0, 'host2', 2), + (5000000, 'host1', 3), + (5000000, 'host2', 4), + (10000000, 'host1', 5), + (10000000, 'host2', 6), + (15000000, 'host1', 7), + (15000000, 'host2', 8); + +-- Test on Timestamps of different precisions + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_sec{host="host1"}; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_micro{host="host1"}; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_micro{host="host1"}[5s]); + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') host_sec{host="host1"} + host_micro{host="host1"}; + +-- SQLNESS SORT_RESULT 3 1 +TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]) + avg_over_time(host_micro{host="host1"}[5s]); + +DROP TABLE host_sec; + +DROP TABLE host_micro;