Skip to content

Commit

Permalink
feat(cubesql): Tableau Standard Gregorian missing date groupings supp…
Browse files Browse the repository at this point in the history
…ort through SQL push down and some other functions
  • Loading branch information
paveltiunov committed Sep 26, 2023
1 parent a32b619 commit 1da0cbf
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 5 deletions.
8 changes: 8 additions & 0 deletions packages/cubejs-schema-compiler/src/adapter/BaseQuery.js
Original file line number Diff line number Diff line change
Expand Up @@ -2432,6 +2432,14 @@ class BaseQuery {

COALESCE: 'COALESCE({{ args_concat }})',
CONCAT: 'CONCAT({{ args_concat }})',
FLOOR: 'FLOOR({{ args_concat }})',
CEIL: 'CEIL({{ args_concat }})',
TRUNC: 'TRUNC({{ args_concat }})',
LEAST: 'LEAST({{ args_concat }})',
LOWER: 'LOWER({{ args_concat }})',
UPPER: 'UPPER({{ args_concat }})',
LEFT: 'LEFT({{ args_concat }})',
RIGHT: 'RIGHT({{ args_concat }})',
},
statements: {
select: 'SELECT {{ select_concat | map(attribute=\'aliased\') | join(\', \') }} \n' +
Expand Down
36 changes: 33 additions & 3 deletions rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ impl CubeScanWrapperNode {
);
Ok((resulting_sql, sql_query))
}
// Expr::ScalarUDF { .. } => {}

// Expr::TableUDF { .. } => {}
Expr::Literal(literal) => {
Ok(match literal {
Expand Down Expand Up @@ -1127,6 +1127,36 @@ impl CubeScanWrapperNode {
}
})
}
Expr::ScalarUDF { fun, args } => {
let mut sql_args = Vec::new();
for arg in args {
let (sql, query) = Self::generate_sql_for_expr(
plan.clone(),
sql_query,
sql_generator.clone(),
arg,
ungrouped_scan_node.clone(),
)
.await?;

Check warning on line 1140 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L1140

Added line #L1140 was not covered by tests
sql_query = query;
sql_args.push(sql);
}
Ok((
Self::escape_interpolation_quotes(
sql_generator
.get_sql_templates()
.scalar_function(fun.name.to_string(), sql_args, None)
.map_err(|e| {
DataFusionError::Internal(format!(

Check warning on line 1150 in rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/engine/df/wrapper.rs#L1149-L1150

Added lines #L1149 - L1150 were not covered by tests
"Can't generate SQL for scalar function: {}",
e
))
})?,
ungrouped_scan_node.is_some(),
),
sql_query,
))
}
Expr::ScalarFunction { fun, args } => {
if let BuiltinScalarFunction::DatePart = &fun {
if args.len() >= 2 {
Expand Down Expand Up @@ -1199,7 +1229,7 @@ impl CubeScanWrapperNode {
Self::escape_interpolation_quotes(
sql_generator
.get_sql_templates()
.scalar_function(fun, sql_args, date_part)
.scalar_function(fun.to_string(), sql_args, date_part)
.map_err(|e| {
DataFusionError::Internal(format!(
"Can't generate SQL for scalar function: {}",
Expand Down Expand Up @@ -1260,7 +1290,7 @@ impl CubeScanWrapperNode {
// Expr::QualifiedWildcard { .. } => {}
x => {
return Err(DataFusionError::Internal(format!(
"Can't generate SQL for expr: {:?}",
"SQL generation for expression is not supported: {:?}",
x
)))
}
Expand Down
87 changes: 87 additions & 0 deletions rust/cubesql/cubesql/src/compile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18690,6 +18690,93 @@ ORDER BY \"COUNT(count)\" DESC"
.contains("EXTRACT"));
}

#[tokio::test]
async fn test_wrapper_tableau_week_number() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT CAST(FLOOR((7 + EXTRACT(DOY FROM order_date) - 1 + EXTRACT(DOW FROM DATE_TRUNC('YEAR', order_date))) / 7) AS INT) AS \"wk:created_at:ok\", AVG(avgPrice) mp FROM KibanaSampleDataEcommerce a GROUP BY 1 ORDER BY 1 DESC"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let physical_plan = query_plan.as_physical_plan().await.unwrap();
println!(
"Physical plan: {}",
displayable(physical_plan.as_ref()).indent()
);

let logical_plan = query_plan.as_logical_plan();
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("EXTRACT"));
}

#[tokio::test]
async fn test_wrapper_tableau_week_mmmm_yyyy() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT ((CAST(TRUNC(EXTRACT(YEAR FROM order_date)) AS INT) * 100) + CAST(TRUNC(EXTRACT(MONTH FROM order_date)) AS INT)) AS \"my:created_at:ok\", AVG(avgPrice) mp FROM KibanaSampleDataEcommerce a GROUP BY 1 ORDER BY 1 DESC"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let physical_plan = query_plan.as_physical_plan().await.unwrap();
println!(
"Physical plan: {}",
displayable(physical_plan.as_ref()).indent()
);

let logical_plan = query_plan.as_logical_plan();
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("EXTRACT"));
}

#[tokio::test]
async fn test_wrapper_tableau_iso_quarter() {
if !Rewriter::sql_push_down_enabled() {
return;
}
init_logger();

let query_plan = convert_select_to_query_plan(
"SELECT (LEAST(CAST((EXTRACT(WEEK FROM order_date) - 1) AS BIGINT) / 13, 3) + 1) AS \"iqr:created_at:ok\", AVG(avgPrice) mp FROM KibanaSampleDataEcommerce a GROUP BY 1 ORDER BY 1 DESC"
.to_string(),
DatabaseProtocol::PostgreSQL,
)
.await;

let physical_plan = query_plan.as_physical_plan().await.unwrap();
println!(
"Physical plan: {}",
displayable(physical_plan.as_ref()).indent()
);

let logical_plan = query_plan.as_logical_plan();
assert!(logical_plan
.find_cube_scan_wrapper()
.wrapped_sql
.unwrap()
.sql
.contains("EXTRACT"));
}

#[tokio::test]
async fn test_thoughtspot_pg_date_trunc_year() {
init_logger();
Expand Down
2 changes: 2 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod order;
mod projection;
mod scalar_function;
mod sort_expr;
mod udf_function;
mod wrapper_pull_up;

use crate::compile::{
Expand Down Expand Up @@ -44,6 +45,7 @@ impl RewriteRules for WrapperRules {
self.order_rules(&mut rules);
self.aggregate_function_rules(&mut rules);
self.scalar_function_rules(&mut rules);
self.udf_function_rules(&mut rules);
self.extract_rules(&mut rules);
self.alias_rules(&mut rules);
self.case_rules(&mut rules);
Expand Down
149 changes: 149 additions & 0 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/udf_function.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use crate::{
compile::rewrite::{
analysis::LogicalPlanAnalysis, rewrite, rules::wrapper::WrapperRules, transforming_rewrite,
udf_expr_var_arg, udf_fun_expr_args, udf_fun_expr_args_empty_tail, wrapper_pullup_replacer,
wrapper_pushdown_replacer, LogicalPlanLanguage, ScalarUDFExprFun,
WrapperPullupReplacerAliasToCube,
},
var, var_iter,
};
use egg::{EGraph, Rewrite, Subst};

impl WrapperRules {
pub fn udf_function_rules(
&self,
rules: &mut Vec<Rewrite<LogicalPlanLanguage, LogicalPlanAnalysis>>,
) {
rules.extend(vec![
rewrite(
"wrapper-push-down-udf",
wrapper_pushdown_replacer(
udf_expr_var_arg("?fun", "?args"),
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
udf_expr_var_arg(
"?fun",
wrapper_pushdown_replacer(
"?args",
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
),
),
transforming_rewrite(
"wrapper-pull-up-udf",
udf_expr_var_arg(
"?fun",
wrapper_pullup_replacer(
"?args",
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
),
wrapper_pullup_replacer(
udf_expr_var_arg("?fun", "?args"),
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
self.transform_udf_expr("?fun", "?alias_to_cube"),
),
rewrite(
"wrapper-push-down-udf-args",
wrapper_pushdown_replacer(
udf_fun_expr_args("?left", "?right"),
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
udf_fun_expr_args(
wrapper_pushdown_replacer(
"?left",
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
wrapper_pushdown_replacer(
"?right",
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
),
),
rewrite(
"wrapper-pull-up-udf-args",
udf_fun_expr_args(
wrapper_pullup_replacer(
"?left",
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
wrapper_pullup_replacer(
"?right",
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
),
wrapper_pullup_replacer(
udf_fun_expr_args("?left", "?right"),
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
),
rewrite(
"wrapper-push-down-udf-empty-tail",
wrapper_pushdown_replacer(
udf_fun_expr_args_empty_tail(),
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
wrapper_pullup_replacer(
udf_fun_expr_args_empty_tail(),
"?alias_to_cube",
"?ungrouped",
"?cube_members",
),
),
]);
}

fn transform_udf_expr(
&self,
fun_var: &'static str,
alias_to_cube_var: &'static str,
) -> impl Fn(&mut EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, &mut Subst) -> bool {
let fun_var = var!(fun_var);
let alias_to_cube_var = var!(alias_to_cube_var);
let meta = self.cube_context.meta.clone();
move |egraph, subst| {
for alias_to_cube in var_iter!(
egraph[subst[alias_to_cube_var]],
WrapperPullupReplacerAliasToCube
)
.cloned()
{
if let Some(sql_generator) = meta.sql_generator_by_alias_to_cube(&alias_to_cube) {
for fun in var_iter!(egraph[subst[fun_var]], ScalarUDFExprFun).cloned() {
if sql_generator
.get_sql_templates()
.templates
.contains_key(&format!("functions/{}", fun.to_uppercase()))

Check warning on line 139 in rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/udf_function.rs

View check run for this annotation

Codecov / codecov/patch

rust/cubesql/cubesql/src/compile/rewrite/rules/wrapper/udf_function.rs#L139

Added line #L139 was not covered by tests
{
return true;
}
}
}
}
false
}
}
}
3 changes: 3 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ pub fn get_test_tenant_ctx() -> Arc<MetaContext> {
),
("functions/DATETRUNC".to_string(), "DATE_TRUNC({{ args_concat }})".to_string()),
("functions/DATEPART".to_string(), "DATE_PART({{ args_concat }})".to_string()),
("functions/FLOOR".to_string(), "FLOOR({{ args_concat }})".to_string()),
("functions/TRUNC".to_string(), "TRUNC({{ args_concat }})".to_string()),
("functions/LEAST".to_string(), "LEAST({{ args_concat }})".to_string()),
("expressions/extract".to_string(), "EXTRACT({{ date_part }} FROM {{ expr }})".to_string()),
(
"statements/select".to_string(),
Expand Down
4 changes: 2 additions & 2 deletions rust/cubesql/cubesql/src/transport/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use cubeclient::{

use datafusion::{
arrow::{datatypes::SchemaRef, record_batch::RecordBatch},
physical_plan::{aggregates::AggregateFunction, functions::BuiltinScalarFunction},
physical_plan::aggregates::AggregateFunction,
};
use minijinja::{context, value::Value, Environment};
use serde_derive::*;
Expand Down Expand Up @@ -392,7 +392,7 @@ impl SqlTemplates {

pub fn scalar_function(
&self,
scalar_function: BuiltinScalarFunction,
scalar_function: String,
args: Vec<String>,
date_part: Option<String>,
) -> Result<String, CubeError> {
Expand Down

0 comments on commit 1da0cbf

Please sign in to comment.