Skip to content

Commit

Permalink
fix: add a function for washing data after upgrading version from 4.0…
Browse files Browse the repository at this point in the history
….0 to 4.0.1 (OpenAtomFoundation#2888)

* add a function for washing data after upgrading version from 4.0.0 to 4.0.1
  • Loading branch information
QlQlqiqi authored Sep 3, 2024
1 parent 4bfb5e7 commit daa8126
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 2 deletions.
7 changes: 6 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -652,4 +652,9 @@ cache-lfu-decay-time: 1
# 'internal-used-unfinished-full-sync' is used to generate a metric 'is_eligible_for_master_election'
# which serves for the scenario of codis-pika cluster reelection
# You'd better [DO NOT MODIFY IT UNLESS YOU KNOW WHAT YOU ARE DOING]
internal-used-unfinished-full-sync :
internal-used-unfinished-full-sync :

# for wash data from 4.0.0 to 4.0.1
# https://github.com/OpenAtomFoundation/pika/issues/2886
# default value: true
wash-data: true
7 changes: 7 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return share_block_cache_;
}
bool wash_data() {
std::shared_lock l(rwlock_);
return wash_data_;
}
bool enable_partitioned_index_filters() {
std::shared_lock l(rwlock_);
return enable_partitioned_index_filters_;
Expand Down Expand Up @@ -1069,6 +1073,9 @@ class PikaConf : public pstd::BaseConf {

//Internal used metrics Persisted by pika.conf
std::unordered_set<std::string> internal_used_unfinished_full_sync_;

// for wash data from 4.0.0 to 4.0.1
bool wash_data_;
};

#endif
7 changes: 7 additions & 0 deletions include/pika_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ class DB : public std::enable_shared_from_this<DB>, public pstd::noncopyable {
friend class PkClusterInfoCmd;
friend class PikaServer;

/**
* When it is the first time for upgrading version from 4.0.0 to 4.0.1, you should call
* this function to wash data. true if successful, false otherwise.
* @see https://github.com/OpenAtomFoundation/pika/issues/2886
*/
bool WashData();

std::string GetDBName();
std::shared_ptr<storage::Storage> storage() const;
void GetBgSaveMetaData(std::vector<std::string>* fileNames, std::string* snapshot_uuid);
Expand Down
11 changes: 11 additions & 0 deletions src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ int main(int argc, char* argv[]) {
g_pika_conf.reset();
};

// wash data if necessary
if (g_pika_conf->wash_data()) {
auto dbs = g_pika_server->GetDB();
for (auto& kv : dbs) {
if (!kv.second->WashData()) {
LOG(FATAL) << "write batch error in WashData";
return 1;
}
}
}

g_pika_rm->Start();
g_pika_server->Start();

Expand Down
2 changes: 2 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,8 @@ int PikaConf::Load() {
rsync_timeout_ms_.store(tmp_rsync_timeout_ms);
}

GetConfBool("wash-data", &wash_data_);

return ret;
}

Expand Down
27 changes: 27 additions & 0 deletions src/pika_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,33 @@ DB::~DB() {
StopKeyScan();
}

bool DB::WashData() {
rocksdb::ReadOptions read_options;
rocksdb::Status s;
auto suffix_len = storage::ParsedBaseDataValue::GetkBaseDataValueSuffixLength();
for (int i = 0; i < g_pika_conf->db_instance_num(); i++) {
rocksdb::WriteBatch batch;
auto handle = storage_->GetHashCFHandles(i)[1];
auto db = storage_->GetDBByIndex(i);
auto it(db->NewIterator(read_options, handle));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
std::string key = it->key().ToString();
std::string value = it->value().ToString();
if (value.size() < suffix_len) {
// need to wash
storage::BaseDataValue internal_value(value);
batch.Put(handle, key, internal_value.Encode());
}
}
delete it;
s = db->Write(storage_->GetDefaultWriteOptions(i), &batch);
if (!s.ok()) {
return false;
}
}
return true;
}

std::string DB::GetDBName() { return db_name_; }

void DB::BgSaveDB() {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,11 @@ class Storage {
const std::string& db_type, const std::unordered_map<std::string, std::string>& options);
void GetRocksDBInfo(std::string& info);

// get hash cf handle in insts_[idx]
std::vector<rocksdb::ColumnFamilyHandle*> GetHashCFHandles(const int idx);
// get DefaultWriteOptions in insts_[idx]
rocksdb::WriteOptions GetDefaultWriteOptions(const int idx) const;

private:
std::vector<std::unique_ptr<Redis>> insts_;
std::unique_ptr<SlotIndexer> slot_indexer_;
Expand Down
4 changes: 3 additions & 1 deletion src/storage/src/base_data_value_format.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ class ParsedBaseDataValue : public ParsedInternalValue {
}
}

static size_t GetkBaseDataValueSuffixLength() { return kBaseDataValueSuffixLength; }

protected:
virtual void SetVersionToValue() override {};

private:
const size_t kBaseDataValueSuffixLength = kSuffixReserveLength + kTimestampLength;
static const size_t kBaseDataValueSuffixLength = kSuffixReserveLength + kTimestampLength;
};

} // namespace storage
Expand Down
2 changes: 2 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ class Redis {
inline Status SetFirstOrLastID(const rocksdb::Slice& key, StreamMetaValue& stream_meta, bool is_set_first,
rocksdb::ReadOptions& read_options);

public:
inline rocksdb::WriteOptions GetDefaultWriteOptions() const { return default_write_options_; }

private:
int32_t index_ = 0;
Expand Down
8 changes: 8 additions & 0 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ static std::string AppendSubDirectory(const std::string& db_path, int index) {
}
}

std::vector<rocksdb::ColumnFamilyHandle*> Storage::GetHashCFHandles(const int idx) {
return insts_[idx]->GetHashCFHandles();
}

rocksdb::WriteOptions Storage::GetDefaultWriteOptions(const int idx) const {
return insts_[idx]->GetDefaultWriteOptions();
}

Status Storage::Open(const StorageOptions& storage_options, const std::string& db_path) {
mkpath(db_path.c_str(), 0755);

Expand Down

0 comments on commit daa8126

Please sign in to comment.