From daa8126e162dd62a4e4e0fe3985680207014836e Mon Sep 17 00:00:00 2001 From: QlQl <2458371920@qq.com> Date: Tue, 3 Sep 2024 11:28:41 +0800 Subject: [PATCH] fix: add a function for washing data after upgrading version from 4.0.0 to 4.0.1 (#2888) * add a function for washing data after upgrading version from 4.0.0 to 4.0.1 --- conf/pika.conf | 7 +++++- include/pika_conf.h | 7 ++++++ include/pika_db.h | 7 ++++++ src/pika.cc | 11 ++++++++++ src/pika_conf.cc | 2 ++ src/pika_db.cc | 27 ++++++++++++++++++++++++ src/storage/include/storage/storage.h | 5 +++++ src/storage/src/base_data_value_format.h | 4 +++- src/storage/src/redis.h | 2 ++ src/storage/src/storage.cc | 8 +++++++ 10 files changed, 78 insertions(+), 2 deletions(-) diff --git a/conf/pika.conf b/conf/pika.conf index bf6107bd18..28b95baee2 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -652,4 +652,9 @@ cache-lfu-decay-time: 1 # 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election' # which serves for the scenario of codis-pika cluster reelection # You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING] -internal-used-unfinished-full-sync : \ No newline at end of file +internal-used-unfinished-full-sync : + +# for wash data from 4.0.0 to 4.0.1 +# https://github.com/OpenAtomFoundation/pika/issues/2886 +# default value: true +wash-data: true diff --git a/include/pika_conf.h b/include/pika_conf.h index 6941738bcf..d147d67c40 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -332,6 +332,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return share_block_cache_; } + bool wash_data() { + std::shared_lock l(rwlock_); + return wash_data_; + } bool enable_partitioned_index_filters() { std::shared_lock l(rwlock_); return enable_partitioned_index_filters_; @@ -1069,6 +1073,9 @@ class PikaConf : public pstd::BaseConf { //Internal used metrics Persisted by pika.conf std::unordered_set internal_used_unfinished_full_sync_; + + // for wash data from 4.0.0 to 4.0.1 + bool wash_data_; }; #endif diff --git a/include/pika_db.h b/include/pika_db.h index c3d4fce211..5b77f78771 100644 --- a/include/pika_db.h +++ b/include/pika_db.h @@ -90,6 +90,13 @@ class DB : public std::enable_shared_from_this, public pstd::noncopyable { friend class PkClusterInfoCmd; friend class PikaServer; + /** + * When it is the first time for upgrading version from 4.0.0 to 4.0.1, you should call + * this function to wash data. true if successful, false otherwise. + * @see https://github.com/OpenAtomFoundation/pika/issues/2886 + */ + bool WashData(); + std::string GetDBName(); std::shared_ptr storage() const; void GetBgSaveMetaData(std::vector* fileNames, std::string* snapshot_uuid); diff --git a/src/pika.cc b/src/pika.cc index ca44ece971..a530e3fbda 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -232,6 +232,17 @@ int main(int argc, char* argv[]) { g_pika_conf.reset(); }; + // wash data if necessary + if (g_pika_conf->wash_data()) { + auto dbs = g_pika_server->GetDB(); + for (auto& kv : dbs) { + if (!kv.second->WashData()) { + LOG(FATAL) << "write batch error in WashData"; + return 1; + } + } + } + g_pika_rm->Start(); g_pika_server->Start(); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 98468835e8..327fcd3554 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -730,6 +730,8 @@ int PikaConf::Load() { rsync_timeout_ms_.store(tmp_rsync_timeout_ms); } + GetConfBool("wash-data", &wash_data_); + return ret; } diff --git a/src/pika_db.cc b/src/pika_db.cc index 884948d4c5..04ff0dc710 100644 --- a/src/pika_db.cc +++ b/src/pika_db.cc @@ -54,6 +54,33 @@ DB::~DB() { StopKeyScan(); } +bool DB::WashData() { + rocksdb::ReadOptions read_options; + rocksdb::Status s; + auto suffix_len = storage::ParsedBaseDataValue::GetkBaseDataValueSuffixLength(); + for (int i = 0; i < g_pika_conf->db_instance_num(); i++) { + rocksdb::WriteBatch batch; + auto handle = storage_->GetHashCFHandles(i)[1]; + auto db = storage_->GetDBByIndex(i); + auto it(db->NewIterator(read_options, handle)); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + std::string key = it->key().ToString(); + std::string value = it->value().ToString(); + if (value.size() < suffix_len) { + // need to wash + storage::BaseDataValue internal_value(value); + batch.Put(handle, key, internal_value.Encode()); + } + } + delete it; + s = db->Write(storage_->GetDefaultWriteOptions(i), &batch); + if (!s.ok()) { + return false; + } + } + return true; +} + std::string DB::GetDBName() { return db_name_; } void DB::BgSaveDB() { diff --git a/src/storage/include/storage/storage.h b/src/storage/include/storage/storage.h index c673b07030..f929fb2091 100644 --- a/src/storage/include/storage/storage.h +++ b/src/storage/include/storage/storage.h @@ -1096,6 +1096,11 @@ class Storage { const std::string& db_type, const std::unordered_map& options); void GetRocksDBInfo(std::string& info); + // get hash cf handle in insts_[idx] + std::vector GetHashCFHandles(const int idx); + // get DefaultWriteOptions in insts_[idx] + rocksdb::WriteOptions GetDefaultWriteOptions(const int idx) const; + private: std::vector> insts_; std::unique_ptr slot_indexer_; diff --git a/src/storage/src/base_data_value_format.h b/src/storage/src/base_data_value_format.h index ce118613a6..41648b11ef 100644 --- a/src/storage/src/base_data_value_format.h +++ b/src/storage/src/base_data_value_format.h @@ -98,11 +98,13 @@ class ParsedBaseDataValue : public ParsedInternalValue { } } + static size_t GetkBaseDataValueSuffixLength() { return kBaseDataValueSuffixLength; } + protected: virtual void SetVersionToValue() override {}; private: - const size_t kBaseDataValueSuffixLength = kSuffixReserveLength + kTimestampLength; + static const size_t kBaseDataValueSuffixLength = kSuffixReserveLength + kTimestampLength; }; } // namespace storage diff --git a/src/storage/src/redis.h b/src/storage/src/redis.h index 93a52e0406..8d23a43118 100644 --- a/src/storage/src/redis.h +++ b/src/storage/src/redis.h @@ -464,6 +464,8 @@ class Redis { inline Status SetFirstOrLastID(const rocksdb::Slice& key, StreamMetaValue& stream_meta, bool is_set_first, rocksdb::ReadOptions& read_options); +public: + inline rocksdb::WriteOptions GetDefaultWriteOptions() const { return default_write_options_; } private: int32_t index_ = 0; diff --git a/src/storage/src/storage.cc b/src/storage/src/storage.cc index 173ecfb976..006a6d00fb 100644 --- a/src/storage/src/storage.cc +++ b/src/storage/src/storage.cc @@ -89,6 +89,14 @@ static std::string AppendSubDirectory(const std::string& db_path, int index) { } } +std::vector Storage::GetHashCFHandles(const int idx) { + return insts_[idx]->GetHashCFHandles(); +} + +rocksdb::WriteOptions Storage::GetDefaultWriteOptions(const int idx) const { + return insts_[idx]->GetDefaultWriteOptions(); +} + Status Storage::Open(const StorageOptions& storage_options, const std::string& db_path) { mkpath(db_path.c_str(), 0755);