Skip to content

Commit

Permalink
[fix](move-memtable) set idle timeout equal to load timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Jan 12, 2024
1 parent 88f3950 commit 4f0edf1
Show file tree
Hide file tree
Showing 867 changed files with 5,834 additions and 5,834 deletions.
8 changes: 3 additions & 5 deletions be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

#include "cloud/config.h"

namespace doris {
namespace config {
namespace doris::config {

// TODO
DEFINE_String(cloud_unique_id, "");

} // namespace config
} // namespace doris
} // namespace doris::config
12 changes: 7 additions & 5 deletions be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

#include "common/config.h"

namespace doris {
namespace config {
namespace doris::config {

// TODO
DECLARE_String(cloud_unique_id);

} // namespace config
} // namespace doris
static inline bool is_cloud_mode() {
return !cloud_unique_id.empty();
}

} // namespace doris::config
4 changes: 2 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -776,8 +776,6 @@ DEFINE_Int64(open_load_stream_timeout_ms, "60000"); // 60s
// timeout for load stream close wait in ms
DEFINE_Int64(close_load_stream_timeout_ms, "600000"); // 10 min

// idle timeout for load stream in ms
DEFINE_mInt64(load_stream_idle_timeout_ms, "600000");
// brpc streaming max_buf_size in bytes
DEFINE_Int64(load_stream_max_buf_size, "20971520"); // 20MB
// brpc streaming messages_in_batch
Expand Down Expand Up @@ -1161,6 +1159,8 @@ DEFINE_mInt64(enable_debug_log_timeout_secs, "0");
// Tolerance for the number of partition id 0 in rowset, default 0
DEFINE_Int32(ignore_invalid_partition_id_rowset_num, "0");

DEFINE_mInt32(report_query_statistics_interval_ms, "3000");

// clang-format off
#ifdef BE_TEST
// test s3
Expand Down
4 changes: 2 additions & 2 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -829,8 +829,6 @@ DECLARE_Int64(open_load_stream_timeout_ms);
// timeout for load stream close wait in ms
DECLARE_Int64(close_load_stream_timeout_ms);

// idle timeout for load stream in ms
DECLARE_Int64(load_stream_idle_timeout_ms);
// brpc streaming max_buf_size in bytes
DECLARE_Int64(load_stream_max_buf_size);
// brpc streaming messages_in_batch
Expand Down Expand Up @@ -1237,6 +1235,8 @@ DECLARE_mBool(enable_column_type_check);
// Tolerance for the number of partition id 0 in rowset, default 0
DECLARE_Int32(ignore_invalid_partition_id_rowset_num);

DECLARE_mInt32(report_query_statistics_interval_ms);

#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
Expand Down
13 changes: 13 additions & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/block_spill_manager.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/task_group/task_group_manager.h"
#include "util/cpu_info.h"
#include "util/debug_util.h"
Expand Down Expand Up @@ -352,6 +355,13 @@ void Daemon::block_spill_gc_thread() {
}
}

void Daemon::report_runtime_query_statistics_thread() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::report_query_statistics_interval_ms))) {
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->report_runtime_query_statistics();
}
}

void Daemon::je_purge_dirty_pages_thread() const {
do {
std::unique_lock<std::mutex> l(doris::MemInfo::je_purge_dirty_pages_lock);
Expand Down Expand Up @@ -399,6 +409,9 @@ void Daemon::start() {
st = Thread::create(
"Daemon", "je_purge_dirty_pages_thread",
[this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back());
st = Thread::create(
"Daemon", "query_runtime_statistics_thread",
[this]() { this->report_runtime_query_statistics_thread(); }, &_threads.emplace_back());
CHECK(st.ok()) << st;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Daemon {
void calculate_metrics_thread();
void block_spill_gc_thread();
void je_purge_dirty_pages_thread() const;
void report_runtime_query_statistics_thread();

CountDownLatch _stop_background_threads_latch;
std::vector<scoped_refptr<Thread>> _threads;
Expand Down
32 changes: 7 additions & 25 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.stream_sink) {
return Status::InternalError("Missing data stream sink.");
}
bool send_query_statistics_with_every_batch =
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
// TODO: figure out good buffer size based on size of output row
sink->reset(new vectorized::VDataStreamSender(state, pool, params.sender_id, row_desc,
thrift_sink.stream_sink, params.destinations,
send_query_statistics_with_every_batch));
thrift_sink.stream_sink,
params.destinations));
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
break;
}
Expand All @@ -82,16 +78,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}

// TODO: figure out good buffer size based on size of output row
bool send_query_statistics_with_every_batch =
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
state, pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
params.destinations, send_query_statistics_with_every_batch, output_exprs,
desc_tbl));
params.destinations, output_exprs, desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(row_desc, output_exprs));
}
Expand Down Expand Up @@ -201,14 +192,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.stream_sink) {
return Status::InternalError("Missing data stream sink.");
}
bool send_query_statistics_with_every_batch =
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
// TODO: figure out good buffer size based on size of output row
*sink = std::make_unique<vectorized::VDataStreamSender>(
state, pool, local_params.sender_id, row_desc, thrift_sink.stream_sink,
params.destinations, send_query_statistics_with_every_batch);
*sink = std::make_unique<vectorized::VDataStreamSender>(state, pool, local_params.sender_id,
row_desc, thrift_sink.stream_sink,
params.destinations);
// RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
break;
}
Expand All @@ -229,16 +216,11 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
}

// TODO: figure out good buffer size based on size of output row
bool send_query_statistics_with_every_batch =
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
state, pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink,
params.destinations, send_query_statistics_with_every_batch, output_exprs,
desc_tbl));
params.destinations, output_exprs, desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(row_desc, output_exprs));
}
Expand Down
8 changes: 0 additions & 8 deletions be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class ObjectPool;
class RuntimeState;
class TPlanFragmentExecParams;
class DescriptorTbl;
class QueryStatistics;
class TDataSink;
class TExpr;
class TPipelineFragmentParams;
Expand Down Expand Up @@ -104,10 +103,6 @@ class DataSink {
// Returns the runtime profile for the sink.
RuntimeProfile* profile() { return _profile; }

virtual void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
_query_statistics = statistics;
}

const RowDescriptor& row_desc() { return _row_desc; }

virtual bool can_write() { return true; }
Expand All @@ -124,9 +119,6 @@ class DataSink {

RuntimeProfile* _profile = nullptr; // Allocated from _pool

// Maybe this will be transferred to BufferControlBlock.
std::shared_ptr<QueryStatistics> _query_statistics;

RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
RuntimeProfile::Counter* _output_rows_counter = nullptr;
Expand Down
21 changes: 4 additions & 17 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
#include "vec/utils/util.hpp"

namespace doris {
class QueryStatistics;

const std::string ExecNode::ROW_THROUGHPUT_COUNTER = "RowsProducedRate";

Expand All @@ -96,6 +95,7 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true}));
}
_query_statistics = std::make_shared<QueryStatistics>();
}

ExecNode::~ExecNode() = default;
Expand Down Expand Up @@ -176,22 +176,6 @@ Status ExecNode::reset(RuntimeState* state) {
return Status::OK();
}

Status ExecNode::collect_query_statistics(QueryStatistics* statistics) {
DCHECK(statistics != nullptr);
for (auto child_node : _children) {
RETURN_IF_ERROR(child_node->collect_query_statistics(statistics));
}
return Status::OK();
}

Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int sender_id) {
DCHECK(statistics != nullptr);
for (auto child_node : _children) {
RETURN_IF_ERROR(child_node->collect_query_statistics(statistics, sender_id));
}
return Status::OK();
}

void ExecNode::release_resource(doris::RuntimeState* state) {
if (!_is_resource_released) {
if (_rows_returned_counter != nullptr) {
Expand Down Expand Up @@ -276,6 +260,9 @@ Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool,
// Step 1 Create current ExecNode according to current thrift plan node.
ExecNode* cur_exec_node = nullptr;
RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, &cur_exec_node));
if (cur_exec_node != nullptr) {
state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics());
}

// Step 1.1
// Record current node if we have parent or record myself as root node.
Expand Down
11 changes: 4 additions & 7 deletions be/src/exec/exec_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,6 @@ class ExecNode {
// so should be fast.
[[nodiscard]] virtual Status reset(RuntimeState* state);

// This should be called before close() and after get_next(), it is responsible for
// collecting statistics sent with row batch, it can't be called when prepare() returns
// error.
[[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics);

[[nodiscard]] virtual Status collect_query_statistics(QueryStatistics* statistics,
int sender_id);
// close() will get called for every exec node, regardless of what else is called and
// the status of these calls (i.e. prepare() may never have been called, or
// prepare()/open()/get_next() returned with an error).
Expand Down Expand Up @@ -243,6 +236,8 @@ class ExecNode {
// such as send the last buffer to remote.
virtual Status try_close(RuntimeState* state) { return Status::OK(); }

std::shared_ptr<QueryStatistics> get_query_statistics() { return _query_statistics; }

protected:
friend class DataSink;

Expand Down Expand Up @@ -330,6 +325,8 @@ class ExecNode {

std::atomic<bool> _can_read = false;

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;

private:
static Status create_tree_helper(RuntimeState* state, ObjectPool* pool,
const std::vector<TPlanNode>& tnodes,
Expand Down
47 changes: 30 additions & 17 deletions be/src/exprs/bloom_filter_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,22 @@ struct CommonFindOp {

void find_batch(const BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column,
uint8_t* results) const {
const T* __restrict data = nullptr;
const uint8_t* __restrict nullmap = nullptr;
if (column->is_nullable()) {
const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get());
const auto& nullmap =
assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();
if (nullable->has_null()) {
nullmap =
assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data()
.data();
}
data = (T*)nullable->get_nested_column().get_raw_data().data;
} else {
data = (T*)column->get_raw_data().data;
}

const T* data = (T*)nullable->get_nested_column().get_raw_data().data;
if (nullmap) {
for (size_t i = 0; i < column->size(); i++) {
if (!nullmap[i]) {
results[i] = bloom_filter.test_element(data[i]);
Expand All @@ -327,7 +336,6 @@ struct CommonFindOp {
}
}
} else {
const T* data = (T*)column->get_raw_data().data;
for (size_t i = 0; i < column->size(); i++) {
results[i] = bloom_filter.test_element(data[i]);
}
Expand All @@ -340,8 +348,8 @@ struct CommonFindOp {
};

struct StringFindOp : CommonFindOp<StringRef> {
void insert_batch(BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column,
size_t start) {
static void insert_batch(BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column,
size_t start) {
if (column->is_nullable()) {
const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get());
const auto& col =
Expand All @@ -363,21 +371,26 @@ struct StringFindOp : CommonFindOp<StringRef> {
}
}

void find_batch(const BloomFilterAdaptor& bloom_filter, const vectorized::ColumnPtr& column,
uint8_t* results) {
static void find_batch(const BloomFilterAdaptor& bloom_filter,
const vectorized::ColumnPtr& column, uint8_t* results) {
if (column->is_nullable()) {
const auto* nullable = assert_cast<const vectorized::ColumnNullable*>(column.get());
const auto& col =
assert_cast<const vectorized::ColumnString&>(nullable->get_nested_column());
const auto& nullmap =
assert_cast<const vectorized::ColumnUInt8&>(nullable->get_null_map_column())
.get_data();

for (size_t i = 0; i < column->size(); i++) {
if (!nullmap[i]) {
if (nullable->has_null()) {
for (size_t i = 0; i < column->size(); i++) {
if (!nullmap[i]) {
results[i] = bloom_filter.test_element(col.get_data_at(i));
} else {
results[i] = false;
}
}
} else {
for (size_t i = 0; i < column->size(); i++) {
results[i] = bloom_filter.test_element(col.get_data_at(i));
} else {
results[i] = false;
}
}
} else {
Expand All @@ -392,9 +405,9 @@ struct StringFindOp : CommonFindOp<StringRef> {
// We do not need to judge whether data is empty, because null will not appear
// when filer used by the storage engine
struct FixedStringFindOp : public StringFindOp {
uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data,
const uint8* nullmap, uint16_t* offsets, int number,
const bool is_parse_column) {
static uint16_t find_batch_olap_engine(const BloomFilterAdaptor& bloom_filter, const char* data,
const uint8* nullmap, uint16_t* offsets, int number,
const bool is_parse_column) {
return find_batch_olap<StringRef, true>(bloom_filter, data, nullmap, offsets, number,
is_parse_column);
}
Expand Down
Loading

0 comments on commit 4f0edf1

Please sign in to comment.