Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support 'col IN (a, b, c)' type expressions #652

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

roeap
Copy link
Collaborator

@roeap roeap commented Jan 18, 2025

What changes are proposed in this pull request?

Currently, evaluation expressions of type col IN (a, b, c) is missing an implementation. While this might be the exact case @scovich cautioned us about, where the rhs might get significant in size and should really be handled as EngineData, I hope that we at least do not make things worse here. Unfortunately delta-rs already has support for these types of expressions, so the main intend right now is to retain feature parity over there while migrating.

How was this change tested?

Additional tests for specific expression flavor.

Copy link

codecov bot commented Jan 18, 2025

Codecov Report

Attention: Patch coverage is 77.12766% with 43 lines in your changes missing coverage. Please review.

Project coverage is 84.00%. Comparing base (6751838) to head (def21c1).

Files with missing lines Patch % Lines
kernel/src/engine/arrow_expression.rs 82.08% 21 Missing and 3 partials ⚠️
kernel/src/expressions/scalars.rs 66.03% 18 Missing ⚠️
kernel/src/engine/arrow_conversion.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #652      +/-   ##
==========================================
- Coverage   84.08%   84.00%   -0.08%     
==========================================
  Files          76       76              
  Lines       17526    17713     +187     
  Branches    17526    17713     +187     
==========================================
+ Hits        14736    14880     +144     
- Misses       2077     2115      +38     
- Partials      713      718       +5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@roeap roeap force-pushed the feat/col-in-arr branch 2 times, most recently from 007b4e2 to 6977db9 Compare January 18, 2025 16:19
@@ -208,7 +208,7 @@ impl TryFrom<&ArrowDataType> for DataType {
ArrowDataType::Date64 => Ok(DataType::DATE),
ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => Ok(DataType::TIMESTAMP_NTZ),
ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz))
if tz.eq_ignore_ascii_case("utc") =>
if tz.eq_ignore_ascii_case("utc") || tz.eq_ignore_ascii_case("+00:00") =>
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The data in arrow arrays should always represent a timestamp in UTC, so is this check even necessary?

https://github.com/apache/arrow-rs/blob/af777cd53e56f8382382137b6e08af249c475397/arrow-schema/src/datatype.rs#L179-L182

})?;

fn op(
col: impl Iterator<Item = Option<impl Into<Scalar>>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why not impl IntoIterator? Avoids having to call iter() at the call site?

}

// safety: as_* methods on arrow arrays can panic, but we checked the data type before applying.
let arr: BooleanArray = match (column.data_type(), data_type) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: with the strongly typed op, we shouldn't need a type annotation here?

Suggested change
let arr: BooleanArray = match (column.data_type(), data_type) {
let arr = match (column.data_type(), data_type) {

Comment on lines 298 to 305
fn op(
col: impl Iterator<Item = Option<impl Into<Scalar>>>,
ad: &ArrayData,
) -> BooleanArray {
#[allow(deprecated)]
let res = col.map(|val| val.map(|v| ad.array_elements().contains(&v.into())));
BooleanArray::from_iter(res)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might help both readability to factor out op a tad differently (playground):

#[allow(deprecated)]
let inlist = ad.array_elements();
fn op<T>(
    inlist: &[Scalar], 
    values: impl IntoIterator<Item = Option<T>>, 
    from: fn(T) -> Scalar,
) -> BooleanArray {
    values
        .into_iter()
        .map(|v| Some(inlist.contains(&from(v?))))
        .collect()
}

Then the primitive array cases simplify to e.g.:

(ArrowDataType::Float64, PrimitiveType::Double) => {
    op(inlist, column.as_primitive::<Float64Type>(), Scalar::from)
}

and e.g. the TimestampNtz case simplifies to:

let array = column.as_primitive::<TimestampMicrosecondType>();
op(inlist, array, Scalar::TimestampNtz)

(might need to tweak the type signature of values a bit to match whatever the primitive column iterator actually returns -- but the general approach seems to work)

We should also be able to make T: ArrowPrimitiveType and derive the expected primitive rust type from there:

fn op<T: ArrowPrimitiveType>(
    inlist: &[Scalar], 
    values: &dyn Array, 
    from: fn(T::Native) -> Scalar,
) -> BooleanArray {
    values
        .as_primitive::<T>
        .into_iter()
        .map(|v| Some(inlist.contains(&from(v?))))
        .collect()
}

which allows pulling the as_primitive call inside op:

(ArrowDataType::Float64, PrimitiveType::Double) => {
    op::<Float64Type>(inlist, column, Scalar::from)
}

and e.g. the TimestampNtz case simplifies to:

op::<TimestampMicrosecondType>(inlist, column, Scalar::TimestampNtz)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah - this turs out much nicer! Needed to make some adjustments, but hope kept the spirit!

kernel/src/engine/arrow_expression.rs Outdated Show resolved Hide resolved
ad: &ArrayData,
) -> BooleanArray {
#[allow(deprecated)]
let res = col.map(|val| val.map(|v| ad.array_elements().contains(&v.into())));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this handles NULL values correctly? See e.g. https://spark.apache.org/docs/3.5.1/sql-ref-null-semantics.html#innot-in-subquery-:

  • TRUE is returned when the non-NULL value in question is found in the list
  • FALSE is returned when the non-NULL value is not found in the list and the list does not contain NULL values
  • UNKNOWN is returned when the value is NULL, or the non-NULL value is not found in the list and the list contains at least one NULL value

I think, instead of calling contains, you could borrow the code from PredicateEvaluatorDefaults::finish_eval_variadic, with true as the "dominator" value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think you could just invoke that method directly, with a properly crafted iterator?

// `v IN (k1, ..., kN)` is logically equivalent to `v = k1 OR ... OR v = kN`, so evaluate
// it as such, ensuring correct handling of NULL inputs (including `Scalar::Null`).
col.map(|v| {
    PredicateEvaluatorDefaults::finish_eval_variadic(
        VariadicOperator::Or, 
        inlist.iter().map(Some(Scalar::partial_cmp(v?, k?)? == Ordering::Equal)),
        false,
    )
})

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was I correct in thinking that None - no dominant value, but found Null - should just be false in this case?

ad: &ArrayData,
) -> BooleanArray {
#[allow(deprecated)]
let res = col.map(|val| val.map(|v| ad.array_elements().contains(&v.into())));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: We actually have a lurking bug -- Scalar derives PartialEq which will allow two Scalar::Null to compare equal. But SQL semantics dictate that NULL doesn't compare equal to anything -- not even itself.

Our manual impl of PartialOrd for Scalar does this correctly, but it breaks the rules for PartialEq:

If PartialOrd or Ord are also implemented for Self and Rhs, their methods must also be consistent with PartialEq (see the documentation of those traits for the exact requirements). It’s easy to accidentally make them disagree by deriving some of the traits and manually implementing others.

Looks like we'll need to define a manual impl PartialEq for Scalar that follows the same approach.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed not covered. Added an implementation for PartialEq that mirrors PartialOrd.

@roeap roeap requested a review from scovich January 24, 2025 23:55
Comment on lines +312 to +313
// None is returned when no dominant value (true) is found and there is at least one NULL
// In th case of IN, this is equivalent to false
Copy link
Collaborator

@scovich scovich Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rescuing #652 (comment):

https://spark.apache.org/docs/3.5.1/sql-ref-null-semantics.html#innot-in-subquery-:

  • TRUE is returned when the non-NULL value in question is found in the list
  • FALSE is returned when the non-NULL value is not found in the list and the list does not contain NULL values
  • UNKNOWN (NULL) is returned when the value is NULL, or the non-NULL value is not found in the list and the list contains at least one NULL value

If I understand the above correctly:

NULL IN (1, 2, NULL) -- NULL because the value to search for was NULL
10 IN (1, 2, NULL) -- NULL because no direct match and in-list contains a NULL
10 IN (1, 2) -- FALSE because no match and also no NULL anywhere
10 IN (10, 20, NULL) -- TRUE in spite of the NULL because of direct match 

So instead of Some(<expr>.unwrap_or(false)) we should just return <expr> directly and let the NULL propagate.

Some(
PredicateEvaluatorDefaults::finish_eval_variadic(
VariadicOperator::Or,
inlist.iter().map(|k| v.as_ref().map(|vv| vv == k)),
Copy link
Collaborator

@scovich scovich Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct -- we need comparisons against Scalar::Null to return None. That's why I had previously recommended using Scalar::partial_cmp instead of ==.

Also, can we not use ? to unwrap the various options here?

Suggested change
inlist.iter().map(|k| v.as_ref().map(|vv| vv == k)),
inlist.iter().map(Some(Scalar::partial_cmp(v?, k?)? == Ordering::Equal)),

Unpacking that -- if the value we search for is NULL, or if the inlist entry is NULL, or if the two values are incomparable, then return None for that pair. Otherwise, return Some boolean indicating whether the values compared equal or not. That automatically covers the various required cases, and also makes us robust to any type mismatches that might creep in.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: If we wanted to be a tad more efficient, we could also unpack v outside the inner loop:

values.map(|v| {
    let v = v?;
    PredicateEvaluatorDefaults::finish_eval_variadic(...)
})

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm -- empty in-lists pose a corner case with respect to unpacking v:

NULL IN ()

Operator OR with zero inputs normally produces FALSE (which is correct if you stop to think about it) -- but unpacking a NULL v first makes the operator return NULL instead (which is also correct if you squint, because NULL input always produces NULL output).

Unfortunately, the only clear docs I could find -- https://spark.apache.org/docs/3.5.1/sql-ref-null-semantics.html#innot-in-subquery- -- are also ambiguous:

Conceptually a IN expression is semantically equivalent to a set of equality condition separated by a disjunctive operator (OR).

... suggests FALSE while

UNKNOWN is returned when the value is NULL

... suggests NULL

The difference matters for NOT IN, because NULL NOT IN () would either return TRUE (keep rows) or NULL (drop row).

Copy link
Collaborator

@scovich scovich Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: SQL engines normally forbid statically empty in-list but do not forbid subqueries from producing an empty result.

I tried the following expression on three engines (sqlite, mysql, postgres):

SELECT 1 WHERE NULL NOT IN (SELECT 1 WHERE FALSE)

And all three returned 1. So OR semantics prevail, and we must NOT unpack v outside the loop.

}

fn str_op<'a>(
column: impl Iterator<Item = Option<&'a str>> + 'a,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
column: impl Iterator<Item = Option<&'a str>> + 'a,
column: impl IntoIterator<Item = Option<&'a str>> + 'a,

(avoids the need for callers to invoke iter() -- we can call into_iter() once here instead)

(the column has type e.g. &GenericByteArray, whose impl IntoIterator is equivalent to calling iter())

Comment on lines +336 to +345
(ArrowDataType::Utf8, PrimitiveType::String) => op_in(inlist, str_op(column.as_string::<i32>().iter())),
(ArrowDataType::LargeUtf8, PrimitiveType::String) => op_in(inlist, str_op(column.as_string::<i64>().iter())),
(ArrowDataType::Utf8View, PrimitiveType::String) => op_in(inlist, str_op(column.as_string_view().iter())),
(ArrowDataType::Int8, PrimitiveType::Byte) => op_in(inlist,op::<Int8Type>( column.as_ref(), Scalar::from)),
(ArrowDataType::Int16, PrimitiveType::Short) => op_in(inlist,op::<Int16Type>(column.as_ref(), Scalar::from)),
(ArrowDataType::Int32, PrimitiveType::Integer) => op_in(inlist,op::<Int32Type>(column.as_ref(), Scalar::from)),
(ArrowDataType::Int64, PrimitiveType::Long) => op_in(inlist,op::<Int64Type>(column.as_ref(), Scalar::from)),
(ArrowDataType::Float32, PrimitiveType::Float) => op_in(inlist,op::<Float32Type>(column.as_ref(), Scalar::from)),
(ArrowDataType::Float64, PrimitiveType::Double) => op_in(inlist,op::<Float64Type>(column.as_ref(), Scalar::from)),
(ArrowDataType::Date32, PrimitiveType::Date) => op_in(inlist,op::<Date32Type>(column.as_ref(), Scalar::Date)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are all a lot longer than 100 chars... why doesn't the fmt check blow up??

@@ -280,6 +281,84 @@ fn evaluate_expression(
(ArrowDataType::Decimal256(_, _), Decimal256Type)
}
}
(Column(name), Literal(Scalar::Array(ad))) => {
fn op<T: ArrowPrimitiveType>(
values: &dyn Array,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
values: &dyn Array,
values: ArrayRef,

(avoids the need for .as_ref() at the call site)

Comment on lines +230 to +232
// NOTE: We intentionally do two match arms for each variant to avoid a catch-all, so
// that new variants trigger compilation failures instead of being silently ignored.
match (self, other) {
Copy link
Collaborator

@scovich scovich Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the requirements for PartialEq and PartialOrd, I think it would be much safer (and more compact) to move the current PartialOrd::partial_cmp to Scalar::partial_cmp. Then PartialOrd::partial_cmp is just a thin wrapper around self.partial_cmp, and PartialEq::eq is self.partial_cmp(...) == Some(Ordering::Equal)

See e.g. https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=dc8e783e26f94c98da88258a4854dbd9

#[test]
fn test_partial_cmp() {
let a = Scalar::Integer(1);
let b = Scalar::Integer(2);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably needs a Scalar::Null as well, to exercise None result?

(ditto below, to ensure that comparing == against Scalar::null produces false result)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants