-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Support Spark ArraySort with lambda function #10138
base: main
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for meta-velox canceled.
|
return rewritten; | ||
} | ||
|
||
VELOX_USER_FAIL(kNotSupported, lambda->toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to throw error out if the rewrite is not possible for spark? I follow the presto's logic, but not sure it's necessary for spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since Spark has different comparisons implementation than presto
Could we add a block the PR description to describe the semantic difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
@rui-mo done, address the comments, pls have a look again. |
@boneanxs I wonder if we are tackling the difference of NaN semantics in this PR. There is a plan in Velox to adjust its semantics, and some PRs have been merged. Perhaps we can fix the Presto function directly, seeing #7237. |
@rui-mo We might still need a special spark rewrite arraySort logic even the NaN semantics difference is fixed, given:
e.g. for the expression Do we have plan to fix all spark comparison functions after the NaN semantics is unified? Also, possibly other comparison functions could have other semantic difference than NaN? And I'm thinking it might be a long term to address it?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
b209dd1
to
901aae4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added several comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Added several questions.
prefix + "array_sort", arraySortSignatures(), makeArraySort); | ||
prefix + "array_sort", arraySortSignatures(true), makeArraySortAsc); | ||
exec::registerStatefulVectorFunction( | ||
prefix + "array_sort_desc", arraySortDescSignatures(), makeArraySortDesc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have corresponding function for array_sort_desc
in Spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have array_sort_desc
in Spark, but this is required since rewriteArraySort
need it:
velox/velox/functions/lib/ArraySort.cpp
Line 559 in 0093ee9
: prefix + "array_sort_desc"; |
5c0e1f4
to
80fa6c3
Compare
This pull request has been automatically marked as stale because it has not had recent activity. If you'd still like this PR merged, please comment on the PR, make sure you've addressed reviewer comments, and rebase on the latest main. Thank you for your contributions! |
@boneanxs Would you like to update this PR? Thanks. |
Oh, forgot it. Sure, will update it recently |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments on the documentation.
velox/docs/functions/spark/array.rst
Outdated
:noindex: | ||
|
||
Returns the array sorted by values computed using specified lambda in ascending | ||
order. ``U`` must be an orderable type. If the value from the lambda function is NULL, the element will be placed at the end. :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NULL or NaN for floating type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently for lambda function returned values, Nan is not handled. Do we need to handle NaN
since it shouldn't returned by lambda functions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle NaN since it shouldn't returned by lambda functions
Hi @boneanxs, could you provide more details on why NaN shouldn't be returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry, I overlooked this before, for array_sort with lambda functions, it supports sorting with NaN
in SimpleVector.comparePrimitiveAsc
and follows the logic of NaN is before NULL
, I also add a test to cover this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me explain more here. After looking into the presto/spark implementation, they both say
It returns -1, 0, or 1 as the first nullable element is less than, equal to, or greater than the second nullable element. If the comparator function returns other values (including NULL), the query will fail and raise an error
see presto and spark (Though spark says it doesn't support returning null values, but it doesn't throw errors for query like SELECT array_sort(ARRAY ('bc', 'ab', 'dc'), (x, y) -> IF(x < y, 1, IF(x = y, 0, null)))
in Spark3.2 which might be a bug)
So null values and NaN shouldn't be return for lambda function function(T,T, int)
, and in SimpleComparisonMatcher
, we do the match that the return value must be int.
SimpleComparisonMatcher
could optimize function(T,T, int)
to function(T, U)
where U
is orderable(not limited to int), it's possible that it creates float values, such as function(float, float, int)
: IF( x > y, 1, IF(x < y, -1, 0))
will be optimized to function(float, float)
: x -> x
, at such point, they should still be the same since both goes into SimpleVector.compare
to do the comparison(except NULLs are filtered in ArraySort.sortElements
in advance to respect nullsFirst
flag). And inside SimpleVector.compare
, NaN
is smaller than Null
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried SELECT array_sort(ARRAY ('bc', 'ab', 'dc'), (x, y) -> IF(x < y, 1, IF(x = y, 0, null)))
in Spark 3.5 and got below exception. Would you like the add a unit test for this case to make sure exception is thrown?
Caused by: org.apache.spark.SparkException: [COMPARATOR_RETURNS_NULL] The comparator has returned a NULL for a comparison between dc and dc. It should return a positive integer for "greater than", 0 for "equal" and a negative integer for "less than". To revert to deprecated behavior where NULL is treated as 0 (equal), you must set "spark.sql.legacy.allowNullComparisonResultInArraySort" to "true".
I also notice Spark requires the function must return integer type, would you like to confirm?
https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala#L412-L421
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to miss this comment, yes, for the integer type handling, Spark itself will throw the exception if the return type is not integer type(and it handles return type in analyze stage). Also, velox can't rewrite the lambda function if the lambda function doesn't match, I add a test in ArraySortTest.unsupporteLambda
to ensure it.
As for null returned by comparator, it's fixed by apache/spark#36812 since Spark 3.2.2( I use 3.2.1 so it can pass). Spark 3.5 doesn't have this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for iterating. Some minors and the others look good!
velox/docs/functions/spark/array.rst
Outdated
:noindex: | ||
|
||
Returns the array sorted by values computed using specified lambda in ascending | ||
order. ``U`` must be an orderable type. If the value from the lambda function is NULL, the element will be placed at the end. :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to handle NaN since it shouldn't returned by lambda functions
Hi @boneanxs, could you provide more details on why NaN shouldn't be returned?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boneanxs Thanks for iterating. Would you also rebase this PR?
velox/docs/functions/spark/array.rst
Outdated
:noindex: | ||
|
||
Returns the array sorted by values computed using specified lambda in ascending order. ``U`` must be an orderable type. | ||
Null/NaN elements returned by the lambda function will be placed at the end of the returned array, with NaN elements appearing before Null elements. This functions is not supported in Spark and is only used inside velox. :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps clarify the purpose in the document.
used inside velox for rewring :spark:func:`xxx` as :spark:func:`xxx`.
@@ -35,6 +36,24 @@ class ArraySortTest : public SparkFunctionBaseTest { | |||
assertEqualVectors(expected, result); | |||
} | |||
|
|||
void testArraySort( | |||
const std::string& lamdaExpr, | |||
const bool asc, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: drop const when passing by value.
velox/docs/functions/spark/array.rst
Outdated
Returns the array sorted by values computed using specified lambda in ascending | ||
order. ``U`` must be an orderable type. If the value from the lambda function is NULL, the element will be placed at the end. | ||
The function attempts to analyze the lambda function and rewrite it into a simpler call that | ||
specifies the sort-by expression (like :spark:func:`array_sort(array(T), function(T,U)) -> array(T)`). For example, ``(left, right) -> if(length(left) > length(right), 1, if(length(left) < length(right), -1, 0))`` will be rewritten to ``x -> length(x)``. :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps clarify the behavior when rewrite is not possible.
@@ -140,5 +163,50 @@ TEST_F(ArraySortTest, constant) { | |||
expected = makeConstantArray<int64_t>(size, {6, 6, 6, 6}); | |||
assertEqualVectors(expected, result); | |||
} | |||
|
|||
TEST_F(ArraySortTest, lambda) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you like to add test for the case when rewriting is not possible?
velox/docs/functions/spark/array.rst
Outdated
:noindex: | ||
|
||
Returns the array sorted by values computed using specified lambda in ascending | ||
order. ``U`` must be an orderable type. If the value from the lambda function is NULL, the element will be placed at the end. :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried SELECT array_sort(ARRAY ('bc', 'ab', 'dc'), (x, y) -> IF(x < y, 1, IF(x = y, 0, null)))
in Spark 3.5 and got below exception. Would you like the add a unit test for this case to make sure exception is thrown?
Caused by: org.apache.spark.SparkException: [COMPARATOR_RETURNS_NULL] The comparator has returned a NULL for a comparison between dc and dc. It should return a positive integer for "greater than", 0 for "equal" and a negative integer for "less than". To revert to deprecated behavior where NULL is treated as 0 (equal), you must set "spark.sql.legacy.allowNullComparisonResultInArraySort" to "true".
I also notice Spark requires the function must return integer type, would you like to confirm?
https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala#L412-L421
@rui-mo can see apache/incubator-gluten#8526, added tests passed, Also, it will be fallback if can't be rewritable. 2025-01-14T04:03:49.5216547Z - array_sort with lambda functions
2025-01-14T04:03:49.5478095Z 04:03:49.547 WARN org.apache.spark.sql.execution.GlutenFallbackReporter: Validation failed for plan: Project, due to: Native validation failed:
2025-01-14T04:03:49.5481453Z Validation failed due to exception caught at file:SubstraitToVeloxPlanValidator.cc line:1380 function:validate, thrown from file:ArraySort.cpp line:573 function:rewriteArraySortCall, reason:array_sort with comparator lambda that cannot be rewritten into a transform is not supported: lambda ROW<x_12:MAP<VARCHAR,INTEGER>,y_13:MAP<VARCHAR,INTEGER>> -> subtract(size("x_12",true),size("y_13",true)).
2025-01-14T04:03:49.5542142Z 04:03:49.553 WARN org.apache.spark.sql.execution.GlutenFallbackReporter: Validation failed for plan: Project, due to: Native validation failed:
2025-01-14T04:03:49.5545407Z Validation failed due to exception caught at file:SubstraitToVeloxPlanValidator.cc line:1380 function:validate, thrown from file:ArraySort.cpp line:573 function:rewriteArraySortCall, reason:array_sort with comparator lambda that cannot be rewritten into a transform is not supported: lambda ROW<x_12:MAP<VARCHAR,INTEGER>,y_13:MAP<VARCHAR,INTEGER>> -> subtract(size("x_12",true),size("y_13",true)).
2025-01-14T04:03:49.6574296Z 04:03:49.656 WARN org.apache.spark.sql.execution.GlutenFallbackReporter: Validation failed for plan: Project, due to: Native validation failed:
2025-01-14T04:03:49.6577446Z Validation failed due to exception caught at file:SubstraitToVeloxPlanValidator.cc line:1380 function:validate, thrown from file:ArraySort.cpp line:573 function:rewriteArraySortCall, reason:array_sort with comparator lambda that cannot be rewritten into a transform is not supported: lambda ROW<x:MAP<VARCHAR,INTEGER>,y:MAP<VARCHAR,INTEGER>> -> subtract(size("x",true),size("y",true)).
2025-01-14T04:03:49.6660157Z 04:03:49.665 WARN org.apache.spark.sql.execution.GlutenFallbackReporter: Validation failed for plan: Project, due to: Native validation failed:
2025-01-14T04:03:49.6663312Z Validation failed due to exception caught at file:SubstraitToVeloxPlanValidator.cc line:1380 function:validate, thrown from file:ArraySort.cpp line:573 function:rewriteArraySortCall, reason:array_sort with comparator lambda that cannot be rewritten into a transform is not supported: lambda ROW<x:MAP<VARCHAR,INTEGER>,y:MAP<VARCHAR,INTEGER>> -> subtract(size("x",true),size("y",true)). |
d950987
to
f800969
Compare
Hey @rui-mo , any more comments for this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just reviewed the added doc. Could you compile the rst file to see the added content is well displayed (including hyper link) in the generated doc? Thanks!
velox/docs/functions/spark/array.rst
Outdated
:noindex: | ||
|
||
Returns the array sorted by values computed using specified lambda in ascending | ||
order. ``U`` must be an orderable type. If the value from the lambda function is NULL, the element will be placed at the end. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion (if my understanding is right):
If the value from the lambda function is NULL, the element will be placed at the end.
->
If the lambda function returns NULL, the corresponding element will be placed at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, not the same, it means the value returned from the rewritten function, such as x -> length(x)
in the example.
I update here to make it more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @boneanxs, I added some nits. And do we have feedback for #10138 (comment)?
@boneanxs, could you file a pr in Gluten to enable this function and also to see its CI feedback? Thus, we can make sure Spark UTs pass. In Gluten pr, your personal Velox branch with this patch should be referenced. |
Hey @PHILO-HE, thanks for mentioning it, yes, it's displayed well
There's a pr test it before, apache/incubator-gluten#8526, and it all passes. |
@rui-mo Oh, sorry I miss that before, added comment now. Can help review it again :) |
Support Spark
array_sort
to allowlambda
function to sort elements.Since Spark has different comparisons implementation than presto(see #5569), we can't directly reuse presto
array_sort
logic to rewritelambda
function to a simple comparator if possible.This pr tries to:
array_sort
tovelox/functions/lib
whereas both presto and spark can use itnullsFirst
to support nulls to be placed at the start of the array(to support spark functionsort_array
SimpleComparisonMatcher
and move it tovelox/functions/lib
, and create differentSimpleComparisonChecker
for spark and presto to do the comparison match(e.g,=
iseq
in presto, butequalto
in spark)