Skip to content

Commit

Permalink
fix: bugs related to merge scan (#2118)
Browse files Browse the repository at this point in the history
* fix: prevent optimize merge scan, mark distinct as unsupported

Signed-off-by: Ruihang Xia <[email protected]>

* fix some other problems

Signed-off-by: Ruihang Xia <[email protected]>

* fix unit tests

Signed-off-by: Ruihang Xia <[email protected]>

* remove deadcode

Signed-off-by: Ruihang Xia <[email protected]>

* add some comments

Signed-off-by: Ruihang Xia <[email protected]>

* Update src/query/src/optimizer/type_conversion.rs

Co-authored-by: Lei, HUANG <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Lei, HUANG <[email protected]>
  • Loading branch information
waynexia and v0y4g3r authored Aug 8, 2023
1 parent 57836e7 commit 4c69379
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 84 deletions.
19 changes: 6 additions & 13 deletions src/query/src/dist_plan/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,13 @@ impl AnalyzerRule for DistPlannerAnalyzer {
plan: LogicalPlan,
_config: &ConfigOptions,
) -> datafusion_common::Result<LogicalPlan> {
// (1) add merge scan
let plan = plan.transform(&Self::add_merge_scan)?;

// (2) transform up merge scan
// (1) transform up merge scan
let mut visitor = CommutativeVisitor::new();
let _ = plan.visit(&mut visitor)?;
let state = ExpandState::new();
let plan = plan.transform_down(&|plan| Self::expand(plan, &visitor, &state))?;

// (3) remove placeholder merge scan
// (2) remove placeholder merge scan
let plan = plan.transform(&Self::remove_placeholder_merge_scan)?;

Ok(plan)
Expand All @@ -59,6 +56,7 @@ impl AnalyzerRule for DistPlannerAnalyzer {

impl DistPlannerAnalyzer {
/// Add [MergeScanLogicalPlan] before the table scan
#[allow(dead_code)]
fn add_merge_scan(plan: LogicalPlan) -> datafusion_common::Result<Transformed<LogicalPlan>> {
Ok(match plan {
LogicalPlan::TableScan(table_scan) => {
Expand Down Expand Up @@ -326,8 +324,7 @@ mod test {
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = String::from(
"Aggregate: groupBy=[[]], aggr=[[AVG(t.number)]]\
\n MergeScan [is_placeholder=false]\
\n TableScan: t",
\n MergeScan [is_placeholder=false]",
);
assert_eq!(expected, format!("{:?}", result));
}
Expand All @@ -353,9 +350,7 @@ mod test {
let expected = String::from(
"Sort: t.number ASC NULLS LAST\
\n Distinct:\
\n MergeScan [is_placeholder=false]\
\n Distinct:\
\n TableScan: t",
\n MergeScan [is_placeholder=false]",
);
assert_eq!(expected, format!("{:?}", result));
}
Expand All @@ -378,9 +373,7 @@ mod test {
let result = DistPlannerAnalyzer {}.analyze(plan, &config).unwrap();
let expected = String::from(
"Limit: skip=0, fetch=1\
\n MergeScan [is_placeholder=false]\
\n Limit: skip=0, fetch=1\
\n TableScan: t",
\n MergeScan [is_placeholder=false]",
);
assert_eq!(expected, format!("{:?}", result));
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/src/dist_plan/commutativity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl Categorizer {
LogicalPlan::Extension(extension) => {
Self::check_extension_plan(extension.node.as_ref() as _)
}
LogicalPlan::Distinct(_) => Commutativity::PartialCommutative,
LogicalPlan::Distinct(_) => Commutativity::Unimplemented,
LogicalPlan::Unnest(_) => Commutativity::Commutative,
LogicalPlan::Statement(_) => Commutativity::Unsupported,
LogicalPlan::Values(_) => Commutativity::Unsupported,
Expand Down
18 changes: 8 additions & 10 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use snafu::ResultExt;

use crate::error::{ConvertSchemaSnafu, RemoteRequestSnafu, UnexpectedOutputKindSnafu};

#[derive(Debug, Hash, PartialEq, Eq)]
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct MergeScanLogicalPlan {
/// In logical plan phase it only contains one input
input: LogicalPlan,
Expand All @@ -53,29 +53,27 @@ impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
Self::name()
}

// Prevent further optimization.
// The input can be retrieved by `self.input()`
fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
vec![]
}

fn schema(&self) -> &datafusion_common::DFSchemaRef {
self.input.schema()
}

// Prevent further optimization
fn expressions(&self) -> Vec<datafusion_expr::Expr> {
self.input.expressions()
vec![]
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MergeScan [is_placeholder={}]", self.is_placeholder)
}

// todo: maybe contains exprs will be useful
// todo: add check for inputs' length
fn from_template(&self, _exprs: &[datafusion_expr::Expr], inputs: &[LogicalPlan]) -> Self {
Self {
input: inputs[0].clone(),
is_placeholder: self.is_placeholder,
}
fn from_template(&self, _exprs: &[datafusion_expr::Expr], _inputs: &[LogicalPlan]) -> Self {
self.clone()
}
}

Expand Down
22 changes: 17 additions & 5 deletions src/query/src/dist_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::{DataFusionError, TableReference};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_optimizer::analyzer::Analyzer;
use partition::manager::PartitionRuleManager;
use snafu::{OptionExt, ResultExt};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
Expand Down Expand Up @@ -82,8 +83,10 @@ impl ExtensionPlanner for DistExtensionPlanner {
} else {
// TODO(ruihang): generate different execution plans for different variant merge operation
let input_plan = merge_scan.input();
let optimized_input =
self.optimize_input_logical_plan(session_state, input_plan)?;
let input_physical_plan = planner
.create_physical_plan(input_plan, session_state)
.create_physical_plan(&optimized_input, session_state)
.await?;
let Some(table_name) = self.get_table_name(input_plan)? else {
// no relation found in input plan, going to execute them locally
Expand Down Expand Up @@ -116,10 +119,7 @@ impl ExtensionPlanner for DistExtensionPlanner {

Ok(Some(Arc::new(exec) as _))
}
Err(_) => planner
.create_physical_plan(&input_plan, session_state)
.await
.map(Some),
Err(_) => Ok(Some(input_physical_plan)),
}
}
} else {
Expand Down Expand Up @@ -165,6 +165,18 @@ impl DistExtensionPlanner {
})
.map_err(|e| DataFusionError::External(Box::new(e)))
}

// TODO(ruihang): find a more elegant way to optimize input logical plan
fn optimize_input_logical_plan(
&self,
session_state: &SessionState,
plan: &LogicalPlan,
) -> Result<LogicalPlan> {
let state = session_state.clone();
let analyzer = Analyzer::default();
let state = state.with_analyzer_rules(analyzer.rules);
state.optimize(plan)
}
}

/// Visitor to extract table name from logical plan (TableScan node)
Expand Down
9 changes: 8 additions & 1 deletion src/query/src/optimizer/type_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,19 @@ impl TypeConverter {
_ => return Ok((left.clone(), right.clone())),
};

// only try to convert timestamp or boolean types
if !matches!(left_type, DataType::Timestamp(_, _))
&& !matches!(left_type, DataType::Boolean)
{
return Ok((left.clone(), right.clone()));
}

match (left, right) {
(Expr::Column(col), Expr::Literal(value)) => {
let casted_right = Self::cast_scalar_value(value, left_type)?;
if casted_right.is_null() {
return Err(DataFusionError::Plan(format!(
"column:{col:?} value:{value:?} is invalid",
"column:{col:?}. Casting value:{value:?} to {left_type:?} is invalid",
)));
}
if reverse {
Expand Down
14 changes: 10 additions & 4 deletions src/servers/src/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_grpc::writer::{LinesWriter, Precision};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_time::timestamp::TimeUnit;
use datafusion::prelude::{col, lit, regexp_match, Expr};
use datafusion_common::ScalarValue;
use datatypes::prelude::{ConcreteDataType, Value};
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use query::dataframe::DataFrame;
Expand Down Expand Up @@ -72,8 +73,9 @@ pub fn query_to_plan(dataframe: DataFrame, q: &Query) -> Result<LogicalPlan> {

let mut conditions = Vec::with_capacity(label_matches.len() + 1);

conditions.push(col(TIMESTAMP_COLUMN_NAME).gt_eq(lit(start_timestamp_ms)));
conditions.push(col(TIMESTAMP_COLUMN_NAME).lt_eq(lit(end_timestamp_ms)));
conditions
.push(col(TIMESTAMP_COLUMN_NAME).gt_eq(lit_timestamp_millisecond(start_timestamp_ms)));
conditions.push(col(TIMESTAMP_COLUMN_NAME).lt_eq(lit_timestamp_millisecond(end_timestamp_ms)));

for m in label_matches {
let name = &m.name;
Expand Down Expand Up @@ -121,6 +123,10 @@ fn new_label(name: String, value: String) -> Label {
Label { name, value }
}

fn lit_timestamp_millisecond(ts: i64) -> Expr {
Expr::Literal(ScalarValue::TimestampMillisecond(Some(ts), None))
}

// A timeseries id
#[derive(Debug)]
struct TimeSeriesId {
Expand Down Expand Up @@ -526,7 +532,7 @@ mod tests {
let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
let display_string = format!("{}", plan.display_indent());

assert_eq!("Filter: ?table?.greptime_timestamp >= Int64(1000) AND ?table?.greptime_timestamp <= Int64(2000)\n TableScan: ?table?", display_string);
assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None)\n TableScan: ?table?", display_string);

let q = Query {
start_timestamp_ms: 1000,
Expand Down Expand Up @@ -555,7 +561,7 @@ mod tests {
let plan = query_to_plan(DataFrame::DataFusion(dataframe), &q).unwrap();
let display_string = format!("{}", plan.display_indent());

assert_eq!("Filter: ?table?.greptime_timestamp >= Int64(1000) AND ?table?.greptime_timestamp <= Int64(2000) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
assert_eq!("Filter: ?table?.greptime_timestamp >= TimestampMillisecond(1000, None) AND ?table?.greptime_timestamp <= TimestampMillisecond(2000, None) AND regexp_match(?table?.job, Utf8(\"*prom*\")) IS NOT NULL AND ?table?.instance != Utf8(\"localhost\")\n TableScan: ?table?", display_string);
}

#[test]
Expand Down
6 changes: 0 additions & 6 deletions tests/cases/distributed/explain/order_by.result
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ EXPLAIN SELECT DISTINCT i%2 FROM integers ORDER BY 1;
| logical_plan_| Sort: integers.i % Int64(2) ASC NULLS LAST_|
|_|_Aggregate: groupBy=[[integers.i % Int64(2)]], aggr=[[]]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_Aggregate: groupBy=[[integers.i % Int64(2)]], aggr=[[]]_|
|_|_Projection: integers.i % Int64(2)_|
|_|_TableScan: integers projection=[i]_|
| physical_plan | SortPreservingMergeExec: [integers.i % Int64(2)@0 ASC NULLS LAST]_|
|_|_SortExec: expr=[integers.i % Int64(2)@0 ASC NULLS LAST]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[integers.i % Int64(2)@0 as integers.i % Int64(2)], aggr=[] |
Expand Down Expand Up @@ -49,7 +46,6 @@ EXPLAIN SELECT a, b FROM test ORDER BY a, b;
+-+-+
| logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: test projection=[a, b]_|
| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST] |
|_|_MergeScanExec: peers=[REDACTED
|_|_|
Expand All @@ -68,8 +64,6 @@ EXPLAIN SELECT DISTINCT a, b FROM test ORDER BY a, b;
| logical_plan_| Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST_|
|_|_Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]_|
|_|_MergeScan [is_placeholder=false]_|
|_|_Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]_|
|_|_TableScan: test projection=[a, b]_|
| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST]_|
|_|_SortExec: expr=[a@0 ASC NULLS LAST,b@1 ASC NULLS LAST]_|
|_|_AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]_|
Expand Down
3 changes: 0 additions & 3 deletions tests/cases/distributed/explain/single_partition.result
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ EXPLAIN SELECT COUNT(*) FROM single_partition;
| logical_plan_| Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]_|
|_|_Projection:_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: single_partition projection=[i, j, k]_|
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]_|
|_|_CoalescePartitionsExec_|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]_|
Expand All @@ -38,7 +37,6 @@ EXPLAIN SELECT SUM(i) FROM single_partition;
| logical_plan_| Aggregate: groupBy=[[]], aggr=[[SUM(single_partition.i)]]_|
|_|_Projection: single_partition.i_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: single_partition projection=[i, j, k]_|
| physical_plan | AggregateExec: mode=Final, gby=[], aggr=[SUM(single_partition.i)]_|
|_|_CoalescePartitionsExec_|
|_|_AggregateExec: mode=Partial, gby=[], aggr=[SUM(single_partition.i)]_|
Expand All @@ -60,7 +58,6 @@ EXPLAIN SELECT * FROM single_partition ORDER BY i DESC;
+-+-+
| logical_plan_| Sort: single_partition.i DESC NULLS FIRST_|
|_|_MergeScan [is_placeholder=false]_|
|_|_TableScan: single_partition projection=[i, j, k] |
| physical_plan | SortExec: expr=[i@0 DESC]_|
|_|_MergeScanExec: peers=[REDACTED
|_|_|
Expand Down
17 changes: 13 additions & 4 deletions tests/cases/distributed/optimizer/filter_push_down.result
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,18 @@ Error: 1003(Internal), This feature is not implemented: Unsupported expression:

SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE i2.i>1 ORDER BY 2;

++
++
+---+---+
| i | i |
+---+---+
| 1 | 2 |
| 2 | 2 |
| 3 | 2 |
| | 2 |
| 1 | 3 |
| 2 | 3 |
| 3 | 3 |
| | 3 |
+---+---+

SELECT i1.i,i2.i FROM integers i1 LEFT OUTER JOIN integers i2 ON 1=0 WHERE CASE WHEN i2.i IS NULL THEN False ELSE True END ORDER BY 2;

Expand Down Expand Up @@ -197,8 +207,7 @@ SELECT i FROM (SELECT * FROM integers i1 UNION SELECT * FROM integers i2) a WHER
-- SELECT * FROM (SELECT i1.i AS a, i2.i AS b, row_number() OVER (ORDER BY i1.i, i2.i) FROM integers i1, integers i2 WHERE i1.i IS NOT NULL AND i2.i IS NOT NULL) a1 WHERE a=b ORDER BY 1;
SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2) a1 WHERE cond ORDER BY 1;

++
++
Error: 1003(Internal), Invalid argument error: must either specify a row count or at least one column

SELECT * FROM (SELECT 0=1 AS cond FROM integers i1, integers i2 GROUP BY 1) a1 WHERE cond ORDER BY 1;

Expand Down
Loading

0 comments on commit 4c69379

Please sign in to comment.