diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 1b811219c322e9..4c078094b98cd5 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -549,41 +549,15 @@ struct MultiCastSharedState : public BasicSharedState { std::unique_ptr multi_cast_data_streamer; }; -struct BlockRowPos { - int64_t block_num {}; //the pos at which block - int64_t row_num {}; //the pos at which row - int64_t pos {}; //pos = all blocks size + row_num - std::string debug_string() const { - std::string res = "\t block_num: "; - res += std::to_string(block_num); - res += "\t row_num: "; - res += std::to_string(row_num); - res += "\t pos: "; - res += std::to_string(pos); - return res; - } -}; - struct AnalyticSharedState : public BasicSharedState { ENABLE_FACTORY_CREATOR(AnalyticSharedState) public: AnalyticSharedState() = default; - - int64_t current_row_position = 0; - BlockRowPos partition_by_end; - int64_t input_total_rows = 0; - BlockRowPos all_block_end; - std::vector input_blocks; - bool input_eos = false; - BlockRowPos found_partition_end; - std::vector origin_cols; - std::vector input_block_first_row_positions; - std::vector> agg_input_columns; - - // TODO: maybe global? - std::vector partition_by_column_idxs; - std::vector ordey_by_column_idxs; + std::queue blocks_buffer; + std::mutex buffer_mutex; + bool sink_eos = false; + std::mutex sink_eos_lock; }; struct JoinSharedState : public BasicSharedState { diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 7cc25eef9446d6..72213e74406845 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -18,6 +18,10 @@ #include "analytic_sink_operator.h" +#include + +#include +#include #include #include "pipeline/exec/operator.h" @@ -30,10 +34,74 @@ Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); - _evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime"); + _evaluation_timer = ADD_TIMER(profile(), "EvaluationTime"); _compute_agg_data_timer = ADD_TIMER(profile(), "ComputeAggDataTime"); _compute_partition_by_timer = ADD_TIMER(profile(), "ComputePartitionByTime"); _compute_order_by_timer = ADD_TIMER(profile(), "ComputeOrderByTime"); + _compute_range_between_function_timer = ADD_TIMER(profile(), "ComputeOrderByFunctionTime"); + _partition_search_timer = ADD_TIMER(profile(), "PartitionSearchTime"); + _order_search_timer = ADD_TIMER(profile(), "OrderSearchTime"); + _remove_rows_timer = ADD_TIMER(profile(), "RemoveRowsTime"); + _remove_rows = ADD_COUNTER(profile(), "RemoveRows", TUnit::UNIT); + _remove_count = ADD_COUNTER(profile(), "RemoveCount", TUnit::UNIT); + _blocks_memory_usage = + profile()->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage", 1); + _agg_arena_pool = std::make_unique(); + auto& p = _parent->cast(); + if (!p._has_window) { //haven't set window, Unbounded: [unbounded preceding,unbounded following] + // For window frame `ROWS|RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING` + _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_partition; + } else if (p._has_range_window) { + if (!p._has_window_start && !p._has_window_end) { + _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_partition; + } else { + if (!p._has_window_start && + p._window.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW) { + // For window frame `RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW` + _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_unbounded_range; + } else { + _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_range_between; + } + } + } else { + // haven't set start and end, same as PARTITION + if (!p._has_window_start && !p._has_window_end) { + _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_partition; + } else { + if (!p._has_window_start && + p._window.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW) { + _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_unbounded_rows; + } else { + _executor.get_next_impl = &AnalyticSinkLocalState::_get_next_for_sliding_rows; + } + } + + if (p._has_window_start) { //calculate start boundary + TAnalyticWindowBoundary b = p._window.window_start; + if (b.__isset.rows_offset_value) { //[offset , ] + _rows_start_offset = b.rows_offset_value; + if (b.type == TAnalyticWindowBoundaryType::PRECEDING) { + _rows_start_offset *= -1; //preceding--> negative + } //current_row 0 + } else { //following positive + DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW); //[current row, ] + _rows_start_offset = 0; + } + } + + if (p._has_window_end) { //calculate end boundary + TAnalyticWindowBoundary b = p._window.window_end; + if (b.__isset.rows_offset_value) { //[ , offset] + _rows_end_offset = b.rows_offset_value; + if (b.type == TAnalyticWindowBoundaryType::PRECEDING) { + _rows_end_offset *= -1; + } + } else { + DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW); //[ ,current row] + _rows_end_offset = 0; + } + } + } return Status::OK(); } @@ -42,159 +110,487 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); - _shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size()); - _shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size()); - - size_t agg_size = p._agg_expr_ctxs.size(); - _agg_expr_ctxs.resize(agg_size); - _shared_state->agg_input_columns.resize(agg_size); - for (int i = 0; i < agg_size; ++i) { - _shared_state->agg_input_columns[i].resize(p._num_agg_input[i]); + + _agg_functions_size = p._agg_functions_size; + _agg_expr_ctxs.resize(_agg_functions_size); + _agg_functions.resize(_agg_functions_size); + _agg_input_columns.resize(_agg_functions_size); + _offsets_of_aggregate_states.resize(_agg_functions_size); + _result_column_nullable_flags.resize(_agg_functions_size); + + for (int i = 0; i < _agg_functions_size; ++i) { + _agg_functions[i] = p._agg_functions[i]->clone(state, state->obj_pool()); + _agg_input_columns[i].resize(p._num_agg_input[i]); _agg_expr_ctxs[i].resize(p._agg_expr_ctxs[i].size()); for (int j = 0; j < p._agg_expr_ctxs[i].size(); ++j) { RETURN_IF_ERROR(p._agg_expr_ctxs[i][j]->clone(state, _agg_expr_ctxs[i][j])); + _agg_input_columns[i][j] = _agg_expr_ctxs[i][j]->root()->data_type()->create_column(); } - - for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) { - _shared_state->agg_input_columns[i][j] = - _agg_expr_ctxs[i][j]->root()->data_type()->create_column(); - } + _offsets_of_aggregate_states[i] = p._offsets_of_aggregate_states[i]; + _result_column_nullable_flags[i] = + !_agg_functions[i]->function()->get_return_type()->is_nullable() && + _agg_functions[i]->data_type()->is_nullable(); } - _partition_by_eq_expr_ctxs.resize(p._partition_by_eq_expr_ctxs.size()); - for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); i++) { + + _partition_exprs_size = p._partition_by_eq_expr_ctxs.size(); + _partition_by_eq_expr_ctxs.resize(_partition_exprs_size); + _partition_by_columns.resize(_partition_exprs_size); + for (size_t i = 0; i < _partition_exprs_size; i++) { RETURN_IF_ERROR( p._partition_by_eq_expr_ctxs[i]->clone(state, _partition_by_eq_expr_ctxs[i])); + _partition_by_columns[i] = + _partition_by_eq_expr_ctxs[i]->root()->data_type()->create_column(); } - _order_by_eq_expr_ctxs.resize(p._order_by_eq_expr_ctxs.size()); - for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); i++) { + + _order_by_exprs_size = p._order_by_eq_expr_ctxs.size(); + _order_by_eq_expr_ctxs.resize(_order_by_exprs_size); + _order_by_columns.resize(_order_by_exprs_size); + for (size_t i = 0; i < _order_by_exprs_size; i++) { RETURN_IF_ERROR(p._order_by_eq_expr_ctxs[i]->clone(state, _order_by_eq_expr_ctxs[i])); + _order_by_columns[i] = _order_by_eq_expr_ctxs[i]->root()->data_type()->create_column(); + } + + // only support one order by column, so need two columns upper and lower bound + _range_result_columns.resize(_range_between_expr_ctxs.size()); + _range_between_expr_ctxs = p._range_between_expr_ctxs; + for (size_t i = 0; i < _range_between_expr_ctxs.size(); i++) { + RETURN_IF_ERROR(p._range_between_expr_ctxs[i]->clone(state, _range_between_expr_ctxs[i])); + _range_result_columns[i] = + _range_between_expr_ctxs[i]->root()->data_type()->create_column(); } + + _fn_place_ptr = _agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states, + p._align_aggregate_states); + _create_agg_status(); return Status::OK(); } -bool AnalyticSinkLocalState::_whether_need_next_partition(BlockRowPos& found_partition_end) { - auto& shared_state = *_shared_state; - if (shared_state.input_eos || - (shared_state.current_row_position < - shared_state.partition_by_end.pos)) { //now still have partition data - return false; +Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) { + SCOPED_TIMER(exec_time_counter()); + SCOPED_TIMER(_close_timer); + if (_closed) { + return Status::OK(); + } + + _destroy_agg_status(); + _agg_arena_pool = nullptr; + + _result_window_columns.clear(); + _agg_input_columns.clear(); + _partition_by_columns.clear(); + _order_by_columns.clear(); + _range_result_columns.clear(); + return PipelineXSinkLocalState::close(state, exec_status); +} + +bool AnalyticSinkLocalState::_get_next_for_sliding_rows(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + int64_t current_row_start = 0; + int64_t current_row_end = _current_row_position + _rows_end_offset + 1; + + _reset_agg_status(); + if (!_parent->cast()._window.__isset.window_start) { + current_row_start = _partition_by_pose.start; + } else { + current_row_start = _current_row_position + _rows_start_offset; + } + // Eg: rows between unbounded preceding and 10 preceding + // Make sure range_start <= range_end + current_row_start = std::min(current_row_start, current_row_end); + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, current_row_start, + current_row_end); + _insert_result_info(1); + _current_row_position++; + if (_current_row_position - current_block_base_pos >= batch_rows) { + return true; + } + } + return false; +} + +bool AnalyticSinkLocalState::_get_next_for_unbounded_rows(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + // [preceding, current_row], [current_row, following] rewrite it's same + // as could reuse the previous calculate result, so don't call _reset_agg_status function + // going on calculate, add up data, no need to reset state + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, + _current_row_position, _current_row_position + 1); + _insert_result_info(1); + _current_row_position++; + if (_current_row_position - current_block_base_pos >= batch_rows) { + return true; + } } - if ((_partition_by_eq_expr_ctxs.empty() && !shared_state.input_eos) || - (found_partition_end.pos == 0)) { //no partition, get until fetch to EOS - return true; + return false; +} + +bool AnalyticSinkLocalState::_get_next_for_partition(int64_t batch_rows, + int64_t current_block_base_pos) { + if (_current_row_position == _partition_by_pose.start) { + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, + _partition_by_pose.start, _partition_by_pose.end); } - if (!_partition_by_eq_expr_ctxs.empty() && - found_partition_end.pos == shared_state.all_block_end.pos && - !shared_state.input_eos) { //current partition data calculate done - return true; + + // the end pos maybe after multis blocks, but should output by batch size and should not exceed partition end + auto window_end_pos = _current_row_position + batch_rows; + window_end_pos = std::min(window_end_pos, _partition_by_pose.end); + + auto previous_window_frame_width = _current_row_position - current_block_base_pos; + auto current_window_frame_width = window_end_pos - current_block_base_pos; + // should not exceed block batch size + current_window_frame_width = std::min(current_window_frame_width, batch_rows); + auto real_deal_with_width = current_window_frame_width - previous_window_frame_width; + + _insert_result_info(real_deal_with_width); + _current_row_position += real_deal_with_width; + return _current_row_position - current_block_base_pos >= batch_rows; +} + +bool AnalyticSinkLocalState::_get_next_for_unbounded_range(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + _update_order_by_range(); + if (_current_row_position == _order_by_pose.start) { + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, + _order_by_pose.start, _order_by_pose.end); + } + auto previous_window_frame_width = _current_row_position - current_block_base_pos; + auto current_window_frame_width = _order_by_pose.end - current_block_base_pos; + current_window_frame_width = std::min(current_window_frame_width, batch_rows); + auto real_deal_with_width = current_window_frame_width - previous_window_frame_width; + + _insert_result_info(real_deal_with_width); + _current_row_position += real_deal_with_width; + if (_current_row_position - current_block_base_pos >= batch_rows) { + return true; + } } return false; } -//_partition_by_columns,_order_by_columns save in blocks, so if need to calculate the boundary, may find in which blocks firstly -BlockRowPos AnalyticSinkLocalState::_compare_row_to_find_end(int64_t idx, BlockRowPos start, - BlockRowPos end, - bool need_check_first) { - auto& shared_state = *_shared_state; - int64_t start_init_row_num = start.row_num; - vectorized::ColumnPtr start_column = - shared_state.input_blocks[start.block_num].get_by_position(idx).column; - vectorized::ColumnPtr start_next_block_column = start_column; - - DCHECK_LE(start.block_num, end.block_num); - DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1); - int64_t start_block_num = start.block_num; - int64_t end_block_num = end.block_num; - int64_t mid_blcok_num = end.block_num; - // To fix this problem: https://github.com/apache/doris/issues/15951 - // in this case, the partition by column is last row of block, so it's pointed to a new block at row = 0, range is: [left, right) - // From the perspective of order by column, the two values are exactly equal. - // so the range will be get wrong because it's compare_at == 0 with next block at row = 0 - if (need_check_first && end.block_num > 0 && end.row_num == 0) { - end.block_num--; - end_block_num--; - end.row_num = shared_state.input_blocks[end_block_num].rows(); - } - //binary search find in which block - while (start_block_num < end_block_num) { - mid_blcok_num = (start_block_num + end_block_num + 1) >> 1; - start_next_block_column = - shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column; - //Compares (*this)[n] and rhs[m], this: start[init_row] rhs: mid[0] - if (start_column->compare_at(start_init_row_num, 0, *start_next_block_column, 1) == 0) { - start_block_num = mid_blcok_num; +bool AnalyticSinkLocalState::_get_next_for_range_between(int64_t batch_rows, + int64_t current_block_base_pos) { + while (_current_row_position < _partition_by_pose.end) { + _reset_agg_status(); + if (!_parent->cast()._window.__isset.window_start) { + _order_by_pose.start = _partition_by_pose.start; } else { - end_block_num = mid_blcok_num - 1; - } - } - - // have check the start.block_num: start_column[start_init_row_num] with mid_blcok_num start_next_block_column[0] - // now next block must not be result, so need check with end_block_num: start_next_block_column[last_row] - if (end_block_num == mid_blcok_num - 1) { - start_next_block_column = - shared_state.input_blocks[end_block_num].get_by_position(idx).column; - int64_t block_size = shared_state.input_blocks[end_block_num].rows(); - if ((start_column->compare_at(start_init_row_num, block_size - 1, *start_next_block_column, - 1) == 0)) { - start.block_num = end_block_num + 1; - start.row_num = 0; - return start; - } - } - - //check whether need get column again, maybe same as first init - // if the start_block_num have move to forword, so need update start block num and compare it from row_num=0 - if (start_block_num != start.block_num) { - start_init_row_num = 0; - start.block_num = start_block_num; - start_column = shared_state.input_blocks[start.block_num].get_by_position(idx).column; - } - //binary search, set start and end pos - int64_t start_pos = start_init_row_num; - int64_t end_pos = shared_state.input_blocks[start.block_num].rows(); - //if end_block_num haven't moved, only start_block_num go to the end block - //so could use the end.row_num for binary search - if (start.block_num == end.block_num) { - end_pos = end.row_num; - } - while (start_pos < end_pos) { - int64_t mid_pos = (start_pos + end_pos) >> 1; - if (start_column->compare_at(start_init_row_num, mid_pos, *start_column, 1)) { - end_pos = mid_pos; + _order_by_pose.start = find_first_not_equal( + _range_result_columns[0].get(), _order_by_columns[0].get(), + _current_row_position, _order_by_pose.start, _partition_by_pose.end); + } + + if (!_parent->cast()._window.__isset.window_end) { + _order_by_pose.end = _partition_by_pose.end; } else { - start_pos = mid_pos + 1; + _order_by_pose.end = find_first_not_equal( + _range_result_columns[1].get(), _order_by_columns[0].get(), + _current_row_position, _order_by_pose.end, _partition_by_pose.end); + } + _execute_for_function(_partition_by_pose.start, _partition_by_pose.end, + _order_by_pose.start, _order_by_pose.end); + _insert_result_info(1); + _current_row_position++; + if (_current_row_position - current_block_base_pos >= batch_rows) { + return true; + } + } + if (_current_row_position == _partition_by_pose.end) { + _order_by_pose.start = _partition_by_pose.end; // update to next partition pos + _order_by_pose.end = _partition_by_pose.end; + } + return false; +} + +Status AnalyticSinkLocalState::_execute_impl() { + while (_output_block_index < _input_blocks.size()) { + { + _get_partition_by_end(); + if (!_partition_by_pose.is_ended) { + break; + } + _init_result_columns(); + auto batch_rows = _input_blocks[_output_block_index].rows(); + auto current_block_base_pos = + _input_block_first_row_positions[_output_block_index] - _have_removed_rows; + bool should_output = false; + + { + SCOPED_TIMER(_evaluation_timer); + should_output = + (this->*_executor.get_next_impl)(batch_rows, current_block_base_pos); + } + + if (should_output) { + vectorized::Block block; + _output_current_block(&block); + _refresh_buffer_and_dependency_state(&block); + } + if (_current_row_position == _partition_by_pose.end) { + _reset_state_for_next_partition(); + } + } + } + return Status::OK(); +} + +void AnalyticSinkLocalState::_execute_for_function(int64_t partition_start, int64_t partition_end, + int64_t frame_start, int64_t frame_end) { + // here is the core function, should not add timer + // If the end is not greater than the start, the current window should be empty. + _current_window_empty = + std::min(frame_end, partition_end) <= std::max(frame_start, partition_start); + if (_current_window_empty) { + return; + } + for (size_t i = 0; i < _agg_functions_size; ++i) { + std::vector agg_columns; + for (int j = 0; j < _agg_input_columns[i].size(); ++j) { + agg_columns.push_back(_agg_input_columns[i][j].get()); + } + _agg_functions[i]->function()->add_range_single_place( + partition_start, partition_end, frame_start, frame_end, + _fn_place_ptr + _offsets_of_aggregate_states[i], agg_columns.data(), + _agg_arena_pool.get()); + } +} + +void AnalyticSinkLocalState::_insert_result_info(int64_t real_deal_with_width) { + // here is the core function, should not add timer + for (size_t i = 0; i < _agg_functions_size; ++i) { + for (size_t j = 0; j < real_deal_with_width; ++j) { + if (_result_column_nullable_flags[i]) { + if (_current_window_empty) { + _result_window_columns[i]->insert_default(); + } else { + auto* dst = assert_cast( + _result_window_columns[i].get()); + dst->get_null_map_data().push_back(0); + _agg_functions[i]->insert_result_info( + _fn_place_ptr + _offsets_of_aggregate_states[i], + &dst->get_nested_column()); + } + } else { + _agg_functions[i]->insert_result_info( + _fn_place_ptr + _offsets_of_aggregate_states[i], + _result_window_columns[i].get()); + } + } + } +} + +void AnalyticSinkLocalState::_output_current_block(vectorized::Block* block) { + block->swap(std::move(_input_blocks[_output_block_index])); + _blocks_memory_usage->add(-block->allocated_bytes()); + DCHECK(_parent->cast()._change_to_nullable_flags.size() == + _result_window_columns.size()); + for (size_t i = 0; i < _result_window_columns.size(); ++i) { + DCHECK(_result_window_columns[i]); + DCHECK(_agg_functions[i]); + if (_parent->cast()._change_to_nullable_flags[i]) { + block->insert({make_nullable(std::move(_result_window_columns[i])), + make_nullable(_agg_functions[i]->data_type()), ""}); + } else { + block->insert( + {std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""}); + } + } + + _output_block_index++; +} + +void AnalyticSinkLocalState::_init_result_columns() { + if (_current_row_position + _have_removed_rows == + _input_block_first_row_positions[_output_block_index]) { + _result_window_columns.resize(_agg_functions_size); + // return type create result column + for (size_t i = 0; i < _agg_functions_size; ++i) { + _result_window_columns[i] = _agg_functions[i]->data_type()->create_column(); + } + } +} + +void AnalyticSinkLocalState::_refresh_buffer_and_dependency_state(vectorized::Block* block) { + size_t buffer_size = 0; + { + std::unique_lock lc(_shared_state->buffer_mutex); + _shared_state->blocks_buffer.push(std::move(*block)); + buffer_size = _shared_state->blocks_buffer.size(); + } + if (buffer_size > 128) { + // buffer have enough data, could block the sink + _dependency->block(); + } + // buffer have push data, could signal the source to read + _dependency->set_ready_to_read(); +} + +void AnalyticSinkLocalState::_reset_state_for_next_partition() { + _partition_column_statistics.update(_partition_by_pose.end - _partition_by_pose.start); + _order_by_column_statistics.reset(); + _partition_by_pose.start = _partition_by_pose.end; + _current_row_position = _partition_by_pose.start; + _reset_agg_status(); +} + +void AnalyticSinkLocalState::_update_order_by_range() { + // still have more data + if (_order_by_pose.is_ended && _current_row_position < _order_by_pose.end) { + return; + } + SCOPED_TIMER(_order_search_timer); + while (!_next_order_by_ends.empty()) { + int64_t peek = _next_order_by_ends.front(); + _next_order_by_ends.pop(); + if (peek > _order_by_pose.end) { + _order_by_pose.start = _order_by_pose.end; + _order_by_pose.end = peek; + _order_by_pose.is_ended = true; + _order_by_column_statistics.update(_order_by_pose.end - _order_by_pose.start); + return; } } - start.row_num = start_pos; //update row num, return the find end - return start; + + if (_order_by_pose.is_ended) { + _order_by_pose.start = _order_by_pose.end; + } + _order_by_pose.end = _partition_by_pose.end; + + { + if (_order_by_pose.start < _order_by_pose.end) { + for (size_t i = 0; i < _order_by_exprs_size; ++i) { + _order_by_pose.end = find_first_not_equal( + _order_by_columns[i].get(), _order_by_columns[i].get(), + _order_by_pose.start, _order_by_pose.start, _order_by_pose.end); + } + } + } + + if (_order_by_pose.end < _partition_by_pose.end) { + _order_by_column_statistics.update(_order_by_pose.end - _order_by_pose.start); + _order_by_pose.is_ended = true; + _find_next_order_by_ends(); + return; + } + DCHECK_EQ(_partition_by_pose.end, _order_by_pose.end); + if (_partition_by_pose.is_ended) { + _order_by_pose.is_ended = true; + return; + } + _order_by_pose.is_ended = false; } -BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() { - auto& shared_state = *_shared_state; - if (shared_state.current_row_position < - shared_state.partition_by_end.pos) { //still have data, return partition_by_end directly - return shared_state.partition_by_end; +void AnalyticSinkLocalState::_get_partition_by_end() { + //still have data, return partition_by_end directly + if (_partition_by_pose.is_ended && _current_row_position < _partition_by_pose.end) { + return; + } + //no partition_by, the all block is end + if (_partition_by_eq_expr_ctxs.empty() || (_input_total_rows == 0)) { + _partition_by_pose.end = _input_total_rows - _have_removed_rows; + _partition_by_pose.is_ended = _input_eos; + return; + } + SCOPED_TIMER(_partition_search_timer); + while (!_next_partition_ends.empty()) { + int64_t peek = _next_partition_ends.front(); + _next_partition_ends.pop(); + if (peek > _partition_by_pose.end) { + _partition_by_pose.end = peek; + _partition_by_pose.is_ended = true; + return; + } } - if (_partition_by_eq_expr_ctxs.empty() || - (shared_state.input_total_rows == 0)) { //no partition_by, the all block is end - return shared_state.all_block_end; + const auto start = _partition_by_pose.end; + const auto target = (_partition_by_pose.is_ended || _partition_by_pose.end == 0) + ? _partition_by_pose.end + : _partition_by_pose.end - 1; + DCHECK(_partition_exprs_size > 0); + const auto partition_column_rows = _partition_by_columns[0]->size(); + _partition_by_pose.end = partition_column_rows; + + { + if (start < _partition_by_pose.end) { + for (size_t i = 0; i < _partition_exprs_size; ++i) { + _partition_by_pose.end = find_first_not_equal( + _partition_by_columns[i].get(), _partition_by_columns[i].get(), target, + start, _partition_by_pose.end); + } + } } - BlockRowPos cal_end = shared_state.all_block_end; - for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); - ++i) { //have partition_by, binary search the partiton end - cal_end = _compare_row_to_find_end(shared_state.partition_by_column_idxs[i], - shared_state.partition_by_end, cal_end); + if (_partition_by_pose.end < partition_column_rows) { + _partition_by_pose.is_ended = true; + _find_next_partition_ends(); + return; } - cal_end.pos = shared_state.input_block_first_row_positions[cal_end.block_num] + cal_end.row_num; - return cal_end; + + DCHECK_EQ(_partition_by_pose.end, partition_column_rows); + _partition_by_pose.is_ended = _input_eos; +} + +void AnalyticSinkLocalState::_find_next_partition_ends() { + if (!_partition_column_statistics.is_high_cardinality()) { + return; + } + + SCOPED_TIMER(_partition_search_timer); + for (size_t i = _partition_by_pose.end + 1; i < _partition_by_columns[0]->size(); ++i) { + for (auto& column : _partition_by_columns) { + auto cmp = column->compare_at(i - 1, i, *column, 1); + if (cmp != 0) { + _next_partition_ends.push(i); + break; + } + } + } +} + +void AnalyticSinkLocalState::_find_next_order_by_ends() { + if (!_order_by_column_statistics.is_high_cardinality()) { + return; + } + + SCOPED_TIMER(_order_search_timer); + for (size_t i = _order_by_pose.end + 1; i < _partition_by_pose.end; ++i) { + for (auto& column : _order_by_columns) { + auto cmp = column->compare_at(i - 1, i, *column, 1); + if (cmp != 0) { + _next_order_by_ends.push(i); + break; + } + } + } +} + +// Compares (*this)[n] and rhs[m] +int64_t AnalyticSinkLocalState::find_first_not_equal(vectorized::IColumn* reference_column, + vectorized::IColumn* compared_column, + int64_t target, int64_t start, int64_t end) { + while (start + 1 < end) { + int64_t mid = start + (end - start) / 2; + if (reference_column->compare_at(target, mid, *compared_column, 1) == 0) { + start = mid; + } else { + end = mid; + } + } + if (reference_column->compare_at(target, end - 1, *compared_column, 1) == 0) { + return end; + } + return end - 1; } AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), + _pool(pool), + _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id), + _output_tuple_id(tnode.analytic_node.output_tuple_id), _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), @@ -202,20 +598,33 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, _require_bucket_distribution(require_bucket_distribution), _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution ? tnode.distribute_expr_lists[0] - : tnode.analytic_node.partition_exprs) { + : tnode.analytic_node.partition_exprs), + _window(tnode.analytic_node.window), + _has_window(tnode.analytic_node.__isset.window), + _has_range_window(tnode.analytic_node.window.type == TAnalyticWindowType::RANGE), + _has_window_start(tnode.analytic_node.window.__isset.window_start), + _has_window_end(tnode.analytic_node.window.__isset.window_end) { _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; } Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); const TAnalyticNode& analytic_node = tnode.analytic_node; - size_t agg_size = analytic_node.analytic_functions.size(); - _agg_expr_ctxs.resize(agg_size); - _num_agg_input.resize(agg_size); - for (int i = 0; i < agg_size; ++i) { + _agg_functions_size = analytic_node.analytic_functions.size(); + _agg_expr_ctxs.resize(_agg_functions_size); + _num_agg_input.resize(_agg_functions_size); + for (int i = 0; i < _agg_functions_size; ++i) { const TExpr& desc = analytic_node.analytic_functions[i]; - _num_agg_input[i] = desc.nodes[0].num_children; + vectorized::AggFnEvaluator* evaluator = nullptr; + // Window function treats all NullableAggregateFunction as AlwaysNullable. + // Its behavior is same with executed without group by key. + // https://github.com/apache/doris/pull/40693 + RETURN_IF_ERROR(vectorized::AggFnEvaluator::create(_pool, desc, {}, /*without_key*/ true, + &evaluator)); + _agg_functions.emplace_back(evaluator); + int node_idx = 0; + _num_agg_input[i] = desc.nodes[0].num_children; for (int j = 0; j < desc.nodes[0].num_children; ++j) { ++node_idx; vectorized::VExprSPtr expr; @@ -230,7 +639,8 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) _partition_by_eq_expr_ctxs)); RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.order_by_exprs, _order_by_eq_expr_ctxs)); - _agg_functions_size = agg_size; + RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(analytic_node.range_between_offset_exprs, + _order_by_eq_expr_ctxs)); return Status::OK(); } @@ -239,6 +649,18 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) { for (const auto& ctx : _agg_expr_ctxs) { RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc())); } + _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); + _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); + _change_to_nullable_flags.resize(_agg_functions_size); + for (size_t i = 0; i < _agg_functions_size; ++i) { + SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i]; + SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i]; + RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(), + intermediate_slot_desc, output_slot_desc)); + _agg_functions[i]->set_version(state->be_exec_version()); + _change_to_nullable_flags[i] = + output_slot_desc->is_nullable() && (!_agg_functions[i]->data_type()->is_nullable()); + } if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) { vector tuple_ids; tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id()); @@ -253,11 +675,39 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) { vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, cmp_row_desc)); } } + if (!_range_between_expr_ctxs.empty()) { + DCHECK(_range_between_expr_ctxs.size() == 2); + RETURN_IF_ERROR( + vectorized::VExpr::prepare(_range_between_expr_ctxs, state, _child->row_desc())); + } + RETURN_IF_ERROR(vectorized::VExpr::open(_range_between_expr_ctxs, state)); RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state)); RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state)); for (size_t i = 0; i < _agg_functions_size; ++i) { + RETURN_IF_ERROR(_agg_functions[i]->open(state)); RETURN_IF_ERROR(vectorized::VExpr::open(_agg_expr_ctxs[i], state)); } + + _offsets_of_aggregate_states.resize(_agg_functions_size); + for (size_t i = 0; i < _agg_functions_size; ++i) { + _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states; + const auto& agg_function = _agg_functions[i]->function(); + // aggregate states are aligned based on maximum requirement + _align_aggregate_states = std::max(_align_aggregate_states, agg_function->align_of_data()); + _total_size_of_aggregate_states += agg_function->size_of_data(); + // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. + if (i + 1 < _agg_functions_size) { + size_t alignment_of_next_state = _agg_functions[i + 1]->function()->align_of_data(); + if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0) { + return Status::RuntimeError("Logical error: align_of_data is not 2^N"); + } + /// Extend total_size to next alignment requirement + /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. + _total_size_of_aggregate_states = + (_total_size_of_aggregate_states + alignment_of_next_state - 1) / + alignment_of_next_state * alignment_of_next_state; + } + } return Status::OK(); } @@ -266,73 +716,125 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows()); - local_state._shared_state->input_eos = eos; - if (local_state._shared_state->input_eos && input_block->rows() == 0) { - local_state._dependency->set_ready_to_read(); - local_state._dependency->block(); - return Status::OK(); + local_state._input_eos = eos; + local_state._remove_unused_rows(); + RETURN_IF_ERROR(_add_input_block(state, input_block)); + RETURN_IF_ERROR(local_state._execute_impl()); + if (local_state._input_eos) { + std::unique_lock lc(local_state._shared_state->sink_eos_lock); + local_state._shared_state->sink_eos = true; + local_state._dependency->set_ready_to_read(); // ready for source to read } + return Status::OK(); +} - local_state._shared_state->input_block_first_row_positions.emplace_back( - local_state._shared_state->input_total_rows); - size_t block_rows = input_block->rows(); - local_state._shared_state->input_total_rows += block_rows; - local_state._shared_state->all_block_end.block_num = - local_state._shared_state->input_blocks.size(); - local_state._shared_state->all_block_end.row_num = block_rows; - local_state._shared_state->all_block_end.pos = local_state._shared_state->input_total_rows; - - if (local_state._shared_state->origin_cols - .empty()) { //record origin columns, maybe be after this, could cast some column but no need to save - for (int c = 0; c < input_block->columns(); ++c) { - local_state._shared_state->origin_cols.emplace_back(c); - } +Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state, + vectorized::Block* input_block) { + if (input_block->rows() <= 0) { + return Status::OK(); } + auto& local_state = get_local_state(state); + local_state._input_block_first_row_positions.emplace_back(local_state._input_total_rows); + size_t block_rows = input_block->rows(); + local_state._input_total_rows += block_rows; + // record origin columns, maybe be after this, could cast some column but no need to output + auto column_to_keep = input_block->columns(); { SCOPED_TIMER(local_state._compute_agg_data_timer); - for (size_t i = 0; i < _agg_functions_size; - ++i) { //insert _agg_input_columns, execute calculate for its + //insert _agg_input_columns, execute calculate for its, and those columns maybe could remove have used data + for (size_t i = 0; i < _agg_functions_size; ++i) { for (size_t j = 0; j < local_state._agg_expr_ctxs[i].size(); ++j) { - RETURN_IF_ERROR(_insert_range_column( - input_block, local_state._agg_expr_ctxs[i][j], - local_state._shared_state->agg_input_columns[i][j].get(), block_rows)); + RETURN_IF_ERROR(_insert_range_column(input_block, local_state._agg_expr_ctxs[i][j], + local_state._agg_input_columns[i][j].get(), + block_rows)); } } } { SCOPED_TIMER(local_state._compute_partition_by_timer); for (size_t i = 0; i < local_state._partition_by_eq_expr_ctxs.size(); ++i) { - int result_col_id = -1; - RETURN_IF_ERROR(local_state._partition_by_eq_expr_ctxs[i]->execute(input_block, - &result_col_id)); - DCHECK_GE(result_col_id, 0); - local_state._shared_state->partition_by_column_idxs[i] = result_col_id; + RETURN_IF_ERROR( + _insert_range_column(input_block, local_state._partition_by_eq_expr_ctxs[i], + local_state._partition_by_columns[i].get(), block_rows)); } } - { SCOPED_TIMER(local_state._compute_order_by_timer); for (size_t i = 0; i < local_state._order_by_eq_expr_ctxs.size(); ++i) { - int result_col_id = -1; + RETURN_IF_ERROR(_insert_range_column(input_block, local_state._order_by_eq_expr_ctxs[i], + local_state._order_by_columns[i].get(), + block_rows)); + } + } + { + SCOPED_TIMER(local_state._compute_range_between_function_timer); + for (size_t i = 0; i < local_state._range_between_expr_ctxs.size(); ++i) { RETURN_IF_ERROR( - local_state._order_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id)); - DCHECK_GE(result_col_id, 0); - local_state._shared_state->ordey_by_column_idxs[i] = result_col_id; + _insert_range_column(input_block, local_state._range_between_expr_ctxs[i], + local_state._range_result_columns[i].get(), block_rows)); } } - + vectorized::Block::erase_useless_column(input_block, column_to_keep); COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes()); + COUNTER_UPDATE(local_state._blocks_memory_usage, input_block->allocated_bytes()); + local_state._input_blocks.emplace_back(std::move(*input_block)); + return Status::OK(); +} - //TODO: if need improvement, the is a tips to maintain a free queue, - //so the memory could reuse, no need to new/delete again; - local_state._shared_state->input_blocks.emplace_back(std::move(*input_block)); +void AnalyticSinkLocalState::_remove_unused_rows() { + const size_t block_num = 256; + if (_removed_block_index + block_num + 1 >= _input_block_first_row_positions.size()) { + return; + } + const int64_t unused_rows_pos = + _input_block_first_row_positions[_removed_block_index + block_num]; + + if (_have_removed_rows + _partition_by_pose.start <= unused_rows_pos) { + return; + } + + const int64_t remove_rows = unused_rows_pos - _have_removed_rows; + auto left_rows = _input_total_rows - _have_removed_rows - remove_rows; { - SCOPED_TIMER(local_state._evaluation_timer); - local_state._shared_state->found_partition_end = local_state._get_partition_by_end(); + SCOPED_TIMER(_remove_rows_timer); + for (size_t i = 0; i < _agg_functions_size; i++) { + for (size_t j = 0; j < _agg_expr_ctxs[i].size(); j++) { + _agg_input_columns[i][j] = + _agg_input_columns[i][j]->cut(remove_rows, left_rows)->assume_mutable(); + } + } + for (size_t i = 0; i < _partition_exprs_size; i++) { + _partition_by_columns[i] = + _partition_by_columns[i]->cut(remove_rows, left_rows)->assume_mutable(); + } + for (size_t i = 0; i < _order_by_exprs_size; i++) { + _order_by_columns[i] = + _order_by_columns[i]->cut(remove_rows, left_rows)->assume_mutable(); + } } - local_state._refresh_need_more_input(); - return Status::OK(); + COUNTER_UPDATE(_remove_count, 1); + COUNTER_UPDATE(_remove_rows, remove_rows); + _current_row_position -= remove_rows; + _partition_by_pose.remove_unused_rows(remove_rows); + _order_by_pose.remove_unused_rows(remove_rows); + int64_t candidate_partition_end_size = _next_partition_ends.size(); + while (--candidate_partition_end_size >= 0) { + auto peek = _next_partition_ends.front(); + _next_partition_ends.pop(); + _next_partition_ends.push(peek - remove_rows); + } + int64_t candidate_peer_group_end_size = _next_order_by_ends.size(); + while (--candidate_peer_group_end_size >= 0) { + auto peek = _next_order_by_ends.front(); + _next_order_by_ends.pop(); + _next_order_by_ends.push(peek - remove_rows); + } + _removed_block_index += block_num; + _have_removed_rows += remove_rows; + + DCHECK_GE(_current_row_position, 0); + DCHECK_GE(_partition_by_pose.end, 0); } Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block, @@ -346,6 +848,35 @@ Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block, return Status::OK(); } +void AnalyticSinkLocalState::_reset_agg_status() { + for (size_t i = 0; i < _agg_functions_size; ++i) { + _agg_functions[i]->reset(_fn_place_ptr + _offsets_of_aggregate_states[i]); + } +} + +void AnalyticSinkLocalState::_create_agg_status() { + for (size_t i = 0; i < _agg_functions_size; ++i) { + try { + _agg_functions[i]->create(_fn_place_ptr + _offsets_of_aggregate_states[i]); + } catch (...) { + for (int j = 0; j < i; ++j) { + _agg_functions[j]->destroy(_fn_place_ptr + _offsets_of_aggregate_states[j]); + } + throw; + } + } + _agg_functions_created = true; +} + +void AnalyticSinkLocalState::_destroy_agg_status() { + if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) { + return; + } + for (size_t i = 0; i < _agg_functions_size; ++i) { + _agg_functions[i]->destroy(_fn_place_ptr + _offsets_of_aggregate_states[i]); + } +} + template class DataSinkOperatorX; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 0ff7c4e4e047bd..5530a4f4de563e 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -28,6 +28,37 @@ namespace doris { namespace pipeline { class AnalyticSinkOperatorX; +struct BoundaryPose { + int64_t start = 0; + int64_t end = 0; + bool is_ended = false; + void remove_unused_rows(int64_t cnt) { + start -= cnt; + end -= cnt; + } +}; + +class PartitionStatistics { +public: + void update(int64_t size) { + _count++; + _cumulative_size += size; + _average_size = _cumulative_size / _count; + } + + void reset() { + _count = 0; + _cumulative_size = 0; + _average_size = 0; + } + + bool is_high_cardinality() const { return _count > 16 && _average_size < 8; } + + int64_t _count = 0; + int64_t _cumulative_size = 0; + int64_t _average_size = 0; +}; + class AnalyticSinkLocalState : public PipelineXSinkLocalState { ENABLE_FACTORY_CREATOR(AnalyticSinkLocalState); @@ -37,34 +68,99 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalStatefound_partition_end); - if (need_more_input) { - _dependency->set_block_to_read(); - _dependency->set_ready(); - } else { - _dependency->block(); - _dependency->set_ready_to_read(); - } - return need_more_input; - } - BlockRowPos _get_partition_by_end(); - BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start, BlockRowPos end, - bool need_check_first = false); - bool _whether_need_next_partition(BlockRowPos& found_partition_end); + std::vector _agg_expr_ctxs; + vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs; + vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs; + vectorized::VExprContextSPtrs _range_between_expr_ctxs; + std::vector> _agg_input_columns; + std::vector _partition_by_columns; + std::vector _order_by_columns; + std::vector _range_result_columns; + size_t _partition_exprs_size = 0; + size_t _order_by_exprs_size = 0; + BoundaryPose _partition_by_pose; + BoundaryPose _order_by_pose; + PartitionStatistics _partition_column_statistics; + PartitionStatistics _order_by_column_statistics; + std::queue _next_partition_ends; + std::queue _next_order_by_ends; + + size_t _agg_functions_size = 0; + bool _agg_functions_created = false; + vectorized::AggregateDataPtr _fn_place_ptr = nullptr; + std::unique_ptr _agg_arena_pool = nullptr; + std::vector _agg_functions; + std::vector _offsets_of_aggregate_states; + std::vector _result_column_nullable_flags; + + using vectorized_get_next = bool (AnalyticSinkLocalState::*)(int64_t, int64_t); + struct executor { + vectorized_get_next get_next_impl; + }; + executor _executor; + + bool _current_window_empty = false; + int64_t _current_row_position = 0; + int64_t _output_block_index = 0; + std::vector _result_window_columns; + + int64_t _rows_start_offset = 0; + int64_t _rows_end_offset = 0; + int64_t _input_total_rows = 0; + bool _input_eos = false; + std::vector _input_blocks; + std::vector _input_block_first_row_positions; + int64_t _removed_block_index = 0; + int64_t _have_removed_rows = 0; RuntimeProfile::Counter* _evaluation_timer = nullptr; RuntimeProfile::Counter* _compute_agg_data_timer = nullptr; RuntimeProfile::Counter* _compute_partition_by_timer = nullptr; RuntimeProfile::Counter* _compute_order_by_timer = nullptr; - - std::vector _agg_expr_ctxs; - vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs; - vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs; + RuntimeProfile::Counter* _compute_range_between_function_timer = nullptr; + RuntimeProfile::Counter* _partition_search_timer = nullptr; + RuntimeProfile::Counter* _order_search_timer = nullptr; + RuntimeProfile::Counter* _remove_rows_timer = nullptr; + RuntimeProfile::Counter* _remove_count = nullptr; + RuntimeProfile::Counter* _remove_rows = nullptr; + RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr; }; class AnalyticSinkOperatorX final : public DataSinkOperatorX { @@ -94,23 +190,44 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX _agg_expr_ctxs; vectorized::VExprContextSPtrs _partition_by_eq_expr_ctxs; vectorized::VExprContextSPtrs _order_by_eq_expr_ctxs; + vectorized::VExprContextSPtrs _range_between_expr_ctxs; size_t _agg_functions_size = 0; + std::vector _num_agg_input; + std::vector _agg_functions; + TupleId _intermediate_tuple_id; + TupleId _output_tuple_id; + TupleDescriptor* _intermediate_tuple_desc = nullptr; + TupleDescriptor* _output_tuple_desc = nullptr; const TTupleId _buffered_tuple_id; - std::vector _num_agg_input; const bool _is_colocate; const bool _require_bucket_distribution; const std::vector _partition_exprs; + + TAnalyticWindow _window; + bool _has_window; + bool _has_range_window; + bool _has_window_start; + bool _has_window_end; + + /// The offset of the n-th functions. + std::vector _offsets_of_aggregate_states; + /// The total size of the row from the functions. + size_t _total_size_of_aggregate_states = 0; + /// The max align size for functions + size_t _align_aggregate_states = 1; + std::vector _change_to_nullable_flags; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index fe0ab0b148e55a..ce6f0d1d1074ae 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -17,6 +17,7 @@ #include "analytic_source_operator.h" +#include #include #include "pipeline/exec/operator.h" @@ -27,589 +28,68 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* parent) - : PipelineXLocalState(state, parent), - _output_block_index(0), - _window_end_position(0), - _next_partition(false), - _rows_start_offset(0), - _rows_end_offset(0), - _fn_place_ptr(nullptr), - _agg_functions_size(0), - _agg_functions_created(false), - _agg_arena_pool(std::make_unique()) {} - -//_partition_by_columns,_order_by_columns save in blocks, so if need to calculate the boundary, may find in which blocks firstly -BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int64_t idx, BlockRowPos start, - BlockRowPos end, bool need_check_first) { - auto& shared_state = *_shared_state; - int64_t start_init_row_num = start.row_num; - vectorized::ColumnPtr start_column = - shared_state.input_blocks[start.block_num].get_by_position(idx).column; - vectorized::ColumnPtr start_next_block_column = start_column; - - DCHECK_LE(start.block_num, end.block_num); - DCHECK_LE(start.block_num, shared_state.input_blocks.size() - 1); - int64_t start_block_num = start.block_num; - int64_t end_block_num = end.block_num; - int64_t mid_blcok_num = end.block_num; - // To fix this problem: https://github.com/apache/doris/issues/15951 - // in this case, the partition by column is last row of block, so it's pointed to a new block at row = 0, range is: [left, right) - // From the perspective of order by column, the two values are exactly equal. - // so the range will be get wrong because it's compare_at == 0 with next block at row = 0 - if (need_check_first && end.block_num > 0 && end.row_num == 0) { - end.block_num--; - end_block_num--; - end.row_num = shared_state.input_blocks[end_block_num].rows(); - } - //binary search find in which block - while (start_block_num < end_block_num) { - mid_blcok_num = (start_block_num + end_block_num + 1) >> 1; - start_next_block_column = - shared_state.input_blocks[mid_blcok_num].get_by_position(idx).column; - //Compares (*this)[n] and rhs[m], this: start[init_row] rhs: mid[0] - if (start_column->compare_at(start_init_row_num, 0, *start_next_block_column, 1) == 0) { - start_block_num = mid_blcok_num; - } else { - end_block_num = mid_blcok_num - 1; - } - } - - // have check the start.block_num: start_column[start_init_row_num] with mid_blcok_num start_next_block_column[0] - // now next block must not be result, so need check with end_block_num: start_next_block_column[last_row] - if (end_block_num == mid_blcok_num - 1) { - start_next_block_column = - shared_state.input_blocks[end_block_num].get_by_position(idx).column; - int64_t block_size = shared_state.input_blocks[end_block_num].rows(); - if ((start_column->compare_at(start_init_row_num, block_size - 1, *start_next_block_column, - 1) == 0)) { - start.block_num = end_block_num + 1; - start.row_num = 0; - return start; - } - } - - //check whether need get column again, maybe same as first init - // if the start_block_num have move to forword, so need update start block num and compare it from row_num=0 - if (start_block_num != start.block_num) { - start_init_row_num = 0; - start.block_num = start_block_num; - start_column = shared_state.input_blocks[start.block_num].get_by_position(idx).column; - } - //binary search, set start and end pos - int64_t start_pos = start_init_row_num; - int64_t end_pos = shared_state.input_blocks[start.block_num].rows(); - //if end_block_num haven't moved, only start_block_num go to the end block - //so could use the end.row_num for binary search - if (start.block_num == end.block_num) { - end_pos = end.row_num; - } - while (start_pos < end_pos) { - int64_t mid_pos = (start_pos + end_pos) >> 1; - if (start_column->compare_at(start_init_row_num, mid_pos, *start_column, 1)) { - end_pos = mid_pos; - } else { - start_pos = mid_pos + 1; - } - } - start.row_num = start_pos; //update row num, return the find end - return start; -} - -BlockRowPos AnalyticLocalState::_get_partition_by_end() { - auto& shared_state = *_shared_state; - if (shared_state.current_row_position < - shared_state.partition_by_end.pos) { //still have data, return partition_by_end directly - return shared_state.partition_by_end; - } - - const auto partition_exprs_size = - _parent->cast()._partition_exprs_size; - if (partition_exprs_size == 0 || - (shared_state.input_total_rows == 0)) { //no partition_by, the all block is end - return shared_state.all_block_end; - } - - BlockRowPos cal_end = shared_state.all_block_end; - for (size_t i = 0; i < partition_exprs_size; - ++i) { //have partition_by, binary search the partiton end - cal_end = _compare_row_to_find_end(shared_state.partition_by_column_idxs[i], - shared_state.partition_by_end, cal_end); - } - cal_end.pos = shared_state.input_block_first_row_positions[cal_end.block_num] + cal_end.row_num; - return cal_end; -} - -bool AnalyticLocalState::_whether_need_next_partition(BlockRowPos& found_partition_end) { - auto& shared_state = *_shared_state; - if (shared_state.input_eos || - (shared_state.current_row_position < - shared_state.partition_by_end.pos)) { //now still have partition data - return false; - } - const auto partition_exprs_size = - _parent->cast()._partition_exprs_size; - if ((partition_exprs_size == 0 && !shared_state.input_eos) || - (found_partition_end.pos == 0)) { //no partition, get until fetch to EOS - return true; - } - if (partition_exprs_size != 0 && found_partition_end.pos == shared_state.all_block_end.pos && - !shared_state.input_eos) { //current partition data calculate done - return true; - } - return false; -} + : PipelineXLocalState(state, parent) {} Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_init_timer); - _blocks_memory_usage = - profile()->AddHighWaterMarkCounter("MemoryUsageBlocks", TUnit::BYTES, "", 1); - _evaluation_timer = ADD_TIMER(profile(), "GetPartitionBoundTime"); - _execute_timer = ADD_TIMER(profile(), "ExecuteTime"); _get_next_timer = ADD_TIMER(profile(), "GetNextTime"); - _get_result_timer = ADD_TIMER(profile(), "GetResultsTime"); - return Status::OK(); -} - -Status AnalyticLocalState::open(RuntimeState* state) { - RETURN_IF_ERROR(PipelineXLocalState::open(state)); - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_open_timer); - - auto& p = _parent->cast(); - _agg_functions_size = p._agg_functions.size(); - - _agg_functions.resize(p._agg_functions.size()); - for (size_t i = 0; i < _agg_functions.size(); i++) { - _agg_functions[i] = p._agg_functions[i]->clone(state, state->obj_pool()); - } - - _fn_place_ptr = _agg_arena_pool->aligned_alloc(p._total_size_of_aggregate_states, - p._align_aggregate_states); - - if (!p._has_window) { //haven't set window, Unbounded: [unbounded preceding,unbounded following] - _executor.get_next = std::bind(&AnalyticLocalState::_get_next_for_partition, this, - std::placeholders::_1); - - } else if (p._has_range_window) { - if (!p._has_window_end) { //haven't set end, so same as PARTITION, [unbounded preceding, unbounded following] - _executor.get_next = std::bind(&AnalyticLocalState::_get_next_for_partition, - this, std::placeholders::_1); - - } else { - _executor.get_next = std::bind(&AnalyticLocalState::_get_next_for_range, this, - std::placeholders::_1); - } - - } else { - if (!p._has_window_start && - !p._has_window_end) { //haven't set start and end, same as PARTITION - _executor.get_next = std::bind(&AnalyticLocalState::_get_next_for_partition, - this, std::placeholders::_1); - - } else { - if (p._has_window_start) { //calculate start boundary - TAnalyticWindowBoundary b = p._window.window_start; - if (b.__isset.rows_offset_value) { //[offset , ] - _rows_start_offset = b.rows_offset_value; - if (b.type == TAnalyticWindowBoundaryType::PRECEDING) { - _rows_start_offset *= -1; //preceding--> negative - } //current_row 0 - } else { //following positive - DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW); //[current row, ] - _rows_start_offset = 0; - } - } - - if (p._has_window_end) { //calculate end boundary - TAnalyticWindowBoundary b = p._window.window_end; - if (b.__isset.rows_offset_value) { //[ , offset] - _rows_end_offset = b.rows_offset_value; - if (b.type == TAnalyticWindowBoundaryType::PRECEDING) { - _rows_end_offset *= -1; - } - } else { - DCHECK_EQ(b.type, TAnalyticWindowBoundaryType::CURRENT_ROW); //[ ,current row] - _rows_end_offset = 0; - } - } - - _executor.get_next = std::bind(&AnalyticLocalState::_get_next_for_rows, this, - std::placeholders::_1); - } - } - _create_agg_status(); - return Status::OK(); -} - -void AnalyticLocalState::_reset_agg_status() { - for (size_t i = 0; i < _agg_functions_size; ++i) { - _agg_functions[i]->reset( - _fn_place_ptr + - _parent->cast()._offsets_of_aggregate_states[i]); - } -} - -void AnalyticLocalState::_create_agg_status() { - for (size_t i = 0; i < _agg_functions_size; ++i) { - try { - _agg_functions[i]->create( - _fn_place_ptr + - _parent->cast()._offsets_of_aggregate_states[i]); - } catch (...) { - for (int j = 0; j < i; ++j) { - _agg_functions[j]->destroy( - _fn_place_ptr + - _parent->cast()._offsets_of_aggregate_states[j]); - } - throw; - } - } - _agg_functions_created = true; -} - -void AnalyticLocalState::_destroy_agg_status() { - if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) { - return; - } - for (size_t i = 0; i < _agg_functions_size; ++i) { - _agg_functions[i]->destroy( - _fn_place_ptr + - _parent->cast()._offsets_of_aggregate_states[i]); - } -} - -void AnalyticLocalState::_execute_for_win_func(int64_t partition_start, int64_t partition_end, - int64_t frame_start, int64_t frame_end) { - SCOPED_TIMER(_execute_timer); - for (size_t i = 0; i < _agg_functions_size; ++i) { - std::vector agg_columns; - for (int j = 0; j < _shared_state->agg_input_columns[i].size(); ++j) { - agg_columns.push_back(_shared_state->agg_input_columns[i][j].get()); - } - _agg_functions[i]->function()->add_range_single_place( - partition_start, partition_end, frame_start, frame_end, - _fn_place_ptr + - _parent->cast()._offsets_of_aggregate_states[i], - agg_columns.data(), _agg_arena_pool.get()); - - // If the end is not greater than the start, the current window should be empty. - _current_window_empty = - std::min(frame_end, partition_end) <= std::max(frame_start, partition_start); - } -} - -void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) { - SCOPED_TIMER(_get_result_timer); - int64_t current_block_row_pos = - _shared_state->input_block_first_row_positions[_output_block_index]; - int64_t get_result_start = _shared_state->current_row_position - current_block_row_pos; - if (_parent->cast()._fn_scope == AnalyticFnScope::PARTITION) { - int64_t get_result_end = - std::min(_shared_state->current_row_position + current_block_rows, - _shared_state->partition_by_end.pos); - _window_end_position = - std::min(get_result_end - current_block_row_pos, current_block_rows); - _shared_state->current_row_position += (_window_end_position - get_result_start); - } else if (_parent->cast()._fn_scope == AnalyticFnScope::RANGE) { - _window_end_position = - std::min(_order_by_end.pos - current_block_row_pos, current_block_rows); - _shared_state->current_row_position += (_window_end_position - get_result_start); - } else { - _window_end_position++; - _shared_state->current_row_position++; - } - - const auto& offsets_of_aggregate_states = - _parent->cast()._offsets_of_aggregate_states; - for (size_t i = 0; i < _agg_functions_size; ++i) { - for (size_t j = get_result_start; j < _window_end_position; ++j) { - if (!_agg_functions[i]->function()->get_return_type()->is_nullable() && - _result_window_columns[i]->is_nullable()) { - if (_current_window_empty) { - _result_window_columns[i]->insert_default(); - } else { - auto* dst = assert_cast( - _result_window_columns[i].get()); - dst->get_null_map_data().push_back(0); - _agg_functions[i]->insert_result_info( - _fn_place_ptr + offsets_of_aggregate_states[i], - &dst->get_nested_column()); - } - continue; - } - _agg_functions[i]->insert_result_info(_fn_place_ptr + offsets_of_aggregate_states[i], - _result_window_columns[i].get()); - } - } -} - -Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) { - SCOPED_TIMER(_get_next_timer); - while (_shared_state->current_row_position < _shared_state->partition_by_end.pos && - _window_end_position < current_block_rows) { - int64_t range_start, range_end; - if (!_parent->cast()._window.__isset.window_start && - _parent->cast()._window.window_end.type == - TAnalyticWindowBoundaryType::CURRENT_ROW) { - // [preceding, current_row], [current_row, following] rewrite it's same - // as could reuse the previous calculate result, so don't call _reset_agg_status function - // going on calculate, add up data, no need to reset state - range_start = _shared_state->current_row_position; - range_end = _shared_state->current_row_position + 1; - } else { - _reset_agg_status(); - range_end = _shared_state->current_row_position + _rows_end_offset + 1; - //[preceding, offset] --unbound: [preceding, following] - if (!_parent->cast()._window.__isset.window_start) { - range_start = _partition_by_start.pos; - } else { - range_start = _shared_state->current_row_position + _rows_start_offset; - } - // Make sure range_start <= range_end - range_start = std::min(range_start, range_end); - } - _execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos, - range_start, range_end); - _insert_result_info(current_block_rows); - } - return Status::OK(); -} - -Status AnalyticLocalState::_get_next_for_partition(size_t current_block_rows) { - SCOPED_TIMER(_get_next_timer); - if (_next_partition) { - _execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos, - _partition_by_start.pos, _shared_state->partition_by_end.pos); - } - _insert_result_info(current_block_rows); - return Status::OK(); -} - -Status AnalyticLocalState::_get_next_for_range(size_t current_block_rows) { - SCOPED_TIMER(_get_next_timer); - while (_shared_state->current_row_position < _shared_state->partition_by_end.pos && - _window_end_position < current_block_rows) { - if (_shared_state->current_row_position >= _order_by_end.pos) { - _update_order_by_range(); - _execute_for_win_func(_partition_by_start.pos, _shared_state->partition_by_end.pos, - _order_by_start.pos, _order_by_end.pos); - } - _insert_result_info(current_block_rows); - } - return Status::OK(); -} - -void AnalyticLocalState::_update_order_by_range() { - _order_by_start = _order_by_end; - _order_by_end = _shared_state->partition_by_end; - for (size_t i = 0; i < _parent->cast()._order_by_exprs_size; ++i) { - _order_by_end = _compare_row_to_find_end(_shared_state->ordey_by_column_idxs[i], - _order_by_start, _order_by_end, true); - } - _order_by_start.pos = - _shared_state->input_block_first_row_positions[_order_by_start.block_num] + - _order_by_start.row_num; - _order_by_end.pos = _shared_state->input_block_first_row_positions[_order_by_end.block_num] + - _order_by_end.row_num; - // `_order_by_end` will be assigned to `_order_by_start` next time, - // so make it a valid position. - if (_order_by_end.row_num == _shared_state->input_blocks[_order_by_end.block_num].rows()) { - _order_by_end.block_num++; - _order_by_end.row_num = 0; - } -} - -void AnalyticLocalState::init_result_columns() { - if (!_window_end_position) { - _result_window_columns.resize(_agg_functions_size); - for (size_t i = 0; i < _agg_functions_size; ++i) { - _result_window_columns[i] = - _agg_functions[i]->data_type()->create_column(); //return type - } - } -} - -//calculate pos have arrive partition end, so it's needed to init next partition, and update the boundary of partition -bool AnalyticLocalState::init_next_partition(BlockRowPos found_partition_end) { - if ((_shared_state->current_row_position >= _shared_state->partition_by_end.pos) && - ((_shared_state->partition_by_end.pos == 0) || - (_shared_state->partition_by_end.pos != found_partition_end.pos))) { - _partition_by_start = _shared_state->partition_by_end; - _shared_state->partition_by_end = found_partition_end; - _shared_state->current_row_position = _partition_by_start.pos; - _reset_agg_status(); - return true; - } - return false; -} - -Status AnalyticLocalState::output_current_block(vectorized::Block* block) { - block->swap(std::move(_shared_state->input_blocks[_output_block_index])); - _blocks_memory_usage->add(-block->allocated_bytes()); - if (_shared_state->origin_cols.size() < block->columns()) { - block->erase_not_in(_shared_state->origin_cols); - } - - DCHECK(_parent->cast()._change_to_nullable_flags.size() == - _result_window_columns.size()); - for (size_t i = 0; i < _result_window_columns.size(); ++i) { - if (_parent->cast()._change_to_nullable_flags[i]) { - block->insert({make_nullable(std::move(_result_window_columns[i])), - make_nullable(_agg_functions[i]->data_type()), ""}); - } else { - block->insert( - {std::move(_result_window_columns[i]), _agg_functions[i]->data_type(), ""}); - } - } - - _output_block_index++; - _window_end_position = 0; - + _filtered_rows_counter = ADD_COUNTER(profile(), "FilteredRows", TUnit::UNIT); return Status::OK(); } AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs) - : OperatorX(pool, tnode, operator_id, descs), - _window(tnode.analytic_node.window), - _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id), - _output_tuple_id(tnode.analytic_node.output_tuple_id), - _has_window(tnode.analytic_node.__isset.window), - _has_range_window(tnode.analytic_node.window.type == TAnalyticWindowType::RANGE), - _has_window_start(tnode.analytic_node.window.__isset.window_start), - _has_window_end(tnode.analytic_node.window.__isset.window_end), - _partition_exprs_size(tnode.analytic_node.partition_exprs.size()), - _order_by_exprs_size(tnode.analytic_node.order_by_exprs.size()) { + : OperatorX(pool, tnode, operator_id, descs) { _is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator; - _fn_scope = AnalyticFnScope::PARTITION; - if (tnode.analytic_node.__isset.window && - tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) { - DCHECK(!_window.__isset.window_start) << "RANGE windows must have UNBOUNDED PRECEDING"; - DCHECK(!_window.__isset.window_end || - _window.window_end.type == TAnalyticWindowBoundaryType::CURRENT_ROW) - << "RANGE window end bound must be CURRENT ROW or UNBOUNDED FOLLOWING"; - - if (_window.__isset - .window_end) { //haven't set end, so same as PARTITION, [unbounded preceding, unbounded following] - _fn_scope = AnalyticFnScope::RANGE; //range: [unbounded preceding,current row] - } - - } else if (tnode.analytic_node.__isset.window) { - if (_window.__isset.window_start || _window.__isset.window_end) { - _fn_scope = AnalyticFnScope::ROWS; - } - } } -Status AnalyticSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(OperatorX::init(tnode, state)); - const TAnalyticNode& analytic_node = tnode.analytic_node; - size_t agg_size = analytic_node.analytic_functions.size(); - for (int i = 0; i < agg_size; ++i) { - vectorized::AggFnEvaluator* evaluator = nullptr; - // Window function treats all NullableAggregateFunction as AlwaysNullable. - // Its behavior is same with executed without group by key. - // https://github.com/apache/doris/pull/40693 - RETURN_IF_ERROR(vectorized::AggFnEvaluator::create( - _pool, analytic_node.analytic_functions[i], {}, /*wihout_key*/ true, &evaluator)); - _agg_functions.emplace_back(evaluator); - } - - return Status::OK(); -} - -Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, +Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block, bool* eos) { + RETURN_IF_CANCELLED(state); auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); - if (local_state._shared_state->input_eos && - (local_state._output_block_index == local_state._shared_state->input_blocks.size() || - local_state._shared_state->input_total_rows == 0)) { - *eos = true; - return Status::OK(); - } - - while (!local_state._shared_state->input_eos || - local_state._output_block_index < local_state._shared_state->input_blocks.size()) { - { - SCOPED_TIMER(local_state._evaluation_timer); - local_state._shared_state->found_partition_end = local_state._get_partition_by_end(); - } - if (local_state._refresh_need_more_input()) { - return Status::OK(); - } - local_state._next_partition = - local_state.init_next_partition(local_state._shared_state->found_partition_end); - local_state.init_result_columns(); - size_t current_block_rows = - local_state._shared_state->input_blocks[local_state._output_block_index].rows(); - RETURN_IF_ERROR(local_state._executor.get_next(current_block_rows)); - if (local_state._window_end_position == current_block_rows) { - break; + SCOPED_TIMER(local_state._get_next_timer); + output_block->clear_column_data(); + size_t output_rows = 0; + { + std::lock_guard lock(local_state._shared_state->buffer_mutex); + if (!local_state._shared_state->blocks_buffer.empty()) { + local_state._shared_state->blocks_buffer.front().swap(*output_block); + local_state._shared_state->blocks_buffer.pop(); + output_rows = output_block->rows(); + //if buffer have no data and sink not eos, block reading and wait for signal again + RETURN_IF_ERROR(vectorized::VExprContext::filter_block( + local_state._conjuncts, output_block, output_block->columns())); + if (local_state._shared_state->blocks_buffer.empty() && + !local_state._shared_state->sink_eos) { + // add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos. + // so have to hold mutex to set block(), avoid to sink have set eos and set ready, but here set block() by mistake + std::unique_lock lc(local_state._shared_state->sink_eos_lock); + if (!local_state._shared_state->sink_eos) { + local_state._dependency->block(); // block self source + local_state._dependency->set_ready_to_write(); // ready for sink write + } + } + } else { + //iff buffer have no data and sink eos, set eos + std::unique_lock lc(local_state._shared_state->sink_eos_lock); + *eos = local_state._shared_state->sink_eos; } } - RETURN_IF_ERROR(local_state.output_current_block(block)); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, block, - block->columns())); - local_state.reached_limit(block, eos); - return Status::OK(); -} - -Status AnalyticLocalState::close(RuntimeState* state) { - SCOPED_TIMER(exec_time_counter()); - SCOPED_TIMER(_close_timer); - if (_closed) { - return Status::OK(); + local_state.reached_limit(output_block, eos); + if (!output_block->empty()) { + auto return_rows = output_block->rows(); + local_state._num_rows_returned += return_rows; + COUNTER_UPDATE(local_state._filtered_rows_counter, output_rows - return_rows); } - - _destroy_agg_status(); - _agg_arena_pool = nullptr; - - std::vector tmp_result_window_columns; - _result_window_columns.swap(tmp_result_window_columns); - return PipelineXLocalState::close(state); + return Status::OK(); } Status AnalyticSourceOperatorX::open(RuntimeState* state) { RETURN_IF_ERROR(OperatorX::open(state)); DCHECK(_child->row_desc().is_prefix_of(_row_descriptor)); - _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); - _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); - for (size_t i = 0; i < _agg_functions.size(); ++i) { - SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i]; - SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i]; - RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(), - intermediate_slot_desc, output_slot_desc)); - _agg_functions[i]->set_version(state->be_exec_version()); - _change_to_nullable_flags.push_back(output_slot_desc->is_nullable() && - !_agg_functions[i]->data_type()->is_nullable()); - } - - _offsets_of_aggregate_states.resize(_agg_functions.size()); - for (size_t i = 0; i < _agg_functions.size(); ++i) { - _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states; - const auto& agg_function = _agg_functions[i]->function(); - // aggregate states are aligned based on maximum requirement - _align_aggregate_states = std::max(_align_aggregate_states, agg_function->align_of_data()); - _total_size_of_aggregate_states += agg_function->size_of_data(); - // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. - if (i + 1 < _agg_functions.size()) { - size_t alignment_of_next_state = _agg_functions[i + 1]->function()->align_of_data(); - if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0) { - return Status::RuntimeError("Logical error: align_of_data is not 2^N"); - } - /// Extend total_size to next alignment requirement - /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. - _total_size_of_aggregate_states = - (_total_size_of_aggregate_states + alignment_of_next_state - 1) / - alignment_of_next_state * alignment_of_next_state; - } - } - for (auto* agg_function : _agg_functions) { - RETURN_IF_ERROR(agg_function->open(state)); - } return Status::OK(); } diff --git a/be/src/pipeline/exec/analytic_source_operator.h b/be/src/pipeline/exec/analytic_source_operator.h index 56c664cec6193b..be1fdb2c9e5db5 100644 --- a/be/src/pipeline/exec/analytic_source_operator.h +++ b/be/src/pipeline/exec/analytic_source_operator.h @@ -27,87 +27,18 @@ class RuntimeState; namespace pipeline { #include "common/compile_check_begin.h" -enum AnalyticFnScope { PARTITION, RANGE, ROWS }; class AnalyticSourceOperatorX; class AnalyticLocalState final : public PipelineXLocalState { public: ENABLE_FACTORY_CREATOR(AnalyticLocalState); AnalyticLocalState(RuntimeState* state, OperatorXBase* parent); - Status init(RuntimeState* state, LocalStateInfo& info) override; - Status open(RuntimeState* state) override; - Status close(RuntimeState* state) override; - - void init_result_columns(); - - Status output_current_block(vectorized::Block* block); - - bool init_next_partition(BlockRowPos found_partition_end); private: - Status _get_next_for_rows(size_t rows); - Status _get_next_for_range(size_t rows); - Status _get_next_for_partition(size_t rows); - - void _execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start, - int64_t frame_end); - void _insert_result_info(int64_t current_block_rows); - - void _update_order_by_range(); - bool _refresh_need_more_input() { - auto need_more_input = _whether_need_next_partition(_shared_state->found_partition_end); - if (need_more_input) { - _dependency->block(); - _dependency->set_ready_to_write(); - } else { - _dependency->set_block_to_write(); - _dependency->set_ready(); - } - return need_more_input; - } - BlockRowPos _get_partition_by_end(); - BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start, BlockRowPos end, - bool need_check_first = false); - bool _whether_need_next_partition(BlockRowPos& found_partition_end); - - void _reset_agg_status(); - void _create_agg_status(); - void _destroy_agg_status(); - friend class AnalyticSourceOperatorX; - - int64_t _output_block_index; - int64_t _window_end_position; - bool _next_partition; - std::vector _result_window_columns; - - int64_t _rows_start_offset; - int64_t _rows_end_offset; - vectorized::AggregateDataPtr _fn_place_ptr; - size_t _agg_functions_size; - bool _agg_functions_created; - bool _current_window_empty = false; - - BlockRowPos _order_by_start; - BlockRowPos _order_by_end; - BlockRowPos _partition_by_start; - std::unique_ptr _agg_arena_pool; - std::vector _agg_functions; - - RuntimeProfile::Counter* _evaluation_timer = nullptr; - RuntimeProfile::Counter* _execute_timer = nullptr; RuntimeProfile::Counter* _get_next_timer = nullptr; - RuntimeProfile::Counter* _get_result_timer = nullptr; - RuntimeProfile::HighWaterMarkCounter* _blocks_memory_usage = nullptr; - - using vectorized_get_next = std::function; - - struct executor { - vectorized_get_next get_next; - }; - - executor _executor; + RuntimeProfile::Counter* _filtered_rows_counter = nullptr; }; class AnalyticSourceOperatorX final : public OperatorX { @@ -119,39 +50,10 @@ class AnalyticSourceOperatorX final : public OperatorX { bool is_source() const override { return true; } - Status init(const TPlanNode& tnode, RuntimeState* state) override; Status open(RuntimeState* state) override; private: friend class AnalyticLocalState; - - TAnalyticWindow _window; - - TupleId _intermediate_tuple_id; - TupleId _output_tuple_id; - - bool _has_window; - bool _has_range_window; - bool _has_window_start; - bool _has_window_end; - - std::vector _agg_functions; - - AnalyticFnScope _fn_scope; - - TupleDescriptor* _intermediate_tuple_desc = nullptr; - TupleDescriptor* _output_tuple_desc = nullptr; - - /// The offset of the n-th functions. - std::vector _offsets_of_aggregate_states; - /// The total size of the row from the functions. - size_t _total_size_of_aggregate_states = 0; - /// The max align size for functions - size_t _align_aggregate_states = 1; - - std::vector _change_to_nullable_flags; - const size_t _partition_exprs_size; - const size_t _order_by_exprs_size; }; } // namespace pipeline diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h index 5d449318b7d2f5..ca5b3bb07652bb 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.h +++ b/be/src/vec/aggregate_functions/aggregate_function_window.h @@ -86,8 +86,8 @@ class WindowFunctionRowNumber final struct RankData { int64_t rank = 0; - int64_t count = 0; - int64_t peer_group_start = 0; + int64_t count = 1; + int64_t peer_group_start = -1; }; class WindowFunctionRank final : public IAggregateFunctionDataHelper { @@ -131,7 +131,7 @@ class WindowFunctionRank final : public IAggregateFunctionDataHelper range_between_offset_exprs } struct TMergeNode { diff --git a/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out b/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out new file mode 100644 index 00000000000000..ac07866762434b --- /dev/null +++ b/regression-test/data/query_p0/sql_functions/window_functions/test_column_boundary.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql_1 -- +512000000 + diff --git a/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy b/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy new file mode 100644 index 00000000000000..398d0a73e19e6a --- /dev/null +++ b/regression-test/suites/query_p0/sql_functions/window_functions/test_column_boundary.groovy @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_column_boundary") { + sql """ DROP TABLE IF EXISTS test_column_boundary """ + sql """ + CREATE TABLE IF NOT EXISTS test_column_boundary ( + u_id int NULL COMMENT "", + u_city varchar(20) NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`u_id`, `u_city`) + DISTRIBUTED BY HASH(`u_id`, `u_city`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ); + """ + + sql """ insert into test_column_boundary select number, number + random() from numbers("number" = "1000000"); """ + Integer count = 0; + Integer maxCount = 9; + while (count < maxCount) { + sql """ insert into test_column_boundary select * from test_column_boundary;""" + count++ + sleep(100); + } + sql """ set parallel_pipeline_task_num = 1; """ + + qt_sql_1 """ select count() from test_column_boundary; """ + test { + // column size is too large + sql """ select count() over(partition by u_city) from test_column_boundary; """ + exception "string column length is too large" + } + sql """ DROP TABLE IF EXISTS test_column_boundary """ +} + +