diff --git a/datafusion/expr/src/test/function_stub.rs b/datafusion/expr/src/test/function_stub.rs index 19822c92d690..8e6642e4b149 100644 --- a/datafusion/expr/src/test/function_stub.rs +++ b/datafusion/expr/src/test/function_stub.rs @@ -373,6 +373,9 @@ impl AggregateUDFImpl for Min { fn reverse_expr(&self) -> ReversedUDAF { ReversedUDAF::Identical } + fn get_minmax_desc(&self) -> Option { + Some(false) + } } create_func!(Max, max_udaf); @@ -457,6 +460,9 @@ impl AggregateUDFImpl for Max { fn reverse_expr(&self) -> ReversedUDAF { ReversedUDAF::Identical } + fn get_minmax_desc(&self) -> Option { + Some(true) + } } /// Testing stub implementation of avg aggregate diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs index 3f4a99749cf6..b4b604228475 100644 --- a/datafusion/expr/src/udaf.rs +++ b/datafusion/expr/src/udaf.rs @@ -249,6 +249,10 @@ impl AggregateUDF { pub fn simplify(&self) -> Option { self.inner.simplify() } + + pub fn get_minmax_desc(&self) -> Option { + self.inner.get_minmax_desc() + } } impl From for AggregateUDF @@ -536,6 +540,16 @@ pub trait AggregateUDFImpl: Debug + Send + Sync { self.signature().hash(hasher); hasher.finish() } + + /// If this function is max, return true + /// if the function is min, return false + /// otherwise return None (the default) + /// + /// + /// Note: this is used to use special aggregate implementations in certain conditions + fn get_minmax_desc(&self) -> Option { + None + } } pub enum ReversedUDAF { diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index ac6bd1234f5f..19daff4c9766 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -16,6 +16,7 @@ // under the License. //! [`Max`] and [`MaxAccumulator`] accumulator for the `max` function +//! [`Min`] and [`MinAccumulator`] accumulator for the `max` function // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file @@ -32,8 +33,6 @@ // specific language governing permissions and limitations // under the License. -//! Defines `MAX` aggregate accumulators - use arrow::array::{ ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, @@ -65,20 +64,207 @@ use datafusion_expr::{ }; use datafusion_expr::{Expr, GroupsAccumulator}; -macro_rules! typed_min_max_float { - ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{ - ScalarValue::$SCALAR(match ($VALUE, $DELTA) { - (None, None) => None, - (Some(a), None) => Some(*a), - (None, Some(b)) => Some(*b), - (Some(a), Some(b)) => match a.total_cmp(b) { - choose_min_max!($OP) => Some(*b), - _ => Some(*a), - }, - }) +// Min/max aggregation can take Dictionary encode input but always produces unpacked +// (aka non Dictionary) output. We need to adjust the output data type to reflect this. +// The reason min/max aggregate produces unpacked output because there is only one +// min/max value per group; there is no needs to keep them Dictionary encode +fn min_max_aggregate_data_type(input_type: DataType) -> DataType { + if let DataType::Dictionary(_, value_type) = input_type { + *value_type + } else { + input_type + } +} +// MAX aggregate UDF +#[derive(Debug)] +pub struct Max { + signature: Signature, + aliases: Vec, +} + +impl Max { + pub fn new() -> Self { + Self { + signature: Signature::numeric(1, Volatility::Immutable), + aliases: vec!["max".to_owned()], + } + } +} + +impl Default for Max { + fn default() -> Self { + Self::new() + } +} +/// Creates a [`PrimitiveGroupsAccumulator`] for computing `MAX` +/// the specified [`ArrowPrimitiveType`]. +/// +/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType +macro_rules! instantiate_max_accumulator { + ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{ + Ok(Box::new( + PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new($DATA_TYPE, |cur, new| { + if *cur < new { + *cur = new + } + }) + // Initialize each accumulator to $NATIVE::MIN + .with_starting_value($NATIVE::MIN), + )) + }}; +} + +/// Creates a [`PrimitiveGroupsAccumulator`] for computing `MIN` +/// the specified [`ArrowPrimitiveType`]. +/// +/// +/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType +macro_rules! instantiate_min_accumulator { + ($DATA_TYPE:ident, $NATIVE:ident, $PRIMTYPE:ident) => {{ + Ok(Box::new( + PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(&$DATA_TYPE, |cur, new| { + if *cur > new { + *cur = new + } + }) + // Initialize each accumulator to $NATIVE::MAX + .with_starting_value($NATIVE::MAX), + )) }}; } +impl AggregateUDFImpl for Max { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "MAX" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + Ok(min_max_aggregate_data_type(arg_types[0].clone())) + } + + fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { + Ok(Box::new(MaxAccumulator::try_new(acc_args.input_type)?)) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { + matches!( + args.data_type, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) + ) + } + + fn create_groups_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + use DataType::*; + use TimeUnit::*; + let data_type = args.data_type; + match data_type { + Int8 => instantiate_max_accumulator!(data_type, i8, Int8Type), + Int16 => instantiate_max_accumulator!(data_type, i16, Int16Type), + Int32 => instantiate_max_accumulator!(data_type, i32, Int32Type), + Int64 => instantiate_max_accumulator!(data_type, i64, Int64Type), + UInt8 => instantiate_max_accumulator!(data_type, u8, UInt8Type), + UInt16 => instantiate_max_accumulator!(data_type, u16, UInt16Type), + UInt32 => instantiate_max_accumulator!(data_type, u32, UInt32Type), + UInt64 => instantiate_max_accumulator!(data_type, u64, UInt64Type), + Float32 => { + instantiate_max_accumulator!(data_type, f32, Float32Type) + } + Float64 => { + instantiate_max_accumulator!(data_type, f64, Float64Type) + } + Date32 => instantiate_max_accumulator!(data_type, i32, Date32Type), + Date64 => instantiate_max_accumulator!(data_type, i64, Date64Type), + Time32(Second) => { + instantiate_max_accumulator!(data_type, i32, Time32SecondType) + } + Time32(Millisecond) => { + instantiate_max_accumulator!(data_type, i32, Time32MillisecondType) + } + Time64(Microsecond) => { + instantiate_max_accumulator!(data_type, i64, Time64MicrosecondType) + } + Time64(Nanosecond) => { + instantiate_max_accumulator!(data_type, i64, Time64NanosecondType) + } + Timestamp(Second, _) => { + instantiate_max_accumulator!(data_type, i64, TimestampSecondType) + } + Timestamp(Millisecond, _) => { + instantiate_max_accumulator!(data_type, i64, TimestampMillisecondType) + } + Timestamp(Microsecond, _) => { + instantiate_max_accumulator!(data_type, i64, TimestampMicrosecondType) + } + Timestamp(Nanosecond, _) => { + instantiate_max_accumulator!(data_type, i64, TimestampNanosecondType) + } + Decimal128(_, _) => { + instantiate_max_accumulator!(data_type, i128, Decimal128Type) + } + Decimal256(_, _) => { + instantiate_max_accumulator!(data_type, i256, Decimal256Type) + } + + // It would be nice to have a fast implementation for Strings as well + // https://github.com/apache/datafusion/issues/6906 + + // This is only reached if groups_accumulator_supported is out of sync + _ => internal_err!("GroupsAccumulator not supported for max({})", data_type), + } + } + + fn create_sliding_accumulator( + &self, + args: AccumulatorArgs, + ) -> Result> { + Ok(Box::new(SlidingMaxAccumulator::try_new(args.input_type)?)) + } + + fn get_minmax_desc(&self) -> Option { + Some(true) + } +} + +// Statically-typed version of min/max(array) -> ScalarValue for string types +macro_rules! typed_min_max_batch_string { + ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ + let array = downcast_value!($VALUES, $ARRAYTYPE); + let value = compute::$OP(array); + let value = value.and_then(|e| Some(e.to_string())); + ScalarValue::$SCALAR(value) + }}; +} // Statically-typed version of min/max(array) -> ScalarValue for binay types. macro_rules! typed_min_max_batch_binary { ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ @@ -97,79 +283,284 @@ macro_rules! typed_min_max_batch { ScalarValue::$SCALAR(value, $($EXTRA_ARGS.clone()),*) }}; } -// min/max of two scalar string values. -macro_rules! typed_min_max_string { - ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{ - ScalarValue::$SCALAR(match ($VALUE, $DELTA) { - (None, None) => None, - (Some(a), None) => Some(a.clone()), - (None, Some(b)) => Some(b.clone()), - (Some(a), Some(b)) => Some((a).$OP(b).clone()), - }) - }}; -} - -// Statically-typed version of min/max(array) -> ScalarValue for string types. -macro_rules! typed_min_max_batch_string { - ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident, $OP:ident) => {{ - let array = downcast_value!($VALUES, $ARRAYTYPE); - let value = compute::$OP(array); - let value = value.and_then(|e| Some(e.to_string())); - ScalarValue::$SCALAR(value) - }}; -} -macro_rules! min_max { - ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ - Ok(match ($VALUE, $DELTA) { - ( - lhs @ ScalarValue::Decimal128(lhsv, lhsp, lhss), - rhs @ ScalarValue::Decimal128(rhsv, rhsp, rhss) - ) => { - if lhsp.eq(rhsp) && lhss.eq(rhss) { - typed_min_max!(lhsv, rhsv, Decimal128, $OP, lhsp, lhss) - } else { - return internal_err!( - "MIN/MAX is not expected to receive scalars of incompatible types {:?}", - (lhs, rhs) - ); - } - } - ( - lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss), - rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss) - ) => { - if lhsp.eq(rhsp) && lhss.eq(rhss) { - typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss) - } else { - return internal_err!( - "MIN/MAX is not expected to receive scalars of incompatible types {:?}", - (lhs, rhs) - ); - } - } - (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => { - typed_min_max!(lhs, rhs, Boolean, $OP) - } - (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => { - typed_min_max_float!(lhs, rhs, Float64, $OP) - } - (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => { - typed_min_max_float!(lhs, rhs, Float32, $OP) +// Statically-typed version of min/max(array) -> ScalarValue for non-string types. +// this is a macro to support both operations (min and max). +macro_rules! min_max_batch { + ($VALUES:expr, $OP:ident) => {{ + match $VALUES.data_type() { + DataType::Decimal128(precision, scale) => { + typed_min_max_batch!( + $VALUES, + Decimal128Array, + Decimal128, + $OP, + precision, + scale + ) } - (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => { - typed_min_max!(lhs, rhs, UInt64, $OP) + DataType::Decimal256(precision, scale) => { + typed_min_max_batch!( + $VALUES, + Decimal256Array, + Decimal256, + $OP, + precision, + scale + ) } - (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => { - typed_min_max!(lhs, rhs, UInt32, $OP) + // all types that have a natural order + DataType::Float64 => { + typed_min_max_batch!($VALUES, Float64Array, Float64, $OP) } - (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => { - typed_min_max!(lhs, rhs, UInt16, $OP) + DataType::Float32 => { + typed_min_max_batch!($VALUES, Float32Array, Float32, $OP) } - (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => { - typed_min_max!(lhs, rhs, UInt8, $OP) + DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP), + DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP), + DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP), + DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP), + DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP), + DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP), + DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP), + DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP), + DataType::Timestamp(TimeUnit::Second, tz_opt) => { + typed_min_max_batch!( + $VALUES, + TimestampSecondArray, + TimestampSecond, + $OP, + tz_opt + ) } - (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => { + DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!( + $VALUES, + TimestampMillisecondArray, + TimestampMillisecond, + $OP, + tz_opt + ), + DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!( + $VALUES, + TimestampMicrosecondArray, + TimestampMicrosecond, + $OP, + tz_opt + ), + DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!( + $VALUES, + TimestampNanosecondArray, + TimestampNanosecond, + $OP, + tz_opt + ), + DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP), + DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP), + DataType::Time32(TimeUnit::Second) => { + typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP) + } + DataType::Time32(TimeUnit::Millisecond) => { + typed_min_max_batch!( + $VALUES, + Time32MillisecondArray, + Time32Millisecond, + $OP + ) + } + DataType::Time64(TimeUnit::Microsecond) => { + typed_min_max_batch!( + $VALUES, + Time64MicrosecondArray, + Time64Microsecond, + $OP + ) + } + DataType::Time64(TimeUnit::Nanosecond) => { + typed_min_max_batch!( + $VALUES, + Time64NanosecondArray, + Time64Nanosecond, + $OP + ) + } + other => { + // This should have been handled before + return internal_err!( + "Min/Max accumulator not implemented for type {:?}", + other + ); + } + } + }}; +} + +/// dynamically-typed min(array) -> ScalarValue +fn min_batch(values: &ArrayRef) -> Result { + Ok(match values.data_type() { + DataType::Utf8 => { + typed_min_max_batch_string!(values, StringArray, Utf8, min_string) + } + DataType::LargeUtf8 => { + typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string) + } + DataType::Boolean => { + typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean) + } + DataType::Binary => { + typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary) + } + DataType::LargeBinary => { + typed_min_max_batch_binary!( + &values, + LargeBinaryArray, + LargeBinary, + min_binary + ) + } + _ => min_max_batch!(values, min), + }) +} + +/// dynamically-typed max(array) -> ScalarValue +fn max_batch(values: &ArrayRef) -> Result { + Ok(match values.data_type() { + DataType::Utf8 => { + typed_min_max_batch_string!(values, StringArray, Utf8, max_string) + } + DataType::LargeUtf8 => { + typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string) + } + DataType::Boolean => { + typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean) + } + DataType::Binary => { + typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary) + } + DataType::LargeBinary => { + typed_min_max_batch_binary!( + &values, + LargeBinaryArray, + LargeBinary, + max_binary + ) + } + _ => min_max_batch!(values, max), + }) +} + +// min/max of two non-string scalar values. +macro_rules! typed_min_max { + ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{ + ScalarValue::$SCALAR( + match ($VALUE, $DELTA) { + (None, None) => None, + (Some(a), None) => Some(*a), + (None, Some(b)) => Some(*b), + (Some(a), Some(b)) => Some((*a).$OP(*b)), + }, + $($EXTRA_ARGS.clone()),* + ) + }}; +} +macro_rules! typed_min_max_float { + ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{ + ScalarValue::$SCALAR(match ($VALUE, $DELTA) { + (None, None) => None, + (Some(a), None) => Some(*a), + (None, Some(b)) => Some(*b), + (Some(a), Some(b)) => match a.total_cmp(b) { + choose_min_max!($OP) => Some(*b), + _ => Some(*a), + }, + }) + }}; +} + +// min/max of two scalar string values. +macro_rules! typed_min_max_string { + ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident) => {{ + ScalarValue::$SCALAR(match ($VALUE, $DELTA) { + (None, None) => None, + (Some(a), None) => Some(a.clone()), + (None, Some(b)) => Some(b.clone()), + (Some(a), Some(b)) => Some((a).$OP(b).clone()), + }) + }}; +} + +macro_rules! choose_min_max { + (min) => { + std::cmp::Ordering::Greater + }; + (max) => { + std::cmp::Ordering::Less + }; +} + +macro_rules! interval_min_max { + ($OP:tt, $LHS:expr, $RHS:expr) => {{ + match $LHS.partial_cmp(&$RHS) { + Some(choose_min_max!($OP)) => $RHS.clone(), + Some(_) => $LHS.clone(), + None => { + return internal_err!("Comparison error while computing interval min/max") + } + } + }}; +} + +// min/max of two scalar values of the same type +macro_rules! min_max { + ($VALUE:expr, $DELTA:expr, $OP:ident) => {{ + Ok(match ($VALUE, $DELTA) { + ( + lhs @ ScalarValue::Decimal128(lhsv, lhsp, lhss), + rhs @ ScalarValue::Decimal128(rhsv, rhsp, rhss) + ) => { + if lhsp.eq(rhsp) && lhss.eq(rhss) { + typed_min_max!(lhsv, rhsv, Decimal128, $OP, lhsp, lhss) + } else { + return internal_err!( + "MIN/MAX is not expected to receive scalars of incompatible types {:?}", + (lhs, rhs) + ); + } + } + ( + lhs @ ScalarValue::Decimal256(lhsv, lhsp, lhss), + rhs @ ScalarValue::Decimal256(rhsv, rhsp, rhss) + ) => { + if lhsp.eq(rhsp) && lhss.eq(rhss) { + typed_min_max!(lhsv, rhsv, Decimal256, $OP, lhsp, lhss) + } else { + return internal_err!( + "MIN/MAX is not expected to receive scalars of incompatible types {:?}", + (lhs, rhs) + ); + } + } + (ScalarValue::Boolean(lhs), ScalarValue::Boolean(rhs)) => { + typed_min_max!(lhs, rhs, Boolean, $OP) + } + (ScalarValue::Float64(lhs), ScalarValue::Float64(rhs)) => { + typed_min_max_float!(lhs, rhs, Float64, $OP) + } + (ScalarValue::Float32(lhs), ScalarValue::Float32(rhs)) => { + typed_min_max_float!(lhs, rhs, Float32, $OP) + } + (ScalarValue::UInt64(lhs), ScalarValue::UInt64(rhs)) => { + typed_min_max!(lhs, rhs, UInt64, $OP) + } + (ScalarValue::UInt32(lhs), ScalarValue::UInt32(rhs)) => { + typed_min_max!(lhs, rhs, UInt32, $OP) + } + (ScalarValue::UInt16(lhs), ScalarValue::UInt16(rhs)) => { + typed_min_max!(lhs, rhs, UInt16, $OP) + } + (ScalarValue::UInt8(lhs), ScalarValue::UInt8(rhs)) => { + typed_min_max!(lhs, rhs, UInt8, $OP) + } + (ScalarValue::Int64(lhs), ScalarValue::Int64(rhs)) => { typed_min_max!(lhs, rhs, Int64, $OP) } (ScalarValue::Int32(lhs), ScalarValue::Int32(rhs)) => { @@ -323,683 +714,83 @@ macro_rules! min_max { }}; } -macro_rules! choose_min_max { - (min) => { - std::cmp::Ordering::Greater - }; - (max) => { - std::cmp::Ordering::Less - }; +/// An accumulator to compute the maximum value +#[derive(Debug)] +pub struct MaxAccumulator { + max: ScalarValue, } -macro_rules! interval_min_max { - ($OP:tt, $LHS:expr, $RHS:expr) => {{ - match $LHS.partial_cmp(&$RHS) { - Some(choose_min_max!($OP)) => $RHS.clone(), - Some(_) => $LHS.clone(), - None => { - return internal_err!("Comparison error while computing interval min/max") - } - } - }}; +impl MaxAccumulator { + /// new max accumulator + pub fn try_new(datatype: &DataType) -> Result { + Ok(Self { + max: ScalarValue::try_from(datatype)?, + }) + } } -// Statically-typed version of min/max(array) -> ScalarValue for non-string types. -// this is a macro to support both operations (min and max). -macro_rules! min_max_batch { - ($VALUES:expr, $OP:ident) => {{ - match $VALUES.data_type() { - DataType::Decimal128(precision, scale) => { - typed_min_max_batch!( - $VALUES, - Decimal128Array, - Decimal128, - $OP, - precision, - scale - ) - } - DataType::Decimal256(precision, scale) => { - typed_min_max_batch!( - $VALUES, - Decimal256Array, - Decimal256, - $OP, - precision, - scale - ) - } - // all types that have a natural order - DataType::Float64 => { - typed_min_max_batch!($VALUES, Float64Array, Float64, $OP) - } - DataType::Float32 => { - typed_min_max_batch!($VALUES, Float32Array, Float32, $OP) - } - DataType::Int64 => typed_min_max_batch!($VALUES, Int64Array, Int64, $OP), - DataType::Int32 => typed_min_max_batch!($VALUES, Int32Array, Int32, $OP), - DataType::Int16 => typed_min_max_batch!($VALUES, Int16Array, Int16, $OP), - DataType::Int8 => typed_min_max_batch!($VALUES, Int8Array, Int8, $OP), - DataType::UInt64 => typed_min_max_batch!($VALUES, UInt64Array, UInt64, $OP), - DataType::UInt32 => typed_min_max_batch!($VALUES, UInt32Array, UInt32, $OP), - DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP), - DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP), - DataType::Timestamp(TimeUnit::Second, tz_opt) => { - typed_min_max_batch!( - $VALUES, - TimestampSecondArray, - TimestampSecond, - $OP, - tz_opt - ) - } - DataType::Timestamp(TimeUnit::Millisecond, tz_opt) => typed_min_max_batch!( - $VALUES, - TimestampMillisecondArray, - TimestampMillisecond, - $OP, - tz_opt - ), - DataType::Timestamp(TimeUnit::Microsecond, tz_opt) => typed_min_max_batch!( - $VALUES, - TimestampMicrosecondArray, - TimestampMicrosecond, - $OP, - tz_opt - ), - DataType::Timestamp(TimeUnit::Nanosecond, tz_opt) => typed_min_max_batch!( - $VALUES, - TimestampNanosecondArray, - TimestampNanosecond, - $OP, - tz_opt - ), - DataType::Date32 => typed_min_max_batch!($VALUES, Date32Array, Date32, $OP), - DataType::Date64 => typed_min_max_batch!($VALUES, Date64Array, Date64, $OP), - DataType::Time32(TimeUnit::Second) => { - typed_min_max_batch!($VALUES, Time32SecondArray, Time32Second, $OP) - } - DataType::Time32(TimeUnit::Millisecond) => { - typed_min_max_batch!( - $VALUES, - Time32MillisecondArray, - Time32Millisecond, - $OP - ) - } - DataType::Time64(TimeUnit::Microsecond) => { - typed_min_max_batch!( - $VALUES, - Time64MicrosecondArray, - Time64Microsecond, - $OP - ) - } - DataType::Time64(TimeUnit::Nanosecond) => { - typed_min_max_batch!( - $VALUES, - Time64NanosecondArray, - Time64Nanosecond, - $OP - ) - } - other => { - // This should have been handled before - return internal_err!( - "Min/Max accumulator not implemented for type {:?}", - other - ); - } - } - }}; -} - -/// dynamically-typed max(array) -> ScalarValue -fn max_batch(values: &ArrayRef) -> Result { - Ok(match values.data_type() { - DataType::Utf8 => { - typed_min_max_batch_string!(values, StringArray, Utf8, max_string) - } - DataType::LargeUtf8 => { - typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, max_string) - } - DataType::Boolean => { - typed_min_max_batch!(values, BooleanArray, Boolean, max_boolean) - } - DataType::Binary => { - typed_min_max_batch_binary!(&values, BinaryArray, Binary, max_binary) - } - DataType::LargeBinary => { - typed_min_max_batch_binary!( - &values, - LargeBinaryArray, - LargeBinary, - max_binary - ) - } - _ => min_max_batch!(values, max), - }) -} - -fn min_batch(values: &ArrayRef) -> Result { - Ok(match values.data_type() { - DataType::Utf8 => { - typed_min_max_batch_string!(values, StringArray, Utf8, min_string) - } - DataType::LargeUtf8 => { - typed_min_max_batch_string!(values, LargeStringArray, LargeUtf8, min_string) - } - DataType::Boolean => { - typed_min_max_batch!(values, BooleanArray, Boolean, min_boolean) - } - DataType::Binary => { - typed_min_max_batch_binary!(&values, BinaryArray, Binary, min_binary) - } - DataType::LargeBinary => { - typed_min_max_batch_binary!( - &values, - LargeBinaryArray, - LargeBinary, - min_binary - ) - } - _ => min_max_batch!(values, min), - }) -} -// min/max of two non-string scalar values. -macro_rules! typed_min_max { - ($VALUE:expr, $DELTA:expr, $SCALAR:ident, $OP:ident $(, $EXTRA_ARGS:ident)*) => {{ - ScalarValue::$SCALAR( - match ($VALUE, $DELTA) { - (None, None) => None, - (Some(a), None) => Some(*a), - (None, Some(b)) => Some(*b), - (Some(a), Some(b)) => Some((*a).$OP(*b)), - }, - $($EXTRA_ARGS.clone()),* - ) - }}; -} - -// The implementation is taken from https://github.com/spebern/moving_min_max/blob/master/src/lib.rs. - -// Keep track of the minimum or maximum value in a sliding window. -// -// `moving min max` provides one data structure for keeping track of the -// minimum value and one for keeping track of the maximum value in a sliding -// window. -// -// Each element is stored with the current min/max. One stack to push and another one for pop. If pop stack is empty, -// push to this stack all elements popped from first stack while updating their current min/max. Now pop from -// the second stack (MovingMin/Max struct works as a queue). To find the minimum element of the queue, -// look at the smallest/largest two elements of the individual stacks, then take the minimum of those two values. -// -// The complexity of the operations are -// - O(1) for getting the minimum/maximum -// - O(1) for push -// - amortized O(1) for pop - -/// ``` -/// # use datafusion_physical_expr::aggregate::moving_min_max::MovingMin; -/// let mut moving_min = MovingMin::::new(); -/// moving_min.push(2); -/// moving_min.push(1); -/// moving_min.push(3); -/// -/// assert_eq!(moving_min.min(), Some(&1)); -/// assert_eq!(moving_min.pop(), Some(2)); -/// -/// assert_eq!(moving_min.min(), Some(&1)); -/// assert_eq!(moving_min.pop(), Some(1)); -/// -/// assert_eq!(moving_min.min(), Some(&3)); -/// assert_eq!(moving_min.pop(), Some(3)); -/// -/// assert_eq!(moving_min.min(), None); -/// assert_eq!(moving_min.pop(), None); -/// ``` -#[derive(Debug)] -pub struct MovingMin { - push_stack: Vec<(T, T)>, - pop_stack: Vec<(T, T)>, -} - -impl Default for MovingMin { - fn default() -> Self { - Self { - push_stack: Vec::new(), - pop_stack: Vec::new(), - } - } -} - -impl MovingMin { - /// Creates a new `MovingMin` to keep track of the minimum in a sliding - /// window. - #[inline] - pub fn new() -> Self { - Self::default() - } - - /// Creates a new `MovingMin` to keep track of the minimum in a sliding - /// window with `capacity` allocated slots. - #[inline] - pub fn with_capacity(capacity: usize) -> Self { - Self { - push_stack: Vec::with_capacity(capacity), - pop_stack: Vec::with_capacity(capacity), - } - } - - /// Returns the minimum of the sliding window or `None` if the window is - /// empty. - #[inline] - pub fn min(&self) -> Option<&T> { - match (self.push_stack.last(), self.pop_stack.last()) { - (None, None) => None, - (Some((_, min)), None) => Some(min), - (None, Some((_, min))) => Some(min), - (Some((_, a)), Some((_, b))) => Some(if a < b { a } else { b }), - } - } - - /// Pushes a new element into the sliding window. - #[inline] - pub fn push(&mut self, val: T) { - self.push_stack.push(match self.push_stack.last() { - Some((_, min)) => { - if val > *min { - (val, min.clone()) - } else { - (val.clone(), val) - } - } - None => (val.clone(), val), - }); - } - - /// Removes and returns the last value of the sliding window. - #[inline] - pub fn pop(&mut self) -> Option { - if self.pop_stack.is_empty() { - match self.push_stack.pop() { - Some((val, _)) => { - let mut last = (val.clone(), val); - self.pop_stack.push(last.clone()); - while let Some((val, _)) = self.push_stack.pop() { - let min = if last.1 < val { - last.1.clone() - } else { - val.clone() - }; - last = (val.clone(), min); - self.pop_stack.push(last.clone()); - } - } - None => return None, - } - } - self.pop_stack.pop().map(|(val, _)| val) - } - - /// Returns the number of elements stored in the sliding window. - #[inline] - pub fn len(&self) -> usize { - self.push_stack.len() + self.pop_stack.len() - } - - /// Returns `true` if the moving window contains no elements. - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} -/// ``` -/// # use datafusion_physical_expr::aggregate::moving_min_max::MovingMax; -/// let mut moving_max = MovingMax::::new(); -/// moving_max.push(2); -/// moving_max.push(3); -/// moving_max.push(1); -/// -/// assert_eq!(moving_max.max(), Some(&3)); -/// assert_eq!(moving_max.pop(), Some(2)); -/// -/// assert_eq!(moving_max.max(), Some(&3)); -/// assert_eq!(moving_max.pop(), Some(3)); -/// -/// assert_eq!(moving_max.max(), Some(&1)); -/// assert_eq!(moving_max.pop(), Some(1)); -/// -/// assert_eq!(moving_max.max(), None); -/// assert_eq!(moving_max.pop(), None); -/// ``` -#[derive(Debug)] -pub struct MovingMax { - push_stack: Vec<(T, T)>, - pop_stack: Vec<(T, T)>, -} - -impl Default for MovingMax { - fn default() -> Self { - Self { - push_stack: Vec::new(), - pop_stack: Vec::new(), - } - } -} - -impl MovingMax { - /// Creates a new `MovingMax` to keep track of the maximum in a sliding window. - #[inline] - pub fn new() -> Self { - Self::default() - } - - /// Creates a new `MovingMax` to keep track of the maximum in a sliding window with - /// `capacity` allocated slots. - #[inline] - pub fn with_capacity(capacity: usize) -> Self { - Self { - push_stack: Vec::with_capacity(capacity), - pop_stack: Vec::with_capacity(capacity), - } - } - - /// Returns the maximum of the sliding window or `None` if the window is empty. - #[inline] - pub fn max(&self) -> Option<&T> { - match (self.push_stack.last(), self.pop_stack.last()) { - (None, None) => None, - (Some((_, max)), None) => Some(max), - (None, Some((_, max))) => Some(max), - (Some((_, a)), Some((_, b))) => Some(if a > b { a } else { b }), - } - } - - /// Pushes a new element into the sliding window. - #[inline] - pub fn push(&mut self, val: T) { - self.push_stack.push(match self.push_stack.last() { - Some((_, max)) => { - if val < *max { - (val, max.clone()) - } else { - (val.clone(), val) - } - } - None => (val.clone(), val), - }); - } - - /// Removes and returns the last value of the sliding window. - #[inline] - pub fn pop(&mut self) -> Option { - if self.pop_stack.is_empty() { - match self.push_stack.pop() { - Some((val, _)) => { - let mut last = (val.clone(), val); - self.pop_stack.push(last.clone()); - while let Some((val, _)) = self.push_stack.pop() { - let max = if last.1 > val { - last.1.clone() - } else { - val.clone() - }; - last = (val.clone(), max); - self.pop_stack.push(last.clone()); - } - } - None => return None, - } - } - self.pop_stack.pop().map(|(val, _)| val) - } - - /// Returns the number of elements stored in the sliding window. - #[inline] - pub fn len(&self) -> usize { - self.push_stack.len() + self.pop_stack.len() - } - - /// Returns `true` if the moving window contains no elements. - #[inline] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -make_udaf_expr_and_func!( - Max, - max, - expression, - "Returns the maximum of a group of values.", - max_udaf -); - -make_udaf_expr_and_func!( - Min, - min, - expression, - "Returns the minimum of a group of values.", - min_udaf -); - -pub fn max_distinct(expr: Expr) -> Expr { - Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf( - max_udaf(), - vec![expr], - true, - None, - None, - None, - )) -} - -pub fn min_distinct(expr: Expr) -> Expr { - Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf( - min_udaf(), - vec![expr], - true, - None, - None, - None, - )) -} -fn min_max_aggregate_data_type(input_type: DataType) -> DataType { - if let DataType::Dictionary(_, value_type) = input_type { - *value_type - } else { - input_type - } -} - -#[derive(Debug)] -pub struct Max { - signature: Signature, - aliases: Vec, -} - -impl Max { - pub fn new() -> Self { - Self { - signature: Signature::numeric(1, Volatility::Immutable), - aliases: vec!["max".to_owned()], - } - } -} - -impl Default for Max { - fn default() -> Self { - Self::new() - } -} - -impl AggregateUDFImpl for Max { - fn as_any(&self) -> &dyn std::any::Any { - self - } - - fn name(&self) -> &str { - "MAX" - } - - fn signature(&self) -> &Signature { - &self.signature - } - - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(min_max_aggregate_data_type(arg_types[0].clone())) - } - - fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - Ok(Box::new(MaxAccumulator::try_new(acc_args.data_type)?)) +impl Accumulator for MaxAccumulator { + fn state(&mut self) -> Result> { + Ok(vec![self.evaluate()?]) } - fn aliases(&self) -> &[String] { - &self.aliases + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let values = &values[0]; + let delta = &max_batch(values)?; + let new_max: Result = + min_max!(&self.max, delta, max); + self.max = new_max?; + Ok(()) } - fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { - matches!( - _args.data_type, - DataType::Int8 - | DataType::Int16 - | DataType::Int32 - | DataType::Int64 - | DataType::UInt8 - | DataType::UInt16 - | DataType::UInt32 - | DataType::UInt64 - | DataType::Float32 - | DataType::Float64 - | DataType::Date32 - | DataType::Date64 - | DataType::Time32(TimeUnit::Second) - | DataType::Time32(TimeUnit::Millisecond) - | DataType::Time64(TimeUnit::Microsecond) - | DataType::Time64(TimeUnit::Nanosecond) - | DataType::Timestamp(TimeUnit::Second, _) - | DataType::Timestamp(TimeUnit::Millisecond, _) - | DataType::Timestamp(TimeUnit::Microsecond, _) - | DataType::Timestamp(TimeUnit::Nanosecond, _) - | DataType::Decimal128(_, _) - | DataType::Decimal256(_, _) - ) + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) } - fn create_groups_accumulator( - &self, - args: AccumulatorArgs, - ) -> Result> { - use DataType::*; - use TimeUnit::*; - let data_type = args.data_type; - macro_rules! helper { - ($NATIVE:ident, $PRIMTYPE:ident) => {{ - Ok(Box::new( - PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new( - data_type, - |cur, new| { - if *cur < new { - *cur = new - } - }, - ) - .with_starting_value($NATIVE::MIN), - )) - }}; - } - - match args.data_type { - Int8 => helper!(i8, Int8Type), - Int16 => helper!(i16, Int16Type), - Int32 => helper!(i32, Int32Type), - Int64 => helper!(i64, Int64Type), - UInt8 => helper!(u8, UInt8Type), - UInt16 => helper!(u16, UInt16Type), - UInt32 => helper!(u32, UInt32Type), - UInt64 => helper!(u64, UInt64Type), - Float32 => { - helper!(f32, Float32Type) - } - Float64 => { - helper!(f64, Float64Type) - } - Date32 => helper!(i32, Date32Type), - Date64 => helper!(i64, Date64Type), - Time32(Second) => { - helper!(i32, Time32SecondType) - } - Time32(Millisecond) => { - helper!(i32, Time32MillisecondType) - } - Time64(Microsecond) => { - helper!(i64, Time64MicrosecondType) - } - Time64(Nanosecond) => { - helper!(i64, Time64NanosecondType) - } - Timestamp(Second, _) => { - helper!(i64, TimestampSecondType) - } - Timestamp(Millisecond, _) => { - helper!(i64, TimestampMillisecondType) - } - Timestamp(Microsecond, _) => { - helper!(i64, TimestampMicrosecondType) - } - Timestamp(Nanosecond, _) => { - helper!(i64, TimestampNanosecondType) - } - Decimal128(_, _) => { - helper!(i128, Decimal128Type) - } - Decimal256(_, _) => { - helper!(i256, Decimal256Type) - } - - // It would be nice to have a fast implementation for Strings as well - // https://github.com/apache/datafusion/issues/6906 - - // This is only reached if groups_accumulator_supported is out of sync - _ => internal_err!("GroupsAccumulator not supported for max({})", data_type), - } + fn evaluate(&mut self) -> Result { + Ok(self.max.clone()) } - fn create_sliding_accumulator( - &self, - args: AccumulatorArgs, - ) -> Result> { - Ok(Box::new(SlidingMaxAccumulator::try_new(args.data_type)?)) + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size() } } -/// An accumulator to compute the maximum value #[derive(Debug)] -pub struct MaxAccumulator { +pub struct SlidingMaxAccumulator { max: ScalarValue, + moving_max: MovingMax, } -impl MaxAccumulator { +impl SlidingMaxAccumulator { /// new max accumulator pub fn try_new(datatype: &DataType) -> Result { Ok(Self { max: ScalarValue::try_from(datatype)?, + moving_max: MovingMax::::new(), }) } } -impl Accumulator for MaxAccumulator { - fn state(&mut self) -> Result> { - Ok(vec![self.evaluate()?]) +impl Accumulator for SlidingMaxAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + for idx in 0..values[0].len() { + let val = ScalarValue::try_from_array(&values[0], idx)?; + self.moving_max.push(val); + } + if let Some(res) = self.moving_max.max() { + self.max = res.clone(); + } + Ok(()) } - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let values = &values[0]; - let delta = &max_batch(values)?; - let new_max: Result = - min_max!(&self.max, delta, max); - self.max = new_max?; + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + for _idx in 0..values[0].len() { + (self.moving_max).pop(); + } + if let Some(res) = self.moving_max.max() { + self.max = res.clone(); + } Ok(()) } @@ -1007,10 +798,18 @@ impl Accumulator for MaxAccumulator { self.update_batch(states) } + fn state(&mut self) -> Result> { + Ok(vec![self.max.clone()]) + } + fn evaluate(&mut self) -> Result { Ok(self.max.clone()) } + fn supports_retract_batch(&self) -> bool { + true + } + fn size(&self) -> usize { std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size() } @@ -1030,6 +829,7 @@ impl Min { } } } + impl Default for Min { fn default() -> Self { Self::new() @@ -1054,16 +854,16 @@ impl AggregateUDFImpl for Min { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result> { - Ok(Box::new(MinAccumulator::try_new(acc_args.data_type)?)) + Ok(Box::new(MinAccumulator::try_new(acc_args.input_type)?)) } fn aliases(&self) -> &[String] { &self.aliases } - fn groups_accumulator_supported(&self, _args: AccumulatorArgs) -> bool { + fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { matches!( - _args.data_type, + args.data_type, DataType::Int8 | DataType::Int16 | DataType::Int32 @@ -1076,14 +876,9 @@ impl AggregateUDFImpl for Min { | DataType::Float64 | DataType::Date32 | DataType::Date64 - | DataType::Time32(TimeUnit::Second) - | DataType::Time32(TimeUnit::Millisecond) - | DataType::Time64(TimeUnit::Microsecond) - | DataType::Time64(TimeUnit::Nanosecond) - | DataType::Timestamp(TimeUnit::Second, _) - | DataType::Timestamp(TimeUnit::Millisecond, _) - | DataType::Timestamp(TimeUnit::Microsecond, _) - | DataType::Timestamp(TimeUnit::Nanosecond, _) + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) ) @@ -1096,68 +891,52 @@ impl AggregateUDFImpl for Min { use DataType::*; use TimeUnit::*; let data_type = args.data_type; - macro_rules! helper { - ($NATIVE:ident, $PRIMTYPE:ident) => {{ - Ok(Box::new( - PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new( - data_type, - |cur, new| { - if *cur > new { - *cur = new - } - }, - ) - .with_starting_value($NATIVE::MAX), - )) - }}; - } - - match args.data_type { - Int8 => helper!(i8, Int8Type), - Int16 => helper!(i16, Int16Type), - Int32 => helper!(i32, Int32Type), - Int64 => helper!(i64, Int64Type), - UInt8 => helper!(u8, UInt8Type), - UInt16 => helper!(u16, UInt16Type), - UInt32 => helper!(u32, UInt32Type), - UInt64 => helper!(u64, UInt64Type), + match data_type { + Int8 => instantiate_min_accumulator!(data_type, i8, Int8Type), + Int16 => instantiate_min_accumulator!(data_type, i16, Int16Type), + Int32 => instantiate_min_accumulator!(data_type, i32, Int32Type), + Int64 => instantiate_min_accumulator!(data_type, i64, Int64Type), + UInt8 => instantiate_min_accumulator!(data_type, u8, UInt8Type), + UInt16 => instantiate_min_accumulator!(data_type, u16, UInt16Type), + UInt32 => instantiate_min_accumulator!(data_type, u32, UInt32Type), + UInt64 => instantiate_min_accumulator!(data_type, u64, UInt64Type), Float32 => { - helper!(f32, Float32Type) + instantiate_min_accumulator!(data_type, f32, Float32Type) } Float64 => { - helper!(f64, Float64Type) + instantiate_min_accumulator!(data_type, f64, Float64Type) } - Date32 => helper!(i32, Date32Type), - Date64 => helper!(i64, Date64Type), + Date32 => instantiate_min_accumulator!(data_type, i32, Date32Type), + Date64 => instantiate_min_accumulator!(data_type, i64, Date64Type), Time32(Second) => { - helper!(i32, Time32SecondType) + instantiate_min_accumulator!(data_type, i32, Time32SecondType) } Time32(Millisecond) => { - helper!(i32, Time32MillisecondType) + instantiate_min_accumulator!(data_type, i32, Time32MillisecondType) } Time64(Microsecond) => { - helper!(i64, Time64MicrosecondType) + instantiate_min_accumulator!(data_type, i64, Time64MicrosecondType) } Time64(Nanosecond) => { - helper!(i64, Time64NanosecondType) + instantiate_min_accumulator!(data_type, i64, Time64NanosecondType) } Timestamp(Second, _) => { - helper!(i64, TimestampSecondType) + instantiate_min_accumulator!(data_type, i64, TimestampSecondType) } Timestamp(Millisecond, _) => { - helper!(i64, TimestampMillisecondType) + instantiate_min_accumulator!(data_type, i64, TimestampMillisecondType) } Timestamp(Microsecond, _) => { - helper!(i64, TimestampMicrosecondType) + instantiate_min_accumulator!(data_type, i64, TimestampMicrosecondType) } Timestamp(Nanosecond, _) => { - helper!(i64, TimestampNanosecondType) + instantiate_min_accumulator!(data_type, i64, TimestampNanosecondType) } Decimal128(_, _) => { - helper!(i128, Decimal128Type) + instantiate_min_accumulator!(data_type, i128, Decimal128Type) } Decimal256(_, _) => { - helper!(i256, Decimal256Type) + instantiate_min_accumulator!(data_type, i256, Decimal256Type) } // It would be nice to have a fast implementation for Strings as well @@ -1172,7 +951,11 @@ impl AggregateUDFImpl for Min { &self, args: AccumulatorArgs, ) -> Result> { - Ok(Box::new(SlidingMinAccumulator::try_new(args.data_type)?)) + Ok(Box::new(SlidingMinAccumulator::try_new(args.input_type)?)) + } + + fn get_minmax_desc(&self) -> Option { + Some(false) } } /// An accumulator to compute the minimum value @@ -1237,108 +1020,335 @@ impl Accumulator for SlidingMinAccumulator { Ok(vec![self.min.clone()]) } - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - for idx in 0..values[0].len() { - let val = ScalarValue::try_from_array(&values[0], idx)?; - if !val.is_null() { - self.moving_min.push(val); - } - } - if let Some(res) = self.moving_min.min() { - self.min = res.clone(); + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + for idx in 0..values[0].len() { + let val = ScalarValue::try_from_array(&values[0], idx)?; + if !val.is_null() { + self.moving_min.push(val); + } + } + if let Some(res) = self.moving_min.min() { + self.min = res.clone(); + } + Ok(()) + } + + fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + for idx in 0..values[0].len() { + let val = ScalarValue::try_from_array(&values[0], idx)?; + if !val.is_null() { + (self.moving_min).pop(); + } + } + if let Some(res) = self.moving_min.min() { + self.min = res.clone(); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.update_batch(states) + } + + fn evaluate(&mut self) -> Result { + Ok(self.min.clone()) + } + + fn supports_retract_batch(&self) -> bool { + true + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size() + } +} +// +// Moving min and moving max +// The implementation is taken from https://github.com/spebern/moving_min_max/blob/master/src/lib.rs. + +// Keep track of the minimum or maximum value in a sliding window. +// +// `moving min max` provides one data structure for keeping track of the +// minimum value and one for keeping track of the maximum value in a sliding +// window. +// +// Each element is stored with the current min/max. One stack to push and another one for pop. If pop stack is empty, +// push to this stack all elements popped from first stack while updating their current min/max. Now pop from +// the second stack (MovingMin/Max struct works as a queue). To find the minimum element of the queue, +// look at the smallest/largest two elements of the individual stacks, then take the minimum of those two values. +// +// The complexity of the operations are +// - O(1) for getting the minimum/maximum +// - O(1) for push +// - amortized O(1) for pop + +/// ``` +/// # use datafusion_physical_expr::aggregate::moving_min_max::MovingMin; +/// let mut moving_min = MovingMin::::new(); +/// moving_min.push(2); +/// moving_min.push(1); +/// moving_min.push(3); +/// +/// assert_eq!(moving_min.min(), Some(&1)); +/// assert_eq!(moving_min.pop(), Some(2)); +/// +/// assert_eq!(moving_min.min(), Some(&1)); +/// assert_eq!(moving_min.pop(), Some(1)); +/// +/// assert_eq!(moving_min.min(), Some(&3)); +/// assert_eq!(moving_min.pop(), Some(3)); +/// +/// assert_eq!(moving_min.min(), None); +/// assert_eq!(moving_min.pop(), None); +/// ``` +#[derive(Debug)] +pub struct MovingMin { + push_stack: Vec<(T, T)>, + pop_stack: Vec<(T, T)>, +} + +impl Default for MovingMin { + fn default() -> Self { + Self { + push_stack: Vec::new(), + pop_stack: Vec::new(), + } + } +} + +impl MovingMin { + /// Creates a new `MovingMin` to keep track of the minimum in a sliding + /// window. + #[inline] + pub fn new() -> Self { + Self::default() + } + + /// Creates a new `MovingMin` to keep track of the minimum in a sliding + /// window with `capacity` allocated slots. + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + Self { + push_stack: Vec::with_capacity(capacity), + pop_stack: Vec::with_capacity(capacity), } - Ok(()) } - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - for idx in 0..values[0].len() { - let val = ScalarValue::try_from_array(&values[0], idx)?; - if !val.is_null() { - (self.moving_min).pop(); - } - } - if let Some(res) = self.moving_min.min() { - self.min = res.clone(); + /// Returns the minimum of the sliding window or `None` if the window is + /// empty. + #[inline] + pub fn min(&self) -> Option<&T> { + match (self.push_stack.last(), self.pop_stack.last()) { + (None, None) => None, + (Some((_, min)), None) => Some(min), + (None, Some((_, min))) => Some(min), + (Some((_, a)), Some((_, b))) => Some(if a < b { a } else { b }), } - Ok(()) } - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) + /// Pushes a new element into the sliding window. + #[inline] + pub fn push(&mut self, val: T) { + self.push_stack.push(match self.push_stack.last() { + Some((_, min)) => { + if val > *min { + (val, min.clone()) + } else { + (val.clone(), val) + } + } + None => (val.clone(), val), + }); } - fn evaluate(&mut self) -> Result { - Ok(self.min.clone()) + /// Removes and returns the last value of the sliding window. + #[inline] + pub fn pop(&mut self) -> Option { + if self.pop_stack.is_empty() { + match self.push_stack.pop() { + Some((val, _)) => { + let mut last = (val.clone(), val); + self.pop_stack.push(last.clone()); + while let Some((val, _)) = self.push_stack.pop() { + let min = if last.1 < val { + last.1.clone() + } else { + val.clone() + }; + last = (val.clone(), min); + self.pop_stack.push(last.clone()); + } + } + None => return None, + } + } + self.pop_stack.pop().map(|(val, _)| val) } - fn supports_retract_batch(&self) -> bool { - true + /// Returns the number of elements stored in the sliding window. + #[inline] + pub fn len(&self) -> usize { + self.push_stack.len() + self.pop_stack.len() } - fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.min) + self.min.size() + /// Returns `true` if the moving window contains no elements. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 } } - +/// ``` +/// # use datafusion_physical_expr::aggregate::moving_min_max::MovingMax; +/// let mut moving_max = MovingMax::::new(); +/// moving_max.push(2); +/// moving_max.push(3); +/// moving_max.push(1); +/// +/// assert_eq!(moving_max.max(), Some(&3)); +/// assert_eq!(moving_max.pop(), Some(2)); +/// +/// assert_eq!(moving_max.max(), Some(&3)); +/// assert_eq!(moving_max.pop(), Some(3)); +/// +/// assert_eq!(moving_max.max(), Some(&1)); +/// assert_eq!(moving_max.pop(), Some(1)); +/// +/// assert_eq!(moving_max.max(), None); +/// assert_eq!(moving_max.pop(), None); +/// ``` #[derive(Debug)] -pub struct SlidingMaxAccumulator { - max: ScalarValue, - moving_max: MovingMax, +pub struct MovingMax { + push_stack: Vec<(T, T)>, + pop_stack: Vec<(T, T)>, } -impl SlidingMaxAccumulator { - /// new max accumulator - pub fn try_new(datatype: &DataType) -> Result { - Ok(Self { - max: ScalarValue::try_from(datatype)?, - moving_max: MovingMax::::new(), - }) +impl Default for MovingMax { + fn default() -> Self { + Self { + push_stack: Vec::new(), + pop_stack: Vec::new(), + } } } -impl Accumulator for SlidingMaxAccumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - for idx in 0..values[0].len() { - let val = ScalarValue::try_from_array(&values[0], idx)?; - self.moving_max.push(val); - } - if let Some(res) = self.moving_max.max() { - self.max = res.clone(); - } - Ok(()) +impl MovingMax { + /// Creates a new `MovingMax` to keep track of the maximum in a sliding window. + #[inline] + pub fn new() -> Self { + Self::default() } - fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - for _idx in 0..values[0].len() { - (self.moving_max).pop(); - } - if let Some(res) = self.moving_max.max() { - self.max = res.clone(); + /// Creates a new `MovingMax` to keep track of the maximum in a sliding window with + /// `capacity` allocated slots. + #[inline] + pub fn with_capacity(capacity: usize) -> Self { + Self { + push_stack: Vec::with_capacity(capacity), + pop_stack: Vec::with_capacity(capacity), } - Ok(()) } - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - self.update_batch(states) + /// Returns the maximum of the sliding window or `None` if the window is empty. + #[inline] + pub fn max(&self) -> Option<&T> { + match (self.push_stack.last(), self.pop_stack.last()) { + (None, None) => None, + (Some((_, max)), None) => Some(max), + (None, Some((_, max))) => Some(max), + (Some((_, a)), Some((_, b))) => Some(if a > b { a } else { b }), + } } - fn state(&mut self) -> Result> { - Ok(vec![self.max.clone()]) + /// Pushes a new element into the sliding window. + #[inline] + pub fn push(&mut self, val: T) { + self.push_stack.push(match self.push_stack.last() { + Some((_, max)) => { + if val < *max { + (val, max.clone()) + } else { + (val.clone(), val) + } + } + None => (val.clone(), val), + }); } - fn evaluate(&mut self) -> Result { - Ok(self.max.clone()) + /// Removes and returns the last value of the sliding window. + #[inline] + pub fn pop(&mut self) -> Option { + if self.pop_stack.is_empty() { + match self.push_stack.pop() { + Some((val, _)) => { + let mut last = (val.clone(), val); + self.pop_stack.push(last.clone()); + while let Some((val, _)) = self.push_stack.pop() { + let max = if last.1 > val { + last.1.clone() + } else { + val.clone() + }; + last = (val.clone(), max); + self.pop_stack.push(last.clone()); + } + } + None => return None, + } + } + self.pop_stack.pop().map(|(val, _)| val) } - fn supports_retract_batch(&self) -> bool { - true + /// Returns the number of elements stored in the sliding window. + #[inline] + pub fn len(&self) -> usize { + self.push_stack.len() + self.pop_stack.len() } - fn size(&self) -> usize { - std::mem::size_of_val(self) - std::mem::size_of_val(&self.max) + self.max.size() + /// Returns `true` if the moving window contains no elements. + #[inline] + pub fn is_empty(&self) -> bool { + self.len() == 0 } } +make_udaf_expr_and_func!( + Max, + max, + expression, + "Returns the maximum of a group of values.", + max_udaf +); + +make_udaf_expr_and_func!( + Min, + min, + expression, + "Returns the minimum of a group of values.", + min_udaf +); + +pub fn max_distinct(expr: Expr) -> Expr { + Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf( + max_udaf(), + vec![expr], + true, + None, + None, + None, + )) +} + +pub fn min_distinct(expr: Expr) -> Expr { + Expr::AggregateFunction(datafusion_expr::expr::AggregateFunction::new_udf( + min_udaf(), + vec![expr], + true, + None, + None, + None, + )) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs b/datafusion/physical-expr-common/src/aggregate/mod.rs index 4eede6567504..de4355dab642 100644 --- a/datafusion/physical-expr-common/src/aggregate/mod.rs +++ b/datafusion/physical-expr-common/src/aggregate/mod.rs @@ -730,6 +730,12 @@ impl AggregateExpr for AggregateFunctionExpr { } } } + + fn get_minmax_desc(&self) -> Option<(Field, bool)> { + self.fun + .get_minmax_desc() + .and_then(|flag| self.field().ok().map(|f| (f, flag))) + } } impl PartialEq for AggregateFunctionExpr { diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 959de23cf9d3..6c4c07428bd3 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -24,11 +24,11 @@ use datafusion_common::{ }; use datafusion_expr::expr::Unnest; use datafusion_expr::expr::{Alias, Placeholder}; +use datafusion_expr::ExprFunctionExt; use datafusion_expr::{ expr::{self, InList, Sort, WindowFunction}, logical_plan::{PlanType, StringifiedPlan}, - Between, BinaryExpr, BuiltInWindowFunction, Case, Cast, Expr, ExprFunctionExt, - GroupingSet, + Between, BinaryExpr, BuiltInWindowFunction, Case, Cast, Expr, GroupingSet, GroupingSet::GroupingSets, JoinConstraint, JoinType, Like, Operator, TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 69971997aa3e..a4334fce52cd 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -25,6 +25,7 @@ use std::vec; use arrow::array::RecordBatch; use arrow::csv::WriterBuilder; use datafusion::physical_expr_common::aggregate::AggregateExprBuilder; +use datafusion::physical_plan::udaf::create_aggregate_expr; use datafusion_functions_aggregate::min_max::max_udaf; use prost::Message; @@ -919,7 +920,6 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> { "max", false, false, - false, )?; let window = Arc::new(WindowAggExec::try_new(