Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve speed of median by implementing special GroupsAccumulator #13681

Open
wants to merge 21 commits into
base: main
Choose a base branch
from

Conversation

Rachelint
Copy link
Contributor

@Rachelint Rachelint commented Dec 7, 2024

Which issue does this PR close?

Closes #13550

Rationale for this change

Support specific GroupsAccumulator for median for better performance.

What changes are included in this PR?

  • Impl MedianGroupsAccumulator
  • add related tests.

Are these changes tested?

Yes, by exist tests and new e2e and fuzzy tests.

Are there any user-facing changes?

No.

@Rachelint Rachelint force-pushed the impl-group-accumulator-for-median branch from ded4e5f to 311f82f Compare December 8, 2024 17:28
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Dec 21, 2024
@Rachelint Rachelint force-pushed the impl-group-accumulator-for-median branch from f80e890 to a53da8c Compare January 22, 2025 09:25
@Rachelint
Copy link
Contributor Author

Rachelint commented Jan 23, 2025

I am back and continue working this pr yesterday.
The rest work is fixing some failed fuzzy test cases and sorting out codes, plan to finish it today or tomorrow.

Very sorry for long delay for some private reason.

Thanks @Dandandan for helping!

@alamb alamb mentioned this pull request Jan 23, 2025
29 tasks
@github-actions github-actions bot added the core Core DataFusion crate label Jan 24, 2025
@Rachelint Rachelint force-pushed the impl-group-accumulator-for-median branch from 5047371 to 40284c8 Compare January 24, 2025 09:13
@Rachelint
Copy link
Contributor Author

Rachelint commented Jan 25, 2025

I think this pr is ready now, sorry again for long delay.

Q6 in h2o medium:

  • result in main
Q6: SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5;
Query 6 iteration 1 took 9584.7 ms and returned 10000 rows
Query 6 iteration 2 took 9606.6 ms and returned 10000 rows
Query 6 iteration 3 took 9584.6 ms and returned 10000 rows
  • result in this pr
Q6: SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5;
Query 6 iteration 1 took 6558.1 ms and returned 10000 rows
Query 6 iteration 2 took 6473.9 ms and returned 10000 rows
Query 6 iteration 3 took 6494.3 ms and returned 10000 rows

@Rachelint Rachelint marked this pull request as ready for review January 25, 2025 11:04
Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! I have benchmarked locally and it's much faster (for benchmarks running on csvs, i think most time is spend reading from csv, so the results are closer)

h2o Q6 on parquet (10k groups):
main: 1500ms
pr: 300ms

query with 4 groups (from tpch sf10 lineitem table):
select median(l_orderkey) from lineitem group by l_returnflag, l_linestatus;
main: 0.7s
pr: 0.35s

I have a suggestion for testing: I noticed existing null tests for median() won't take this GroupsAccumulator path, those test cases don't have group by so they should be executed with regular Accumulator (see https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/aggregates/no_grouping.rs), could you include tests for null handling?

// Extend values to related groups
// TODO: avoid using iterator of the `ListArray`, this will lead to
// many calls of `slice` of its ``inner array`, and `slice` is not
// so efficient(due to the calculation of `null_count` for each `slice`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to directly use the value without checking null, null values should be ignored during accumulation

Copy link
Contributor Author

@Rachelint Rachelint Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to directly use the value without checking null, null values should be ignored during accumulation

🤔 The input list is possible to be null actually, due to some of them are generated from convert_to_state(skip_partial).

And batch like:

row0: 0
row1: 1
row2: null
...
rown: n

will be converted to a list like:

row0: [0]
row1: [1]
row2: null
...
rown: [n]

I think we can implement a simple version for correctness firstly.

@Rachelint
Copy link
Contributor Author

@2010YOUY01 testcases with nulls have been added.

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation looks good to me, thank you

@Rachelint
Copy link
Contributor Author

Plan to merge this one tomorrow, would you like to review this pr again before merging?

@Dandandan @alamb

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Rachelint for this really nice PR and @2010YOUY01 for the review.

I thought this PR looks really really nice (easy to understand and read). I am running the end to end benchmarks on my gcp machine now to get final numbers but I suspect it will be much faster 🚀

I left various suggestions on ways to potentially make this PR faster, but they could all be done as follow ons (or never)

Thanks again 🙏

let data_gen_config = baseline_config();

// Queries like SELECT median(a), median(distinct) FROM fuzz_table GROUP BY b
let query_builder = QueryBuilder::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

/// For calculating the accurate medians of groups, we need to store all values
/// of groups before final evaluation.
/// So values in each group will be stored in a `Vec<T>`, and the total group values
/// will be actually organized as a `Vec<Vec<T>>`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given it is important to track the median values for each group separately I don't really see a way around Vec/Vec -- I think it is the simplest version and will have pretty reasonable performance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I tried not to use Vec<Vec<T>> for avoiding copying from Vec<Vec<T>> to the result Vec<T>, but it is hard to do that.

datafusion/functions-aggregate/src/median.rs Show resolved Hide resolved
datafusion/functions-aggregate/src/median.rs Show resolved Hide resolved

// `offsets` in `ListArray`, each row as a list element
let offsets = (0..=input_array.len() as i32).collect::<Vec<_>>();
let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise here OffsetBuffer::new_unchecked could be used

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
It is easy to ensure all check in OffsetBuffer::new can be passed by adding

        assert!(input_array.len() <= i32::MAX as usize);

datafusion/functions-aggregate/src/median.rs Show resolved Hide resolved
@alamb alamb changed the title Support specific GroupsAccumulator for median Improve speed of median by implementing special GroupsAccumulator Jan 27, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW for completeness I also ran the sqllite suite against this PR too:

andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2$ INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests
...
andrewlamb@Andrews-MacBook-Pro-2:~/Software/datafusion2$ INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests
    Finished `release-nonlto` profile [optimized] target(s) in 0.34s
     Running bin/sqllogictests.rs (target/release-nonlto/deps/sqllogictests-6c6dc6221381c36b)
Completed in 8 minutes

And it all passed (thanks again to @Omega359 for making this happen)

I was trying to figure out how to run the extended test suite against this PR, but I couldn't figure out how to setup the workflow syntax. I filed this to track the idea:

@alamb
Copy link
Contributor

alamb commented Jan 27, 2025

BTW I am still trying to benchmark this branch to show how awesome it is. I am having trouble as the h2o large benchmark is being oomkilled on my machine

@Rachelint Rachelint force-pushed the impl-group-accumulator-for-median branch 3 times, most recently from d2d8ca9 to a975adb Compare January 27, 2025 21:55
#[derive(Debug)]
struct MedianGroupsAccumulator<T: ArrowNumericType + Send> {
data_type: DataType,
group_values: Vec<Vec<T::Native>>,
Copy link
Contributor

@korowa korowa Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wonder -- using Vec<Vec<>> for as a state storage doesn't seem to differ much from what regular accumulator does, but this PR still introduces a noticeable performance improvement. Are there any other optimizations that could be used in regular accumulator?

P.S. asking just because when I was doing +- same for count distinct (PR), the performance for GroupsAccumulator with Vec<HashSet<>> was not that significant comparing to regular accumulators with HashSet<> states.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think among other things, the intermediate state management (creating ListArrays directly rather than from ScalarValue) probably helps a lot:

https://github.com/apache/datafusion/blob/6c9355d5be8b6045865fed67cb6d028b2dfc2e06/datafusion/functions-aggregate/src/median.rs#L200-L199

There is also an extra allocation per group when using the groups accumulator adapter thingie

That being said, it is a fair question how much better the existing MedianAccumulator could be if it built the ListArrays as does this PR directly 🤔

Copy link
Contributor Author

@Rachelint Rachelint Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@korowa I think what mentioned by @alamb is a important point about the improvement.

Following are some other points for me:

  • in GroupsAccumulatorAdapter::update_batch, we need to reorder the input batch, and use slice to split the reordered batch after. I think such two operations may be not cheap.

    let values = take_arrays(values, &batch_indices, None)?;
    let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
    // invoke each accumulator with the appropriate rows, first
    // pulling the input arguments for this group into their own
    // RecordBatch(es)
    let iter = groups_with_rows.iter().zip(offsets.windows(2));
    let mut sizes_pre = 0;
    let mut sizes_post = 0;
    for (&group_idx, offsets) in iter {
    let state = &mut self.states[group_idx];
    sizes_pre += state.size();
    let values_to_accumulate = slice_and_maybe_filter(
    &values,
    opt_filter.as_ref().map(|f| f.as_boolean()),
    offsets,
    )?;
    f(state.accumulator.as_mut(), &values_to_accumulate)?;
    // clear out the state so they are empty for next
    // iteration
    state.indices.clear();
    sizes_post += state.size();

  • in GroupsAccumulatorAdapter::merge_batch, the similar problem as input batch may be even more serious... Becasue we need to reorder a ListArray

  • and in GroupsAccumulatorAdapter::state, extra allocations exist as mentioned by @alamb .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@korowa it means impl a group accumulator for distinct count not get a obviously improvement?
It is really surprise for me, I am learning #8721

Copy link
Contributor

@korowa korowa Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was some improvements, but overall results for clickbench q9 (I was mostly looking at this query) were like x2.63 for GroupsAccumulator, and x2.30 for the regular Accumulator -- so it would be like 13-15% overall difference, which is not as massive as this PR results.

However, maybe things has changed in GroupsAccumulator implementation, and now even plain Vec<HashSet<>> will be way faster.

UPD: and, yes, maybe producing state, as pointed out by @alamb above, was (at least partially) the cause of non-significant improvement -- in count distinct it was implemented via ListArray::from_iter_primitive (commit), instead of building it from single flattened array and its offsets.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seem really worth seeking the reason more deeply.

@alamb
Copy link
Contributor

alamb commented Jan 28, 2025

Sorry for the delay @Rachelint . I was having trouble with the benchmark queries

Here are my benchmark results -- not bad :bowtie:

almost 7x faster for our extended clickbench query:

SELECT "ClientIP", "WatchID", COUNT(*) c, MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, MAX("ResponseStartTiming") tmax FROM hits WHERE "JavaEnable" = 0 GROUP BY "ClientIP", "WatchID" HAVING c > 1 ORDER BY tmed DESC LIMIT 10;

And the actual h2o benchmark (which is dominated by CSV parsing) also shows a noticeable 1.6x improvement

SELECT id4, id5, MEDIAN(v3) AS median_v3, STDDEV(v3) AS sd_v3 FROM x GROUP BY id4, id5;

--------------------
Benchmark clickbench_extended.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃  main_base ┃ impl-group-accumulator-for-medi… ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │  2753.78ms │                        2717.63ms │     no change │
│ QQuery 1     │   828.33ms │                         736.61ms │ +1.12x faster │
│ QQuery 2     │  1621.34ms │                        1507.86ms │ +1.08x faster │
│ QQuery 3     │   734.73ms │                         739.59ms │     no change │
│ QQuery 4     │ 12552.79ms │                        1823.32ms │ +6.88x faster │
│ QQuery 5     │ 19545.52ms │                       19039.51ms │     no change │
└──────────────┴────────────┴──────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main_base)                           │ 38036.49ms │
│ Total Time (impl-group-accumulator-for-median)   │ 26564.53ms │
│ Average Time (main_base)                         │  6339.41ms │
│ Average Time (impl-group-accumulator-for-median) │  4427.42ms │
│ Queries Faster                                   │          3 │
│ Queries Slower                                   │          0 │
│ Queries with No Change                           │          3 │
└──────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark h2o.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃  main_base ┃ impl-group-accumulator-for-medi… ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  2207.34ms │                        2168.72ms │     no change │
│ QQuery 2     │  5739.73ms │                        5757.47ms │     no change │
│ QQuery 3     │  4330.60ms │                        4332.62ms │     no change │
│ QQuery 4     │  3008.24ms │                        2998.72ms │     no change │
│ QQuery 5     │  4108.77ms │                        4072.95ms │     no change │
│ QQuery 6     │  6834.05ms │                        4160.14ms │ +1.64x faster │
│ QQuery 7     │  4059.20ms │                        4019.71ms │     no change │
│ QQuery 8     │  8013.99ms │                        8108.63ms │     no change │
│ QQuery 9     │ 10774.38ms │                       10642.69ms │     no change │
│ QQuery 10    │  8018.83ms │                        7916.58ms │     no change │
└──────────────┴────────────┴──────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main_base)                           │ 57095.14ms │
│ Total Time (impl-group-accumulator-for-median)   │ 54178.23ms │
│ Average Time (main_base)                         │  5709.51ms │
│ Average Time (impl-group-accumulator-for-median) │  5417.82ms │
│ Queries Faster                                   │          1 │
│ Queries Slower                                   │          0 │
│ Queries with No Change                           │          9 │
└──────────────────────────────────────────────────┴────────────┘

@Rachelint
Copy link
Contributor Author

@korowa would you like to review this one again before merging?

@Rachelint
Copy link
Contributor Author

Rachelint commented Jan 29, 2025

Will plan to merge this one tomorrow if there is not anyone else who would like time to review

.with_data_type(self.data_type.clone());

// `offsets` in `ListArray`, each row as a list element
assert!(input_array.len() <= i32::MAX as usize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could use i32::try_from here instead of assert + following cast in range creation? I cannot imagine a real life use-case when this assertion will fail, but it still can be avoided

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i32::try_from is indeed better, fixed

@korowa
Copy link
Contributor

korowa commented Jan 29, 2025

@korowa would you like to review this one again before merging?

@Rachelint, I've partially went through it, and haven't found any major or blocking issues, so it looks like good to go.

@Rachelint Rachelint force-pushed the impl-group-accumulator-for-median branch from abc0068 to 85ed001 Compare January 29, 2025 06:01
@Rachelint Rachelint force-pushed the impl-group-accumulator-for-median branch from 85ed001 to e963d50 Compare January 29, 2025 06:16
.with_data_type(self.data_type.clone());

// `offsets` in `ListArray`, each row as a list element
let offset_end = i32::try_from(input_array.len()).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And another one here -- why unwrap() and not ?, since we can return Error here?

Copy link
Contributor Author

@Rachelint Rachelint Jan 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, it is better to just use it to replace assert, fixed.
Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate functions sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Improve performance of median function
5 participants