From 956bdebe12841b1ca63a15da9b49f8cae311ee13 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Mon, 25 Dec 2023 19:28:15 +0800 Subject: [PATCH] 5 --- be/src/olap/wal_dirs_info.cpp | 179 ++++++++++++++++++++ be/src/olap/wal_dirs_info.h | 78 +++++++++ be/src/olap/wal_manager.cpp | 118 ++----------- be/src/olap/wal_manager.h | 27 +-- be/src/runtime/group_commit_mgr.cpp | 4 +- be/src/runtime/group_commit_mgr.h | 4 +- be/src/vec/sink/group_commit_block_sink.cpp | 2 +- 7 files changed, 284 insertions(+), 128 deletions(-) create mode 100644 be/src/olap/wal_dirs_info.cpp create mode 100644 be/src/olap/wal_dirs_info.h diff --git a/be/src/olap/wal_dirs_info.cpp b/be/src/olap/wal_dirs_info.cpp new file mode 100644 index 000000000000000..baaa98aaa2a2d29 --- /dev/null +++ b/be/src/olap/wal_dirs_info.cpp @@ -0,0 +1,179 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_dirs_info.h" + +#include +#include +#include + +#include "common/status.h" +#include "io/fs/local_file_system.h" +#include "util/parse_util.h" + +namespace doris { + +std::string WalDirInfo::get_wal_dir() { + return _wal_dir; +} + +size_t WalDirInfo::get_limit() { + std::shared_lock rlock(_lock); + return _limit; +} + +size_t WalDirInfo::get_used() { + std::shared_lock rlock(_lock); + return _used; +} + +size_t WalDirInfo::get_pre_allocated() { + std::shared_lock rlock(_lock); + return _pre_allocated; +} + +Status WalDirInfo::set_limit(size_t limit) { + std::unique_lock wlock(_lock); + _limit = limit; + return Status::OK(); +} + +Status WalDirInfo::set_used(size_t used) { + std::unique_lock wlock(_lock); + _used = used; + return Status::OK(); +} + +Status WalDirInfo::set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated) { + std::unique_lock wlock(_lock); + if (is_add_pre_allocated) { + _pre_allocated += pre_allocated; + } else { + _pre_allocated -= pre_allocated; + } + return Status::OK(); +} + +size_t WalDirInfo::available() { + std::unique_lock wlock(_lock); + int64_t available = _limit - _used - _pre_allocated; + return available > 0 ? available : 0; +} + +Status WalDirInfo::update_wal_disk_info(size_t limit, size_t used, size_t pre_allocated, + bool is_add_pre_allocated) { + if (limit != static_cast(-1)) { + RETURN_IF_ERROR(set_limit(limit)); + } else { + size_t available_bytes; + size_t disk_capacity_bytes; + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( + _wal_dir, &disk_capacity_bytes, &available_bytes)); + bool is_percent = true; + int64_t wal_disk_limit = ParseUtil::parse_mem_spec(config::group_commit_wal_max_disk_limit, + -1, available_bytes, &is_percent); + if (wal_disk_limit <= 0) { + return Status::InternalError("Disk full! Please check your disk usage!"); + } + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_limit(wal_disk_limit)); + } + if (used != static_cast(-1)) { + RETURN_IF_ERROR(set_used(used)); + } else { + size_t wal_dir_size = 0; + RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(_wal_dir, &wal_dir_size)); + RETURN_IF_ERROR(set_used(wal_dir_size)); + } + if (pre_allocated != static_cast(-1)) { + RETURN_IF_ERROR(set_pre_allocated(pre_allocated, is_add_pre_allocated)); + } + return Status::OK(); +} + +Status WalDirsInfo::add(const std::string& wal_dir, size_t limit, size_t used, + size_t pre_allocated) { + for (const auto& it : _wal_dirs_info_vec) { + if (it->get_wal_dir() == wal_dir) { + return Status::InternalError("wal dir {} exists!", wal_dir); + } + } + std::unique_lock wlock(_lock); + _wal_dirs_info_vec.emplace_back( + std::make_shared(wal_dir, limit, used, pre_allocated)); + return Status::OK(); +} + +std::string WalDirsInfo::get_available_random_wal_dir() { + if (_wal_dirs_info_vec.size() == 1) { + return (*_wal_dirs_info_vec.begin())->get_wal_dir(); + } else { + std::vector available_wal_dirs; + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->available() > wal_dir_info->get_limit() * 0.2) { + available_wal_dirs.emplace_back(wal_dir_info->get_wal_dir()); + } + } + if (available_wal_dirs.empty()) { + return (*std::min_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->get_wal_dir(); + } else { + return (*std::next(_wal_dirs_info_vec.begin(), rand() % _wal_dirs_info_vec.size())) + ->get_wal_dir(); + } + } +} + +size_t WalDirsInfo::get_max_available_size() { + return _wal_dirs_info_vec.size() == 1 + ? (*_wal_dirs_info_vec.begin())->available() + : (*std::max_element(_wal_dirs_info_vec.begin(), _wal_dirs_info_vec.end(), + [](const auto& info1, const auto& info2) { + return info1->available() < info2->available(); + })) + ->available(); +} + +Status WalDirsInfo::update_wal_disk_info(std::string wal_dir, size_t limit, size_t used, + size_t pre_allocated, bool is_add_pre_allocated) { + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + return wal_dir_info->update_wal_disk_info(limit, used, pre_allocated, + is_add_pre_allocated); + } + } + return Status::InternalError("Can not find wal dir in wal disks info."); +} + +Status WalDirsInfo::get_wal_disk_available_size(const std::string& wal_dir, + size_t* available_bytes) { + RETURN_IF_ERROR(update_wal_disk_info(wal_dir)); + std::shared_lock l(_lock); + for (const auto& wal_dir_info : _wal_dirs_info_vec) { + if (wal_dir_info->get_wal_dir() == wal_dir) { + *available_bytes = wal_dir_info->available(); + return Status::OK(); + } + } + return Status::InternalError("can not find wal dir!"); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_dirs_info.h b/be/src/olap/wal_dirs_info.h new file mode 100644 index 000000000000000..a3368fb8cddeba7 --- /dev/null +++ b/be/src/olap/wal_dirs_info.h @@ -0,0 +1,78 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "common/factory_creator.h" + +namespace doris { +class WalDirInfo { + ENABLE_FACTORY_CREATOR(WalDirInfo); + +public: + WalDirInfo(std::string wal_dir, size_t limit, size_t used, size_t pre_allocated) + : _wal_dir(std::move(wal_dir)), + _limit(limit), + _used(used), + _pre_allocated(pre_allocated) {} + std::string get_wal_dir(); + size_t get_limit(); + size_t get_used(); + size_t get_pre_allocated(); + Status set_limit(size_t limit); + Status set_used(size_t used); + Status set_pre_allocated(size_t pre_allocated, bool is_add_pre_allocated); + size_t available(); + Status update_wal_disk_info(size_t limit = -1, size_t used = -1, size_t pre_allocated = -1, + bool is_add_pre_allocated = true); + +private: + std::string _wal_dir; + size_t _limit; + size_t _used; + size_t _pre_allocated; + std::shared_mutex _lock; +}; + +class WalDirsInfo { + ENABLE_FACTORY_CREATOR(WalDirsInfo); + +public: + WalDirsInfo() = default; + ~WalDirsInfo() = default; + Status add(const std::string& wal_dir, size_t limit, size_t used, size_t pre_allocated); + std::string get_available_random_wal_dir(); + size_t get_max_available_size(); + Status update_wal_disk_info(std::string wal_dir, size_t limit = -1, size_t used = -1, + size_t pre_allocated = -1, bool is_add_pre_allocated = true); + Status get_wal_disk_available_size(const std::string& wal_dir, size_t* available_bytes); + +private: + std::vector> _wal_dirs_info_vec; + std::shared_mutex _lock; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index 74cd6b1f722c888..f7f00518a8dd48c 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -29,10 +29,12 @@ #include #include #include +#include #include "common/config.h" #include "common/status.h" #include "io/fs/local_file_system.h" +#include "olap/wal_dirs_info.h" #include "olap/wal_writer.h" #include "runtime/client_cache.h" #include "runtime/exec_env.h" @@ -52,6 +54,7 @@ WalManager::WalManager(ExecEnv* exec_env, const std::string& wal_dir_list) .set_min_threads(1) .set_max_threads(config::group_commit_relay_wal_threads) .build(&_thread_pool)); + _wal_dirs_info = WalDirsInfo::create_unique(); } WalManager::~WalManager() { @@ -74,7 +77,7 @@ void WalManager::stop() { Status WalManager::init() { RETURN_IF_ERROR(_init_wal_dirs_conf()); RETURN_IF_ERROR(_init_wal_dirs()); - RETURN_IF_ERROR(_init_wal_disk_info()); + RETURN_IF_ERROR(_init_wal_dirs_info()); return Thread::create( "WalMgr", "replay_wal", [this]() { static_cast(this->replay()); }, &_replay_thread); @@ -120,7 +123,7 @@ Status WalManager::_init_wal_dirs() { return Status::OK(); } -Status WalManager::_init_wal_disk_info() { +Status WalManager::_init_wal_dirs_info() { for (const std::string& wal_dir : _wal_dirs) { size_t available_bytes; #ifndef BE_TEST @@ -143,11 +146,7 @@ Status WalManager::_init_wal_disk_info() { if (is_percent) { wal_disk_limit += wal_dir_size; } - { - std::unique_lock l(_wal_disk_info_lock); - _wal_disk_info_map.insert(std::make_pair( - wal_dir, std::make_shared(wal_disk_limit, wal_dir_size, 0))); - } + RETURN_IF_ERROR(_wal_dirs_info->add(wal_dir, wal_disk_limit, wal_dir_size, 0)); #ifdef BE_TEST wal_limit_test_bytes = wal_disk_limit; @@ -230,8 +229,7 @@ void WalManager::print_wal_status_queue() { Status WalManager::add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const std::string& label, std::string& base_path) { - // TODO: (Yukang) change it. - base_path = get_min_disk_usage_wal_dir(); + base_path = _wal_dirs_info->get_available_random_wal_dir(); std::stringstream ss; ss << base_path << "/" << std::to_string(db_id) << "/" << std::to_string(table_id) << "/" << std::to_string(wal_id) << "_" << label; @@ -404,7 +402,7 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, std::vector< table_ptr->add_wals(wals); #ifndef BE_TEST for (auto wal : wals) { - RETURN_IF_ERROR(update_wal_disk_info_map(_get_base_wal_path(wal))); + RETURN_IF_ERROR(update_wal_disk_info(_get_base_wal_path(wal))); } #endif return Status::OK(); @@ -432,8 +430,8 @@ Status WalManager::delete_wal(int64_t wal_id, size_t block_queue_pre_allocated) _wal_path_map.erase(wal_id); } } - RETURN_IF_ERROR(update_wal_disk_info_map(_get_base_wal_path(wal_path), -1, -1, - block_queue_pre_allocated, false)); + RETURN_IF_ERROR(update_wal_disk_info(_get_base_wal_path(wal_path), -1, -1, + block_queue_pre_allocated, false)); return Status::OK(); } @@ -474,103 +472,19 @@ Status WalManager::get_wal_column_index(int64_t wal_id, std::vector& col return Status::OK(); } -bool WalManager::is_wal_disk_space_enough() { - // if all disks space usage < 80% - std::shared_lock l(_wal_disk_info_lock); - for (const auto& it : _wal_disk_info_map) { - size_t limit = it.second->limit; - size_t available = it.second->available(); - if (available >= limit * 0.8) { - return true; - } - } - return false; -} - -const string& WalManager::get_min_disk_usage_wal_dir() { - std::shared_lock l(_wal_disk_info_lock); - return _wal_dirs.size() == 1 - ? _wal_dirs[0] - : *std::min_element(_wal_dirs.begin(), _wal_dirs.end(), - [this](const std::string& dir1, const std::string& dir2) { - return _wal_disk_info_map[dir1]->available() < - _wal_disk_info_map[dir2]->available(); - }); -} - -const string& WalManager::get_random_wal_dir() { - std::shared_lock l(_wal_disk_info_lock); - return _wal_disk_info_map.size() == 1 - ? _wal_disk_info_map.begin()->first - : std::next(_wal_disk_info_map.begin(), rand() % _wal_disk_info_map.size()) - ->first; -} - size_t WalManager::get_max_available_size() { - std::shared_lock l(_wal_disk_info_lock); - return _wal_disk_info_map.size() == 1 - ? _wal_disk_info_map.begin()->second->available() - : std::max_element(_wal_disk_info_map.begin(), _wal_disk_info_map.end(), - [](const auto& map1, const auto& map2) { - return map1.second->available() < - map2.second->available(); - }) - ->second->available(); + return _wal_dirs_info->get_max_available_size(); } -Status WalManager::update_wal_disk_info_map(std::string wal_dir, size_t limit, size_t used, - size_t pre_allocated, bool is_add_pre_allocated) { - if (_wal_disk_info_map.find(wal_dir) != _wal_disk_info_map.end()) { - std::unique_lock l(_wal_disk_info_lock); - if (limit != static_cast(-1)) { - _wal_disk_info_map[wal_dir]->limit = limit; - } else { - size_t available_bytes; - size_t disk_capacity_bytes; - RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info( - wal_dir, &disk_capacity_bytes, &available_bytes)); - bool is_percent = true; - int64_t wal_disk_limit = ParseUtil::parse_mem_spec( - config::group_commit_wal_max_disk_limit, -1, available_bytes, &is_percent); - if (wal_disk_limit <= 0) { - return Status::InternalError("Disk full! Please check your disk usage!"); - } - size_t wal_dir_size = 0; - RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir, &wal_dir_size)); - _wal_disk_info_map[wal_dir]->limit = wal_disk_limit; - } - if (used != static_cast(-1)) { - _wal_disk_info_map[wal_dir]->used = used; - } else { - size_t wal_dir_size = 0; - RETURN_IF_ERROR(io::global_local_filesystem()->directory_size(wal_dir, &wal_dir_size)); - _wal_disk_info_map[wal_dir]->used = wal_dir_size; - } - if (pre_allocated != static_cast(-1)) { - if (is_add_pre_allocated) { - _wal_disk_info_map[wal_dir]->pre_allocated += pre_allocated; - } else { - _wal_disk_info_map[wal_dir]->pre_allocated -= pre_allocated; - } - } - } else { - return Status::InternalError("Can not find wal dir in wal disk info map."); - } - return Status::OK(); +Status WalManager::update_wal_disk_info(std::string wal_dir, size_t limit, size_t used, + size_t pre_allocated, bool is_add_pre_allocated) { + return _wal_dirs_info->update_wal_disk_info(wal_dir, limit, used, pre_allocated, + is_add_pre_allocated); } Status WalManager::get_wal_disk_available_size(const std::string& wal_dir, size_t* available_bytes) { - RETURN_IF_ERROR(update_wal_disk_info_map(wal_dir)); - std::shared_lock l(_wal_disk_info_lock); - { - if (auto it = _wal_disk_info_map.find(wal_dir); it != _wal_disk_info_map.end()) { - *available_bytes = _wal_disk_info_map[wal_dir]->available(); - return Status::OK(); - } else { - return Status::InternalError("can not find wal dir!"); - } - } + return _wal_dirs_info->get_wal_disk_available_size(wal_dir, available_bytes); } std::string WalManager::_get_base_wal_path(const std::string& wal_path_str) { diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h index 74adb8f16f0e9de..7bfb4a6453f3af2 100644 --- a/be/src/olap/wal_manager.h +++ b/be/src/olap/wal_manager.h @@ -31,6 +31,7 @@ #include "gen_cpp/FrontendService.h" #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/HeartbeatService_types.h" +#include "olap/wal_dirs_info.h" #include "olap/wal_reader.h" #include "olap/wal_table.h" #include "olap/wal_writer.h" @@ -44,18 +45,6 @@ class WalManager { ENABLE_FACTORY_CREATOR(WalManager); public: - struct WalDiskInfo { - WalDiskInfo(size_t limit, size_t used, size_t pre_allocated) - : limit(limit), used(used), pre_allocated(pre_allocated) {} - size_t available() const { - int64_t available = limit - used - pre_allocated; - return available > 0 ? available : 0; - } - size_t limit; - size_t used; - size_t pre_allocated; - }; - enum WAL_STATUS { PREPARE = 0, REPLAY, @@ -89,19 +78,17 @@ class WalManager { void erase_wal_column_index(int64_t wal_id); Status get_wal_column_index(int64_t wal_id, std::vector& column_index); - Status update_wal_disk_info_map(std::string wal_dir, size_t limit = -1, size_t used = -1, - size_t pre_allocated = -1, bool is_add_pre_allocated = true); + Status update_wal_disk_info(std::string wal_dir, size_t limit = -1, size_t used = -1, + size_t pre_allocated = -1, bool is_add_pre_allocated = true); Status get_wal_disk_available_size(const std::string& wal_dir, size_t* available_bytes); - bool is_wal_disk_space_enough(); - const std::string& get_min_disk_usage_wal_dir(); size_t get_max_available_size(); - const std::string& get_random_wal_dir(); private: Status _init_wal_dirs_conf(); Status _init_wal_dirs(); - Status _init_wal_disk_info(); + Status _init_wal_dirs_info(); std::string _get_base_wal_path(const std::string& wal_path_str); + const std::string& _get_available_random_wal_dir(); public: // used for be ut @@ -123,8 +110,6 @@ class WalManager { std::shared_mutex _wal_column_id_map_lock; std::unordered_map&> _wal_column_id_map; std::unique_ptr _thread_pool; - std::shared_mutex _wal_disk_info_lock; - // wal dir to wal disk info map - std::unordered_map> _wal_disk_info_map; + std::unique_ptr _wal_dirs_info; }; } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index 5c31abe5007238a..c710626f2caea8c 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -485,7 +485,7 @@ Status LoadBlockQueue::close_wal() { return Status::OK(); } -bool LoadBlockQueue::is_wal_disk_space_enough( +bool LoadBlockQueue::has_enough_wal_disk_space( const std::vector>& blocks, const TUniqueId& load_id, bool is_blocks_contain_all_load_data) { size_t blocks_size = 0; @@ -511,7 +511,7 @@ bool LoadBlockQueue::is_wal_disk_space_enough( } } if (pre_allocated < available_bytes) { - st = wal_mgr->update_wal_disk_info_map(wal_base_path, -1, -1, pre_allocated); + st = wal_mgr->update_wal_disk_info(wal_base_path, -1, -1, pre_allocated); if (!st.ok()) { LOG(WARNING) << "update wal disk info map failed, reason: " << st.to_string(); } diff --git a/be/src/runtime/group_commit_mgr.h b/be/src/runtime/group_commit_mgr.h index a267d7d7c6c25ab..35afcc46249b7aa 100644 --- a/be/src/runtime/group_commit_mgr.h +++ b/be/src/runtime/group_commit_mgr.h @@ -65,8 +65,8 @@ class LoadBlockQueue { WalManager* wal_manager, std::vector& slot_desc, int be_exe_version); Status close_wal(); - bool is_wal_disk_space_enough(const std::vector>& blocks, - const TUniqueId& load_id, bool is_blocks_contain_all_load_data); + bool has_enough_wal_disk_space(const std::vector>& blocks, + const TUniqueId& load_id, bool is_blocks_contain_all_load_data); static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000; UniqueId load_instance_id; diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index bc8f8db0e21ac15..3e38f9c42fc456a 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -242,7 +242,7 @@ Status GroupCommitBlockSink::_add_blocks(bool write_wal, bool is_blocks_contain_ _db_id, _table_id, _base_schema_version, load_id, _load_block_queue, _state->be_exec_version())); if (write_wal) { - _group_commit_mode = _load_block_queue->is_wal_disk_space_enough( + _group_commit_mode = _load_block_queue->has_enough_wal_disk_space( _blocks, load_id, is_blocks_contain_all_load_data) ? TGroupCommitMode::ASYNC_MODE : TGroupCommitMode::SYNC_MODE;