Skip to content

Commit

Permalink
only encode ttl on new hash object
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 committed Jul 29, 2024
1 parent 6a92b32 commit 6a84054
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 36 deletions.
6 changes: 3 additions & 3 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ rocksdb::Status Database::MDel(const std::vector<Slice> &keys, uint64_t *deleted

batch->Delete(metadata_cf_handle_, lock_keys[i]);

// if delete a hash object that all of fields expired,
// if delete a hash object that all of fields expired,
// so this hash object should be treated as empty and should not affect the deleted_cnt.
if (metadata.Type() == kRedisHash) {
HashMetadata hash_metadata(false);
Expand Down Expand Up @@ -314,7 +314,7 @@ rocksdb::Status Database::Keys(const std::string &prefix, std::vector<std::strin
if (stats) stats->n_expired++;
continue;
}
// if a hash object that all of fields was expired,
// if a hash object that all of fields was expired,
// so the key should not be returned.
if (metadata.Type() == kRedisHash) {
HashMetadata hash_metadata(false);
Expand Down Expand Up @@ -409,7 +409,7 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const

if (metadata.Expired()) continue;

// if a hash object that all of fields was expired,
// if a hash object that all of fields was expired,
// so the key should not be returned.
if (metadata.Type() == kRedisHash) {
HashMetadata hash_metadata(false);
Expand Down
53 changes: 21 additions & 32 deletions src/types/redis_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "redis_hash.h"

#include <rocksdb/status.h>
#include <sys/wait.h>

#include <algorithm>
#include <cctype>
Expand All @@ -46,15 +47,15 @@ rocksdb::Status Hash::Size(const Slice &user_key, uint64_t *size) {
HashMetadata metadata(false);
rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata);
if (!s.ok()) return s;
// if field expiration is disabled,
// if field expiration is disabled,
// the size field in metadata is the length of hash
if (!metadata.IsFieldExpirationEnabled()) {
*size = metadata.size;
return rocksdb::Status::OK();
}

// otherwise, we have to check each field to calc the length
LatestSnapShot ss(storage_);
LatestSnapShot ss(storage_);
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
Expand Down Expand Up @@ -101,6 +102,9 @@ rocksdb::Status Hash::IncrBy(const Slice &user_key, const Slice &field, int64_t
HashMetadata metadata;
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.IsNotFound()) {
metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL;
}

uint64_t expire = 0;
std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode();
Expand Down Expand Up @@ -156,6 +160,9 @@ rocksdb::Status Hash::IncrByFloat(const Slice &user_key, const Slice &field, dou
HashMetadata metadata;
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.IsNotFound()) {
metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL;
}

uint64_t expire = 0;
std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode();
Expand Down Expand Up @@ -294,6 +301,12 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const std::vector<FieldValue>
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;

// For avoid affect existing data,
// we only encode ttl of field on new hash object.
if (s.IsNotFound()) {
metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL;
}

int added = 0;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisHash);
Expand Down Expand Up @@ -502,6 +515,11 @@ rocksdb::Status Hash::ExpireFields(const Slice &user_key, uint64_t expire_ms, co
return rocksdb::Status::OK();
}

// we don't support encode ttl on existing hash object
if (!metadata.IsFieldExpirationEnabled()) {
return rocksdb::Status::NotSupported("can't expire fields on existing hash object");
}

rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
read_options.snapshot = ss.GetSnapShot();

Expand Down Expand Up @@ -565,35 +583,6 @@ rocksdb::Status Hash::ExpireFields(const Slice &user_key, uint64_t expire_ms, co
}
}

// convert rest field encoding
if (!metadata.IsFieldExpirationEnabled()) {
metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL;

std::unordered_set<std::string_view> field_set;
for (auto field : fields) {
if (!field_set.emplace(field.ToStringView()).second) {
continue;
}
}

std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();

rocksdb::Slice upper_bound(next_version_prefix_key);
read_options.iterate_upper_bound = &upper_bound;

auto iter = util::UniqueIterator(storage_, read_options);
for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) {
InternalKey sub_ikey(iter->key(), storage_->IsSlotIdEncoded());
auto value = iter->value().ToString();
if (field_set.find(sub_ikey.GetSubKey().ToStringView()) == field_set.end()) {
encodeExpireToValue(&value, 0);
batch->Put(sub_ikey.Encode(), value);
}
}
}

std::string bytes;
metadata.Encode(&bytes);
batch->Put(metadata_cf_handle_, ns_key, bytes);
Expand Down Expand Up @@ -747,7 +736,7 @@ bool Hash::ExistValidField(const Slice &ns_key, const HashMetadata &metadata) {
return true;
}

LatestSnapShot ss(storage_);
LatestSnapShot ss(storage_);
std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode();
std::string next_version_prefix_key =
InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode();
Expand Down
14 changes: 13 additions & 1 deletion tests/gocase/unit/type/hash/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,20 @@ var testHash = func(t *testing.T, enabledRESP3 string) {
rdb.Do(ctx, "HEXPIRE", "hfe-key", 1, "FIELDS", 2, "f1", "f2")
time.Sleep(1 * time.Second)

require.Equal(t, "none", rdb.Type(ctx, "hfe-key").Val())
// if a hash object than all of fields was expired,
// the hash object should be treated not exist.
require.Equal(t, int64(0), rdb.HLen(ctx, "hfe-key").Val())
require.Equal(t, "none", rdb.Type(ctx, "hfe-key").Val())
require.Equal(t, 0, rdb.Exists(ctx, "hfe-key").Val())
require.Equal(t, -2, rdb.TTL(ctx, "hfe-key").Val())
require.Equal(t, -2, rdb.PTTL(ctx, "hfe-key").Val())
require.Equal(t, -2, rdb.ExpireTime(ctx, "hfe-key").Val())
require.Equal(t, -2, rdb.PExpireTime(ctx, "hfe-key").Val())
require.Equal(t, 0, rdb.ExpireAt(ctx, "hfe-key", time.Now().Add(1*time.Second)).Val())
require.Equal(t, 0, rdb.PExpireAt(ctx, "hfe-key", time.Unix(time.Now().Unix()+1, 0)).Val())
require.Equal(t, 0, rdb.Copy(ctx, "hfe-key", "dst", 0, true).Val())
require.Equal(t, "", rdb.Dump(ctx, "hfe-key").Val())
require.Equal(t, 0, rdb.Del(ctx, "hfe-key").Val())
})

t.Run("HFE expected 0 if delete a expired field", func(t *testing.T) {
Expand Down

0 comments on commit 6a84054

Please sign in to comment.