Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](exectuor)Add scanbytes/scanrows condition, add move action in be #29337

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions be/src/agent/workload_sched_policy_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ void WorkloadschedPolicyListener::handle_topic_info(const std::vector<TopicInfo>
policy_map.emplace(tpolicy.id, std::move(policy_ptr));
}
size_t new_policy_size = policy_map.size();
if (new_policy_size > 0) {
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
}
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
LOG(INFO) << "[workload_schedule]finish update workload schedule policy, size="
<< new_policy_size;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ std::string PipelineXTask::debug_string() {

void PipelineXTask::wake_up() {
// call by dependency
static_cast<void>(get_task_queue()->push_back(this));
static_cast<void>(query_context()->get_exec_task_queue()->push_back(this));
}

} // namespace doris::pipeline
15 changes: 13 additions & 2 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
auto task = *task_itr;
task->set_state(t_state);
local_tasks.erase(task_itr++);
static_cast<void>(task->get_task_queue()->push_back(task));
static_cast<void>(task->query_context()->get_exec_task_queue()->push_back(task));
}

TaskScheduler::~TaskScheduler() {
Expand Down Expand Up @@ -232,6 +232,17 @@ void TaskScheduler::_do_work(size_t index) {
if (!task) {
continue;
}

// query may be migrated between scheduler
if (auto* query_ctx_ptr = task->query_context()) {
if (auto* sched_ptr = query_ctx_ptr->get_task_scheduler()) {
if (sched_ptr->get_wg_id() != this->get_wg_id()) {
static_cast<void>(query_ctx_ptr->get_exec_task_queue()->push_back(task, index));
continue;
}
}
}

if (task->is_pipelineX() && task->is_running()) {
static_cast<void>(_task_queue->push_back(task, index));
continue;
Expand Down Expand Up @@ -336,7 +347,7 @@ void TaskScheduler::_do_work(size_t index) {
break;
case PipelineTaskState::RUNNABLE:
task->set_running(false);
static_cast<void>(_task_queue->push_back(task, index));
static_cast<void>(task->query_context()->get_exec_task_queue()->push_back(task, index));
break;
default:
DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state)
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class TaskScheduler {

TaskQueue* task_queue() const { return _task_queue.get(); }

void set_wg_id(uint64_t wg_id) { this->_wg_id = wg_id; }

uint64_t get_wg_id() const { return _wg_id; }

private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
std::shared_ptr<TaskQueue> _task_queue;
Expand All @@ -101,6 +105,7 @@ class TaskScheduler {
std::atomic<bool> _shutdown;
std::string _name;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
uint64_t _wg_id = -1;

void _do_work(size_t index);
// after _try_close_task, task maybe destructed.
Expand Down
15 changes: 11 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1571,17 +1571,24 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) {
{
std::lock_guard<std::mutex> lock(_lock);
// todo: use monotonic time
VecDateTimeValue now = VecDateTimeValue::local_time();
for (const auto& q : _query_ctx_map) {
WorkloadQueryInfo workload_query_info;
workload_query_info.query_id = print_id(q.first);
workload_query_info.tquery_id = q.first;

uint64_t query_time_millisecond = q.second->query_time(now) * 1000;
uint64_t query_time_millisecond = q.second->query_time();
workload_query_info.metric_map.emplace(WorkloadMetricType::QUERY_TIME,
std::to_string(query_time_millisecond));
// todo, add scan rows, scan bytes

int64_t scan_rows = 0;
int64_t scan_bytes = 0;
workload_query_info.metric_map.emplace(WorkloadMetricType::SCAN_ROWS,
std::to_string(scan_rows));
workload_query_info.metric_map.emplace(WorkloadMetricType::SCAN_BYTES,
std::to_string(scan_bytes));

workload_query_info._query_ctx_share_ptr = q.second;

query_info_list->push_back(workload_query_info);
}
}
Expand Down
15 changes: 15 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/task_scheduler.h"

namespace doris {

Expand All @@ -38,6 +39,7 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
_exec_env(exec_env),
_query_options(query_options) {
_start_time = VecDateTimeValue::local_time();
_monotonic_start_time_ms = MonotonicMillis();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
_execution_dependency =
Expand Down Expand Up @@ -116,4 +118,17 @@ bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragme
}
return true;
}

pipeline::TaskQueue* QueryContext::get_exec_task_queue() {
std::shared_lock<std::shared_mutex> read_lock(_exec_task_sched_mutex);
if (_task_scheduler) {
return _task_scheduler->task_queue();
} else if (use_task_group_for_cpu_limit.load()) {
return _exec_env->pipeline_task_group_scheduler()->task_queue();
} else {
// no workload group's task queue found, then we rollback to a common scheduler
return _exec_env->pipeline_task_scheduler()->task_queue();
}
}

} // namespace doris
54 changes: 41 additions & 13 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace doris {

namespace pipeline {
class PipelineFragmentContext;
class TaskQueue;
} // namespace pipeline

struct ReportStatusRequest {
Expand Down Expand Up @@ -93,8 +94,6 @@ class QueryContext {
return false;
}

int64_t query_time(VecDateTimeValue& now) { return now.second_diff(_start_time); }

void set_thread_token(int concurrency, bool is_serial) {
_thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token(
is_serial ? ThreadPool::ExecutionMode::SERIAL
Expand Down Expand Up @@ -151,9 +150,20 @@ class QueryContext {

vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; }

void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
void set_task_group(taskgroup::TaskGroupPtr& tg) {
std::lock_guard<std::shared_mutex> write_lock(_task_group_lock);
_task_group = tg;
}

taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); }
taskgroup::TaskGroup* get_task_group() {
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
std::shared_lock<std::shared_mutex> read_lock(_task_group_lock);
return _task_group.get();
}

taskgroup::TaskGroupPtr get_task_group_shared_ptr() {
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
std::shared_lock<std::shared_mutex> read_lock(_task_group_lock);
return _task_group;
}

int execution_timeout() const {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
Expand Down Expand Up @@ -194,20 +204,33 @@ class QueryContext {

TUniqueId query_id() const { return _query_id; }

void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) {
_task_scheduler = task_scheduler;
void set_task_scheduler(std::shared_ptr<pipeline::TaskScheduler>* task_scheduler) {
std::lock_guard<std::shared_mutex> write_lock(_exec_task_sched_mutex);
_task_scheduler = *task_scheduler;
}

pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; }
pipeline::TaskScheduler* get_task_scheduler() {
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
std::shared_lock<std::shared_mutex> read_lock(_exec_task_sched_mutex);
return _task_scheduler.get();
}

pipeline::TaskQueue* get_exec_task_queue();

void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* scan_task_scheduler) {
_scan_task_scheduler = scan_task_scheduler;
void set_scan_task_scheduler(
std::shared_ptr<vectorized::SimplifiedScanScheduler>* scan_task_scheduler) {
std::lock_guard<std::shared_mutex> write_lock(_scan_task_sched_mutex);
_scan_task_scheduler = *scan_task_scheduler;
}

vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
vectorized::SimplifiedScanScheduler* get_scan_scheduler() {
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
std::shared_lock<std::shared_mutex> read_lock(_scan_task_sched_mutex);
return _scan_task_scheduler.get();
}

pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); }

int64_t query_time() { return MonotonicMillis() - _monotonic_start_time_ms; }
yiguolei marked this conversation as resolved.
Show resolved Hide resolved

public:
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
Expand Down Expand Up @@ -243,6 +266,7 @@ class QueryContext {
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
VecDateTimeValue _start_time;
int64_t _monotonic_start_time_ms;

// A token used to submit olap scanner to the "_limited_scan_thread_pool",
// This thread pool token is created from "_limited_scan_thread_pool" from exec env.
Expand All @@ -262,7 +286,8 @@ class QueryContext {
std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller;
vectorized::RuntimePredicate _runtime_predicate;

taskgroup::TaskGroupPtr _task_group;
std::shared_mutex _task_group_lock;
taskgroup::TaskGroupPtr _task_group = nullptr;
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
const TQueryOptions _query_options;

Expand All @@ -271,8 +296,11 @@ class QueryContext {
// to report the real message if failed.
Status _exec_status = Status::OK();

pipeline::TaskScheduler* _task_scheduler = nullptr;
vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
std::shared_mutex _exec_task_sched_mutex;
std::shared_ptr<pipeline::TaskScheduler> _task_scheduler = nullptr;
std::shared_mutex _scan_task_sched_mutex;
std::shared_ptr<vectorized::SimplifiedScanScheduler> _scan_task_scheduler = nullptr;

std::unique_ptr<pipeline::Dependency> _execution_dependency;
};

Expand Down
26 changes: 21 additions & 5 deletions be/src/runtime/task_group/task_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
}

bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) {
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
query_ctx_ptr->set_task_scheduler(&(_tg_sche_map.at(tg_id)));
} else {
return false;
}

if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_map.at(tg_id).get());
query_ctx_ptr->set_scan_task_scheduler(&(_tg_scan_sche_map.at(tg_id)));
} else {
return false;
}
Expand Down Expand Up @@ -114,10 +114,11 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
}
auto task_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);

auto pipeline_task_scheduler = std::make_unique<pipeline::TaskScheduler>(
auto pipeline_task_scheduler = std::make_shared<pipeline::TaskScheduler>(
exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue),
"Exec_" + tg_name, cg_cu_ctl_ptr);
Status ret = pipeline_task_scheduler->start();
pipeline_task_scheduler->set_wg_id(tg_id);
if (ret.ok()) {
_tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
} else {
Expand All @@ -128,7 +129,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
// step 3: init scan scheduler
if (_tg_scan_sche_map.find(tg_id) == _tg_scan_sche_map.end()) {
auto scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>(tg_name, cg_cu_ctl_ptr);
std::make_shared<vectorized::SimplifiedScanScheduler>(tg_name, cg_cu_ctl_ptr);
Status ret = scan_scheduler->start();
if (ret.ok()) {
_tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
Expand Down Expand Up @@ -237,6 +238,21 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
LOG(INFO) << "finish clear unused cgroup path";
}

bool TaskGroupManager::migrate_memory_tracker_to_group(
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<MemTrackerLimiter> mem_tracker, uint64_t src_group_id,
uint64_t dst_group_id, std::shared_ptr<taskgroup::TaskGroup>* dst_group_ptr) {
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
if (_task_groups.find(src_group_id) == _task_groups.end() ||
_task_groups.find(dst_group_id) == _task_groups.end()) {
return false;
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
}
_task_groups[src_group_id]->remove_mem_tracker_limiter(mem_tracker);
*dst_group_ptr = _task_groups[dst_group_id];
(*dst_group_ptr)->add_mem_tracker_limiter(mem_tracker);

return true;
}

void TaskGroupManager::stop() {
for (auto& task_sche : _tg_sche_map) {
task_sche.second->stop();
Expand Down
8 changes: 6 additions & 2 deletions be/src/runtime/task_group/task_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,19 @@ class TaskGroupManager {
// doris task group only support cpu soft limit
bool enable_cgroup() { return enable_cpu_hard_limit() || config::enable_cgroup_cpu_soft_limit; }

bool migrate_memory_tracker_to_group(std::shared_ptr<MemTrackerLimiter> mem_tracker,
uint64_t src_group_id, uint64_t dst_group_id,
std::shared_ptr<taskgroup::TaskGroup>* dst_group_ptr);

private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;

// map for workload group id and task scheduler pool
// used for cpu hard limit
std::shared_mutex _task_scheduler_lock;
std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>> _tg_sche_map;
std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>> _tg_scan_sche_map;
std::map<uint64_t, std::shared_ptr<doris::pipeline::TaskScheduler>> _tg_sche_map;
std::map<uint64_t, std::shared_ptr<vectorized::SimplifiedScanScheduler>> _tg_scan_sche_map;
std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;

std::shared_mutex _init_cg_ctl_lock;
Expand Down
31 changes: 30 additions & 1 deletion be/src/runtime/workload_management/workload_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "runtime/workload_management/workload_action.h"

#include "runtime/fragment_mgr.h"
#include "runtime/task_group/task_group_manager.h"

namespace doris {

Expand All @@ -29,7 +30,35 @@ void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
}

void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
LOG(INFO) << "[workload_schedule]move query action run group=" << _wg_name;
QueryContext* query_ctx_ptr = query_info->_query_ctx_share_ptr.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果再执行这个exec 期间,query 结束了,quer context 析构了,怎么版?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queryctx是存成一个sharedptr到WorkloadQueryInfo里了,WorkloadQueryInfo的生命周期是本次调度策略跑完所有的查询才会结束。不过这么一说感觉也有问题,queryctx可能析构的会很晚。感觉可以在每个查询策略匹配完之后就析构WorkloadQueryInfo


taskgroup::TaskGroup* cur_task_group = query_ctx_ptr->get_task_group();
// todo(wb) maybe we can support move a query with no group to a group
if (!cur_task_group) {
LOG(INFO) << "[workload_schedule] no workload group for current query "
<< query_info->query_id << " skip move action";
return;
}
uint64_t cur_group_id = cur_task_group->id();
std::shared_ptr<taskgroup::TaskGroup> dst_group = nullptr;
bool move_mem_ret =
ExecEnv::GetInstance()->task_group_manager()->migrate_memory_tracker_to_group(
query_ctx_ptr->query_mem_tracker, cur_group_id, _wg_id, &dst_group);

bool move_cpu_ret =
ExecEnv::GetInstance()->task_group_manager()->set_cg_task_sche_for_query_ctx(
_wg_id, query_ctx_ptr);

if (move_mem_ret && move_cpu_ret) {
query_ctx_ptr->set_task_group(dst_group);
LOG(INFO) << "[workload_schedule]move query " << query_info->query_id << " from "
<< cur_group_id << " to " << _wg_id << " success";
} else {
LOG(INFO) << "[workload_schedule]move query " << query_info->query_id << " from "
<< cur_group_id << " to " << _wg_id
<< " failed, move memory ret=" << ((int)move_mem_ret)
<< ", move cpu ret=" << ((int)move_cpu_ret);
}
};

} // namespace doris
Loading
Loading