From 30f6adf67a3fd64bb0cb418bc7ea3e270f16cd93 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Thu, 20 Feb 2025 16:54:32 +0800 Subject: [PATCH 1/9] Reduce swiss join build contention by pre-partition the batches --- cpp/src/arrow/acero/swiss_join.cc | 189 ++++++++++++---------- cpp/src/arrow/acero/swiss_join_internal.h | 43 +++-- 2 files changed, 135 insertions(+), 97 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index a1faef4679729..410c464e48219 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1102,7 +1102,8 @@ uint32_t SwissTableForJoin::payload_id_to_key_id(uint32_t payload_id) const { } Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t num_rows, - bool reject_duplicate_keys, bool no_payload, + int64_t num_batches, bool reject_duplicate_keys, + bool no_payload, const std::vector& key_types, const std::vector& payload_types, MemoryPool* pool, int64_t hardware_flags) { @@ -1112,7 +1113,7 @@ Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t // Make sure that we do not use many partitions if there are not enough rows. // - constexpr int64_t min_num_rows_per_prtn = 1 << 18; + constexpr int64_t min_num_rows_per_prtn = 1 << 12; log_num_prtns_ = std::min(bit_util::Log2(dop_), bit_util::Log2(bit_util::CeilDiv(num_rows, min_num_rows_per_prtn))); @@ -1123,9 +1124,9 @@ Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t pool_ = pool; hardware_flags_ = hardware_flags; + batch_states_.resize(num_batches); prtn_states_.resize(num_prtns_); thread_states_.resize(dop_); - prtn_locks_.Init(dop_, num_prtns_); RowTableMetadata key_row_metadata; key_row_metadata.FromColumnMetadataVector(key_types, @@ -1154,91 +1155,73 @@ Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t return Status::OK(); } -Status SwissTableForJoinBuild::PushNextBatch(int64_t thread_id, - const ExecBatch& key_batch, - const ExecBatch* payload_batch_maybe_null, - arrow::util::TempVectorStack* temp_stack) { - ARROW_DCHECK(thread_id < dop_); +Status SwissTableForJoinBuild::PartitionBatch(size_t thread_id, int64_t batch_id, + const ExecBatch& key_batch, + arrow::util::TempVectorStack* temp_stack) { + DCHECK_LE(static_cast(thread_id), dop_); + DCHECK_LE(batch_id, static_cast(batch_states_.size())); ThreadState& locals = thread_states_[thread_id]; + BatchState& batch_state = batch_states_[batch_id]; + uint16_t num_rows = static_cast(key_batch.length); // Compute hash // - locals.batch_hashes.resize(key_batch.length); - RETURN_NOT_OK(Hashing32::HashBatch( - key_batch, locals.batch_hashes.data(), locals.temp_column_arrays, hardware_flags_, - temp_stack, /*start_row=*/0, static_cast(key_batch.length))); + batch_state.hashes.resize(num_rows); + RETURN_NOT_OK(Hashing32::HashBatch(key_batch, batch_state.hashes.data(), + locals.temp_column_arrays, hardware_flags_, + temp_stack, /*start_row=*/0, num_rows)); // Partition on hash // - locals.batch_prtn_row_ids.resize(locals.batch_hashes.size()); - locals.batch_prtn_ranges.resize(num_prtns_ + 1); - int num_rows = static_cast(locals.batch_hashes.size()); + batch_state.prtn_ranges.resize(num_prtns_ + 1); + batch_state.prtn_row_ids.resize(num_rows); if (num_prtns_ == 1) { // We treat single partition case separately to avoid extra checks in row // partitioning implementation for general case. // - locals.batch_prtn_ranges[0] = 0; - locals.batch_prtn_ranges[1] = num_rows; - for (int i = 0; i < num_rows; ++i) { - locals.batch_prtn_row_ids[i] = i; + batch_state.prtn_ranges[0] = 0; + batch_state.prtn_ranges[1] = num_rows; + for (uint16_t i = 0; i < num_rows; ++i) { + batch_state.prtn_row_ids[i] = i; } } else { PartitionSort::Eval( - static_cast(locals.batch_hashes.size()), num_prtns_, - locals.batch_prtn_ranges.data(), - [this, &locals](int64_t i) { + num_rows, num_prtns_, batch_state.prtn_ranges.data(), + [this, &batch_state](int64_t i) { // SwissTable uses the highest bits of the hash for block index. // We want each partition to correspond to a range of block indices, // so we also partition on the highest bits of the hash. // - return locals.batch_hashes[i] >> (SwissTable::bits_hash_ - log_num_prtns_); + return batch_state.hashes[i] >> (SwissTable::bits_hash_ - log_num_prtns_); }, - [&locals](int64_t i, int pos) { - locals.batch_prtn_row_ids[pos] = static_cast(i); + [&batch_state](int64_t i, int pos) { + batch_state.prtn_row_ids[pos] = static_cast(i); }); } // Update hashes, shifting left to get rid of the bits that were already used // for partitioning. // - for (size_t i = 0; i < locals.batch_hashes.size(); ++i) { - locals.batch_hashes[i] <<= log_num_prtns_; + for (size_t i = 0; i < batch_state.hashes.size(); ++i) { + batch_state.hashes[i] <<= log_num_prtns_; } - // For each partition: - // - map keys to unique integers using (this partition's) hash table - // - append payloads (if present) to (this partition's) row array - // - locals.temp_prtn_ids.resize(num_prtns_); - - RETURN_NOT_OK(prtn_locks_.ForEachPartition( - thread_id, locals.temp_prtn_ids.data(), - /*is_prtn_empty_fn=*/ - [&](int prtn_id) { - return locals.batch_prtn_ranges[prtn_id + 1] == locals.batch_prtn_ranges[prtn_id]; - }, - /*process_prtn_fn=*/ - [&](int prtn_id) { - return ProcessPartition(thread_id, key_batch, payload_batch_maybe_null, - temp_stack, prtn_id); - })); - return Status::OK(); } -Status SwissTableForJoinBuild::ProcessPartition(int64_t thread_id, - const ExecBatch& key_batch, - const ExecBatch* payload_batch_maybe_null, - arrow::util::TempVectorStack* temp_stack, - int prtn_id) { - ARROW_DCHECK(thread_id < dop_); +Status SwissTableForJoinBuild::ProcessPartition( + size_t thread_id, int64_t batch_id, int prtn_id, const ExecBatch& key_batch, + const ExecBatch* payload_batch_maybe_null, arrow::util::TempVectorStack* temp_stack) { + DCHECK_LE(static_cast(thread_id), dop_); + DCHECK_LE(batch_id, static_cast(batch_states_.size())); ThreadState& locals = thread_states_[thread_id]; + BatchState& batch_state = batch_states_[batch_id]; + PartitionState& prtn_state = prtn_states_[prtn_id]; int num_rows_new = - locals.batch_prtn_ranges[prtn_id + 1] - locals.batch_prtn_ranges[prtn_id]; + batch_state.prtn_ranges[prtn_id + 1] - batch_state.prtn_ranges[prtn_id]; const uint16_t* row_ids = - locals.batch_prtn_row_ids.data() + locals.batch_prtn_ranges[prtn_id]; - PartitionState& prtn_state = prtn_states_[prtn_id]; + batch_state.prtn_row_ids.data() + batch_state.prtn_ranges[prtn_id]; size_t num_rows_before = prtn_state.key_ids.size(); // Insert new keys into hash table associated with the current partition // and map existing keys to integer ids. @@ -1247,7 +1230,7 @@ Status SwissTableForJoinBuild::ProcessPartition(int64_t thread_id, SwissTableWithKeys::Input input(&key_batch, num_rows_new, row_ids, temp_stack, &locals.temp_column_arrays, &locals.temp_group_ids); RETURN_NOT_OK(prtn_state.keys.MapWithInserts( - &input, locals.batch_hashes.data(), prtn_state.key_ids.data() + num_rows_before)); + &input, batch_state.hashes.data(), prtn_state.key_ids.data() + num_rows_before)); // Append input batch rows from current partition to an array of payload // rows for this partition. // @@ -2504,6 +2487,13 @@ class SwissJoin : public HashJoinImpl { } void InitTaskGroups() { + task_group_partition_ = register_task_group_callback_( + [this](size_t thread_index, int64_t task_id) -> Status { + return PartitionTask(thread_index, task_id); + }, + [this](size_t thread_index) -> Status { + return PartitionFinished(thread_index); + }); task_group_build_ = register_task_group_callback_( [this](size_t thread_index, int64_t task_id) -> Status { return BuildTask(thread_index, task_id); @@ -2593,16 +2583,16 @@ class SwissJoin : public HashJoinImpl { hash_table_build_ = std::make_unique(); RETURN_NOT_OK(CancelIfNotOK(hash_table_build_->Init( &hash_table_, num_threads_, build_side_batches_.row_count(), - reject_duplicate_keys, no_payload, key_types, payload_types, pool_, - hardware_flags_))); + build_side_batches_.batch_count(), reject_duplicate_keys, no_payload, key_types, + payload_types, pool_, hardware_flags_))); // Process all input batches // - return CancelIfNotOK( - start_task_group_callback_(task_group_build_, build_side_batches_.batch_count())); + return CancelIfNotOK(start_task_group_callback_(task_group_partition_, + build_side_batches_.batch_count())); } - Status BuildTask(size_t thread_id, int64_t batch_id) { + Status PartitionTask(size_t thread_id, int64_t batch_id) { if (IsCancelled()) { return Status::OK(); } @@ -2610,41 +2600,77 @@ class SwissJoin : public HashJoinImpl { DCHECK_GT(build_side_batches_[batch_id].length, 0); const HashJoinProjectionMaps* schema = schema_[1]; - DCHECK_NE(hash_table_build_, nullptr); - bool no_payload = hash_table_build_->no_payload(); - ExecBatch input_batch; ARROW_ASSIGN_OR_RAISE( input_batch, KeyPayloadFromInput(/*side=*/1, &build_side_batches_[batch_id])); - // Split batch into key batch and optional payload batch - // - // Input batch is key-payload batch (key columns followed by payload - // columns). We split it into two separate batches. - // - // TODO: Change SwissTableForJoinBuild interface to use key-payload - // batch instead to avoid this operation, which involves increasing - // shared pointer ref counts. - // ExecBatch key_batch({}, input_batch.length); key_batch.values.resize(schema->num_cols(HashJoinProjection::KEY)); for (size_t icol = 0; icol < key_batch.values.size(); ++icol) { key_batch.values[icol] = input_batch.values[icol]; } - ExecBatch payload_batch({}, input_batch.length); + arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack; + + DCHECK_NE(hash_table_build_, nullptr); + return hash_table_build_->PartitionBatch(static_cast(thread_id), batch_id, + key_batch, temp_stack); + } + + Status PartitionFinished(size_t thread_id) { + RETURN_NOT_OK(status()); + + DCHECK_NE(hash_table_build_, nullptr); + return CancelIfNotOK( + start_task_group_callback_(task_group_build_, hash_table_build_->num_prtns())); + } + + Status BuildTask(size_t thread_id, int64_t prtn_id) { + if (IsCancelled()) { + return Status::OK(); + } + const HashJoinProjectionMaps* schema = schema_[1]; + DCHECK_NE(hash_table_build_, nullptr); + bool no_payload = hash_table_build_->no_payload(); + ExecBatch key_batch, payload_batch; + key_batch.values.resize(schema->num_cols(HashJoinProjection::KEY)); if (!no_payload) { payload_batch.values.resize(schema->num_cols(HashJoinProjection::PAYLOAD)); - for (size_t icol = 0; icol < payload_batch.values.size(); ++icol) { - payload_batch.values[icol] = - input_batch.values[schema->num_cols(HashJoinProjection::KEY) + icol]; - } } arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack; - DCHECK_NE(hash_table_build_, nullptr); - RETURN_NOT_OK(CancelIfNotOK(hash_table_build_->PushNextBatch( - static_cast(thread_id), key_batch, no_payload ? nullptr : &payload_batch, - temp_stack))); + + for (int64_t batch_id = 0; + batch_id < static_cast(build_side_batches_.batch_count()); ++batch_id) { + ExecBatch input_batch; + ARROW_ASSIGN_OR_RAISE( + input_batch, KeyPayloadFromInput(/*side=*/1, &build_side_batches_[batch_id])); + + // Split batch into key batch and optional payload batch + // + // Input batch is key-payload batch (key columns followed by payload + // columns). We split it into two separate batches. + // + // TODO: Change SwissTableForJoinBuild interface to use key-payload + // batch instead to avoid this operation, which involves increasing + // shared pointer ref counts. + // + key_batch.length = input_batch.length; + for (size_t icol = 0; icol < key_batch.values.size(); ++icol) { + key_batch.values[icol] = input_batch.values[icol]; + } + + if (!no_payload) { + payload_batch.length = input_batch.length; + for (size_t icol = 0; icol < payload_batch.values.size(); ++icol) { + payload_batch.values[icol] = + input_batch.values[schema->num_cols(HashJoinProjection::KEY) + icol]; + } + } + + RETURN_NOT_OK(CancelIfNotOK(hash_table_build_->ProcessPartition( + thread_id, batch_id, static_cast(prtn_id), key_batch, + no_payload ? nullptr : &payload_batch, temp_stack))); + } return Status::OK(); } @@ -2897,6 +2923,7 @@ class SwissJoin : public HashJoinImpl { const HashJoinProjectionMaps* schema_[2]; // Task scheduling + int task_group_partition_; int task_group_build_; int task_group_merge_; int task_group_scan_; diff --git a/cpp/src/arrow/acero/swiss_join_internal.h b/cpp/src/arrow/acero/swiss_join_internal.h index c7af6517d7c25..7799c3a8349f0 100644 --- a/cpp/src/arrow/acero/swiss_join_internal.h +++ b/cpp/src/arrow/acero/swiss_join_internal.h @@ -523,19 +523,27 @@ class SwissTableForJoin { // class SwissTableForJoinBuild { public: - Status Init(SwissTableForJoin* target, int dop, int64_t num_rows, + Status Init(SwissTableForJoin* target, int dop, int64_t num_rows, int64_t num_batches, bool reject_duplicate_keys, bool no_payload, const std::vector& key_types, const std::vector& payload_types, MemoryPool* pool, int64_t hardware_flags); - // In the first phase of parallel hash table build, threads pick unprocessed - // exec batches, partition the rows based on hash, and update all of the - // partitions with information related to that batch of rows. + // In the first phase of parallel hash table build, each thread picks unprocessed exec + // batches, hashes the batches and preserve the hashes, then partition the rows based on + // hashes. // - Status PushNextBatch(int64_t thread_id, const ExecBatch& key_batch, - const ExecBatch* payload_batch_maybe_null, - arrow::util::TempVectorStack* temp_stack); + Status PartitionBatch(size_t thread_id, int64_t batch_id, const ExecBatch& key_batch, + arrow::util::TempVectorStack* temp_stack); + + // In the second phase of parallel hash table build, each thread picks the given + // partition of all batches, and updates that particular partition with information + // related to that batch of rows. + // + Status ProcessPartition(size_t thread_id, int64_t batch_id, int prtn_id, + const ExecBatch& key_batch, + const ExecBatch* payload_batch_maybe_null, + arrow::util::TempVectorStack* temp_stack); // Allocate memory and initialize counters required for parallel merging of // hash table partitions. @@ -543,7 +551,7 @@ class SwissTableForJoinBuild { // Status PreparePrtnMerge(); - // Second phase of parallel hash table build. + // Third phase of parallel hash table build. // Each partition can be processed by a different thread. // Parallel step. // @@ -564,9 +572,6 @@ class SwissTableForJoinBuild { private: void InitRowArray(); - Status ProcessPartition(int64_t thread_id, const ExecBatch& key_batch, - const ExecBatch* payload_batch_maybe_null, - arrow::util::TempVectorStack* temp_stack, int prtn_id); SwissTableForJoin* target_; // DOP stands for Degree Of Parallelism - the maximum number of participating @@ -604,6 +609,16 @@ class SwissTableForJoinBuild { MemoryPool* pool_; int64_t hardware_flags_; + // One per batch. + // + // Informations like hashes and partitions of each batch. + // + struct BatchState { + std::vector hashes; + std::vector prtn_ranges; + std::vector prtn_row_ids; + }; + // One per partition. // struct PartitionState { @@ -620,17 +635,13 @@ class SwissTableForJoinBuild { // batches. // struct ThreadState { - std::vector batch_hashes; - std::vector batch_prtn_ranges; - std::vector batch_prtn_row_ids; - std::vector temp_prtn_ids; std::vector temp_group_ids; std::vector temp_column_arrays; }; + std::vector batch_states_; std::vector prtn_states_; std::vector thread_states_; - PartitionLocks prtn_locks_; std::vector partition_keys_first_row_id_; std::vector partition_payloads_first_row_id_; From b2b67e406cbe0439bc17ebdec4e3884bbc650864 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Fri, 28 Feb 2025 14:47:28 +0800 Subject: [PATCH 2/9] Address comment about adding description for the cardinality of each BatchState member --- cpp/src/arrow/acero/swiss_join_internal.h | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/swiss_join_internal.h b/cpp/src/arrow/acero/swiss_join_internal.h index 7799c3a8349f0..365f2917d8eff 100644 --- a/cpp/src/arrow/acero/swiss_join_internal.h +++ b/cpp/src/arrow/acero/swiss_join_internal.h @@ -611,11 +611,17 @@ class SwissTableForJoinBuild { // One per batch. // - // Informations like hashes and partitions of each batch. + // Informations like hashes and partitions of each batch gathered in the partition phase + // and used in the build phase. // struct BatchState { + // Hashes for the batch, preserved in the partition phase to avoid recomputation in + // the build phase. One element per row in the batch. std::vector hashes; + // Accumulative number of rows in each partition for the batch. `num_prtns_` + 1 + // elements. std::vector prtn_ranges; + // Row ids after partition sorting the batch. One element per row in the batch. std::vector prtn_row_ids; }; From 00f90f8f03e2d6c5f62f85735bf3387f00ba57f3 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Mon, 3 Mar 2025 12:42:53 +0800 Subject: [PATCH 3/9] Update cpp/src/arrow/acero/swiss_join.cc Co-authored-by: Sutou Kouhei --- cpp/src/arrow/acero/swiss_join.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 410c464e48219..c6854f46dfc6a 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1158,7 +1158,7 @@ Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t Status SwissTableForJoinBuild::PartitionBatch(size_t thread_id, int64_t batch_id, const ExecBatch& key_batch, arrow::util::TempVectorStack* temp_stack) { - DCHECK_LE(static_cast(thread_id), dop_); + DCHECK_LT(static_cast(thread_id), dop_); DCHECK_LE(batch_id, static_cast(batch_states_.size())); ThreadState& locals = thread_states_[thread_id]; BatchState& batch_state = batch_states_[batch_id]; From c156d7d256913573f8277642bdc50d1cf031d94b Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Mon, 3 Mar 2025 12:43:09 +0800 Subject: [PATCH 4/9] Update cpp/src/arrow/acero/swiss_join.cc Co-authored-by: Sutou Kouhei --- cpp/src/arrow/acero/swiss_join.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index c6854f46dfc6a..054aa85a36250 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1159,7 +1159,7 @@ Status SwissTableForJoinBuild::PartitionBatch(size_t thread_id, int64_t batch_id const ExecBatch& key_batch, arrow::util::TempVectorStack* temp_stack) { DCHECK_LT(static_cast(thread_id), dop_); - DCHECK_LE(batch_id, static_cast(batch_states_.size())); + DCHECK_LT(batch_id, static_cast(batch_states_.size())); ThreadState& locals = thread_states_[thread_id]; BatchState& batch_state = batch_states_[batch_id]; uint16_t num_rows = static_cast(key_batch.length); From a47203916c1dedb347eebd558c2082960396944e Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Mon, 3 Mar 2025 12:38:22 +0800 Subject: [PATCH 5/9] Address comment: Avoid duplicate call to schema->num_cols --- cpp/src/arrow/acero/swiss_join.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 054aa85a36250..f49d85c95abb5 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -2633,9 +2633,11 @@ class SwissJoin : public HashJoinImpl { DCHECK_NE(hash_table_build_, nullptr); bool no_payload = hash_table_build_->no_payload(); ExecBatch key_batch, payload_batch; - key_batch.values.resize(schema->num_cols(HashJoinProjection::KEY)); + auto num_keys = schema->num_cols(HashJoinProjection::KEY); + auto num_payloads = schema->num_cols(HashJoinProjection::PAYLOAD); + key_batch.values.resize(num_keys); if (!no_payload) { - payload_batch.values.resize(schema->num_cols(HashJoinProjection::PAYLOAD)); + payload_batch.values.resize(num_payloads); } arrow::util::TempVectorStack* temp_stack = &local_states_[thread_id].stack; @@ -2662,8 +2664,7 @@ class SwissJoin : public HashJoinImpl { if (!no_payload) { payload_batch.length = input_batch.length; for (size_t icol = 0; icol < payload_batch.values.size(); ++icol) { - payload_batch.values[icol] = - input_batch.values[schema->num_cols(HashJoinProjection::KEY) + icol]; + payload_batch.values[icol] = input_batch.values[num_keys + icol]; } } From 905f5fb580fa0483b555c502a4789c81899967fb Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Mon, 3 Mar 2025 12:42:16 +0800 Subject: [PATCH 6/9] Address comment: Avoid update hash for num_prtn == 1 --- cpp/src/arrow/acero/swiss_join.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index f49d85c95abb5..4758eae7329a7 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1197,13 +1197,13 @@ Status SwissTableForJoinBuild::PartitionBatch(size_t thread_id, int64_t batch_id [&batch_state](int64_t i, int pos) { batch_state.prtn_row_ids[pos] = static_cast(i); }); - } - // Update hashes, shifting left to get rid of the bits that were already used - // for partitioning. - // - for (size_t i = 0; i < batch_state.hashes.size(); ++i) { - batch_state.hashes[i] <<= log_num_prtns_; + // Update hashes, shifting left to get rid of the bits that were already used + // for partitioning. + // + for (size_t i = 0; i < batch_state.hashes.size(); ++i) { + batch_state.hashes[i] <<= log_num_prtns_; + } } return Status::OK(); From 513a1f6d8d5c0790379b81a20aa281eefe71a2f0 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Mon, 3 Mar 2025 12:48:10 +0800 Subject: [PATCH 7/9] Address comment: Refine DCHECK_LE --- cpp/src/arrow/acero/swiss_join.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 4758eae7329a7..5649ed466dc3c 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1158,7 +1158,7 @@ Status SwissTableForJoinBuild::Init(SwissTableForJoin* target, int dop, int64_t Status SwissTableForJoinBuild::PartitionBatch(size_t thread_id, int64_t batch_id, const ExecBatch& key_batch, arrow::util::TempVectorStack* temp_stack) { - DCHECK_LT(static_cast(thread_id), dop_); + DCHECK_LT(thread_id, thread_states_.size()); DCHECK_LT(batch_id, static_cast(batch_states_.size())); ThreadState& locals = thread_states_[thread_id]; BatchState& batch_state = batch_states_[batch_id]; @@ -1212,8 +1212,9 @@ Status SwissTableForJoinBuild::PartitionBatch(size_t thread_id, int64_t batch_id Status SwissTableForJoinBuild::ProcessPartition( size_t thread_id, int64_t batch_id, int prtn_id, const ExecBatch& key_batch, const ExecBatch* payload_batch_maybe_null, arrow::util::TempVectorStack* temp_stack) { - DCHECK_LE(static_cast(thread_id), dop_); - DCHECK_LE(batch_id, static_cast(batch_states_.size())); + DCHECK_LT(thread_id, thread_states_.size()); + DCHECK_LT(batch_id, static_cast(batch_states_.size())); + DCHECK_LT(prtn_id, prtn_states_.size()); ThreadState& locals = thread_states_[thread_id]; BatchState& batch_state = batch_states_[batch_id]; PartitionState& prtn_state = prtn_states_[prtn_id]; From 50e3cc7867e20dfc18af025ef3b834aadfe871de Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Mon, 3 Mar 2025 15:58:22 +0800 Subject: [PATCH 8/9] Fix CI warning --- cpp/src/arrow/acero/swiss_join.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 5649ed466dc3c..b4d89df290214 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -1214,7 +1214,7 @@ Status SwissTableForJoinBuild::ProcessPartition( const ExecBatch* payload_batch_maybe_null, arrow::util::TempVectorStack* temp_stack) { DCHECK_LT(thread_id, thread_states_.size()); DCHECK_LT(batch_id, static_cast(batch_states_.size())); - DCHECK_LT(prtn_id, prtn_states_.size()); + DCHECK_LT(static_cast(prtn_id), prtn_states_.size()); ThreadState& locals = thread_states_[thread_id]; BatchState& batch_state = batch_states_[batch_id]; PartitionState& prtn_state = prtn_states_[prtn_id]; From 10098256578cecc37d1e3d704db28854ceb50c95 Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Tue, 4 Mar 2025 10:38:33 +0800 Subject: [PATCH 9/9] Empty commit just to update commit hash