Skip to content

Commit

Permalink
Merge branch 'master' into stmt-lable-like
Browse files Browse the repository at this point in the history
# Conflicts:
#	fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
  • Loading branch information
JNSimba committed Jan 23, 2024
2 parents 479c359 + 5fe0e6c commit aca9627
Show file tree
Hide file tree
Showing 157 changed files with 4,183 additions and 900 deletions.
6 changes: 4 additions & 2 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace doris {

void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topic_info_list) {
std::set<uint64_t> current_wg_ids;
bool is_set_cgroup_path = config::doris_cgroup_cpu_path != "";
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_group_info) {
continue;
Expand All @@ -52,7 +53,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
// 4 create and update task scheduler
Status ret2 = _exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info,
_exec_env);
if (!ret2.ok()) {
if (is_set_cgroup_path && !ret2.ok()) {
LOG(INFO) << "upsert task sche failed, tg_id=" << task_group_info.id
<< ", reason=" << ret2.to_string();
}
Expand All @@ -63,7 +64,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
<< ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false");
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
<< ", is set cgroup path=" << (is_set_cgroup_path ? "true" : "flase");
}

_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,7 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");

// cgroup
DEFINE_mString(doris_cgroup_cpu_path, "");
DEFINE_mBool(enable_cgroup_cpu_soft_limit, "false");
DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true");

DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");

Expand Down
2 changes: 2 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ namespace ErrorCode {
TStatusError(MEM_ALLOC_FAILED, true); \
TStatusError(BUFFER_ALLOCATION_FAILED, true); \
TStatusError(INVALID_ARGUMENT, false); \
TStatusError(INVALID_JSON_PATH, false); \
TStatusError(MINIMUM_RESERVATION_UNAVAILABLE, true); \
TStatusError(CORRUPTION, true); \
TStatusError(IO_ERROR, true); \
Expand Down Expand Up @@ -405,6 +406,7 @@ class [[nodiscard]] Status {
ERROR_CTOR(MemoryAllocFailed, MEM_ALLOC_FAILED)
ERROR_CTOR(BufferAllocFailed, BUFFER_ALLOCATION_FAILED)
ERROR_CTOR(InvalidArgument, INVALID_ARGUMENT)
ERROR_CTOR(InvalidJsonPath, INVALID_JSON_PATH)
ERROR_CTOR(MinimumReservationUnavailable, MINIMUM_RESERVATION_UNAVAILABLE)
ERROR_CTOR(Corruption, CORRUPTION)
ERROR_CTOR(IOError, IO_ERROR)
Expand Down
52 changes: 24 additions & 28 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
#include <thrift/protocol/TDebugProtocol.h>

#include <map>
#include <memory>
#include <sstream>
#include <typeinfo>
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
Expand Down Expand Up @@ -84,16 +84,10 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
_tuple_ids(tnode.row_tuples),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_resource_profile(tnode.resource_profile),
_limit(tnode.limit),
_num_rows_returned(0),
_rows_returned_counter(nullptr),
_rows_returned_rate(nullptr),
_memory_used_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_is_closed(false),
_ref(0) {
_limit(tnode.limit) {
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true}));
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector {true});
}
_query_statistics = std::make_shared<QueryStatistics>();
}
Expand All @@ -108,7 +102,7 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, context));
_conjuncts.emplace_back(context);
} else if (tnode.__isset.conjuncts) {
for (auto& conjunct : tnode.conjuncts) {
for (const auto& conjunct : tnode.conjuncts) {
vectorized::VExprContextSPtr context;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, context));
_conjuncts.emplace_back(context);
Expand Down Expand Up @@ -136,8 +130,9 @@ Status ExecNode::prepare(RuntimeState* state) {
_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
_rows_returned_rate = runtime_profile()->add_derived_counter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
runtime_profile()->total_time_counter()),
[this, capture0 = runtime_profile()->total_time_counter()] {
return RuntimeProfile::units_per_second(_rows_returned_counter, capture0);
},
"");
_memory_used_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage");
_peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
Expand All @@ -150,13 +145,13 @@ Status ExecNode::prepare(RuntimeState* state) {

RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, intermediate_row_desc()));

for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->prepare(state));
for (auto& i : _children) {
RETURN_IF_ERROR(i->prepare(state));
}
return Status::OK();
}

Status ExecNode::alloc_resource(doris::RuntimeState* state) {
Status ExecNode::alloc_resource(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
}
Expand All @@ -170,8 +165,8 @@ Status ExecNode::open(RuntimeState* state) {

Status ExecNode::reset(RuntimeState* state) {
_num_rows_returned = 0;
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->reset(state));
for (auto& i : _children) {
RETURN_IF_ERROR(i->reset(state));
}
return Status::OK();
}
Expand Down Expand Up @@ -199,8 +194,8 @@ Status ExecNode::close(RuntimeState* state) {
_is_closed = true;

Status result;
for (int i = 0; i < _children.size(); ++i) {
auto st = _children[i]->close(state);
for (auto& i : _children) {
auto st = i->close(state);
if (result.ok() && !st.ok()) {
result = st;
}
Expand All @@ -227,7 +222,7 @@ void ExecNode::add_runtime_exec_option(const std::string& str) {

Status ExecNode::create_tree(RuntimeState* state, ObjectPool* pool, const TPlan& plan,
const DescriptorTbl& descs, ExecNode** root) {
if (plan.nodes.size() == 0) {
if (plan.nodes.empty()) {
*root = nullptr;
return Status::OK();
}
Expand Down Expand Up @@ -305,6 +300,7 @@ Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool,
return Status::OK();
}

// NOLINTBEGIN(readability-function-size)
Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, ExecNode** node) {
VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode);
Expand Down Expand Up @@ -428,8 +424,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
return Status::OK();

default:
std::map<int, const char*>::const_iterator i =
_TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
auto i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
const char* str = "unknown node type";

if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
Expand All @@ -443,6 +438,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN

return Status::OK();
}
// NOLINTEND(readability-function-size)

std::string ExecNode::debug_string() const {
std::stringstream out;
Expand All @@ -459,9 +455,9 @@ void ExecNode::debug_string(int indentation_level, std::stringstream* out) const
}
*out << "]";

for (int i = 0; i < _children.size(); ++i) {
for (auto* i : _children) {
*out << "\n";
_children[i]->debug_string(indentation_level + 1, out);
i->debug_string(indentation_level + 1, out);
}
}

Expand All @@ -470,8 +466,8 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type, std::vector<ExecNode
nodes->push_back(this);
}

for (int i = 0; i < _children.size(); ++i) {
_children[i]->collect_nodes(node_type, nodes);
for (auto& i : _children) {
i->collect_nodes(node_type, nodes);
}
}

Expand All @@ -488,7 +484,7 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) {
void ExecNode::init_runtime_profile(const std::string& name) {
std::stringstream ss;
ss << name << " (id=" << _id << ")";
_runtime_profile.reset(new RuntimeProfile(ss.str()));
_runtime_profile = std::make_unique<RuntimeProfile>(ss.str());
_runtime_profile->set_metadata(_id);
}

Expand Down
19 changes: 5 additions & 14 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
#pragma once

#include <gen_cpp/PlanNodes_types.h>
#include <stddef.h>
#include <stdint.h>

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
Expand Down Expand Up @@ -267,7 +267,7 @@ class ExecNode {
const TBackendResourceProfile _resource_profile;

int64_t _limit; // -1: no limit
int64_t _num_rows_returned;
int64_t _num_rows_returned = 0;

std::unique_ptr<RuntimeProfile> _runtime_profile;

Expand Down Expand Up @@ -303,15 +303,6 @@ class ExecNode {

bool is_closed() const { return _is_closed; }

// TODO(zc)
/// Pointer to the containing SubplanNode or nullptr if not inside a subplan.
/// Set by SubplanNode::Init(). Not owned.
// SubplanNode* containing_subplan_;

/// Returns true if this node is inside the right-hand side plan tree of a SubplanNode.
/// Valid to call in or after Prepare().
bool is_in_subplan() const { return false; }

// Create a single exec node derived from thrift node; place exec node in 'pool'.
static Status create_node(RuntimeState* state, ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs, ExecNode** node);
Expand All @@ -334,9 +325,9 @@ class ExecNode {
ExecNode** root);

friend class pipeline::OperatorBase;
bool _is_closed;
bool _is_closed = false;
bool _is_resource_released = false;
std::atomic_int _ref; // used by pipeline operator to release resource.
std::atomic_int _ref = 0; // used by pipeline operator to release resource.
};

} // namespace doris
10 changes: 5 additions & 5 deletions be/src/olap/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ size_t WalManager::get_wal_queue_size(int64_t table_id) {
return 0;
}
} else {
//table_id is -1 meaning get all table wal size
for (auto it = _wal_queues.begin(); it != _wal_queues.end(); it++) {
count += it->second.size();
// table_id is -1 meaning get all table wal size
for (auto& [_, table_wals] : _wal_queues) {
count += table_wals.size();
}
}
return count;
Expand Down Expand Up @@ -372,8 +372,8 @@ size_t WalManager::get_wal_table_size(int64_t table_id) {

void WalManager::_stop_relay_wal() {
std::lock_guard<std::shared_mutex> wrlock(_table_lock);
for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
it->second->stop();
for (auto& [_, wal_table] : _table_map) {
wal_table->stop();
}
}

Expand Down
29 changes: 14 additions & 15 deletions be/src/olap/wal/wal_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,28 +58,27 @@ void WalTable::_pick_relay_wals() {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
std::vector<std::string> need_replay_wals;
std::vector<std::string> need_erase_wals;
for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) {
auto wal_info = it->second;
for (const auto& [wal_path, wal_info] : _replay_wal_map) {
if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) {
LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id
<< ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num();
auto st = _exec_env->wal_mgr()->rename_to_tmp_path(it->first, _table_id,
<< ", wal=" << wal_path << ", retry_num=" << wal_info->get_retry_num();
auto st = _exec_env->wal_mgr()->rename_to_tmp_path(wal_path, _table_id,
wal_info->get_wal_id());
if (!st.ok()) {
LOG(WARNING) << "rename " << it->first << " fail"
LOG(WARNING) << "rename " << wal_path << " fail"
<< ",st:" << st.to_string();
}
if (config::group_commit_wait_replay_wal_finish) {
auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(it->second->get_wal_id());
auto notify_st = _exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
if (!notify_st.ok()) {
LOG(WARNING) << "notify wal " << it->second->get_wal_id() << " fail";
LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << " fail";
}
}
need_erase_wals.push_back(it->first);
need_erase_wals.push_back(wal_path);
continue;
}
if (_need_replay(wal_info)) {
need_replay_wals.push_back(it->first);
need_replay_wals.push_back(wal_path);
}
}
for (const auto& wal : need_erase_wals) {
Expand Down Expand Up @@ -168,13 +167,13 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
#endif
}

Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
TLoadTxnRollbackRequest request;
request.__set_auth_code(0); // this is a fake, fe not check it now
request.__set_db_id(db_id);
// TODO should we use label, because the replay wal use the same label and different wal_id
request.__set_txnId(wal_id);
std::string reason = "relay wal " + std::to_string(wal_id);
request.__set_label(label);
std::string reason = "relay wal with label " + label;
request.__set_reason(reason);
TLoadTxnRollbackResult result;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
Expand All @@ -185,7 +184,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
},
10000L);
auto result_status = Status::create(result.status);
LOG(INFO) << "abort txn " << wal_id << ",st:" << st << ",result_status:" << result_status;
LOG(INFO) << "abort label " << label << ", st:" << st << ", result_status:" << result_status;
return result_status;
}

Expand All @@ -196,9 +195,9 @@ Status WalTable::_replay_wal_internal(const std::string& wal) {
RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label));
#ifndef BE_TEST
if (!config::group_commit_wait_replay_wal_finish) {
auto st = _try_abort_txn(_db_id, wal_id);
auto st = _try_abort_txn(_db_id, label);
if (!st.ok()) {
LOG(WARNING) << "abort txn " << wal_id << " fail";
LOG(WARNING) << "failed to abort txn with label " << label;
}
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/wal/wal_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class WalTable {

Status _replay_wal_internal(const std::string& wal);
Status _parse_wal_path(const std::string& wal, int64_t& wal_id, std::string& label);
Status _try_abort_txn(int64_t db_id, int64_t wal_id);
Status _try_abort_txn(int64_t db_id, std::string& label);
Status _get_column_info(int64_t db_id, int64_t tb_id,
std::map<int64_t, std::string>& column_info_map);

Expand Down
8 changes: 6 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,22 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSinkDepend
: PipelineXSinkLocalState<AnalyticSinkDependency>(parent, state) {}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status close(RuntimeState* state, Status exec_status) override {
_shared_state->release_sink_dep();
return PipelineXSinkLocalState<AnalyticSinkDependency>::close(state, exec_status);
}

private:
friend class AnalyticSinkOperatorX;

bool _refresh_need_more_input() {
auto need_more_input = _whether_need_next_partition(_shared_state->found_partition_end);
if (need_more_input) {
_shared_state->source_dep->block();
_dependency->set_block_to_read();
_dependency->set_ready();
} else {
_dependency->block();
_shared_state->source_dep->set_ready();
_dependency->set_ready_to_read();
}
return need_more_input;
}
Expand Down
Loading

0 comments on commit aca9627

Please sign in to comment.