Skip to content

Commit

Permalink
[Enhancement](wal) Add timout for wal memory back pressure (apache#29178
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Yukang-Lian authored and HappenLee committed Jan 12, 2024
1 parent 4519cac commit 69802da
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 12 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000");
DEFINE_Bool(wait_internal_group_commit_finish, "false");
// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
DEFINE_Int32(group_commit_max_queue_size, "67108864");
DEFINE_Int32(group_commit_queue_mem_limit, "67108864");
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ DECLARE_mInt32(group_commit_insert_threads);
DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio);
DECLARE_Bool(wait_internal_group_commit_finish);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);
DECLARE_Int32(group_commit_queue_mem_limit);
// Max size(bytes) or percentage(%) of wal disk usage, used for disk space back pressure, default 10% of the disk available space.
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);
Expand Down
22 changes: 19 additions & 3 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <glog/logging.h>

#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
Expand All @@ -40,14 +41,29 @@

namespace doris {

Status LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool write_wal) {
Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block, bool write_wal) {
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
while (_all_block_queues_bytes->load(std::memory_order_relaxed) >
config::group_commit_max_queue_size) {
auto start = std::chrono::steady_clock::now();
while (!runtime_state->is_cancelled() && status.ok() &&
_all_block_queues_bytes->load(std::memory_order_relaxed) >
config::group_commit_queue_mem_limit) {
_put_cond.wait_for(
l, std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT) {
return Status::TimedOut(
"Wal memory back pressure wait too much time! Load block queue txn id: {}, "
"label: {}, instance id: {}",
txn_id, label, load_instance_id.to_string());
}
}
if (runtime_state->is_cancelled()) {
return Status::Cancelled(runtime_state->cancel_reason());
}
RETURN_IF_ERROR(status);
if (block->rows() > 0) {
_block_queue.push_back(block);
if (write_wal) {
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ class LoadBlockQueue {
_all_block_queues_bytes(all_block_queues_bytes),
_group_commit_interval_ms(group_commit_interval_ms) {};

Status add_block(std::shared_ptr<vectorized::Block> block, bool write_wal);
Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block,
bool write_wal);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block,
bool* eos);
Status add_load_id(const UniqueId& load_id);
Expand All @@ -68,7 +69,10 @@ class LoadBlockQueue {
bool has_enough_wal_disk_space(const std::vector<std::shared_ptr<vectorized::Block>>& blocks,
const TUniqueId& load_id, bool is_blocks_contain_all_load_data);

// 1s
static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
// 120s
static constexpr size_t WAL_MEM_BACK_PRESSURE_TIME_OUT = 120000;
UniqueId load_instance_id;
std::string label;
int64_t txn_id;
Expand Down
11 changes: 6 additions & 5 deletions be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
(double)state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) {
return Status::DataQualityError("too many filtered rows");
}
RETURN_IF_ERROR(_add_blocks(true));
RETURN_IF_ERROR(_add_blocks(state, true));
}
if (_load_block_queue) {
_load_block_queue->remove_load_id(_load_id);
Expand Down Expand Up @@ -220,15 +220,16 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
_blocks.emplace_back(output_block);
} else {
if (!_is_block_appended) {
RETURN_IF_ERROR(_add_blocks(false));
RETURN_IF_ERROR(_add_blocks(state, false));
}
RETURN_IF_ERROR(_load_block_queue->add_block(
output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
state, output_block, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
}
return Status::OK();
}

Status GroupCommitBlockSink::_add_blocks(bool is_blocks_contain_all_load_data) {
Status GroupCommitBlockSink::_add_blocks(RuntimeState* state,
bool is_blocks_contain_all_load_data) {
DCHECK(_is_block_appended == false);
TUniqueId load_id;
load_id.__set_hi(_load_id.hi);
Expand Down Expand Up @@ -257,7 +258,7 @@ Status GroupCommitBlockSink::_add_blocks(bool is_blocks_contain_all_load_data) {
}
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
RETURN_IF_ERROR(_load_block_queue->add_block(
*it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
state, *it, _group_commit_mode == TGroupCommitMode::ASYNC_MODE));
}
_is_block_appended = true;
_blocks.clear();
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/group_commit_block_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class GroupCommitBlockSink : public DataSink {

private:
Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block);
Status _add_blocks(bool is_blocks_contain_all_load_data);
Status _add_blocks(RuntimeState* state, bool is_blocks_contain_all_load_data);

vectorized::VExprContextSPtrs _output_vexpr_ctxs;

Expand Down

0 comments on commit 69802da

Please sign in to comment.