Skip to content

Commit

Permalink
fix the view tests
Browse files Browse the repository at this point in the history
  • Loading branch information
goldmedal committed Jul 28, 2024
1 parent 2f5ba33 commit bdf474e
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 35 deletions.
24 changes: 18 additions & 6 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@
use std::{any::Any, sync::Arc};

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::Column;
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};

use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan},
physical_plan::ExecutionPlan,
};
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Column;
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use datafusion_optimizer::Analyzer;

use crate::datasource::{TableProvider, TableType};

Expand All @@ -50,6 +52,7 @@ impl ViewTable {
logical_plan: LogicalPlan,
definition: Option<String>,
) -> Result<Self> {
let logical_plan = Self::apply_required_rule(logical_plan)?;
let table_schema = logical_plan.schema().as_ref().to_owned().into();

let view = Self {
Expand All @@ -61,6 +64,15 @@ impl ViewTable {
Ok(view)
}

fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
let options = ConfigOptions::default();
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]).execute_and_check(
logical_plan,
&options,
|_, _| {},
)
}

/// Get definition ref
pub fn definition(&self) -> Option<&String> {
self.definition.as_ref()
Expand Down
1 change: 0 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,6 @@ impl SessionContext {
}
(_, Err(_)) => {
let table = Arc::new(ViewTable::try_new((*input).clone(), definition)?);

self.register_table(name, table)?;
self.return_empty_dataframe()
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ pub fn expand_qualified_wildcard(
schema: &DFSchema,
wildcard_options: Option<&WildcardAdditionalOptions>,
) -> Result<Vec<Expr>> {
dbg!(&schema);
let qualified_indices = schema.fields_indices_with_qualified(qualifier);
let projected_func_dependencies = schema
.functional_dependencies()
Expand Down
101 changes: 76 additions & 25 deletions datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
use crate::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_common::{plan_err, Result};
use datafusion_common::{Column, DataFusionError, Result};
use datafusion_expr::utils::{expand_qualified_wildcard, expand_wildcard};
use datafusion_expr::{Expr, LogicalPlan, Projection};
use datafusion_expr::{Expr, LogicalPlan, Projection, SubqueryAlias};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

Expand All @@ -37,15 +38,15 @@ impl AnalyzerRule for ExpandWildcardRule {
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
// Because the wildcard expansion is based on the schema of the input plan,
// using `transform_up_with_subqueries` here.
plan.transform_up_with_subqueries(analyzer_internal).data()
plan.transform_up_with_subqueries(expand_internal).data()
}

fn name(&self) -> &str {
"expand_wildcard_rule"
}
}

fn analyzer_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
fn expand_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Projection(Projection { expr, input, .. }) => {
let mut projected_expr = vec![];
Expand All @@ -66,46 +67,76 @@ fn analyzer_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
)?);
}
}
// A workaround to handle the case when the column name is "*".
// We transform the expression to a Expr::Column through [Column::from_name] in many places.
// It would also convert the wildcard expression to a column expression with name "*".
Expr::Column(Column {
ref relation,
ref name,
}) => {
if name.eq("*") {
if let Some(qualifier) = relation {
projected_expr.extend(expand_qualified_wildcard(
qualifier,
input.schema(),
None,
)?);
} else {
projected_expr.extend(expand_wildcard(
input.schema(),
&input,
None,
)?);
}
} else {
projected_expr.push(e.clone());
}
}
_ => projected_expr.push(e),
}
}
validate_unique_names("Projections", projected_expr.iter())?;
Ok(Transformed::yes(
Projection::try_new(projected_expr, Arc::clone(&input))
.map(LogicalPlan::Projection)?,
Projection::try_new(
to_unique_names(projected_expr.iter())?,
Arc::clone(&input),
)
.map(LogicalPlan::Projection)?,
))
}
// Teh schema of the plan should also be updated if the child plan is transformed.
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
Ok(Transformed::yes(
SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias)?,
))
}
_ => Ok(Transformed::no(plan)),
}
}

fn validate_unique_names<'a>(
node_name: &str,
fn to_unique_names<'a>(
expressions: impl IntoIterator<Item = &'a Expr>,
) -> Result<()> {
) -> Result<Vec<Expr>> {
let mut unique_names = HashMap::new();

expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
let name = expr.display_name()?;
match unique_names.get(&name) {
None => {
unique_names.insert(name, (position, expr));
Ok(())
},
Some((existing_position, existing_expr)) => {
plan_err!("{node_name} require unique expression names \
but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
at position {position} have the same name. Consider aliasing (\"AS\") one of them."
)
let mut unique_expr = vec![];
expressions
.into_iter()
.enumerate()
.try_for_each(|(position, expr)| {
let name = expr.display_name()?;
if let Entry::Vacant(e) = unique_names.entry(name) {
e.insert((position, expr));
unique_expr.push(expr.to_owned());
}
}
})
Ok::<(), DataFusionError>(())
})?;
Ok(unique_expr)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
use crate::Analyzer;
use datafusion_common::TableReference;
use datafusion_expr::{
col, in_subquery, qualified_wildcard, wildcard, LogicalPlanBuilder,
Expand Down Expand Up @@ -181,4 +212,24 @@ mod tests {
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
assert_plan_eq(plan, expected)
}

#[test]
fn test_subquery_schema() -> Result<()> {
let analyzer = Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]);
let options = ConfigOptions::default();
let subquery = LogicalPlanBuilder::from(test_table_scan()?)
.project(vec![wildcard()])?
.build()?;
let plan = LogicalPlanBuilder::from(subquery)
.alias("sub")?
.project(vec![wildcard()])?
.build()?;
let analyzed_plan = analyzer.execute_and_check(plan, &options, |_, _| {})?;
for x in analyzed_plan.inputs() {
for field in x.schema().fields() {
assert_ne!(field.name(), "*");
}
}
Ok(())
}
}
4 changes: 3 additions & 1 deletion datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ impl Analyzer {
pub fn new() -> Self {
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
Arc::new(InlineTableScan::new()),
// Every rule that will generate [Expr::Wildcard] should be placed in front of [ExpandWildcardRule].
Arc::new(ExpandWildcardRule::new()),
// [Expr::Wildcard] should be expanded before [TypeCoercion]
Arc::new(TypeCoercion::new()),
Arc::new(CountWildcardRule::new()),
Arc::new(ExpandWildcardRule::new()),
];
Self::with_rules(rules)
}
Expand Down
2 changes: 0 additions & 2 deletions datafusion/optimizer/src/analyzer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,6 @@ mod test {
.eq(cast(lit("1998-03-18"), DataType::Date32));
let empty = empty();
let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?);
dbg!(&plan);
let expected =
"Projection: CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, None)) = CAST(CAST(Utf8(\"1998-03-18\") AS Date32) AS Timestamp(Nanosecond, None))\n EmptyRelation";
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?;
Expand Down Expand Up @@ -1535,7 +1534,6 @@ mod test {
));
let empty = empty();
let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?);
dbg!(&plan);
let expected =
"Projection: CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, None)) - CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, None))\n EmptyRelation";
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?;
Expand Down

0 comments on commit bdf474e

Please sign in to comment.