Skip to content

Commit

Permalink
Add scanbytes/scanrows condition, add move action in be
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Dec 30, 2023
1 parent e7d67e9 commit ae64718
Show file tree
Hide file tree
Showing 21 changed files with 276 additions and 44 deletions.
4 changes: 1 addition & 3 deletions be/src/agent/workload_sched_policy_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@ void WorkloadschedPolicyListener::handle_topic_info(const std::vector<TopicInfo>
policy_map.emplace(tpolicy.id, std::move(policy_ptr));
}
size_t new_policy_size = policy_map.size();
if (new_policy_size > 0) {
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
}
_exec_env->workload_sched_policy_mgr()->update_workload_sched_policy(std::move(policy_map));
LOG(INFO) << "[workload_schedule]finish update workload schedule policy, size="
<< new_policy_size;
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ std::string PipelineXTask::debug_string() {

void PipelineXTask::wake_up() {
// call by dependency
static_cast<void>(get_task_queue()->push_back(this));
static_cast<void>(query_context()->get_exec_task_queue()->push_back(this));
}

} // namespace doris::pipeline
15 changes: 13 additions & 2 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
auto task = *task_itr;
task->set_state(t_state);
local_tasks.erase(task_itr++);
static_cast<void>(task->get_task_queue()->push_back(task));
static_cast<void>(task->query_context()->get_exec_task_queue()->push_back(task));
}

TaskScheduler::~TaskScheduler() {
Expand Down Expand Up @@ -232,6 +232,17 @@ void TaskScheduler::_do_work(size_t index) {
if (!task) {
continue;
}

// query may be migrated between scheduler
if (auto* query_ctx_ptr = task->query_context()) {
if (auto* sched_ptr = query_ctx_ptr->get_task_scheduler()) {
if (sched_ptr->get_wg_id() != this->get_wg_id()) {
static_cast<void>(query_ctx_ptr->get_exec_task_queue()->push_back(task, index));
continue;
}
}
}

if (task->is_pipelineX() && task->is_running()) {
static_cast<void>(_task_queue->push_back(task, index));
continue;
Expand Down Expand Up @@ -336,7 +347,7 @@ void TaskScheduler::_do_work(size_t index) {
break;
case PipelineTaskState::RUNNABLE:
task->set_running(false);
static_cast<void>(_task_queue->push_back(task, index));
static_cast<void>(task->query_context()->get_exec_task_queue()->push_back(task, index));
break;
default:
DCHECK(false) << "error state after run task, " << get_state_name(pipeline_state)
Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ class TaskScheduler {

TaskQueue* task_queue() const { return _task_queue.get(); }

void set_wg_id(uint64_t wg_id) { this->_wg_id = wg_id; }

uint64_t get_wg_id() { return _wg_id; }

private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
std::shared_ptr<TaskQueue> _task_queue;
Expand All @@ -101,6 +105,7 @@ class TaskScheduler {
std::atomic<bool> _shutdown;
std::string _name;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
uint64_t _wg_id = -1;

void _do_work(size_t index);
// after _try_close_task, task maybe destructed.
Expand Down
15 changes: 11 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1571,17 +1571,24 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TPipelineFrag
void FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_info_list) {
{
std::lock_guard<std::mutex> lock(_lock);
// todo: use monotonic time
VecDateTimeValue now = VecDateTimeValue::local_time();
for (const auto& q : _query_ctx_map) {
WorkloadQueryInfo workload_query_info;
workload_query_info.query_id = print_id(q.first);
workload_query_info.tquery_id = q.first;

uint64_t query_time_millisecond = q.second->query_time(now) * 1000;
uint64_t query_time_millisecond = q.second->query_time();
workload_query_info.metric_map.emplace(WorkloadMetricType::QUERY_TIME,
std::to_string(query_time_millisecond));
// todo, add scan rows, scan bytes

int64_t scan_rows = 0;
int64_t scan_bytes = 0;
workload_query_info.metric_map.emplace(WorkloadMetricType::SCAN_ROWS,
std::to_string(scan_rows));
workload_query_info.metric_map.emplace(WorkloadMetricType::SCAN_BYTES,
std::to_string(scan_bytes));

workload_query_info._query_ctx_share_ptr = q.second;

query_info_list->push_back(workload_query_info);
}
}
Expand Down
15 changes: 15 additions & 0 deletions be/src/runtime/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/task_scheduler.h"

namespace doris {

Expand All @@ -38,6 +39,7 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv*
_exec_env(exec_env),
_query_options(query_options) {
_start_time = VecDateTimeValue::local_time();
_monotonic_start_time_ms = MonotonicMillis();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_shared_scanner_controller.reset(new vectorized::SharedScannerController());
_execution_dependency =
Expand Down Expand Up @@ -116,4 +118,17 @@ bool QueryContext::cancel(bool v, std::string msg, Status new_status, int fragme
}
return true;
}

pipeline::TaskQueue* QueryContext::get_exec_task_queue() {
std::shared_lock<std::shared_mutex> read_lock(_exec_task_sched_mutex);
if (_task_scheduler) {
return _task_scheduler->task_queue();
} else if (use_task_group_for_cpu_limit.load()) {
return _exec_env->pipeline_task_group_scheduler()->task_queue();
} else {
// no workload group's task queue found, then we rollback to a common scheduler
return _exec_env->pipeline_task_scheduler()->task_queue();
}
}

} // namespace doris
54 changes: 41 additions & 13 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace doris {

namespace pipeline {
class PipelineFragmentContext;
class TaskQueue;
} // namespace pipeline

struct ReportStatusRequest {
Expand Down Expand Up @@ -93,8 +94,6 @@ class QueryContext {
return false;
}

int64_t query_time(VecDateTimeValue& now) { return now.second_diff(_start_time); }

void set_thread_token(int concurrency, bool is_serial) {
_thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token(
is_serial ? ThreadPool::ExecutionMode::SERIAL
Expand Down Expand Up @@ -151,9 +150,20 @@ class QueryContext {

vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; }

void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
void set_task_group(taskgroup::TaskGroupPtr& tg) {
std::lock_guard<std::shared_mutex> write_lock(_task_group_lock);
_task_group = tg;
}

taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); }
taskgroup::TaskGroup* get_task_group() {
std::shared_lock<std::shared_mutex> read_lock(_task_group_lock);
return _task_group.get();
}

taskgroup::TaskGroupPtr get_task_group_shared_ptr() {
std::shared_lock<std::shared_mutex> read_lock(_task_group_lock);
return _task_group;
}

int execution_timeout() const {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
Expand Down Expand Up @@ -194,20 +204,33 @@ class QueryContext {

TUniqueId query_id() const { return _query_id; }

void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) {
_task_scheduler = task_scheduler;
void set_task_scheduler(std::shared_ptr<pipeline::TaskScheduler>* task_scheduler) {
std::lock_guard<std::shared_mutex> write_lock(_exec_task_sched_mutex);
_task_scheduler = *task_scheduler;
}

pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; }
pipeline::TaskScheduler* get_task_scheduler() {
std::shared_lock<std::shared_mutex> read_lock(_exec_task_sched_mutex);
return _task_scheduler.get();
}

pipeline::TaskQueue* get_exec_task_queue();

void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* scan_task_scheduler) {
_scan_task_scheduler = scan_task_scheduler;
void set_scan_task_scheduler(
std::shared_ptr<vectorized::SimplifiedScanScheduler>* scan_task_scheduler) {
std::lock_guard<std::shared_mutex> write_lock(_scan_task_sched_mutex);
_scan_task_scheduler = *scan_task_scheduler;
}

vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
vectorized::SimplifiedScanScheduler* get_scan_scheduler() {
std::shared_lock<std::shared_mutex> read_lock(_scan_task_sched_mutex);
return _scan_task_scheduler.get();
}

pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); }

int64_t query_time() { return MonotonicMillis() - _monotonic_start_time_ms; }

public:
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
Expand Down Expand Up @@ -243,6 +266,7 @@ class QueryContext {
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
VecDateTimeValue _start_time;
int64_t _monotonic_start_time_ms;

// A token used to submit olap scanner to the "_limited_scan_thread_pool",
// This thread pool token is created from "_limited_scan_thread_pool" from exec env.
Expand All @@ -262,7 +286,8 @@ class QueryContext {
std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller;
vectorized::RuntimePredicate _runtime_predicate;

taskgroup::TaskGroupPtr _task_group;
std::shared_mutex _task_group_lock;
taskgroup::TaskGroupPtr _task_group = nullptr;
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
const TQueryOptions _query_options;

Expand All @@ -271,8 +296,11 @@ class QueryContext {
// to report the real message if failed.
Status _exec_status = Status::OK();

pipeline::TaskScheduler* _task_scheduler = nullptr;
vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
std::shared_mutex _exec_task_sched_mutex;
std::shared_ptr<pipeline::TaskScheduler> _task_scheduler = nullptr;
std::shared_mutex _scan_task_sched_mutex;
std::shared_ptr<vectorized::SimplifiedScanScheduler> _scan_task_scheduler = nullptr;

std::unique_ptr<pipeline::Dependency> _execution_dependency;
};

Expand Down
26 changes: 21 additions & 5 deletions be/src/runtime/task_group/task_group_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
}

bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) {
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
query_ctx_ptr->set_task_scheduler(&(_tg_sche_map.at(tg_id)));
} else {
return false;
}

if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_map.at(tg_id).get());
query_ctx_ptr->set_scan_task_scheduler(&(_tg_scan_sche_map.at(tg_id)));
} else {
return false;
}
Expand Down Expand Up @@ -114,10 +114,11 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
}
auto task_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);

auto pipeline_task_scheduler = std::make_unique<pipeline::TaskScheduler>(
auto pipeline_task_scheduler = std::make_shared<pipeline::TaskScheduler>(
exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue),
"Exec_" + tg_name, cg_cu_ctl_ptr);
Status ret = pipeline_task_scheduler->start();
pipeline_task_scheduler->set_wg_id(tg_id);
if (ret.ok()) {
_tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
} else {
Expand All @@ -128,7 +129,7 @@ Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
// step 3: init scan scheduler
if (_tg_scan_sche_map.find(tg_id) == _tg_scan_sche_map.end()) {
auto scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>(tg_name, cg_cu_ctl_ptr);
std::make_shared<vectorized::SimplifiedScanScheduler>(tg_name, cg_cu_ctl_ptr);
Status ret = scan_scheduler->start();
if (ret.ok()) {
_tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
Expand Down Expand Up @@ -237,6 +238,21 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
LOG(INFO) << "finish clear unused cgroup path";
}

bool TaskGroupManager::migrate_memory_tracker_to_group(
std::shared_ptr<MemTrackerLimiter> mem_tracker, uint64_t src_group_id,
uint64_t dst_group_id, std::shared_ptr<taskgroup::TaskGroup>* dst_group_ptr) {
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
if (_task_groups.find(src_group_id) == _task_groups.end() ||
_task_groups.find(dst_group_id) == _task_groups.end()) {
return false;
}
_task_groups[src_group_id]->remove_mem_tracker_limiter(mem_tracker);
*dst_group_ptr = _task_groups[dst_group_id];
(*dst_group_ptr)->add_mem_tracker_limiter(mem_tracker);

return true;
}

void TaskGroupManager::stop() {
for (auto& task_sche : _tg_sche_map) {
task_sche.second->stop();
Expand Down
8 changes: 6 additions & 2 deletions be/src/runtime/task_group/task_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,19 @@ class TaskGroupManager {
// doris task group only support cpu soft limit
bool enable_cgroup() { return enable_cpu_hard_limit() || config::enable_cgroup_cpu_soft_limit; }

bool migrate_memory_tracker_to_group(std::shared_ptr<MemTrackerLimiter> mem_tracker,
uint64_t src_group_id, uint64_t dst_group_id,
std::shared_ptr<taskgroup::TaskGroup>* dst_group_ptr);

private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;

// map for workload group id and task scheduler pool
// used for cpu hard limit
std::shared_mutex _task_scheduler_lock;
std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>> _tg_sche_map;
std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>> _tg_scan_sche_map;
std::map<uint64_t, std::shared_ptr<doris::pipeline::TaskScheduler>> _tg_sche_map;
std::map<uint64_t, std::shared_ptr<vectorized::SimplifiedScanScheduler>> _tg_scan_sche_map;
std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;

std::shared_mutex _init_cg_ctl_lock;
Expand Down
31 changes: 30 additions & 1 deletion be/src/runtime/workload_management/workload_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "runtime/workload_management/workload_action.h"

#include "runtime/fragment_mgr.h"
#include "runtime/task_group/task_group_manager.h"

namespace doris {

Expand All @@ -29,7 +30,35 @@ void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
}

void WorkloadActionMoveQuery::exec(WorkloadQueryInfo* query_info) {
LOG(INFO) << "[workload_schedule]move query action run group=" << _wg_name;
QueryContext* query_ctx_ptr = query_info->_query_ctx_share_ptr.get();

taskgroup::TaskGroup* cur_task_group = query_ctx_ptr->get_task_group();
// todo(wb) maybe we can support move a query with no group to a group
if (!cur_task_group) {
LOG(INFO) << "[workload_schedule] no workload group for current query "
<< query_info->query_id << " skip move action";
return;
}
uint64_t cur_group_id = cur_task_group->id();
std::shared_ptr<taskgroup::TaskGroup> dst_group = nullptr;
bool move_mem_ret =
ExecEnv::GetInstance()->task_group_manager()->migrate_memory_tracker_to_group(
query_ctx_ptr->query_mem_tracker, cur_group_id, _wg_id, &dst_group);

bool move_cpu_ret =
ExecEnv::GetInstance()->task_group_manager()->set_cg_task_sche_for_query_ctx(
_wg_id, query_ctx_ptr);

if (move_mem_ret && move_cpu_ret) {
query_ctx_ptr->set_task_group(dst_group);
LOG(INFO) << "[workload_schedule]move query " << query_info->query_id << " from "
<< cur_group_id << " to " << _wg_id << " success";
} else {
LOG(INFO) << "[workload_schedule]move query " << query_info->query_id << " from "
<< cur_group_id << " to " << _wg_id
<< " failed, move memory ret=" << ((int)move_mem_ret)
<< ", move cpu ret=" << ((int)move_cpu_ret);
}
};

} // namespace doris
Loading

0 comments on commit ae64718

Please sign in to comment.