-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-45611: [C++][Acero] Improve Swiss join build performance by partitioning batches ahead to reduce contention #45612
GH-45611: [C++][Acero] Improve Swiss join build performance by partitioning batches ahead to reduce contention #45612
Conversation
|
281ca6d
to
30f6adf
Compare
|
3 similar comments
|
|
|
I'm pretty excited by this optimization. @pitrou @westonpace would you mind to take a look? Thanks. |
Does this also remove the spin locks? |
std::vector<uint32_t> hashes; | ||
std::vector<uint16_t> prtn_ranges; | ||
std::vector<uint16_t> prtn_row_ids; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these one value per row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is what I meant in the Overhead section of the PR description (quoted below).
... and worsen the memory profile by 6 bytes per row (4 bytes for hash and 2 bytes for row id in partition).
Some more details you may also want to know:
- The
prtn_ranges
is one element per partition. - This
BatchState
struct is per batch.
Both are less space complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be worth adding comments as to that (the cardinality of each BatchState member).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Comments added.
Besides the two comments above, I don't feel competent to review this, sorry. |
This removes the spin lock usage in Swiss join build, but the spin lock itself, namely
No problem, I appreciate the comments anyway! |
@@ -1112,7 +1113,7 @@ Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t | |||
|
|||
// Make sure that we do not use many partitions if there are not enough rows. | |||
// | |||
constexpr int64_t min_num_rows_per_prtn = 1 << 18; | |||
constexpr int64_t min_num_rows_per_prtn = 1 << 12; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the contention is eliminated, we can be a little more aggressive on the parallelism.
Hi @westonpace , given the absence of Antoine's approval. I think I'll need your help on reviewing this. Appreciate it. |
…BatchState member
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(I'm reviewing... Please wait for a while...)
Sure. Just take your time. Much appreciated @kou ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
cpp/src/arrow/acero/swiss_join.cc
Outdated
// Update hashes, shifting left to get rid of the bits that were already used | ||
// for partitioning. | ||
// | ||
for (size_t i = 0; i < locals.batch_hashes.size(); ++i) { | ||
locals.batch_hashes[i] <<= log_num_prtns_; | ||
for (size_t i = 0; i < batch_state.hashes.size(); ++i) { | ||
batch_state.hashes[i] <<= log_num_prtns_; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related to this PR but we don't need to do this when num_prtns_ == 1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Updated.
cpp/src/arrow/acero/swiss_join.cc
Outdated
DCHECK_LE(static_cast<int64_t>(thread_id), dop_); | ||
DCHECK_LE(batch_id, static_cast<int64_t>(batch_states_.size())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
(We may want to add DCHECK_LT()
for prtn_id
too.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
cpp/src/arrow/acero/swiss_join.cc
Outdated
payload_batch.length = input_batch.length; | ||
for (size_t icol = 0; icol < payload_batch.values.size(); ++icol) { | ||
payload_batch.values[icol] = | ||
input_batch.values[schema->num_cols(HashJoinProjection::KEY) + icol]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related to this PR but can we avoid calling schema->num_cols()
in this loop? (I'm not sure whether this is a performance impact code but it seems that we can use pre-computed value.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Co-authored-by: Sutou Kouhei <[email protected]>
@github-actions crossbow submit -g cpp -g r |
Revision: 50e3cc7 Submitted crossbow builds: ursacomputing/crossbow @ actions-9a599a63ba |
@ursabot please benchmark lang=C++ lang=R |
Supported benchmark command examples:
To run all benchmarks: To filter benchmarks by language: To filter Python and R benchmarks by name: To filter C++ benchmarks by archery --suite-filter and --benchmark-filter: For other |
@ursabot please benchmark lang=C++ |
Benchmark runs are scheduled for commit 50e3cc7. Watch https://buildkite.com/apache-arrow and https://conbench.ursa.dev for updates. A comment will be posted here when the runs are complete. |
@ursabot please benchmark lang=R |
Commit 50e3cc7 already has scheduled benchmark runs. |
@ursabot please benchmark lang=R |
Commit 50e3cc7 already has scheduled benchmark runs. |
@ursabot please benchmark |
Commit 50e3cc7 already has scheduled benchmark runs. |
Thanks for your patience. Conbench analyzed the 4 benchmarking runs that have been run so far on PR commit 50e3cc7. There was 1 benchmark result indicating a performance regression:
The full Conbench report has more details. |
@ursabot please benchmark |
Commit 50e3cc7 already has scheduled benchmark runs. |
@ursabot please benchmark |
Benchmark runs are scheduled for commit 1009825. Watch https://buildkite.com/apache-arrow and https://conbench.ursa.dev for updates. A comment will be posted here when the runs are complete. |
Thanks for your patience. Conbench analyzed the 4 benchmarking runs that have been run so far on PR commit 1009825. There were 25 benchmark results indicating a performance regression:
The full Conbench report has more details. |
The CI failures and regressions are not related. The benchmarking in my PR description (namely ![]() I'll merge this. And much appreciate the review from @kou and @pitrou ! |
After merging your PR, Conbench analyzed the 4 benchmarking runs that have been run so far on merge-commit 30adc91. There were no benchmark performance regressions. 🎉 The full Conbench report has more details. It also includes information about 15 possible false positives for unstable benchmarks that are known to sometimes produce them. |
Rationale for this change
High contention is observed in Swiss join build phase as showed in #45611 .
A little background about the contention. To build the hash table in parallel, we first build
N
partitioned hash tables (the "build" stage), then merge them together into the final hash table (the "merge" stage, less interesting in this PR). In the build stage, each one of the exec batches from the build side table is distributed to one of theM
threads. Each such thread processes each one of the assigned batches by:N
partitions;N
partitions into the corresponding one of theN
partitioned hash tables.Because each batch contains arbitrary data, all
M
threads will write to allN
partitioned hash tables simultaneously. So we use (spin) locks on these partitioned hash tables, thus the contention.What changes are included in this PR?
Instead of all
M
threads writing to allN
partitioned hash tables simultaneously, we can further split the build stage into two:M
threads, each only partitions the batches and preserves the partition info of each batch;N
threads, each builds one of theN
partitioned hash tables. Every thread will iterate all the batches and only insert the belonging rows of the batch into its assigned hash table.Performance
Take this benchmark, which is dedicated for the performance of parallel build, the result shows by eliminating the contention, we can achieve up to 10x (on Arm) and 5x (on Intel) performance boost for Swiss join build. I picked
krows=64
andkrows=512
and made a chart.Note the single thread performance is actually down a little bit (reasons detailed later). But IMO this is quite trivial compared to the total win of multi-threaded cases.
Detailed benchmark numbers (on Arm) follow.
Benchmark After (Click to expand)
Benchmark After (Click to expand)
Overhead
This change introduces some overhead indeed. First, in the old implementation, the partition info is used right way after partitioning the batch, whereas the new implementation preserves the partition info and uses it in the next stage (potentially by other thread). This may be less cache friendly. Second, preserving the the partition info requires more memory: the increased allocation may hurt performance a bit, and worsen the memory profile by 6 bytes per row (4 bytes for hash and 2 bytes for row id in partition).
But as mentioned above, almost all multi-threaded cases are winning. Even nicer, the increased memory profile spans only a short period and doesn't really increase the peak memory: the peak moment always comes in the merge stage, and by that time, the preserved partition info for all batches are released already. This is verified by printing the memory pool stats when benchmarking in my local.
Are these changes tested?
Yes. Existing tests suffice.
Are there any user-facing changes?
None.
This PR includes breaking changes to public APIs. (If there are any breaking changes to public APIs, please explain which changes are breaking. If not, you can remove this.)
This PR contains a "Critical Fix". (If the changes fix either (a) a security vulnerability, (b) a bug that caused incorrect or invalid data to be produced, or (c) a bug that causes a crash (even when the API contract is upheld), please provide explanation. If not, you can remove this.)