Skip to content

Commit

Permalink
[opt](memory) All LRU Cache inherit from LRUCachePolicy (apache#28940)
Browse files Browse the repository at this point in the history
After all LRU Cache inherits from LRUCachePolicy, this will allow prune stale entry, eviction when memory exceeds limit, and define common properties. LRUCache constructor change to private, only allow LRUCachePolicy to construct it.

Impl DummyLRUCache, when LRU Cache capacity is 0, will no longer be meaningless insert and evict.
  • Loading branch information
xinyiZzz authored and HappenLee committed Jan 12, 2024
1 parent fd6bc60 commit e30c82a
Show file tree
Hide file tree
Showing 30 changed files with 367 additions and 207 deletions.
6 changes: 5 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ DEFINE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes, "1000000

DEFINE_mInt32(cache_prune_stale_interval, "10");
// the clean interval of tablet lookup cache
DEFINE_mInt32(tablet_lookup_cache_clean_interval, "30");
DEFINE_mInt32(tablet_lookup_cache_stale_sweep_time_sec, "30");
DEFINE_mInt32(point_query_row_cache_stale_sweep_time_sec, "300");
DEFINE_mInt32(disk_stat_monitor_interval, "5");
DEFINE_mInt32(unused_rowset_monitor_interval, "30");
DEFINE_String(storage_root_path, "${DORIS_HOME}/storage");
Expand Down Expand Up @@ -809,6 +810,9 @@ DEFINE_mInt32(external_table_connect_timeout_sec, "30");

// Global bitmap cache capacity for aggregation cache, size in bytes
DEFINE_Int64(delete_bitmap_agg_cache_capacity, "104857600");
DEFINE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec, "1800");

DEFINE_mInt32(common_obj_lru_cache_stale_sweep_time_sec, "900");

// s3 config
DEFINE_mInt32(max_remote_storage_count, "10");
Expand Down
7 changes: 6 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ DECLARE_mInt64(memory_limitation_per_thread_for_storage_migration_bytes);
// the prune stale interval of all cache
DECLARE_mInt32(cache_prune_stale_interval);
// the clean interval of tablet lookup cache
DECLARE_mInt32(tablet_lookup_cache_clean_interval);
DECLARE_mInt32(tablet_lookup_cache_stale_sweep_time_sec);
DECLARE_mInt32(point_query_row_cache_stale_sweep_time_sec);
DECLARE_mInt32(disk_stat_monitor_interval);
DECLARE_mInt32(unused_rowset_monitor_interval);
DECLARE_String(storage_root_path);
Expand Down Expand Up @@ -863,6 +864,10 @@ DECLARE_mInt32(external_table_connect_timeout_sec);

// Global bitmap cache capacity for aggregation cache, size in bytes
DECLARE_Int64(delete_bitmap_agg_cache_capacity);
DECLARE_mInt32(delete_bitmap_agg_cache_stale_sweep_time_sec);

// A common object cache depends on an Sharded LRU Cache.
DECLARE_mInt32(common_obj_lru_cache_stale_sweep_time_sec);

// s3 config
DECLARE_mInt32(max_remote_storage_count);
Expand Down
35 changes: 35 additions & 0 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,4 +667,39 @@ void ShardedLRUCache::update_cache_metrics() const {
total_lookup_count == 0 ? 0 : ((double)total_hit_count / total_lookup_count));
}

Cache::Handle* DummyLRUCache::insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority, size_t bytes) {
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
auto* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
e->value = value;
e->deleter = deleter;
e->charge = charge;
e->key_length = 0;
e->total_size = 0;
e->bytes = 0;
e->hash = 0;
e->refs = 1; // only one for the returned handle
e->next = e->prev = nullptr;
e->in_cache = false;
return reinterpret_cast<Cache::Handle*>(e);
}

void DummyLRUCache::release(Cache::Handle* handle) {
if (handle == nullptr) {
return;
}
auto* e = reinterpret_cast<LRUHandle*>(handle);
e->free();
}

void* DummyLRUCache::value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}

Slice DummyLRUCache::value_slice(Handle* handle) {
auto* lru_handle = reinterpret_cast<LRUHandle*>(handle);
return Slice((char*)lru_handle->value, lru_handle->charge);
}

} // namespace doris
50 changes: 40 additions & 10 deletions be/src/olap/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,17 @@ namespace doris {
} while (0)

class Cache;
class LRUCachePolicy;

enum LRUCacheType {
SIZE, // The capacity of cache is based on the size of cache entry.
NUMBER // The capacity of cache is based on the number of cache entry.
};

static constexpr LRUCacheType DEFAULT_LRU_CACHE_TYPE = LRUCacheType::SIZE;
static constexpr uint32_t DEFAULT_LRU_CACHE_NUM_SHARDS = 16;
static constexpr size_t DEFAULT_LRU_CACHE_ELEMENT_COUNT_CAPACITY = 0;

class CacheKey {
public:
CacheKey() : _data(nullptr), _size(0) {}
Expand Down Expand Up @@ -263,8 +268,10 @@ struct LRUHandle {

void free() {
(*deleter)(key(), value);
THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker);
DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes);
if (bytes != 0) { // DummyLRUCache bytes always equal to 0
THREAD_MEM_TRACKER_TRANSFER_FROM(bytes, mem_tracker);
DorisMetrics::instance()->lru_cache_memory_bytes->increment(-bytes);
}
::free(this);
}
};
Expand Down Expand Up @@ -330,6 +337,7 @@ class LRUCache {
}

// Like Cache methods, but with an extra "hash" parameter.
// Must call release on the returned handle pointer.
Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
MemTrackerLimiter* tracker,
Expand Down Expand Up @@ -389,14 +397,6 @@ class LRUCache {

class ShardedLRUCache : public Cache {
public:
explicit ShardedLRUCache(const std::string& name, size_t total_capacity,
LRUCacheType type = LRUCacheType::SIZE, uint32_t num_shards = 16,
uint32_t element_count_capacity = 0);
explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type,
uint32_t num_shards,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp, uint32_t element_count_capacity = 0);
// TODO(fdy): 析构时清除所有cache元素
virtual ~ShardedLRUCache();
virtual Handle* insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
Expand All @@ -415,6 +415,16 @@ class ShardedLRUCache : public Cache {
size_t get_total_capacity() override { return _total_capacity; };

private:
// LRUCache can only be created and managed with LRUCachePolicy.
friend class LRUCachePolicy;

explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type,
uint32_t num_shards, uint32_t element_count_capacity);
explicit ShardedLRUCache(const std::string& name, size_t total_capacity, LRUCacheType type,
uint32_t num_shards,
CacheValueTimeExtractor cache_value_time_extractor,
bool cache_value_check_timestamp, uint32_t element_count_capacity);

void update_cache_metrics() const;

static std::string lru_cache_type_string(LRUCacheType type) {
Expand Down Expand Up @@ -456,4 +466,24 @@ class ShardedLRUCache : public Cache {
std::unique_ptr<bvar::PerSecond<bvar::Adder<uint64_t>>> _lookup_count_per_second;
};

// Compatible with ShardedLRUCache usage, but will not actually cache.
class DummyLRUCache : public Cache {
public:
// Must call release on the returned handle pointer.
Handle* insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority = CachePriority::NORMAL, size_t bytes = -1) override;
Handle* lookup(const CacheKey& key) override { return nullptr; };
void release(Handle* handle) override;
void erase(const CacheKey& key) override {};
void* value(Handle* handle) override;
Slice value_slice(Handle* handle) override;
uint64_t new_id() override { return 0; };
int64_t prune() override { return 0; };
int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) override { return 0; };
int64_t mem_consumption() override { return 0; };
int64_t get_usage() override { return 0; };
size_t get_total_capacity() override { return 0; };
};

} // namespace doris
6 changes: 2 additions & 4 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta
} else {
CHECK(false) << "invalid index page cache percentage";
}
if (pk_index_cache_capacity > 0) {
_pk_index_page_cache =
std::make_unique<PKIndexPageCache>(pk_index_cache_capacity, num_shards);
}

_pk_index_page_cache = std::make_unique<PKIndexPageCache>(pk_index_cache_capacity, num_shards);
}

bool StoragePageCache::lookup(const CacheKey& key, PageCacheHandle* handle,
Expand Down
25 changes: 6 additions & 19 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ class StoragePageCache {
void insert(const CacheKey& key, DataPage* data, PageCacheHandle* handle,
segment_v2::PageTypePB page_type, bool in_memory = false);

// Page cache available check.
// When percentage is set to 0 or 100, the index or data cache will not be allocated.
bool is_cache_available(segment_v2::PageTypePB page_type) {
return _get_page_cache(page_type) != nullptr;
}

private:
StoragePageCache();

Expand All @@ -177,26 +171,19 @@ class StoragePageCache {
Cache* _get_page_cache(segment_v2::PageTypePB page_type) {
switch (page_type) {
case segment_v2::DATA_PAGE: {
if (_data_page_cache) {
return _data_page_cache->get();
}
return nullptr;
return _data_page_cache->cache();
}
case segment_v2::INDEX_PAGE: {
if (_index_page_cache) {
return _index_page_cache->get();
}
return nullptr;
return _index_page_cache->cache();
}
case segment_v2::PRIMARY_KEY_INDEX_PAGE: {
if (_pk_index_page_cache) {
return _pk_index_page_cache->get();
}
return nullptr;
return _pk_index_page_cache->cache();
}
default:
return nullptr;
LOG(FATAL) << "get error type page cache";
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
};

Expand Down
52 changes: 20 additions & 32 deletions be/src/olap/rowset/segment_v2/inverted_index_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ InvertedIndexSearcherCache* InvertedIndexSearcherCache::create_global_instance(
return new InvertedIndexSearcherCache(capacity, num_shards);
}

InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards)
: LRUCachePolicy(CachePolicy::CacheType::INVERTEDINDEX_SEARCHER_CACHE,
config::inverted_index_cache_stale_sweep_time_sec),
_mem_tracker(std::make_unique<MemTracker>("InvertedIndexSearcherCache")) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t num_shards) {
uint64_t fd_number = config::min_file_descriptor_number;
struct rlimit l;
int ret = getrlimit(RLIMIT_NOFILE, &l);
Expand All @@ -113,13 +109,11 @@ InvertedIndexSearcherCache::InvertedIndexSearcherCache(size_t capacity, uint32_t
auto* cache_value = (InvertedIndexSearcherCache::CacheValue*)value;
return cache_value->last_visit_time;
};
_cache = std::unique_ptr<Cache>(
new ShardedLRUCache("InvertedIndexSearcherCache", capacity, LRUCacheType::SIZE,
num_shards, get_last_visit_time, true, open_searcher_limit));
_policy = std::make_unique<InvertedIndexSearcherCachePolicy>(
capacity, num_shards, open_searcher_limit, get_last_visit_time, true);
} else {
_cache = std::unique_ptr<Cache>(new ShardedLRUCache("InvertedIndexSearcherCache", capacity,
LRUCacheType::SIZE, num_shards,
open_searcher_limit));
_policy = std::make_unique<InvertedIndexSearcherCachePolicy>(capacity, num_shards,
open_searcher_limit);
}
}

Expand Down Expand Up @@ -208,8 +202,8 @@ Status InvertedIndexSearcherCache::get_index_searcher(
IndexCacheValuePtr cache_value = std::make_unique<InvertedIndexSearcherCache::CacheValue>();
cache_value->index_searcher = std::move(index_searcher);
cache_value->size = mem_tracker->consumption();
*cache_handle =
InvertedIndexCacheHandle(_cache.get(), _insert(cache_key, cache_value.release()));
*cache_handle = InvertedIndexCacheHandle(_policy->cache(),
_insert(cache_key, cache_value.release()));
} else {
cache_handle->index_searcher = std::move(index_searcher);
}
Expand Down Expand Up @@ -280,30 +274,27 @@ Status InvertedIndexSearcherCache::insert(const io::FileSystemSPtr& fs,
cache_value->size = mem_tracker->consumption();
cache_value->last_visit_time = UnixMillis();
auto* lru_handle = _insert(cache_key, cache_value.release());
_cache->release(lru_handle);
_policy->cache()->release(lru_handle);
return Status::OK();
}

Status InvertedIndexSearcherCache::erase(const std::string& index_file_path) {
InvertedIndexSearcherCache::CacheKey cache_key(index_file_path);
_cache->erase(cache_key.index_file_path);
_policy->cache()->erase(cache_key.index_file_path);
return Status::OK();
}

int64_t InvertedIndexSearcherCache::mem_consumption() {
if (_cache) {
return _cache->mem_consumption();
}
return 0L;
return _policy->cache()->mem_consumption();
}

bool InvertedIndexSearcherCache::_lookup(const InvertedIndexSearcherCache::CacheKey& key,
InvertedIndexCacheHandle* handle) {
auto* lru_handle = _cache->lookup(key.index_file_path);
auto* lru_handle = _policy->cache()->lookup(key.index_file_path);
if (lru_handle == nullptr) {
return false;
}
*handle = InvertedIndexCacheHandle(_cache.get(), lru_handle);
*handle = InvertedIndexCacheHandle(_policy->cache(), lru_handle);
return true;
}

Expand All @@ -314,20 +305,20 @@ Cache::Handle* InvertedIndexSearcherCache::_insert(const InvertedIndexSearcherCa
delete cache_value;
};

Cache::Handle* lru_handle =
_cache->insert(key.index_file_path, value, value->size, deleter, CachePriority::NORMAL);
Cache::Handle* lru_handle = _policy->cache()->insert(key.index_file_path, value, value->size,
deleter, CachePriority::NORMAL);
return lru_handle;
}

bool InvertedIndexQueryCache::lookup(const CacheKey& key, InvertedIndexQueryCacheHandle* handle) {
if (key.encode().empty()) {
return false;
}
auto* lru_handle = _cache->lookup(key.encode());
auto* lru_handle = cache()->lookup(key.encode());
if (lru_handle == nullptr) {
return false;
}
*handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle);
*handle = InvertedIndexQueryCacheHandle(cache(), lru_handle);
return true;
}

Expand All @@ -346,16 +337,13 @@ void InvertedIndexQueryCache::insert(const CacheKey& key, std::shared_ptr<roarin
return;
}

auto* lru_handle = _cache->insert(key.encode(), (void*)cache_value_ptr.release(),
bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL);
*handle = InvertedIndexQueryCacheHandle(_cache.get(), lru_handle);
auto* lru_handle = cache()->insert(key.encode(), (void*)cache_value_ptr.release(),
bitmap->getSizeInBytes(), deleter, CachePriority::NORMAL);
*handle = InvertedIndexQueryCacheHandle(cache(), lru_handle);
}

int64_t InvertedIndexQueryCache::mem_consumption() {
if (_cache) {
return _cache->mem_consumption();
}
return 0L;
return cache()->mem_consumption();
}

} // namespace doris::segment_v2
Loading

0 comments on commit e30c82a

Please sign in to comment.