diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 5009bde9b20e48..2a3a11d59cc107 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -83,7 +83,7 @@ class PipelineFragmentContext : public TaskExecutionContext { return _runtime_state->runtime_filter_mgr(); } - QueryContext* get_query_ctx() { return _runtime_state->get_query_ctx(); } + QueryContext* get_query_ctx() { return _query_ctx.get(); } // should be protected by lock? [[nodiscard]] bool is_canceled() const { return _runtime_state->is_cancelled(); } @@ -108,8 +108,6 @@ class PipelineFragmentContext : public TaskExecutionContext { // TODO: Support pipeline runtime filter - QueryContext* get_query_context() { return _query_ctx.get(); } - TUniqueId get_query_id() const { return _query_id; } [[nodiscard]] int get_fragment_id() const { return _fragment_id; } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 75694373a4ce47..5f5ff56aa4bfa7 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -357,7 +357,7 @@ Status PipelineTask::close(Status exec_status) { } QueryContext* PipelineTask::query_context() { - return _fragment_context->get_query_context(); + return _fragment_context->get_query_ctx(); } // The FSM see PipelineTaskState's comment diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e0e668acbd9455..e814e4cdf2d092 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -247,7 +247,7 @@ void TaskScheduler::_do_work(size_t index) { if (state == PipelineTaskState::PENDING_FINISH) { DCHECK(task->is_pipelineX() || !task->is_pending_finish()) << "must not pending close " << task->debug_string(); - Status exec_status = fragment_ctx->get_query_context()->exec_status(); + Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); _try_close_task(task, canceled ? PipelineTaskState::CANCELED : PipelineTaskState::FINISHED, exec_status); @@ -264,7 +264,7 @@ void TaskScheduler::_do_work(size_t index) { // If pipeline is canceled, it will report after pipeline closed, and will propagate // errors to downstream through exchange. So, here we needn't send_report. // fragment_ctx->send_report(true); - Status cancel_status = fragment_ctx->get_query_context()->exec_status(); + Status cancel_status = fragment_ctx->get_query_ctx()->exec_status(); _try_close_task(task, PipelineTaskState::CANCELED, cancel_status); continue; } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 508ea7c7921011..2a1233c9b5d6b1 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -588,9 +588,6 @@ void ExecEnv::destroy() { SAFE_DELETE(_scanner_scheduler); // _storage_page_cache must be destoried before _cache_manager SAFE_DELETE(_storage_page_cache); - // cache_manager must be destoried after _inverted_index_query_cache - // https://github.com/apache/doris/issues/24082#issuecomment-1712544039 - SAFE_DELETE(_cache_manager); SAFE_DELETE(_small_file_mgr); SAFE_DELETE(_broker_mgr); @@ -629,6 +626,10 @@ void ExecEnv::destroy() { SAFE_DELETE(_external_scan_context_mgr); SAFE_DELETE(_user_function_cache); + // cache_manager must be destoried after _inverted_index_query_cache + // https://github.com/apache/doris/issues/24082#issuecomment-1712544039 + SAFE_DELETE(_cache_manager); + // _heartbeat_flags must be destoried after staroge engine SAFE_DELETE(_heartbeat_flags); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9256c3bccfddef..d265b5e35b5203 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -576,7 +576,7 @@ void FragmentMgr::remove_pipeline_context( std::shared_ptr f_context) { std::lock_guard lock(_lock); auto query_id = f_context->get_query_id(); - auto* q_context = f_context->get_query_context(); + auto* q_context = f_context->get_query_ctx(); std::vector ins_ids; f_context->instance_ids(ins_ids); bool all_done = q_context->countdown(ins_ids.size()); @@ -1413,7 +1413,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, DCHECK(pip_context != nullptr); runtime_filter_mgr = pip_context->get_query_ctx()->runtime_filter_mgr(); - pool = &pip_context->get_query_context()->obj_pool; + pool = &pip_context->get_query_ctx()->obj_pool; } else { std::unique_lock lock(_lock); auto iter = _fragment_instance_map.find(tfragment_instance_id); diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp b/be/src/vec/exec/scan/new_olap_scanner.cpp index 5566abec3ab383..bc15cf7207fb80 100644 --- a/be/src/vec/exec/scan/new_olap_scanner.cpp +++ b/be/src/vec/exec/scan/new_olap_scanner.cpp @@ -524,7 +524,6 @@ Status NewOlapScanner::close(RuntimeState* state) { // so that it will core _tablet_reader_params.rs_splits.clear(); _tablet_reader.reset(); - LOG(INFO) << "close_tablet_id" << _tablet_reader_params.tablet->tablet_id(); RETURN_IF_ERROR(VScanner::close(state)); return Status::OK(); }