Skip to content

Commit

Permalink
Moving min and max to new API and removing from protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
edmondop committed Jul 20, 2024
1 parent 5da7ab3 commit 804976a
Show file tree
Hide file tree
Showing 31 changed files with 1,704 additions and 358 deletions.
1 change: 1 addition & 0 deletions datafusion-examples/examples/dataframe_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;

use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::logical_expr::test::function_stub::max;
use datafusion::prelude::*;
use datafusion::test_util::arrow_test_data;
use datafusion_common::ScalarValue;
Expand Down
6 changes: 4 additions & 2 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,11 @@ use datafusion_common::{
};
use datafusion_expr::{case, is_null, lit};
use datafusion_expr::{
max, min, utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
};
use datafusion_functions_aggregate::expr_fn::{
avg, count, max, median, min, stddev, sum,
};
use datafusion_functions_aggregate::expr_fn::{avg, count, median, stddev, sum};

use async_trait::async_trait;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ fn is_min(agg_expr: &dyn AggregateExpr) -> bool {
}

if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
if agg_expr.fun().name() == "min" {
if agg_expr.fun().name().to_lowercase() == "min" {
return true;
}
}
Expand All @@ -299,7 +299,7 @@ fn is_max(agg_expr: &dyn AggregateExpr) -> bool {
}

if let Some(agg_expr) = agg_expr.as_any().downcast_ref::<AggregateFunctionExpr>() {
if agg_expr.fun().name() == "max" {
if agg_expr.fun().name().to_lowercase() == "max" {
return true;
}
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_expr::expr::{GroupingSet, Sort};
use datafusion_expr::var_provider::{VarProvider, VarType};
use datafusion_expr::{
cast, col, exists, expr, in_subquery, lit, max, out_ref_col, placeholder,
cast, col, exists, expr, in_subquery, lit, out_ref_col, placeholder,
scalar_subquery, when, wildcard, Expr, ExprSchemable, WindowFrame, WindowFrameBound,
WindowFrameUnits, WindowFunctionDefinition,
};
use datafusion_functions_aggregate::expr_fn::{array_agg, avg, count, sum};
use datafusion_functions_aggregate::expr_fn::{array_agg, avg, count, max, sum};

#[tokio::test]
async fn test_count_wildcard_on_sort() -> Result<()> {
Expand Down
9 changes: 5 additions & 4 deletions datafusion/core/tests/fuzz_cases/window_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::type_coercion::aggregates::coerce_types;
use datafusion_expr::type_coercion::functions::data_types_with_aggregate_udf;
use datafusion_expr::{
AggregateFunction, BuiltInWindowFunction, WindowFrame, WindowFrameBound,
WindowFrameUnits, WindowFunctionDefinition,
BuiltInWindowFunction, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition,
};
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::min_max::{max_udaf, min_udaf};
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::{cast, col, lit};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
Expand Down Expand Up @@ -360,14 +361,14 @@ fn get_random_function(
window_fn_map.insert(
"min",
(
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Min),
WindowFunctionDefinition::AggregateUDF(min_udaf()),
vec![arg.clone()],
),
);
window_fn_map.insert(
"max",
(
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Max),
WindowFunctionDefinition::AggregateUDF(max_udaf()),
vec![arg.clone()],
),
);
Expand Down
27 changes: 0 additions & 27 deletions datafusion/expr/src/aggregate_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ use strum_macros::EnumIter;
// https://datafusion.apache.org/contributor-guide/index.html#how-to-add-a-new-aggregate-function
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash, EnumIter)]
pub enum AggregateFunction {
/// Minimum
Min,
/// Maximum
Max,
/// Aggregation into an array
ArrayAgg,
}
Expand All @@ -45,8 +41,6 @@ impl AggregateFunction {
pub fn name(&self) -> &str {
use AggregateFunction::*;
match self {
Min => "MIN",
Max => "MAX",
ArrayAgg => "ARRAY_AGG",
}
}
Expand All @@ -62,9 +56,6 @@ impl FromStr for AggregateFunction {
type Err = DataFusionError;
fn from_str(name: &str) -> Result<AggregateFunction> {
Ok(match name {
// general
"max" => AggregateFunction::Max,
"min" => AggregateFunction::Min,
"array_agg" => AggregateFunction::ArrayAgg,
_ => {
return plan_err!("There is no built-in function named {name}");
Expand Down Expand Up @@ -100,11 +91,6 @@ impl AggregateFunction {
})?;

match self {
AggregateFunction::Max | AggregateFunction::Min => {
// For min and max agg function, the returned type is same as input type.
// The coerced_data_types is same with input_types.
Ok(coerced_data_types[0].clone())
}
AggregateFunction::ArrayAgg => Ok(DataType::List(Arc::new(Field::new(
"item",
coerced_data_types[0].clone(),
Expand All @@ -117,7 +103,6 @@ impl AggregateFunction {
/// nullability
pub fn nullable(&self) -> Result<bool> {
match self {
AggregateFunction::Max | AggregateFunction::Min => Ok(true),
AggregateFunction::ArrayAgg => Ok(true),
}
}
Expand All @@ -129,18 +114,6 @@ impl AggregateFunction {
// note: the physical expression must accept the type returned by this function or the execution panics.
match self {
AggregateFunction::ArrayAgg => Signature::any(1, Volatility::Immutable),
AggregateFunction::Min | AggregateFunction::Max => {
let valid = STRINGS
.iter()
.chain(NUMERICS.iter())
.chain(TIMESTAMPS.iter())
.chain(DATES.iter())
.chain(TIMES.iter())
.chain(BINARYS.iter())
.cloned()
.collect::<Vec<_>>();
Signature::uniform(1, valid, Volatility::Immutable)
}
}
}
}
Expand Down
14 changes: 0 additions & 14 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2524,8 +2524,6 @@ mod test {
"first_value",
"last_value",
"nth_value",
"min",
"max",
];
for name in names {
let fun = find_df_window_func(name).unwrap();
Expand All @@ -2542,18 +2540,6 @@ mod test {

#[test]
fn test_find_df_window_function() {
assert_eq!(
find_df_window_func("max"),
Some(WindowFunctionDefinition::AggregateFunction(
aggregate_function::AggregateFunction::Max
))
);
assert_eq!(
find_df_window_func("min"),
Some(WindowFunctionDefinition::AggregateFunction(
aggregate_function::AggregateFunction::Min
))
);
assert_eq!(
find_df_window_func("cume_dist"),
Some(WindowFunctionDefinition::BuiltInWindowFunction(
Expand Down
34 changes: 5 additions & 29 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@
//! Functions for creating logical expressions

use crate::expr::{
AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery,
Placeholder, TryCast, Unnest,
BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery, Placeholder, TryCast,
Unnest,
};
use crate::function::{
AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
StateFieldsArgs,
};
use crate::{
aggregate_function, conditional_expressions::CaseBuilder, logical_plan::Subquery,
AggregateUDF, Expr, LogicalPlan, Operator, ScalarFunctionImplementation, ScalarUDF,
Signature, Volatility,
conditional_expressions::CaseBuilder, logical_plan::Subquery, AggregateUDF, Expr,
LogicalPlan, Operator, ScalarFunctionImplementation, ScalarUDF, Signature,
Volatility,
};
use crate::{AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowUDF, WindowUDFImpl};
use arrow::compute::kernels::cast_utils::{
Expand Down Expand Up @@ -147,30 +147,6 @@ pub fn not(expr: Expr) -> Expr {
expr.not()
}

/// Create an expression to represent the min() aggregate function
pub fn min(expr: Expr) -> Expr {
Expr::AggregateFunction(AggregateFunction::new(
aggregate_function::AggregateFunction::Min,
vec![expr],
false,
None,
None,
None,
))
}

/// Create an expression to represent the max() aggregate function
pub fn max(expr: Expr) -> Expr {
Expr::AggregateFunction(AggregateFunction::new(
aggregate_function::AggregateFunction::Max,
vec![expr],
false,
None,
None,
None,
))
}

/// Return a new expression with bitwise AND
pub fn bitwise_and(left: Expr, right: Expr) -> Expr {
Expr::BinaryExpr(BinaryExpr::new(
Expand Down
6 changes: 4 additions & 2 deletions datafusion/expr/src/expr_rewriter/order_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,13 @@ mod test {
use arrow::datatypes::{DataType, Field, Schema};

use crate::{
cast, col, lit, logical_plan::builder::LogicalTableSource, min,
test::function_stub::avg, try_cast, LogicalPlanBuilder,
cast, col, lit, logical_plan::builder::LogicalTableSource, try_cast,
LogicalPlanBuilder,
};

use super::*;
use crate::test::function_stub::avg;
use crate::test::function_stub::min;

#[test]
fn rewrite_sort_cols_by_agg() {
Expand Down
Loading

0 comments on commit 804976a

Please sign in to comment.