diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index afbe6c77596432..7bde9323e94617 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -148,7 +148,6 @@ class Pipeline : public std::enable_shared_from_this { std::vector> _children; PipelineId _pipeline_id; - int _previous_schedule_id = -1; // pipline id + operator names. init when: // build_operators(), if pipeline; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 1a31e5954f479c..6944c5471306aa 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -67,17 +67,14 @@ class PipelineTask { QueryContext* query_context(); - int get_previous_core_id() const { - return _previous_schedule_id != -1 ? _previous_schedule_id - : _pipeline->_previous_schedule_id; - } + int get_core_id() const { return _core_id; } - void set_previous_core_id(int id) { - if (id != _previous_schedule_id) { - if (_previous_schedule_id != -1) { + void set_core_id(int id) { + if (id != _core_id) { + if (_core_id != -1) { COUNTER_UPDATE(_core_change_times, 1); } - _previous_schedule_id = id; + _core_id = id; } } @@ -175,10 +172,6 @@ class PipelineTask { void update_queue_level(int queue_level) { this->_queue_level = queue_level; } int get_queue_level() const { return this->_queue_level; } - // 1.3 priority queue's core id - void set_core_id(int core_id) { this->_core_id = core_id; } - int get_core_id() const { return this->_core_id; } - /** * Return true if: * 1. `enable_force_spill` is true which forces this task to spill data. @@ -254,7 +247,7 @@ class PipelineTask { bool _has_exceed_timeout = false; bool _opened; RuntimeState* _state = nullptr; - int _previous_schedule_id = -1; + int _core_id = -1; uint32_t _schedule_time = 0; std::unique_ptr _block; PipelineFragmentContext* _fragment_context = nullptr; @@ -269,7 +262,6 @@ class PipelineTask { // 2 exe task // 3 update task statistics(update _queue_level/_core_id) int _queue_level = 0; - int _core_id = 0; RuntimeProfile* _parent_profile = nullptr; std::unique_ptr _task_profile; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index ea812ca9b12dd6..390707766a4892 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -153,7 +153,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); task = _prio_task_queues[core_id].try_take(false); if (task) { - task->set_core_id(core_id); break; } task = _steal_take(core_id); @@ -162,7 +161,6 @@ PipelineTask* MultiCoreTaskQueue::take(int core_id) { } task = _prio_task_queues[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { - task->set_core_id(core_id); break; } } @@ -183,7 +181,6 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { DCHECK(next_id < _core_size); auto task = _prio_task_queues[next_id].try_take(true); if (task) { - task->set_core_id(next_id); return task; } } @@ -191,7 +188,7 @@ PipelineTask* MultiCoreTaskQueue::_steal_take(int core_id) { } Status MultiCoreTaskQueue::push_back(PipelineTask* task) { - int core_id = task->get_previous_core_id(); + int core_id = task->get_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; } @@ -205,9 +202,12 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { } void MultiCoreTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) { - task->inc_runtime_ns(time_spent); - _prio_task_queues[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(), - time_spent); + // if the task not execute but exception early close, core_id == -1 + // should not do update_statistics + if (auto core_id = task->get_core_id(); core_id >= 0) { + task->inc_runtime_ns(time_spent); + _prio_task_queues[core_id].inc_sub_queue_runtime(task->get_queue_level(), time_spent); + } } } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 45898e764175b2..60d9efa66ad2ea 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -33,7 +33,6 @@ #include "common/logging.h" #include "pipeline/pipeline_task.h" -#include "pipeline/task_queue.h" #include "pipeline_fragment_context.h" #include "runtime/exec_env.h" #include "runtime/query_context.h" @@ -103,6 +102,9 @@ void TaskScheduler::_do_work(int index) { if (!task) { continue; } + // The task is already running, maybe block in now dependency wake up by other thread + // but the block thread still hold the task, so put it back to the queue, until the hold + // thread set task->set_running(false) if (task->is_running()) { static_cast(_task_queue.push_back(task, index)); continue; @@ -129,12 +131,8 @@ void TaskScheduler::_do_work(int index) { // task exec bool eos = false; auto status = Status::OK(); + task->set_core_id(index); -#ifdef __APPLE__ - uint32_t core_id = 0; -#else - uint32_t core_id = sched_getcpu(); -#endif ASSIGN_STATUS_IF_CATCH_EXCEPTION( //TODO: use a better enclose to abstracting these if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) { @@ -149,12 +147,11 @@ void TaskScheduler::_do_work(int index) { uint64_t end_time = MonotonicMicros(); ExecEnv::GetInstance()->pipeline_tracer_context()->record( - {query_id, task_name, core_id, thread_id, start_time, end_time}); + {query_id, task_name, static_cast(index), thread_id, + start_time, end_time}); } else { status = task->execute(&eos); }, status); - task->set_previous_core_id(index); - if (!status.ok()) { // Print detail informations below when you debugging here. // @@ -173,14 +170,11 @@ void TaskScheduler::_do_work(int index) { if (eos) { // is pending finish will add the task to dependency's blocking queue, and then the task will be // added to running queue when dependency is ready. - if (task->is_pending_finish()) { - // Only meet eos, should set task to PENDING_FINISH state - task->set_running(false); - } else { + if (!task->is_pending_finish()) { Status exec_status = fragment_ctx->get_query_ctx()->exec_status(); _close_task(task, exec_status); + continue; } - continue; } task->set_running(false); diff --git a/be/src/vec/runtime/vdatetime_value.cpp b/be/src/vec/runtime/vdatetime_value.cpp index 026648319d4be4..e4306f7fa3ef11 100644 --- a/be/src/vec/runtime/vdatetime_value.cpp +++ b/be/src/vec/runtime/vdatetime_value.cpp @@ -2845,40 +2845,6 @@ int date_day_offset_dict::daynr(int year, int month, int day) const { return DATE_DAY_OFFSET_DICT[year - START_YEAR][month - 1][day - 1]; } -template -uint32_t DateV2Value::set_date_uint32(uint32_t int_val) { - union DateV2UInt32Union { - DateV2Value dt; - uint32_t ui32; - ~DateV2UInt32Union() {} - }; - DateV2UInt32Union conv = {.ui32 = int_val}; - if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0)) { - return 0; - } - this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), 0, 0, 0, 0); - - return int_val; -} - -template -uint64_t DateV2Value::set_datetime_uint64(uint64_t int_val) { - union DateTimeV2UInt64Union { - DateV2Value dt; - uint64_t ui64; - ~DateTimeV2UInt64Union() {} - }; - DateTimeV2UInt64Union conv = {.ui64 = int_val}; - if (is_invalid(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(), conv.dt.minute(), - conv.dt.second(), conv.dt.microsecond())) { - return 0; - } - this->unchecked_set_time(conv.dt.year(), conv.dt.month(), conv.dt.day(), conv.dt.hour(), - conv.dt.minute(), conv.dt.second(), conv.dt.microsecond()); - - return int_val; -} - template uint8_t DateV2Value::week(uint8_t mode) const { uint16_t year = 0; @@ -3707,26 +3673,6 @@ bool DateV2Value::to_format_string_conservative(const char* format, size_t le return true; } -template -bool DateV2Value::from_date(uint32_t value) { - DCHECK(!is_datetime); - if (value < MIN_DATE_V2 || value > MAX_DATE_V2) { - return false; - } - - return set_date_uint32(value); -} - -template -bool DateV2Value::from_datetime(uint64_t value) { - DCHECK(is_datetime); - if (value < MIN_DATETIME_V2 || value > MAX_DATETIME_V2) { - return false; - } - - return set_datetime_uint64(value); -} - template int64_t DateV2Value::standardize_timevalue(int64_t value) { if (value <= 0) { diff --git a/be/src/vec/runtime/vdatetime_value.h b/be/src/vec/runtime/vdatetime_value.h index cfe9a368e83d4b..78b01d0ed00061 100644 --- a/be/src/vec/runtime/vdatetime_value.h +++ b/be/src/vec/runtime/vdatetime_value.h @@ -1169,12 +1169,7 @@ class DateV2Value { underlying_value to_date_int_val() const { return int_val_; } - bool from_date(uint32_t value); - bool from_datetime(uint64_t value); - bool from_date_int64(int64_t value); - uint32_t set_date_uint32(uint32_t int_val); - uint64_t set_datetime_uint64(uint64_t int_val); bool get_date_from_daynr(uint64_t); diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 134264f3e80364..7c876b00aa4952 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -771,7 +771,7 @@ TEST(BlockTest, dump_data) { auto& date_v2_data = column_vector_date_v2->get_data(); for (int i = 0; i < 1024; ++i) { DateV2Value value; - value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6)); + value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0); date_v2_data.push_back(*reinterpret_cast(&value)); } vectorized::DataTypePtr date_v2_type(std::make_shared()); diff --git a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp index f05919e4a8f477..3f37bf93c52909 100644 --- a/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_mysql_test.cpp @@ -204,7 +204,7 @@ void serialize_and_deserialize_mysql_test() { auto& date_v2_data = column_vector_date_v2->get_data(); for (int i = 0; i < row_num; ++i) { DateV2Value value; - value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6)); + value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0); date_v2_data.push_back(*reinterpret_cast(&value)); } vectorized::DataTypePtr date_v2_type( diff --git a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp index bf6ead9b21bd28..1414d6c78dc443 100644 --- a/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp +++ b/be/test/vec/data_types/serde/data_type_serde_pb_test.cpp @@ -668,12 +668,9 @@ TEST(DataTypeSerDePbTest, DataTypeScalaSerDeTestDateTime) { uint8_t minute = i; uint8_t second = 0; uint32_t microsecond = 123000; - auto value = ((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) | - ((uint64_t)day << 37) | ((uint64_t)hour << 32) | - ((uint64_t)minute << 26) | ((uint64_t)second << 20) | - (uint64_t)microsecond)); + DateV2Value datetime_v2; - datetime_v2.from_datetime(value); + datetime_v2.unchecked_set_time(year, month, day, hour, minute, second, microsecond); auto datetime_val = binary_cast, UInt64>(datetime_v2); data.push_back(datetime_val); } diff --git a/be/test/vec/jsonb/serialize_test.cpp b/be/test/vec/jsonb/serialize_test.cpp index f3bfc4448fa653..239d2a5f165d82 100644 --- a/be/test/vec/jsonb/serialize_test.cpp +++ b/be/test/vec/jsonb/serialize_test.cpp @@ -483,7 +483,7 @@ TEST(BlockSerializeTest, JsonbBlock) { auto& date_v2_data = column_vector_date_v2->get_data(); for (int i = 0; i < 1024; ++i) { DateV2Value value; - value.from_date((uint32_t)((2022 << 9) | (6 << 5) | 6)); + value.unchecked_set_time(2022, 6, 6, 0, 0, 0, 0); date_v2_data.push_back(*reinterpret_cast(&value)); } vectorized::DataTypePtr date_v2_type(std::make_shared()); diff --git a/be/test/vec/runtime/vdatetime_value_test.cpp b/be/test/vec/runtime/vdatetime_value_test.cpp index 6c0bfad6b56f24..fd0b3a1d2e5e74 100644 --- a/be/test/vec/runtime/vdatetime_value_test.cpp +++ b/be/test/vec/runtime/vdatetime_value_test.cpp @@ -75,7 +75,7 @@ TEST(VDateTimeValueTest, date_v2_from_uint32_test) { uint8_t day = 24; DateV2Value date_v2; - date_v2.from_date((uint32_t)((year << 9) | (month << 5) | day)); + date_v2.unchecked_set_time(year, month, day, 0, 0, 0, 0); EXPECT_TRUE(date_v2.year() == year); EXPECT_TRUE(date_v2.month() == month); @@ -114,10 +114,7 @@ TEST(VDateTimeValueTest, datetime_v2_from_uint64_test) { uint32_t microsecond = 999999; DateV2Value datetime_v2; - datetime_v2.from_datetime((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) | - ((uint64_t)day << 37) | ((uint64_t)hour << 32) | - ((uint64_t)minute << 26) | ((uint64_t)second << 20) | - (uint64_t)microsecond)); + datetime_v2.unchecked_set_time(year, month, day, hour, minute, second, microsecond); EXPECT_TRUE(datetime_v2.year() == year); EXPECT_TRUE(datetime_v2.month() == month); @@ -142,10 +139,11 @@ TEST(VDateTimeValueTest, datetime_v2_from_uint64_test) { uint32_t microsecond = 123000; DateV2Value datetime_v2; - datetime_v2.from_datetime((uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) | - ((uint64_t)day << 37) | ((uint64_t)hour << 32) | - ((uint64_t)minute << 26) | ((uint64_t)second << 20) | - (uint64_t)microsecond)); + auto ui64 = (uint64_t)(((uint64_t)year << 46) | ((uint64_t)month << 42) | + ((uint64_t)day << 37) | ((uint64_t)hour << 32) | + ((uint64_t)minute << 26) | ((uint64_t)second << 20) | + (uint64_t)microsecond); + datetime_v2 = (DateV2Value&)ui64; EXPECT_TRUE(datetime_v2.year() == year); EXPECT_TRUE(datetime_v2.month() == month);