Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor](exec) Remove unless code and add comment #46503

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
std::vector<std::shared_ptr<Pipeline>> _children;

PipelineId _pipeline_id;
int _previous_schedule_id = -1;

// pipline id + operator names. init when:
// build_operators(), if pipeline;
Expand Down
20 changes: 6 additions & 14 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,14 @@ class PipelineTask {

QueryContext* query_context();

int get_previous_core_id() const {
return _previous_schedule_id != -1 ? _previous_schedule_id
: _pipeline->_previous_schedule_id;
}
int get_core_id() const { return _core_id; }

void set_previous_core_id(int id) {
if (id != _previous_schedule_id) {
if (_previous_schedule_id != -1) {
void set_core_id(int id) {
if (id != _core_id) {
if (_core_id != -1) {
COUNTER_UPDATE(_core_change_times, 1);
}
_previous_schedule_id = id;
_core_id = id;
}
}

Expand Down Expand Up @@ -175,10 +172,6 @@ class PipelineTask {
void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
int get_queue_level() const { return this->_queue_level; }

// 1.3 priority queue's core id
void set_core_id(int core_id) { this->_core_id = core_id; }
int get_core_id() const { return this->_core_id; }

/**
* Return true if:
* 1. `enable_force_spill` is true which forces this task to spill data.
Expand Down Expand Up @@ -254,7 +247,7 @@ class PipelineTask {
bool _has_exceed_timeout = false;
bool _opened;
RuntimeState* _state = nullptr;
int _previous_schedule_id = -1;
int _core_id = -1;
uint32_t _schedule_time = 0;
std::unique_ptr<doris::vectorized::Block> _block;
PipelineFragmentContext* _fragment_context = nullptr;
Expand All @@ -269,7 +262,6 @@ class PipelineTask {
// 2 exe task
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
int _core_id = 0;

RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
Expand Down
5 changes: 1 addition & 4 deletions be/src/pipeline/task_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
<< " _core_size: " << _core_size << " _next_core: " << _next_core.load();
task = _prio_task_queues[core_id].try_take(false);
if (task) {
task->set_core_id(core_id);
break;
}
task = _steal_take(core_id);
Expand All @@ -162,7 +161,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) {
}
task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
break;
}
}
Expand All @@ -183,15 +181,14 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) {
DCHECK(next_id < _core_size);
auto task = _prio_task_queues[next_id].try_take(true);
if (task) {
task->set_core_id(next_id);
return task;
}
}
return nullptr;
}

Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
int core_id = task->get_previous_core_id();
int core_id = task->get_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
}
Expand Down
22 changes: 8 additions & 14 deletions be/src/pipeline/task_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@

#include "common/logging.h"
#include "pipeline/pipeline_task.h"
#include "pipeline/task_queue.h"
#include "pipeline_fragment_context.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
Expand Down Expand Up @@ -103,6 +102,9 @@ void TaskScheduler::_do_work(int index) {
if (!task) {
continue;
}
// The task is already running, maybe block in now dependency wake up by other thread
// but the block thread still hold the task, so put it back to the queue, until the hold
// thread set task->set_running(false)
if (task->is_running()) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
Expand All @@ -129,12 +131,8 @@ void TaskScheduler::_do_work(int index) {
// task exec
bool eos = false;
auto status = Status::OK();
task->set_core_id(index);

#ifdef __APPLE__
uint32_t core_id = 0;
#else
uint32_t core_id = sched_getcpu();
#endif
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
//TODO: use a better enclose to abstracting these
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
Expand All @@ -149,12 +147,11 @@ void TaskScheduler::_do_work(int index) {

uint64_t end_time = MonotonicMicros();
ExecEnv::GetInstance()->pipeline_tracer_context()->record(
{query_id, task_name, core_id, thread_id, start_time, end_time});
{query_id, task_name, static_cast<uint32_t>(index), thread_id,
start_time, end_time});
} else { status = task->execute(&eos); },
status);

task->set_previous_core_id(index);

if (!status.ok()) {
// Print detail informations below when you debugging here.
//
Expand All @@ -173,14 +170,11 @@ void TaskScheduler::_do_work(int index) {
if (eos) {
// is pending finish will add the task to dependency's blocking queue, and then the task will be
// added to running queue when dependency is ready.
if (task->is_pending_finish()) {
// Only meet eos, should set task to PENDING_FINISH state
task->set_running(false);
} else {
if (!task->is_pending_finish()) {
Status exec_status = fragment_ctx->get_query_ctx()->exec_status();
_close_task(task, exec_status);
continue;
}
continue;
}

task->set_running(false);
Expand Down
54 changes: 0 additions & 54 deletions be/src/vec/runtime/vdatetime_value.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2845,40 +2845,6 @@ int date_day_offset_dict::daynr(int year, int month, int day) const {
return DATE_DAY_OFFSET_DICT[year - START_YEAR][month - 1][day - 1];
}

template <typename T>
uint32_t DateV2Value<T>::set_date_uint32(uint32_t int_val) {
union DateV2UInt32Union {
DateV2Value<T> dt;
uint32_t ui32;
~DateV2UInt32Union() {}
};
DateV2UInt32Union conv = {.ui32 = int_val};
if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0)) {
return 0;
}
this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0);

return int_val;
}

template <typename T>
uint64_t DateV2Value<T>::set_datetime_uint64(uint64_t int_val) {
union DateTimeV2UInt64Union {
DateV2Value<T> dt;
uint64_t ui64;
~DateTimeV2UInt64Union() {}
};
DateTimeV2UInt64Union conv = {.ui64 = int_val};
if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(), conv.dt.minute(),
conv.dt.second(), conv.dt.microsecond())) {
return 0;
}
this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(),
conv.dt.minute(), conv.dt.second(), conv.dt.microsecond());

return int_val;
}

template <typename T>
uint8_t DateV2Value<T>::week(uint8_t mode) const {
uint16_t year = 0;
Expand Down Expand Up @@ -3707,26 +3673,6 @@ bool DateV2Value<T>::to_format_string_conservative(const char* format, size_t le
return true;
}

template <typename T>
bool DateV2Value<T>::from_date(uint32_t value) {
DCHECK(!is_datetime);
if (value < MIN_DATE_V2 || value > MAX_DATE_V2) {
return false;
}

return set_date_uint32(value);
}

template <typename T>
bool DateV2Value<T>::from_datetime(uint64_t value) {
DCHECK(is_datetime);
if (value < MIN_DATETIME_V2 || value > MAX_DATETIME_V2) {
return false;
}

return set_datetime_uint64(value);
}

template <typename T>
int64_t DateV2Value<T>::standardize_timevalue(int64_t value) {
if (value <= 0) {
Expand Down
5 changes: 0 additions & 5 deletions be/src/vec/runtime/vdatetime_value.h
Original file line number Diff line number Diff line change
Expand Up @@ -1169,12 +1169,7 @@ class DateV2Value {

underlying_value to_date_int_val() const { return int_val_; }

bool from_date(uint32_t value);
bool from_datetime(uint64_t value);

bool from_date_int64(int64_t value);
uint32_t set_date_uint32(uint32_t int_val);
uint64_t set_datetime_uint64(uint64_t int_val);

bool get_date_from_daynr(uint64_t);

Expand Down
Loading