From 0c81a99453bfe9de5ac717bd72dd978df7620bf4 Mon Sep 17 00:00:00 2001 From: zhengyu Date: Mon, 3 Feb 2025 00:40:28 +0800 Subject: [PATCH] [fix](cloud) shorten cache lock held time and add metrics (#47472) when update bvar metrics, we held block lock in the critical context of cache lock, make the later lock held too long and affect other cache logic. we use unsafe method to update the bvar to boost performance. some key metrics of lock and other meaningful metrics are also added for better monitoring cache time costs. --- be/src/common/config.cpp | 3 +- be/src/common/config.h | 3 +- be/src/io/cache/block_file_cache.cpp | 133 ++++++++++++---------- be/src/io/cache/block_file_cache.h | 59 +++++----- be/src/io/cache/file_block.cpp | 10 +- be/src/io/cache/file_block.h | 1 + be/src/io/cache/fs_file_cache_storage.cpp | 2 +- build.sh | 2 +- 8 files changed, 123 insertions(+), 90 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 1e7f0d12954122..14b055f185acbe 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1076,7 +1076,8 @@ DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true"); DEFINE_mBool(enbale_dump_error_file, "false"); // limit the max size of error log on disk DEFINE_mInt64(file_cache_error_log_limit_bytes, "209715200"); // 200MB -DEFINE_mInt64(cache_lock_long_tail_threshold, "1000"); +DEFINE_mInt64(cache_lock_wait_long_tail_threshold_us, "30000000"); +DEFINE_mInt64(cache_lock_held_long_tail_threshold_us, "30000000"); DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false"); DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 852b2bb4aabf90..0517227f1e5577 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1112,7 +1112,8 @@ DECLARE_Bool(enable_ttl_cache_evict_using_lru); DECLARE_mBool(enbale_dump_error_file); // limit the max size of error log on disk DECLARE_mInt64(file_cache_error_log_limit_bytes); -DECLARE_mInt64(cache_lock_long_tail_threshold); +DECLARE_mInt64(cache_lock_wait_long_tail_threshold_us); +DECLARE_mInt64(cache_lock_held_long_tail_threshold_us); // Base compaction may retrieve and produce some less frequently accessed data, // potentially affecting the file cache hit rate. // This configuration determines whether to retain the output within the file cache. diff --git a/be/src/io/cache/block_file_cache.cpp b/be/src/io/cache/block_file_cache.cpp index 59bb1becb5d5da..693cd49bb80aed 100644 --- a/be/src/io/cache/block_file_cache.cpp +++ b/be/src/io/cache/block_file_cache.cpp @@ -202,10 +202,16 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path, _disk_limit_mode_metrics = std::make_shared>( _cache_base_path.c_str(), "file_cache_disk_limit_mode", 0); - _storage_sync_remove_latency = std::make_shared( - _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_ns"); - _storage_async_remove_latency = std::make_shared( - _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_ns"); + _cache_lock_wait_time_us = std::make_shared( + _cache_base_path.c_str(), "file_cache_cache_lock_wait_time_us"); + _get_or_set_latency_us = std::make_shared( + _cache_base_path.c_str(), "file_cache_get_or_set_latency_us"); + _storage_sync_remove_latency_us = std::make_shared( + _cache_base_path.c_str(), "file_cache_storage_sync_remove_latency_us"); + _storage_retry_sync_remove_latency_us = std::make_shared( + _cache_base_path.c_str(), "file_cache_storage_retry_sync_remove_latency_us"); + _storage_async_remove_latency_us = std::make_shared( + _cache_base_path.c_str(), "file_cache_storage_async_remove_latency_us"); _disposable_queue = LRUQueue(cache_settings.disposable_queue_size, cache_settings.disposable_queue_elements, 60 * 60); @@ -259,7 +265,7 @@ FileCacheType BlockFileCache::string_to_cache_type(const std::string& str) { BlockFileCache::QueryFileCacheContextHolderPtr BlockFileCache::get_query_context_holder( const TUniqueId& query_id) { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); if (!config::enable_file_cache_query_limit) { return {}; } @@ -277,7 +283,7 @@ BlockFileCache::QueryFileCacheContextPtr BlockFileCache::get_query_context( } void BlockFileCache::remove_query_context(const TUniqueId& query_id) { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); const auto& query_iter = _query_map.find(query_id); if (query_iter != _query_map.end() && query_iter->second.use_count() <= 1) { @@ -322,7 +328,7 @@ void BlockFileCache::QueryFileCacheContext::reserve(const UInt128Wrapper& hash, } Status BlockFileCache::initialize() { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); return initialize_unlocked(cache_lock); } @@ -541,7 +547,7 @@ std::string BlockFileCache::clear_file_cache_async() { int64_t num_files_all = 0; TEST_SYNC_POINT_CALLBACK("BlockFileCache::clear_file_cache_async"); { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); std::vector deleting_cells; for (auto& [_, offset_to_cell] : _files) { @@ -699,35 +705,39 @@ FileBlocksHolder BlockFileCache::get_or_set(const UInt128Wrapper& hash, size_t o sw.start(); std::lock_guard cache_lock(_mutex); stats->lock_wait_timer += sw.elapsed_time(); - - if (auto iter = _key_to_time.find(hash); - context.cache_type == FileCacheType::INDEX && iter != _key_to_time.end()) { - context.cache_type = FileCacheType::TTL; - context.expiration_time = iter->second; - } - - /// Get all blocks which intersect with the given range. FileBlocks file_blocks; + int64_t duration; { - SCOPED_RAW_TIMER(&stats->get_timer); - file_blocks = get_impl(hash, context, range, cache_lock); - } + SCOPED_RAW_TIMER(&duration); + if (auto iter = _key_to_time.find(hash); + context.cache_type == FileCacheType::INDEX && iter != _key_to_time.end()) { + context.cache_type = FileCacheType::TTL; + context.expiration_time = iter->second; + } - if (file_blocks.empty()) { - SCOPED_RAW_TIMER(&stats->set_timer); - file_blocks = split_range_into_cells(hash, context, offset, size, FileBlock::State::EMPTY, - cache_lock); - } else { - SCOPED_RAW_TIMER(&stats->set_timer); - fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock); - } - DCHECK(!file_blocks.empty()); - *_num_read_blocks << file_blocks.size(); - for (auto& block : file_blocks) { - if (block->state() == FileBlock::State::DOWNLOADED) { - *_num_hit_blocks << 1; + /// Get all blocks which intersect with the given range. + { + SCOPED_RAW_TIMER(&stats->get_timer); + file_blocks = get_impl(hash, context, range, cache_lock); + } + + if (file_blocks.empty()) { + SCOPED_RAW_TIMER(&stats->set_timer); + file_blocks = split_range_into_cells(hash, context, offset, size, + FileBlock::State::EMPTY, cache_lock); + } else { + SCOPED_RAW_TIMER(&stats->set_timer); + fill_holes_with_empty_file_blocks(file_blocks, hash, context, range, cache_lock); + } + DCHECK(!file_blocks.empty()); + *_num_read_blocks << file_blocks.size(); + for (auto& block : file_blocks) { + if (block->state_unsafe() == FileBlock::State::DOWNLOADED) { + *_num_hit_blocks << 1; + } } } + *_get_or_set_latency_us << (duration / 1000); return FileBlocksHolder(std::move(file_blocks)); } @@ -781,7 +791,7 @@ BlockFileCache::FileBlockCell* BlockFileCache::add_cell(const UInt128Wrapper& ha } size_t BlockFileCache::try_release() { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); std::vector trash; for (auto& [hash, blocks] : _files) { for (auto& [offset, cell] : blocks) { @@ -1075,7 +1085,7 @@ bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, b // remove specific cache synchronously, for critical operations // if in use, cache meta will be deleted after use and the block file is then deleted asynchronously void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, true); if (!is_ttl_file) { auto iter = _files.find(file_key); @@ -1097,7 +1107,7 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) { // cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously // if in use, cache meta will be deleted after use and the block file is then deleted asynchronously void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, /*sync*/ false); if (!is_ttl_file) { auto iter = _files.find(file_key); @@ -1321,9 +1331,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo key.meta.expiration_time = expiration_time; if (sync) { int64_t duration_ns = 0; - SCOPED_RAW_TIMER(&duration_ns); - Status st = _storage->remove(key); - *_storage_sync_remove_latency << duration_ns; + Status st; + { + SCOPED_RAW_TIMER(&duration_ns); + st = _storage->remove(key); + } + *_storage_sync_remove_latency_us << (duration_ns / 1000); if (!st.ok()) { LOG_WARNING("").error(st); } @@ -1335,9 +1348,12 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo if (!ret) { LOG_WARNING("Failed to push recycle key to queue, do it synchronously"); int64_t duration_ns = 0; - SCOPED_RAW_TIMER(&duration_ns); - Status st = _storage->remove(key); - *_storage_sync_remove_latency << duration_ns; + Status st; + { + SCOPED_RAW_TIMER(&duration_ns); + st = _storage->remove(key); + } + *_storage_retry_sync_remove_latency_us << (duration_ns / 1000); if (!st.ok()) { LOG_WARNING("").error(st); } @@ -1360,7 +1376,7 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo } size_t BlockFileCache::get_used_cache_size(FileCacheType cache_type) const { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); return get_used_cache_size_unlocked(cache_type, cache_lock); } @@ -1370,7 +1386,7 @@ size_t BlockFileCache::get_used_cache_size_unlocked(FileCacheType cache_type, } size_t BlockFileCache::get_available_cache_size(FileCacheType cache_type) const { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); return get_available_cache_size_unlocked(cache_type, cache_lock); } @@ -1381,7 +1397,7 @@ size_t BlockFileCache::get_available_cache_size_unlocked( } size_t BlockFileCache::get_file_blocks_num(FileCacheType cache_type) const { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); return get_file_blocks_num_unlocked(cache_type, cache_lock); } @@ -1465,7 +1481,7 @@ std::string BlockFileCache::LRUQueue::to_string( } std::string BlockFileCache::dump_structure(const UInt128Wrapper& hash) { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); return dump_structure_unlocked(hash, cache_lock); } @@ -1483,7 +1499,7 @@ std::string BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash, } std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash, size_t offset) { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); return dump_single_cache_type_unlocked(hash, offset, cache_lock); } @@ -1546,7 +1562,7 @@ std::string BlockFileCache::reset_capacity(size_t new_capacity) { ss << "finish reset_capacity, path=" << _cache_base_path; auto start_time = steady_clock::time_point(); { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); if (new_capacity < _capacity && new_capacity < _cur_cache_size) { int64_t need_remove_size = _cur_cache_size - new_capacity; auto remove_blocks = [&](LRUQueue& queue) -> int64_t { @@ -1662,7 +1678,7 @@ void BlockFileCache::run_background_monitor() { } // report { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); _cur_cache_size_metrics->set_value(_cur_cache_size); _cur_ttl_cache_size_metrics->set_value(_cur_cache_size - _index_queue.get_capacity(cache_lock) - @@ -1712,7 +1728,7 @@ void BlockFileCache::run_background_ttl_gc() { // TODO(zhengyu): fix! } { int64_t cur_time = UnixSeconds(); - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); while (!_time_to_key.empty()) { auto begin = _time_to_key.begin(); if (cur_time < begin->first) { @@ -1743,9 +1759,12 @@ void BlockFileCache::run_background_gc() { } int64_t duration_ns = 0; - SCOPED_RAW_TIMER(&duration_ns); - Status st = _storage->remove(key); - *_storage_async_remove_latency << duration_ns; + Status st; + { + SCOPED_RAW_TIMER(&duration_ns); + st = _storage->remove(key); + } + *_storage_async_remove_latency_us << (duration_ns / 1000); if (!st.ok()) { LOG_WARNING("").error(st); @@ -1758,7 +1777,7 @@ void BlockFileCache::run_background_gc() { void BlockFileCache::modify_expiration_time(const UInt128Wrapper& hash, uint64_t new_expiration_time) { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); // 1. If new_expiration_time is equal to zero if (new_expiration_time == 0) { remove_if_ttl_file_blocks(hash, false, cache_lock, false); @@ -1818,7 +1837,7 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper& hash) const { int64_t cur_time = std::chrono::duration_cast( std::chrono::steady_clock::now().time_since_epoch()) .count(); - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); std::vector> blocks_meta; if (auto iter = _files.find(hash); iter != _files.end()) { for (auto& pair : _files.find(hash)->second) { @@ -1887,7 +1906,7 @@ std::string BlockFileCache::clear_file_cache_directly() { using namespace std::chrono; std::stringstream ss; auto start = steady_clock::now(); - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path); std::string clear_msg; @@ -1925,7 +1944,7 @@ std::string BlockFileCache::clear_file_cache_directly() { std::map BlockFileCache::get_blocks_by_key(const UInt128Wrapper& hash) { std::map offset_to_block; - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); if (_files.contains(hash)) { for (auto& [offset, cell] : _files[hash]) { if (cell.file_block->state() == FileBlock::State::DOWNLOADED) { @@ -1940,7 +1959,7 @@ std::map BlockFileCache::get_blocks_by_key(const UInt128W } void BlockFileCache::update_ttl_atime(const UInt128Wrapper& hash) { - SCOPED_CACHE_LOCK(_mutex); + SCOPED_CACHE_LOCK(_mutex, this); if (auto iter = _files.find(hash); iter != _files.end()) { for (auto& [_, cell] : iter->second) { cell.update_atime(); diff --git a/be/src/io/cache/block_file_cache.h b/be/src/io/cache/block_file_cache.h index f93a72cbc621f1..ce8d13d4a14e3b 100644 --- a/be/src/io/cache/block_file_cache.h +++ b/be/src/io/cache/block_file_cache.h @@ -29,26 +29,47 @@ #include "io/cache/file_block.h" #include "io/cache/file_cache_common.h" #include "io/cache/file_cache_storage.h" +#include "util/runtime_profile.h" #include "util/threadpool.h" namespace doris::io { using RecycleFileCacheKeys = moodycamel::ConcurrentQueue; + +class LockScopedTimer { +public: + LockScopedTimer() : start_(std::chrono::steady_clock::now()) {} + ~LockScopedTimer() { + auto end = std::chrono::steady_clock::now(); + auto duration_us = + std::chrono::duration_cast(end - start_).count(); + if (duration_us > config::cache_lock_held_long_tail_threshold_us) { + LOG(WARNING) << "Lock held time " << std::to_string(duration_us) << "us. " + << get_stack_trace(); + } + } + +private: + std::chrono::time_point start_; +}; + // Note: the cache_lock is scoped, so do not add do...while(0) here. #ifdef ENABLE_CACHE_LOCK_DEBUG -#define SCOPED_CACHE_LOCK(MUTEX) \ +#define SCOPED_CACHE_LOCK(MUTEX, cache) \ std::chrono::time_point start_time = \ std::chrono::steady_clock::now(); \ std::lock_guard cache_lock(MUTEX); \ std::chrono::time_point acq_time = \ std::chrono::steady_clock::now(); \ - auto duration = \ - std::chrono::duration_cast(acq_time - start_time).count(); \ - if (duration > config::cache_lock_long_tail_threshold) \ - LOG(WARNING) << "Lock wait time " << std::to_string(duration) << "ms. " \ - << get_stack_trace_by_boost() << std::endl; \ + auto duration_us = \ + std::chrono::duration_cast(acq_time - start_time).count(); \ + *(cache->_cache_lock_wait_time_us) << duration_us; \ + if (duration_us > config::cache_lock_wait_long_tail_threshold_us) { \ + LOG(WARNING) << "Lock wait time " << std::to_string(duration_us) << "us. " \ + << get_stack_trace() << std::endl; \ + } \ LockScopedTimer cache_lock_timer; #else -#define SCOPED_CACHE_LOCK(MUTEX) std::lock_guard cache_lock(MUTEX); +#define SCOPED_CACHE_LOCK(MUTEX, cache) std::lock_guard cache_lock(MUTEX); #endif template @@ -57,23 +78,6 @@ concept IsXLock = std::same_as> || class FSFileCacheStorage; -class LockScopedTimer { -public: - LockScopedTimer() : start_(std::chrono::steady_clock::now()) {} - - ~LockScopedTimer() { - auto end = std::chrono::steady_clock::now(); - auto duration = std::chrono::duration_cast(end - start_).count(); - if (duration > 500) { - LOG(WARNING) << "Lock held time " << std::to_string(duration) << "ms. " - << get_stack_trace_by_boost(); - } - } - -private: - std::chrono::time_point start_; -}; - // The BlockFileCache is responsible for the management of the blocks // The current strategies are lru and ttl. class BlockFileCache { @@ -533,8 +537,11 @@ class BlockFileCache { std::shared_ptr> _hit_ratio_1h; std::shared_ptr> _disk_limit_mode_metrics; - std::shared_ptr _storage_sync_remove_latency; - std::shared_ptr _storage_async_remove_latency; + std::shared_ptr _cache_lock_wait_time_us; + std::shared_ptr _get_or_set_latency_us; + std::shared_ptr _storage_sync_remove_latency_us; + std::shared_ptr _storage_retry_sync_remove_latency_us; + std::shared_ptr _storage_async_remove_latency_us; }; } // namespace doris::io diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp index 8c911ab8f24204..06f58730296fed 100644 --- a/be/src/io/cache/file_block.cpp +++ b/be/src/io/cache/file_block.cpp @@ -59,6 +59,10 @@ FileBlock::State FileBlock::state() const { return _download_state; } +FileBlock::State FileBlock::state_unsafe() const { + return _download_state; +} + uint64_t FileBlock::get_caller_id() { uint64_t id; #if defined(__APPLE__) @@ -144,7 +148,7 @@ Status FileBlock::append(Slice data) { Status FileBlock::finalize() { if (_downloaded_size != 0 && _downloaded_size != _block_range.size()) { - SCOPED_CACHE_LOCK(_mgr->_mutex); + SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); size_t old_size = _block_range.size(); _block_range.right = _block_range.left + _downloaded_size - 1; size_t new_size = _block_range.size(); @@ -179,7 +183,7 @@ Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType new_typ } Status FileBlock::change_cache_type_between_normal_and_index(FileCacheType new_type) { - SCOPED_CACHE_LOCK(_mgr->_mutex); + SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); std::lock_guard block_lock(_mutex); bool expr = (new_type != FileCacheType::TTL && _key.meta.type != FileCacheType::TTL); if (!expr) { @@ -293,7 +297,7 @@ FileBlocksHolder::~FileBlocksHolder() { } } if (should_remove) { - SCOPED_CACHE_LOCK(_mgr->_mutex); + SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); std::lock_guard block_lock(file_block->_mutex); if (file_block.use_count() == 2) { DCHECK(file_block->state_unlock(block_lock) != FileBlock::State::DOWNLOADING); diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h index 93c3841693d9ac..38c57ce9358b14 100644 --- a/be/src/io/cache/file_block.h +++ b/be/src/io/cache/file_block.h @@ -66,6 +66,7 @@ class FileBlock { ~FileBlock() = default; State state() const; + State state_unsafe() const; static std::string state_to_string(FileBlock::State state); diff --git a/be/src/io/cache/fs_file_cache_storage.cpp b/be/src/io/cache/fs_file_cache_storage.cpp index cf1cd41a537abc..ce02c2a101dd23 100644 --- a/be/src/io/cache/fs_file_cache_storage.cpp +++ b/be/src/io/cache/fs_file_cache_storage.cpp @@ -488,7 +488,7 @@ void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const std::vector batch_load_buffer; batch_load_buffer.reserve(scan_length); auto add_cell_batch_func = [&]() { - SCOPED_CACHE_LOCK(_mgr->_mutex); + SCOPED_CACHE_LOCK(_mgr->_mutex, _mgr); auto f = [&](const BatchLoadArgs& args) { // in async load mode, a cell may be added twice. diff --git a/build.sh b/build.sh index 514588ff795858..24ceed2baea726 100755 --- a/build.sh +++ b/build.sh @@ -443,7 +443,7 @@ if [[ -z "${ENABLE_INJECTION_POINT}" ]]; then fi if [[ -z "${ENABLE_CACHE_LOCK_DEBUG}" ]]; then - ENABLE_CACHE_LOCK_DEBUG='OFF' + ENABLE_CACHE_LOCK_DEBUG='ON' fi if [[ -z "${RECORD_COMPILER_SWITCHES}" ]]; then