Skip to content

Commit

Permalink
feat: support any precision in PromQL
Browse files Browse the repository at this point in the history
  • Loading branch information
Taylor-lagrange committed May 14, 2024
1 parent 60eb5de commit 721a7d4
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 11 deletions.
6 changes: 4 additions & 2 deletions src/promql/src/extension_plan/instant_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,14 @@ impl InstantManipulateStream {
// and the function `vectorSelectorSingle`
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
let mut take_indices = vec![];
// TODO(ruihang): maybe the input is not timestamp millisecond array

let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;

// field column for staleness check
let field_column = self
Expand Down
5 changes: 3 additions & 2 deletions src/promql/src/extension_plan/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,13 @@ pub struct SeriesNormalizeStream {

impl SeriesNormalizeStream {
pub fn normalize(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
// TODO(ruihang): maybe the input is not timestamp millisecond array
let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;

// bias the timestamp column by offset
let ts_column_biased = if self.offset == 0 {
Expand Down
13 changes: 9 additions & 4 deletions src/promql/src/extension_plan/range_manipulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ impl RangeManipulateStream {
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
// calculate the range
let (aligned_ts, ranges) = self.calculate_range(&input);
let (aligned_ts, ranges) = self.calculate_range(&input)?;
// ignore this if all ranges are empty
if ranges.iter().all(|(_, len)| *len == 0) {
return Ok(None);
Expand Down Expand Up @@ -472,12 +472,17 @@ impl RangeManipulateStream {
.map_err(|e| DataFusionError::ArrowError(e, None))
}

fn calculate_range(&self, input: &RecordBatch) -> (ArrayRef, Vec<(u32, u32)>) {
fn calculate_range(
&self,
input: &RecordBatch,
) -> DataFusionResult<(ArrayRef, Vec<(u32, u32)>)> {
let ts_column = input
.column(self.time_index)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
.ok_or(DataFusionError::Execution(
"Time index Column downcast to TimestampMillisecondArray failed".into(),
))?;

let mut aligned_ts = vec![];
let mut ranges = vec![];
Expand Down Expand Up @@ -506,7 +511,7 @@ impl RangeManipulateStream {

let aligned_ts_array = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as _;

(aligned_ts_array, ranges)
Ok((aligned_ts_array, ranges))
}
}

Expand Down
63 changes: 60 additions & 3 deletions src/promql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use datafusion::prelude::{Column, Expr as DfExpr, JoinType};
use datafusion::scalar::ScalarValue;
use datafusion::sql::TableReference;
use datafusion_expr::utils::conjunction;
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, TimeUnit as ArrowTimeUnit};
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
use promql_parser::label::{MatchOp, Matcher, Matchers, METRIC_NAME};
use promql_parser::parser::{
Expand Down Expand Up @@ -910,9 +911,65 @@ impl PromPlanner {
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?;
// Safety: `scan_filters` is not empty.
let result = LogicalPlanBuilder::scan(table_ref, provider, None)
let table_schema = provider
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
.table_provider
.as_any()
.downcast_ref::<DfTableProviderAdapter>()
.context(UnknownTableSnafu)?
.table()
.schema();

let time_index_type = &table_schema
.timestamp_column()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.data_type;

let is_time_index_ms = *time_index_type == ConcreteDataType::time_millisecond_datatype()
|| *time_index_type == ConcreteDataType::timestamp_millisecond_datatype();

let mut scan_plan = LogicalPlanBuilder::scan(table_ref.clone(), provider, None)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;

if !is_time_index_ms {
// cast to ms if time_index not in Millisecond precision
let expr: Vec<_> = self
.ctx
.field_columns
.iter()
.map(|col| DfExpr::Column(Column::new(Some(table_ref.clone()), col.clone())))
.chain(self.create_tag_column_exprs()?)
.chain(Some(DfExpr::Alias(Alias {
expr: Box::new(DfExpr::Cast(Cast {
expr: Box::new(self.create_time_index_column_expr()?),
data_type: ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
})),
relation: Some(table_ref.clone()),
name: self
.ctx
.time_index_column
.as_ref()
.with_context(|| TimeIndexNotFoundSnafu {
table: table_ref.to_quoted_string(),
})?
.clone(),
})))
.collect::<Vec<_>>();
scan_plan = LogicalPlanBuilder::from(scan_plan)
.project(expr)
.context(DataFusionPlanningSnafu)?
.build()
.context(DataFusionPlanningSnafu)?;
}

// Safety: `scan_filters` is not empty.
let result = LogicalPlanBuilder::from(scan_plan)
.filter(conjunction(filter).unwrap())
.context(DataFusionPlanningSnafu)?
.build()
Expand Down
121 changes: 121 additions & 0 deletions tests/cases/standalone/common/promql/precisions.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
CREATE TABLE host_sec (
ts timestamp(0) time index,
host STRING PRIMARY KEY,
val DOUBLE,
);

Affected Rows: 0

INSERT INTO TABLE host_sec VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5, 'host1', 3),
(5, 'host2', 4),
(10, 'host1', 5),
(10, 'host2', 6),
(15, 'host1', 7),
(15, 'host2', 8);

Affected Rows: 8

CREATE TABLE host_micro (
ts timestamp(6) time index,
host STRING PRIMARY KEY,
val DOUBLE,
);

Affected Rows: 0

INSERT INTO TABLE host_micro VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5000000, 'host1', 3),
(5000000, 'host2', 4),
(10000000, 'host1', 5),
(10000000, 'host2', 6),
(15000000, 'host1', 7),
(15000000, 'host2', 8);

Affected Rows: 8

-- Test on Timestamps of different precisions
-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_sec{host="host1"};

+-----+-------+---------------------+
| val | host | ts |
+-----+-------+---------------------+
| 1.0 | host1 | 1970-01-01T00:00:00 |
| 3.0 | host1 | 1970-01-01T00:00:05 |
| 5.0 | host1 | 1970-01-01T00:00:10 |
| 7.0 | host1 | 1970-01-01T00:00:15 |
+-----+-------+---------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]);

+---------------------+----------------------------------+-------+
| ts | prom_avg_over_time(ts_range,val) | host |
+---------------------+----------------------------------+-------+
| 1970-01-01T00:00:00 | 1.0 | host1 |
| 1970-01-01T00:00:05 | 2.0 | host1 |
| 1970-01-01T00:00:10 | 4.0 | host1 |
| 1970-01-01T00:00:15 | 6.0 | host1 |
+---------------------+----------------------------------+-------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_micro{host="host1"};

+-----+-------+---------------------+
| val | host | ts |
+-----+-------+---------------------+
| 1.0 | host1 | 1970-01-01T00:00:00 |
| 3.0 | host1 | 1970-01-01T00:00:05 |
| 5.0 | host1 | 1970-01-01T00:00:10 |
| 7.0 | host1 | 1970-01-01T00:00:15 |
+-----+-------+---------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_micro{host="host1"}[5s]);

+---------------------+----------------------------------+-------+
| ts | prom_avg_over_time(ts_range,val) | host |
+---------------------+----------------------------------+-------+
| 1970-01-01T00:00:00 | 1.0 | host1 |
| 1970-01-01T00:00:05 | 2.0 | host1 |
| 1970-01-01T00:00:10 | 4.0 | host1 |
| 1970-01-01T00:00:15 | 6.0 | host1 |
+---------------------+----------------------------------+-------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_sec{host="host1"} + host_micro{host="host1"};

+-------+---------------------+-------------------------------+
| host | ts | host_sec.val + host_micro.val |
+-------+---------------------+-------------------------------+
| host1 | 1970-01-01T00:00:00 | 2.0 |
| host1 | 1970-01-01T00:00:05 | 6.0 |
| host1 | 1970-01-01T00:00:10 | 10.0 |
| host1 | 1970-01-01T00:00:15 | 14.0 |
+-------+---------------------+-------------------------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]) + avg_over_time(host_micro{host="host1"}[5s]);

+-------+---------------------+-----------------------------------------------------------------------------------------+
| host | ts | host_sec.prom_avg_over_time(ts_range,val) + host_micro.prom_avg_over_time(ts_range,val) |
+-------+---------------------+-----------------------------------------------------------------------------------------+
| host1 | 1970-01-01T00:00:00 | 2.0 |
| host1 | 1970-01-01T00:00:05 | 4.0 |
| host1 | 1970-01-01T00:00:10 | 8.0 |
| host1 | 1970-01-01T00:00:15 | 12.0 |
+-------+---------------------+-----------------------------------------------------------------------------------------+

DROP TABLE host_sec;

Affected Rows: 0

DROP TABLE host_micro;

Affected Rows: 0

55 changes: 55 additions & 0 deletions tests/cases/standalone/common/promql/precisions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
CREATE TABLE host_sec (
ts timestamp(0) time index,
host STRING PRIMARY KEY,
val DOUBLE,
);

INSERT INTO TABLE host_sec VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5, 'host1', 3),
(5, 'host2', 4),
(10, 'host1', 5),
(10, 'host2', 6),
(15, 'host1', 7),
(15, 'host2', 8);

CREATE TABLE host_micro (
ts timestamp(6) time index,
host STRING PRIMARY KEY,
val DOUBLE,
);

INSERT INTO TABLE host_micro VALUES
(0, 'host1', 1),
(0, 'host2', 2),
(5000000, 'host1', 3),
(5000000, 'host2', 4),
(10000000, 'host1', 5),
(10000000, 'host2', 6),
(15000000, 'host1', 7),
(15000000, 'host2', 8);

-- Test on Timestamps of different precisions

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_sec{host="host1"};

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]);

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_micro{host="host1"};

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_micro{host="host1"}[5s]);

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') host_sec{host="host1"} + host_micro{host="host1"};

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') avg_over_time(host_sec{host="host1"}[5s]) + avg_over_time(host_micro{host="host1"}[5s]);

DROP TABLE host_sec;

DROP TABLE host_micro;

0 comments on commit 721a7d4

Please sign in to comment.