Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](memory) All LRU Cache inherit from LRUCachePolicy #28940

Merged
merged 11 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ DEFINE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes, "1000000

DEFINE_mInt32(cache_prune_stale_interval, "10");
// the clean interval of tablet lookup cache
DEFINE_mInt32(tablet_lookup_cache_clean_interval, "30");
DEFINE_mInt32(tablet_lookup_cache_stale_sweep_time_sec, "30");
DEFINE_mInt32(point_query_row_cache_stale_sweep_time_sec, "300");
DEFINE_mInt32(disk_stat_monitor_interval, "5");
DEFINE_mInt32(unused_rowset_monitor_interval, "30");
DEFINE_String(storage_root_path, "${DORIS_HOME}/storage");
Expand Down Expand Up @@ -809,6 +810,9 @@ DEFINE_mInt32(external_table_connect_timeout_sec, "30");

// Global bitmap cache capacity for aggregation cache, size in bytes
DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600");
DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800");

DEFINE_mInt32(common_obj_lru_cache_stale_sweep_time_sec, "900");

// s3 config
DEFINE_mInt32(max_remote_storage_count, "10");
Expand Down
7 changes: 6 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ DECLARE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes);
// the prune stale interval of all cache
DECLARE_mInt32(cache_prune_stale_interval);
// the clean interval of tablet lookup cache
DECLARE_mInt32(tablet_lookup_cache_clean_interval);
DECLARE_mInt32(tablet_lookup_cache_stale_sweep_time_sec);
DECLARE_mInt32(point_query_row_cache_stale_sweep_time_sec);
DECLARE_mInt32(disk_stat_monitor_interval);
DECLARE_mInt32(unused_rowset_monitor_interval);
DECLARE_String(storage_root_path);
Expand Down Expand Up @@ -863,6 +864,10 @@ DECLARE_mInt32(external_table_connect_timeout_sec);

// Global bitmap cache capacity for aggregation cache, size in bytes
DECLARE_Int64(delete_bitmap_agg_cache_capacity);
DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec);

// A common object cache depends on an Sharded LRU Cache.
DECLARE_mInt32(common_obj_lru_cache_stale_sweep_time_sec);

// s3 config
DECLARE_mInt32(max_remote_storage_count);
Expand Down
35 changes: 35 additions & 0 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,4 +667,39 @@ void ShardedLRUCache::update_cache_metrics() const {
total_lookup_count == 0 ? 0 : ((double)total_hit_count / total_lookup_count));
}

Cache::Handle* DummyLRUCache::insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority, size_t bytes) {
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
auto* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
e->value = value;
e->deleter = deleter;
e->charge = charge;
e->key_length = 0;
e->total_size = 0;
e->bytes = 0;
e->hash = 0;
e->refs = 1; // only one for the returned handle
e->next = e->prev = nullptr;
e->in_cache = false;
return reinterpret_cast<Cache::Handle*>(e);
}

void DummyLRUCache::release(Cache::Handle* handle) {
if (handle == nullptr) {
return;
}
auto* e = reinterpret_cast<LRUHandle*>(handle);
e->free();
}

void* DummyLRUCache::value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}

Slice DummyLRUCache::value_slice(Handle* handle) {
auto* lru_handle = reinterpret_cast<LRUHandle*>(handle);
return Slice((char*)lru_handle->value, lru_handle->charge);
xinyiZzz marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace doris
50 changes: 40 additions & 10 deletions be/src/olap/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ namespace doris {
} while (0)

class Cache;
class LRUCachePolicy;

enum LRUCacheType {
SIZE, // The capacity of cache is based on the size of cache entry.
NUMBER // The capacity of cache is based on the number of cache entry.
};

static constexpr LRUCacheType DEFAULT_LRU_CACHE_TYPE = LRUCacheType::SIZE;
static constexpr uint32_t DEFAULT_LRU_CACHE_NUM_SHARDS = 16;
static constexpr size_t DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY = 0;

class CacheKey {
public:
CacheKey() : _data(nullptr), _size(0) {}
Expand Down Expand Up @@ -263,8 +268,10 @@ struct LRUHandle {

void free() {
(*deleter)(key(), value);
THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker);
DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes);
if (bytes != 0) { // DummyLRUCache bytes always equal to 0
THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker);
DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes);
}
::free(this);
}
};
Expand Down Expand Up @@ -330,6 +337,7 @@ class LRUCache {
}

// Like Cache methods, but with an extra "hash" parameter.
// Must call release on the returned handle pointer.
Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
MemTrackerLimiter* tracker,
Expand Down Expand Up @@ -389,14 +397,6 @@ class LRUCache {

class ShardedLRUCache : public Cache {
public:
explicit ShardedLRUCache(const std::string& name, size_t total_capacity,
LRUCacheType type = LRUCacheType::SIZE, uint32_t num_shards = 16,
uint32_t element_count_capacity = 0);
explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type,
uint32_t num_shards,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp, uint32_t element_count_capacity = 0);
// TODO(fdy): 析构时清除所有cache元素
virtual ~ShardedLRUCache();
virtual Handle* insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
Expand All @@ -415,6 +415,16 @@ class ShardedLRUCache : public Cache {
size_t get_total_capacity() override { return _total_capacity; };

private:
// LRUCache can only be created and managed with LRUCachePolicy.
friend class LRUCachePolicy;

explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type,
uint32_t num_shards, uint32_t element_count_capacity);
explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type,
uint32_t num_shards,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp, uint32_t element_count_capacity);

void update_cache_metrics() const;

static std::string lru_cache_type_string(LRUCacheType type) {
Expand Down Expand Up @@ -456,4 +466,24 @@ class ShardedLRUCache : public Cache {
std::unique_ptr<bvar::PerSecond<bvar::Adder<uint64_t>>> _lookup_count_per_second;
};

// Compatible with ShardedLRUCache usage, but will not actually cache.
class DummyLRUCache : public Cache {
public:
// Must call release on the returned handle pointer.
Handle* insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority = CachePriority::NORMAL, size_t bytes = -1) override;
Handle* lookup(const CacheKey& key) override { return nullptr; };
void release(Handle* handle) override;
void erase(const CacheKey& key) override {};
void* value(Handle* handle) override;
Slice value_slice(Handle* handle) override;
uint64_t new_id() override { return 0; };
int64_t prune() override { return 0; };
int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) override { return 0; };
xinyiZzz marked this conversation as resolved.
Show resolved Hide resolved
int64_t mem_consumption() override { return 0; };
int64_t get_usage() override { return 0; };
size_t get_total_capacity() override { return 0; };
};

} // namespace doris
6 changes: 2 additions & 4 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta
} else {
CHECK(false) << "invalid index page cache percentage";
}
if (pk_index_cache_capacity > 0) {
_pk_index_page_cache =
std::make_unique<PKIndexPageCache>(pk_index_cache_capacity, num_shards);
}

_pk_index_page_cache = std::make_unique<PKIndexPageCache>(pk_index_cache_capacity, num_shards);
}

bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle,
Expand Down
25 changes: 6 additions & 19 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ class StoragePageCache {
void insert(const CacheKey& key, DataPage* data, PageCacheHandle* handle,
segment_v2::PageTypePB page_type, bool in_memory = false);

// Page cache available check.
// When percentage is set to 0 or 100, the index or data cache will not be allocated.
bool is_cache_available(segment_v2::PageTypePB page_type) {
return _get_page_cache(page_type) != nullptr;
}

private:
StoragePageCache();

Expand All @@ -177,26 +171,19 @@ class StoragePageCache {
Cache* _get_page_cache(segment_v2::PageTypePB page_type) {
switch (page_type) {
case segment_v2::DATA_PAGE: {
if (_data_page_cache) {
return _data_page_cache->get();
}
return nullptr;
return _data_page_cache->cache();
}
case segment_v2::INDEX_PAGE: {
if (_index_page_cache) {
return _index_page_cache->get();
}
return nullptr;
return _index_page_cache->cache();
}
case segment_v2::PRIMARY_KEY_INDEX_PAGE: {
if (_pk_index_page_cache) {
return _pk_index_page_cache->get();
}
return nullptr;
return _pk_index_page_cache->cache();
}
default:
return nullptr;
LOG(FATAL) << "get error type page cache";
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
};

Expand Down
52 changes: 20 additions & 32 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance(
return new InvertedIndexSearcherCache(capacity, num_shards);
}

InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE,
config::inverted_index_cache_stale_sweep_time_sec),
_mem_tracker(std::make_unique<MemTracker>("InvertedIndexSearcherCache")) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards) {
uint64_t fd_number = config::min_file_descriptor_number;
struct rlimit l;
int ret = getrlimit(RLIMIT_NOFILE, &l);
Expand All @@ -113,13 +109,11 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t
auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value;
return cache_value->last_visit_time;
};
_cache = std::unique_ptr<Cache>(
new ShardedLRUCache("InvertedIndexSearcherCache", capacity, LRUCacheType::SIZE,
num_shards, get_last_visit_time, true, open_searcher_limit));
_policy = std::make_unique<InvertedIndexSearcherCachePolicy>(
capacity, num_shards, open_searcher_limit, get_last_visit_time, true);
} else {
_cache = std::unique_ptr<Cache>(new ShardedLRUCache("InvertedIndexSearcherCache", capacity,
LRUCacheType::SIZE, num_shards,
open_searcher_limit));
_policy = std::make_unique<InvertedIndexSearcherCachePolicy>(capacity, num_shards,
open_searcher_limit);
}
}

Expand Down Expand Up @@ -208,8 +202,8 @@ Status InvertedIndexSearcherCache::get_index_searcher(
IndexCacheValuePtr cache_value = std::make_unique<InvertedIndexSearcherCache::CacheValue>();
cache_value->index_searcher = std::move(index_searcher);
cache_value->size = mem_tracker->consumption();
*cache_handle =
InvertedIndexCacheHandle(_cache.get(), _insert(cache_key, cache_value.release()));
*cache_handle = InvertedIndexCacheHandle(_policy->cache(),
_insert(cache_key, cache_value.release()));
} else {
cache_handle->index_searcher = std::move(index_searcher);
}
Expand Down Expand Up @@ -280,30 +274,27 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs,
cache_value->size = mem_tracker->consumption();
cache_value->last_visit_time = UnixMillis();
auto* lru_handle = _insert(cache_key, cache_value.release());
_cache->release(lru_handle);
_policy->cache()->release(lru_handle);
return Status::OK();
}

Status InvertedIndexSearcherCache::erase(const std::string& index_file_path) {
InvertedIndexSearcherCache::CacheKey cache_key(index_file_path);
_cache->erase(cache_key.index_file_path);
_policy->cache()->erase(cache_key.index_file_path);
return Status::OK();
}

int64_t InvertedIndexSearcherCache::mem_consumption() {
if (_cache) {
return _cache->mem_consumption();
}
return 0L;
return _policy->cache()->mem_consumption();
}

bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::CacheKey& key,
InvertedIndexCacheHandle* handle) {
auto* lru_handle = _cache->lookup(key.index_file_path);
auto* lru_handle = _policy->cache()->lookup(key.index_file_path);
if (lru_handle == nullptr) {
return false;
}
*handle = InvertedIndexCacheHandle(_cache.get(), lru_handle);
*handle = InvertedIndexCacheHandle(_policy->cache(), lru_handle);
return true;
}

Expand All @@ -314,20 +305,20 @@ Cache::Handle* InvertedIndexSearcherCache::_insert(const InvertedIndexSearcherCa
delete cache_value;
};

Cache::Handle* lru_handle =
_cache->insert(key.index_file_path, value, value->size, deleter, CachePriority::NORMAL);
Cache::Handle* lru_handle = _policy->cache()->insert(key.index_file_path, value, value->size,
deleter, CachePriority::NORMAL);
return lru_handle;
}

bool InvertedIndexQueryCache::lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle) {
if (key.encode().empty()) {
return false;
}
auto* lru_handle = _cache->lookup(key.encode());
auto* lru_handle = cache()->lookup(key.encode());
if (lru_handle == nullptr) {
return false;
}
*handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle);
*handle = InvertedIndexQueryCacheHandle(cache(), lru_handle);
return true;
}

Expand All @@ -346,16 +337,13 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin
return;
}

auto* lru_handle = _cache->insert(key.encode(), (void*)cache_value_ptr.release(),
bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL);
*handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle);
auto* lru_handle = cache()->insert(key.encode(), (void*)cache_value_ptr.release(),
bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL);
*handle = InvertedIndexQueryCacheHandle(cache(), lru_handle);
}

int64_t InvertedIndexQueryCache::mem_consumption() {
if (_cache) {
return _cache->mem_consumption();
}
return 0L;
return cache()->mem_consumption();
}

} // namespace doris::segment_v2
Loading
Loading