Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Dec 31, 2024
1 parent d02dbcc commit c2155d6
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 33 deletions.
15 changes: 15 additions & 0 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,21 @@ Status AnalyticSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}

Status AnalyticSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}

_destroy_agg_status();
_agg_arena_pool = nullptr;

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
return PipelineXSinkLocalState<AnalyticSharedState>::close(state, exec_status);
}

Status AnalyticSinkLocalState::_get_next_for_sliding_rows() {
do {
auto batch_size = _input_blocks[_output_block_index].rows();
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;

private:
friend class AnalyticSinkOperatorX;
Expand Down Expand Up @@ -72,7 +73,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
void _get_partition_by_end();
BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start, BlockRowPos end,
bool need_check_first = false);
bool _has_input_data() {return _output_block_index < _input_blocks.size();}
bool _has_input_data() { return _output_block_index < _input_blocks.size(); }
bool _check_need_block_task();
void _refresh_buffer_and_dependency_state(vectorized::Block* block);
void _reset_state_for_next_partition();
Expand Down
28 changes: 1 addition & 27 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ namespace doris::pipeline {
#include "common/compile_check_begin.h"

AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<AnalyticSharedState>(state, parent)
{}
: PipelineXLocalState<AnalyticSharedState>(state, parent) {}

Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::init(state, info));
Expand All @@ -38,22 +37,12 @@ Status AnalyticLocalState::init(RuntimeState* state, LocalStateInfo& info) {
return Status::OK();
}

Status AnalyticLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::open(state));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
return Status::OK();
}


AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
int operator_id, const DescriptorTbl& descs)
: OperatorX<AnalyticLocalState>(pool, tnode, operator_id, descs) {
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;

}


Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block,
bool* eos) {
RETURN_IF_CANCELLED(state);
Expand Down Expand Up @@ -91,21 +80,6 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
return Status::OK();
}

Status AnalyticLocalState::close(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
if (_closed) {
return Status::OK();
}

// _destroy_agg_status();
// _agg_arena_pool = nullptr;

// std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
// _result_window_columns.swap(tmp_result_window_columns);
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
Expand Down
6 changes: 1 addition & 5 deletions be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@ class RuntimeState;
namespace pipeline {
#include "common/compile_check_begin.h"


class AnalyticSourceOperatorX;
class AnalyticLocalState final : public PipelineXLocalState<AnalyticSharedState> {
public:
ENABLE_FACTORY_CREATOR(AnalyticLocalState);
AnalyticLocalState(RuntimeState* state, OperatorXBase* parent);

Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;

private:
friend class AnalyticSourceOperatorX;
RuntimeProfile::Counter* _get_next_timer = nullptr;
Expand All @@ -52,7 +49,6 @@ class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {

bool is_source() const override { return true; }

// Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
Expand Down

0 comments on commit c2155d6

Please sign in to comment.