Skip to content

Commit

Permalink
[Improve](cloud) support caching SchemaCloudDictionary in BE side
Browse files Browse the repository at this point in the history
/*
 * SchemaCloudDictionaryCache provides a local cache for SchemaCloudDictionary.
 *
 * Caching logic:
 *  - If the dictionary associated with a given key has not had any new columns added
 *    (determined by comparing the serialized data for consistency),
 *    the cached dictionary is directly used to update the dictionary list in the rowset meta
 *    (similar to the process_dictionary logic in write_schema_dict).
 *  - If new columns have been detected, the local cache is disregarded, and the updated
 *    dictionary should be fetched via the meta service.
 */

use SchemaCloudDictionaryCache in Backend to reduce the frequency of reading and converting SchemaCloudDictionary in MetaService
  • Loading branch information
eldenmoon committed Feb 8, 2025
1 parent e3cee61 commit 0cf64d0
Show file tree
Hide file tree
Showing 16 changed files with 683 additions and 12 deletions.
73 changes: 67 additions & 6 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
#include "cloud/schema_cloud_dictionary_cache.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -342,6 +343,8 @@ static std::string debug_info(const Request& req) {
req.tablet_id(), req.lock_id());
} else if constexpr (is_any_v<Request, GetDeleteBitmapRequest>) {
return fmt::format(" tablet_id={}", req.tablet_id());
} else if constexpr (is_any_v<Request, GetSchemaDictRequest>) {
return fmt::format(" index_id={}", req.index_id());
} else {
static_assert(!sizeof(Request));
}
Expand Down Expand Up @@ -473,10 +476,10 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
// backend side use schema dict
if (config::variant_use_cloud_schema_dict) {
req.set_schema_op(GetRowsetRequest::RETURN_DICT);
}
// backend side use schema dict in cache if enable cloud schema dict cache
req.set_schema_op(config::variant_use_cloud_schema_dict_cache
? GetRowsetRequest::NO_DICT
: GetRowsetRequest::RETURN_DICT);
VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();

stub->get_rowset(&cntl, &req, &resp, nullptr);
Expand Down Expand Up @@ -592,8 +595,23 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
existed_rowset->rowset_id().to_string() == cloud_rs_meta_pb.rowset_id_v2()) {
continue; // Same rowset, skip it
}
RowsetMetaPB meta_pb = cloud_rowset_meta_to_doris(
cloud_rs_meta_pb, resp.has_schema_dict() ? &resp.schema_dict() : nullptr);
RowsetMetaPB meta_pb;
// Check if the rowset meta contains a schema dictionary key list.
if (cloud_rs_meta_pb.has_schema_dict_key_list() && !resp.has_schema_dict()) {
// Use the locally cached dictionary.
RowsetMetaCloudPB copied_cloud_rs_meta_pb = cloud_rs_meta_pb;
CloudStorageEngine& engine =
ExecEnv::GetInstance()->storage_engine().to_cloud();
RETURN_IF_ERROR(
engine.get_schema_cloud_dictionary_cache().replace_dict_keys_to_schema(
cloud_rs_meta_pb.index_id(), &copied_cloud_rs_meta_pb));
meta_pb = cloud_rowset_meta_to_doris(copied_cloud_rs_meta_pb);
} else {
// Otherwise, use the schema dictionary from the response (if available).
meta_pb = cloud_rowset_meta_to_doris(
cloud_rs_meta_pb,
resp.has_schema_dict() ? &resp.schema_dict() : nullptr);
}
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->init_from_pb(meta_pb);
RowsetSharedPtr rowset;
Expand Down Expand Up @@ -835,6 +853,14 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,

RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(), std::move(rs_meta_pb));
// Replace schema dictionary keys based on the rowset's index ID to maintain schema consistency.
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
// if not enable dict cache, then directly return true to avoid refresh
bool replaced =
config::variant_use_cloud_schema_dict_cache
? engine.get_schema_cloud_dictionary_cache().replace_schema_to_dict_keys(
rs_meta_pb.index_id(), req.mutable_rowset_meta())
: true;
Status st = retry_rpc("commit rowset", req, &resp, &MetaService_Stub::commit_rowset);
if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
Expand All @@ -845,6 +871,13 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
}
return Status::AlreadyExist("failed to commit rowset: {}", resp.status().msg());
}
// If dictionary replacement fails, it may indicate that the local schema dictionary is outdated.
// Refreshing the dictionary here ensures that the rowset metadata is updated with the latest schema definitions,
// which is critical for maintaining consistency between the rowset and its corresponding schema.
if (!replaced) {
RETURN_IF_ERROR(
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
}
return st;
}

Expand Down Expand Up @@ -1354,5 +1387,33 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const RowsetMeta& rs_meta) {
return total_inverted_index_size;
}

Status CloudMetaMgr::get_schema_dict(int64_t index_id,
std::shared_ptr<SchemaCloudDictionary>* schema_dict) {
VLOG_DEBUG << "Sending GetSchemaDictRequest, index_id: " << index_id;

// Create the request and response objects.
GetSchemaDictRequest req;
GetSchemaDictResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_index_id(index_id);

// Invoke RPC via the retry_rpc helper function.
// It will call the MetaService_Stub::get_schema_dict method.
Status st = retry_rpc("get schema dict", req, &resp, &MetaService_Stub::get_schema_dict);
if (!st.ok()) {
return st;
}

// Optionally, additional checking of the response status can be done here.
// For example, if the returned status code indicates a parsing or not found error,
// you may return an error accordingly.

// Copy the retrieved schema dictionary from the response.
*schema_dict = std::make_shared<SchemaCloudDictionary>();
(*schema_dict)->Swap(resp.mutable_schema_dict());
VLOG_DEBUG << "Successfully obtained schema dict, index_id: " << index_id;
return Status::OK();
}

#include "common/compile_check_end.h"
} // namespace doris::cloud
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.
#pragma once

#include <gen_cpp/olap_file.pb.h>

#include <memory>
#include <string>
#include <tuple>
Expand Down Expand Up @@ -58,6 +60,8 @@ class CloudMetaMgr {

Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta);

Status get_schema_dict(int64_t index_id, std::shared_ptr<SchemaCloudDictionary>* schema_dict);

Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = false,
bool sync_delete_bitmap = true, bool full_sync = false);

Expand Down
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/cloud.pb.h>
#include <gen_cpp/olap_file.pb.h>
#include <rapidjson/document.h>
#include <rapidjson/encodings.h>
#include <rapidjson/prettywriter.h>
Expand All @@ -37,6 +38,7 @@
#include "cloud/cloud_txn_delete_bitmap_cache.h"
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "common/config.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "io/cache/file_cache_common.h"
Expand Down Expand Up @@ -190,6 +192,9 @@ Status CloudStorageEngine::open() {

_tablet_hotspot = std::make_unique<TabletHotspot>();

_schema_cloud_dictionary_cache =
std::make_unique<SchemaCloudDictionaryCache>(config::schema_dict_cache_capacity);

RETURN_NOT_OK_STATUS_WITH_WARN(
init_stream_load_recorder(ExecEnv::GetInstance()->store_paths()[0].path),
"init StreamLoadRecorder failed");
Expand Down
5 changes: 5 additions & 0 deletions be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
//#include "cloud/cloud_full_compaction.h"
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_tablet.h"
#include "cloud/schema_cloud_dictionary_cache.h"
#include "cloud_txn_delete_bitmap_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -69,6 +70,9 @@ class CloudStorageEngine final : public BaseStorageEngine {
CloudTabletMgr& tablet_mgr() const { return *_tablet_mgr; }

CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return *_txn_delete_bitmap_cache; }
SchemaCloudDictionaryCache& get_schema_cloud_dictionary_cache() {
return *_schema_cloud_dictionary_cache;
}
ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
return *_calc_tablet_delete_bitmap_task_thread_pool;
}
Expand Down Expand Up @@ -163,6 +167,7 @@ class CloudStorageEngine final : public BaseStorageEngine {
std::unique_ptr<CloudTabletMgr> _tablet_mgr;
std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
std::unique_ptr<SchemaCloudDictionaryCache> _schema_cloud_dictionary_cache;

// Components for cache warmup
std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader;
Expand Down
Loading

0 comments on commit 0cf64d0

Please sign in to comment.