Skip to content

Commit

Permalink
[minor](context) duplicate query context in fragment ctx (#29364)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: yiguolei <[email protected]>
  • Loading branch information
yiguolei and Doris-Extras authored Jan 1, 2024
1 parent 216b530 commit 738abac
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 12 deletions.
4 changes: 1 addition & 3 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
return _runtime_state->runtime_filter_mgr();
}

QueryContext* get_query_ctx() { return _runtime_state->get_query_ctx(); }
QueryContext* get_query_ctx() { return _query_ctx.get(); }
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return _runtime_state->is_cancelled(); }

Expand All @@ -108,8 +108,6 @@ class PipelineFragmentContext : public TaskExecutionContext {

// TODO: Support pipeline runtime filter

QueryContext* get_query_context() { return _query_ctx.get(); }

TUniqueId get_query_id() const { return _query_id; }

[[nodiscard]] int get_fragment_id() const { return _fragment_id; }
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ Status PipelineTask::close(Status exec_status) {
}

QueryContext* PipelineTask::query_context() {
return _fragment_context->get_query_context();
return _fragment_context->get_query_ctx();
}

// The FSM see PipelineTaskState's comment
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ void TaskScheduler::_do_work(size_t index) {
if (state == PipelineTaskState::PENDING_FINISH) {
DCHECK(task->is_pipelineX() || !task->is_pending_finish())
<< "must not pending close " << task->debug_string();
Status exec_status = fragment_ctx->get_query_context()->exec_status();
Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
_try_close_task(task,
canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED,
exec_status);
Expand All @@ -264,7 +264,7 @@ void TaskScheduler::_do_work(size_t index) {
// If pipeline is canceled, it will report after pipeline closed, and will propagate
// errors to downstream through exchange. So, here we needn't send_report.
// fragment_ctx->send_report(true);
Status cancel_status = fragment_ctx->get_query_context()->exec_status();
Status cancel_status = fragment_ctx->get_query_ctx()->exec_status();
_try_close_task(task, PipelineTaskState::CANCELED, cancel_status);
continue;
}
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -588,9 +588,6 @@ void ExecEnv::destroy() {
SAFE_DELETE(_scanner_scheduler);
// _storage_page_cache must be destoried before _cache_manager
SAFE_DELETE(_storage_page_cache);
// cache_manager must be destoried after _inverted_index_query_cache
// https://github.com/apache/doris/issues/24082#issuecomment-1712544039
SAFE_DELETE(_cache_manager);

SAFE_DELETE(_small_file_mgr);
SAFE_DELETE(_broker_mgr);
Expand Down Expand Up @@ -629,6 +626,10 @@ void ExecEnv::destroy() {
SAFE_DELETE(_external_scan_context_mgr);
SAFE_DELETE(_user_function_cache);

// cache_manager must be destoried after _inverted_index_query_cache
// https://github.com/apache/doris/issues/24082#issuecomment-1712544039
SAFE_DELETE(_cache_manager);

// _heartbeat_flags must be destoried after staroge engine
SAFE_DELETE(_heartbeat_flags);

Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ void FragmentMgr::remove_pipeline_context(
std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
std::lock_guard<std::mutex> lock(_lock);
auto query_id = f_context->get_query_id();
auto* q_context = f_context->get_query_context();
auto* q_context = f_context->get_query_ctx();
std::vector<TUniqueId> ins_ids;
f_context->instance_ids(ins_ids);
bool all_done = q_context->countdown(ins_ids.size());
Expand Down Expand Up @@ -1413,7 +1413,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,

DCHECK(pip_context != nullptr);
runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr();
pool = &pip_context->get_query_context()->obj_pool;
pool = &pip_context->get_query_ctx()->obj_pool;
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_instance_map.find(tfragment_instance_id);
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,6 @@ Status NewOlapScanner::close(RuntimeState* state) {
// so that it will core
_tablet_reader_params.rs_splits.clear();
_tablet_reader.reset();
LOG(INFO) << "close_tablet_id" << _tablet_reader_params.tablet->tablet_id();
RETURN_IF_ERROR(VScanner::close(state));
return Status::OK();
}
Expand Down

0 comments on commit 738abac

Please sign in to comment.