From c16d8645f180047868083602b441274230ae8468 Mon Sep 17 00:00:00 2001 From: jjz921024 <470623352@qq.com> Date: Sun, 7 Jul 2024 23:05:50 +0800 Subject: [PATCH] Support hash field expiration --- src/commands/cmd_hash.cc | 244 ++++++++++++++++++++++++++++- src/storage/redis_db.cc | 14 +- src/storage/redis_metadata.cc | 4 + src/storage/redis_metadata.h | 3 + src/types/redis_hash.cc | 283 +++++++++++++++++++++++++++++++--- src/types/redis_hash.h | 8 + 6 files changed, 534 insertions(+), 22 deletions(-) diff --git a/src/commands/cmd_hash.cc b/src/commands/cmd_hash.cc index 677f131eb98..be3b56259d1 100644 --- a/src/commands/cmd_hash.cc +++ b/src/commands/cmd_hash.cc @@ -23,6 +23,7 @@ #include "error_constants.h" #include "scan_base.h" #include "server/server.h" +#include "time_util.h" #include "types/redis_hash.h" namespace redis { @@ -429,6 +430,238 @@ class CommandHRandField : public Commander { bool no_parameters_ = true; }; + + +class CommandFiledExpireBase : public Commander { + protected: + Status commonParse(const std::vector &args, int start_idx) { + int idx = start_idx; + CommandParser parser(args, idx); + std::string_view expire_flag, num_flag; + uint64_t fields_num = 0; + while (parser.Good()) { + if (parser.EatEqICaseFlag("FIELDS", num_flag)) { + fields_num = GET_OR_RET(parser.template TakeInt()); + idx += 2; + break; + } else if (parser.EatEqICaseFlag("NX", expire_flag)) { + idx += 1; + field_expire_type_ = HashFieldExpireType::NX; + } else if (parser.EatEqICaseFlag("XX", expire_flag)) { + idx += 1; + field_expire_type_ = HashFieldExpireType::XX; + } else if (parser.EatEqICaseFlag("GT", expire_flag)) { + idx += 1; + field_expire_type_ = HashFieldExpireType::GT; + } else if (parser.EatEqICaseFlag("LT", expire_flag)) { + idx += 1; + field_expire_type_ = HashFieldExpireType::LT; + } else { + // TODO: 更明确的错误信息 + return parser.InvalidSyntax(); + } + } + + if (args.size() != idx + fields_num) { + return { Status::RedisParseErr, errWrongNumOfArguments }; + } + + for (size_t i = idx; i < args.size(); i++) { + fields_.emplace_back(args_[i]); + } + + return Status::OK(); + } + + Status expireFieldExecute(Server *srv, Connection *conn, std::string *output) { + std::vector ret; + redis::Hash hash_db(srv->storage, conn->GetNamespace()); + auto s = hash_db.ExpireFields(args_[1], expire_, fields_, field_expire_type_, &ret); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + *output = redis::MultiLen(ret.size()); + for (const auto i : ret) { + output->append(redis::Integer(i)); + } + + return Status::OK(); + } + + Status ttlExpireExecute(Server *srv, Connection *conn, std::vector &ret) { + redis::Hash hash_db(srv->storage, conn->GetNamespace()); + auto s = hash_db.TTLFields(args_[1], fields_, &ret); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + return Status::OK(); + } + + uint64_t expire_ = 0; + HashFieldExpireType field_expire_type_ = HashFieldExpireType::None; + std::vector fields_; +}; + + +class CommandHExpire : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) return { Status::RedisParseErr, errValueNotInteger }; + + expire_ = *parse_result * 1000 + util::GetTimeStampMS(); + return CommandFiledExpireBase::commonParse(args, 3); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + return expireFieldExecute(srv, conn, output); + } +}; + +class CommandHExpireAt : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) return { Status::RedisParseErr, errValueNotInteger }; + + expire_ = *parse_result * 1000; + return CommandFiledExpireBase::commonParse(args, 3); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + return expireFieldExecute(srv, conn, output); + } +}; + +class CommandHPExpire : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) return { Status::RedisParseErr, errValueNotInteger }; + + expire_ = *parse_result + util::GetTimeStampMS(); + return CommandFiledExpireBase::commonParse(args, 3); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + return expireFieldExecute(srv, conn, output); + } +}; + +class CommandHPExpireAt : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override { + auto parse_result = ParseInt(args[2], 10); + if (!parse_result) return { Status::RedisParseErr, errValueNotInteger }; + + expire_ = *parse_result; + return CommandFiledExpireBase::commonParse(args, 3); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + return expireFieldExecute(srv, conn, output); + } +}; + + +class CommandHExpireTime : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override { + return CommandFiledExpireBase::commonParse(args, 2); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + auto s = ttlExpireExecute(srv, conn, ret); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + auto now = util::GetTimeStampMS(); + *output = redis::MultiLen(ret.size()); + for (const auto ttl : ret) { + uint64_t expire = ttl; + if (ttl > 0) { + expire = (now + expire) / 1000; + } + output->append(redis::Integer(expire)); + } + return Status::OK(); + } +}; + +class CommandHPExpireTime : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override { + return CommandFiledExpireBase::commonParse(args, 2); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + auto s = ttlExpireExecute(srv, conn, ret); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + auto now = util::GetTimeStampMS(); + *output = redis::MultiLen(ret.size()); + for (const auto ttl : ret) { + uint64_t expire = ttl; + if (ttl > 0) { + expire = now + expire; + } + output->append(redis::Integer(expire)); + } + return Status::OK(); + } +}; + +class CommandHTTL : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override { + return CommandFiledExpireBase::commonParse(args, 2); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + auto s = ttlExpireExecute(srv, conn, ret); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + *output = redis::MultiLen(ret.size()); + for (const auto ttl : ret) { + output->append(redis::Integer(ttl > 0 ? ttl / 1000 : ttl)); + } + return Status::OK(); + } +}; + +class CommandHPTTL : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override { + return CommandFiledExpireBase::commonParse(args, 2); + } + + Status Execute(Server *srv, Connection *conn, std::string *output) override { + std::vector ret; + auto s = ttlExpireExecute(srv, conn, ret); + if (!s.IsOK()) { + return {Status::RedisExecErr, s.Msg()}; + } + *output = redis::MultiLen(ret.size()); + for (const auto ttl : ret) { + output->append(redis::Integer(ttl)); + } + return Status::OK(); + } +}; + + +class CommandHPersist : public CommandFiledExpireBase { + public: + Status Parse(const std::vector &args) override {} + Status Execute(Server *srv, Connection *conn, std::string *output) override {} +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("hget", 3, "read-only", 1, 1, 1), MakeCmdAttr("hincrby", 4, "write", 1, 1, 1), MakeCmdAttr("hincrbyfloat", 4, "write", 1, 1, 1), @@ -445,6 +678,15 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr("hget", 3, "read-only", 1, 1, 1 MakeCmdAttr("hgetall", 2, "read-only", 1, 1, 1), MakeCmdAttr("hscan", -3, "read-only", 1, 1, 1), MakeCmdAttr("hrangebylex", -4, "read-only", 1, 1, 1), - MakeCmdAttr("hrandfield", -2, "read-only", 1, 1, 1), ) + MakeCmdAttr("hrandfield", -2, "read-only", 1, 1, 1), + MakeCmdAttr("hexpire", -6, "write", 1, 1, 1), + MakeCmdAttr("hexpireat", -6, "write", 1, 1, 1), + MakeCmdAttr("hexpiretime", -5, "read-only", 1, 1, 1), + MakeCmdAttr("hpexpire", -6, "write", 1, 1, 1), + MakeCmdAttr("hpexpireat", -6, "write", 1, 1, 1), + MakeCmdAttr("hpexpiretime", -5, "read-only", 1, 1, 1), + MakeCmdAttr("hpersist", -5, "write", 1, 1, 1), + MakeCmdAttr("httl", -5, "read-only", 1, 1, 1), + MakeCmdAttr("hpttl", -5, "read-only", 1, 1, 1), ) } // namespace redis diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 4f08490bd66..e504a3439a8 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -568,6 +568,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const } else { start_key = match_prefix_key; } + auto now = util::GetTimeStampMS(); for (iter->Seek(start_key); iter->Valid(); iter->Next()) { if (!cursor.empty() && iter->key() == start_key) { // if cursor is not empty, then we need to skip start_key @@ -578,9 +579,20 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const break; } InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); + auto value = iter->value().ToString(); + // filter expired hash feild + if (type == kRedisHash && (static_cast(&metadata))->IsEncodedFieldExpire()) { + uint64_t expire = 0; + rocksdb::Slice data(value.data(), value.size()); + GetFixed64(&data, &expire); + value = data.ToString(); + if (expire != 0 && expire <= now) { + continue; + } + } keys->emplace_back(ikey.GetSubKey().ToString()); if (values != nullptr) { - values->emplace_back(iter->value().ToString()); + values->emplace_back(value); } cnt++; if (limit > 0 && cnt >= limit) { diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index e44b39cad7c..8b368cba0ac 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -334,6 +334,10 @@ bool Metadata::IsEmptyableType() const { bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); } +bool HashMetadata::IsEncodedFieldExpire() const { + return flags & METADATA_HASH_FIELD_EXPIRE_MASK; +} + ListMetadata::ListMetadata(bool generate_version) : Metadata(kRedisList, generate_version), head(UINT64_MAX / 2), tail(head) {} diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 68f36b2c994..fa3da18cc55 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -134,6 +134,7 @@ class InternalKey { }; constexpr uint8_t METADATA_64BIT_ENCODING_MASK = 0x80; +constexpr uint8_t METADATA_HASH_FIELD_EXPIRE_MASK = 0x40; constexpr uint8_t METADATA_TYPE_MASK = 0x0f; class Metadata { @@ -203,6 +204,8 @@ class Metadata { class HashMetadata : public Metadata { public: explicit HashMetadata(bool generate_version = true) : Metadata(kRedisHash, generate_version) {} + + bool IsEncodedFieldExpire() const; }; class SetMetadata : public Metadata { diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc index c4d60685934..d424fee646a 100644 --- a/src/types/redis_hash.cc +++ b/src/types/redis_hash.cc @@ -31,6 +31,7 @@ #include "db_util.h" #include "parse_util.h" #include "sample_helper.h" +#include "time_util.h" namespace redis { @@ -45,7 +46,13 @@ rocksdb::Status Hash::Size(const Slice &user_key, uint64_t *size) { HashMetadata metadata(false); rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata); if (!s.ok()) return s; - *size = metadata.size; + if (!metadata.IsEncodedFieldExpire()) { + *size = metadata.size; + } else { + std::vector field_values; + GetAll(user_key, &field_values, HashFetchType::kOnlyKey); + *size = field_values.size(); + } return rocksdb::Status::OK(); } @@ -58,7 +65,10 @@ rocksdb::Status Hash::Get(const Slice &user_key, const Slice &field, std::string rocksdb::ReadOptions read_options; read_options.snapshot = ss.GetSnapShot(); std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); - return storage_->Get(read_options, sub_key, value); + s = storage_->Get(read_options, sub_key, value); + if (!s.ok()) return s; + uint64_t expire = 0; + return decodeFieldValue(metadata, value, expire); } rocksdb::Status Hash::IncrBy(const Slice &user_key, const Slice &field, int64_t increment, int64_t *new_value) { @@ -72,12 +82,13 @@ rocksdb::Status Hash::IncrBy(const Slice &user_key, const Slice &field, int64_t rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; + uint64_t expire = 0; std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); if (s.ok()) { std::string value_bytes; s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value_bytes); - if (!s.ok() && !s.IsNotFound()) return s; - if (s.ok()) { + if (!s.ok() && !s.IsNotFound()) return s; + if (s.ok() && decodeFieldValue(metadata, &value_bytes, expire).ok()) { auto parse_result = ParseInt(value_bytes, 10); if (!parse_result) { return rocksdb::Status::InvalidArgument(parse_result.Msg()); @@ -87,6 +98,9 @@ rocksdb::Status Hash::IncrBy(const Slice &user_key, const Slice &field, int64_t } old_value = *parse_result; exists = true; + } else { + // reset expiratime + expire = 0; } } if ((increment < 0 && old_value < 0 && increment < (LLONG_MIN - old_value)) || @@ -98,7 +112,11 @@ rocksdb::Status Hash::IncrBy(const Slice &user_key, const Slice &field, int64_t auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisHash); batch->PutLogData(log_data.Encode()); - batch->Put(sub_key, std::to_string(*new_value)); + auto value_str = std::to_string(*new_value); + if (metadata.IsEncodedFieldExpire()) { + encodeValueExpire(&value_str, expire); + } + batch->Put(sub_key, value_str); if (!exists) { metadata.size += 1; std::string bytes; @@ -119,18 +137,21 @@ rocksdb::Status Hash::IncrByFloat(const Slice &user_key, const Slice &field, dou rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata); if (!s.ok() && !s.IsNotFound()) return s; + uint64_t expire = 0; std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); if (s.ok()) { std::string value_bytes; s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value_bytes); - if (!s.ok() && !s.IsNotFound()) return s; - if (s.ok()) { + if (!s.ok() && !s.IsNotFound()) return s; + if (s.ok() && decodeFieldValue(metadata, &value_bytes, expire).ok()) { auto value_stat = ParseFloat(value_bytes); if (!value_stat || isspace(value_bytes[0])) { return rocksdb::Status::InvalidArgument("value is not a number"); } old_value = *value_stat; exists = true; + } else { + expire = 0; } } double n = old_value + increment; @@ -142,7 +163,11 @@ rocksdb::Status Hash::IncrByFloat(const Slice &user_key, const Slice &field, dou auto batch = storage_->GetWriteBatchBase(); WriteBatchLogData log_data(kRedisHash); batch->PutLogData(log_data.Encode()); - batch->Put(sub_key, std::to_string(*new_value)); + auto value_str = std::to_string(*new_value); + if (metadata.IsEncodedFieldExpire()) { + encodeValueExpire(&value_str, expire); + } + batch->Put(sub_key, value_str); if (!exists) { metadata.size += 1; std::string bytes; @@ -184,10 +209,18 @@ rocksdb::Status Hash::MGet(const Slice &user_key, const std::vector &fiel statuses_vector.resize(keys.size()); storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(), values_vector.data(), statuses_vector.data()); + for (size_t i = 0; i < keys.size(); i++) { if (!statuses_vector[i].ok() && !statuses_vector[i].IsNotFound()) return statuses_vector[i]; - values->emplace_back(values_vector[i].ToString()); - statuses->emplace_back(statuses_vector[i]); + auto value = values_vector[i].ToString(); + auto status = statuses_vector[i]; + // decode value and expire when field found + if (!status.IsNotFound()) { + uint64_t expire = 0; + status = decodeFieldValue(metadata, &value, expire); + } + values->emplace_back(value); + statuses->emplace_back(status); } return rocksdb::Status::OK(); } @@ -208,14 +241,19 @@ rocksdb::Status Hash::Delete(const Slice &user_key, const std::vector &fi rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata); if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s; - std::string value; std::unordered_set field_set; for (const auto &field : fields) { if (!field_set.emplace(field.ToStringView()).second) { continue; } std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + std::string value; s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + uint64_t expire = 0; + s = decodeFieldValue(metadata, &value, expire); if (s.ok()) { *deleted_cnt += 1; batch->Delete(sub_key); @@ -254,21 +292,28 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const std::vector bool exists = false; std::string sub_key = InternalKey(ns_key, it->field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + uint64_t expire = 0; if (metadata.size > 0) { std::string field_value; s = storage_->Get(rocksdb::ReadOptions(), sub_key, &field_value); if (!s.ok() && !s.IsNotFound()) return s; - + if (s.ok()) { if (nx || field_value == it->value) continue; - - exists = true; + exists = decodeFieldValue(metadata, &field_value, expire).ok(); } } - if (!exists) added++; + if (!exists) { + added++; + expire = 0; + } - batch->Put(sub_key, it->value); + auto value = it->value; + if (metadata.IsEncodedFieldExpire()) { + encodeValueExpire(&value, expire); + } + batch->Put(sub_key, value); } if (added > 0) { @@ -334,8 +379,13 @@ rocksdb::Status Hash::RangeByLex(const Slice &user_key, const RangeLexSpec &spec break; } if (spec.offset >= 0 && pos++ < spec.offset) continue; - - field_values->emplace_back(ikey.GetSubKey().ToString(), iter->value().ToString()); + // filte expired field + auto value = iter->value().ToString(); + uint64_t expire = 0; + if (!decodeFieldValue(metadata, &value, expire).ok()) { + continue; + } + field_values->emplace_back(ikey.GetSubKey().ToString(), value); if (spec.count > 0 && field_values->size() >= static_cast(spec.count)) break; } return rocksdb::Status::OK(); @@ -361,14 +411,20 @@ rocksdb::Status Hash::GetAll(const Slice &user_key, std::vector *fie auto iter = util::UniqueIterator(storage_, read_options); for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) { + // filte expired field + uint64_t expire = 0; + auto value = iter->value().ToString(); + if (!decodeFieldValue(metadata, &value, expire).ok()) { + continue; + } if (type == HashFetchType::kOnlyKey) { InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); field_values->emplace_back(ikey.GetSubKey().ToString(), ""); } else if (type == HashFetchType::kOnlyValue) { - field_values->emplace_back("", iter->value().ToString()); + field_values->emplace_back("", value); } else { InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded()); - field_values->emplace_back(ikey.GetSubKey().ToString(), iter->value().ToString()); + field_values->emplace_back(ikey.GetSubKey().ToString(), value); } } return rocksdb::Status::OK(); @@ -418,4 +474,191 @@ rocksdb::Status Hash::RandField(const Slice &user_key, int64_t command_count, st return rocksdb::Status::OK(); } +rocksdb::Status Hash::ExpireFields(const Slice &user_key, uint64_t expire_ms, + const std::vector &fields, HashFieldExpireType type, + std::vector *ret) { + std::string ns_key = AppendNamespacePrefix(user_key); + HashMetadata metadata(false); + LatestSnapShot ss(storage_); + rocksdb::Status s = GetMetadata(GetOptions{ss.GetSnapShot()}, ns_key, &metadata); + if (!s.ok()) { + ret->resize(fields.size(), -2); + return rocksdb::Status::OK(); + } + + rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions(); + read_options.snapshot = ss.GetSnapShot(); + + std::vector sub_keys; + sub_keys.reserve(fields.size()); + for (auto field : fields) { + sub_keys.emplace_back(InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode()); + } + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisHash); + batch->PutLogData(log_data.Encode()); + + // expire special field + std::vector values_vector; + values_vector.resize(sub_keys.size()); + std::vector statuses_vector; + statuses_vector.resize(sub_keys.size()); + storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), sub_keys.size(), sub_keys.data(), + values_vector.data(), statuses_vector.data()); + + auto now = util::GetTimeStampMS(); + for (size_t i = 0; i < sub_keys.size(); i++) { + if (!statuses_vector[i].ok() && !statuses_vector[i].IsNotFound()) return statuses_vector[i]; + auto status = statuses_vector[i]; + + // no such field exists + if (status.IsNotFound()) { + ret->emplace_back(-2); + continue; + } + + // TODO: 如何复用sub_key + InternalKey sub_ikey(ns_key, fields[i], metadata.version, storage_->IsSlotIdEncoded()); + + // expire with a pass time + if (expire_ms <= now) { + batch->Delete(sub_ikey.Encode()); + ret->emplace_back(2); + metadata.size -= 1; + continue; + } + + auto value = values_vector[i].ToString(); + uint64_t field_expire = 0; + decodeFieldValue(metadata, &value, field_expire); + if (isMeetCondition(type, expire_ms, field_expire)) { + encodeValueExpire(&value, expire_ms); + batch->Put(sub_ikey.Encode(), value); + ret->emplace_back(1); + } else { + ret->emplace_back(0); + } + } + + // convert rest field encoding + if (!metadata.IsEncodedFieldExpire()) { + metadata.flags |= METADATA_HASH_FIELD_EXPIRE_MASK; + + std::unordered_set field_set; + for (auto field : fields) { + if (!field_set.emplace(field.ToStringView()).second) { + continue; + } + } + + std::string prefix_key = InternalKey(ns_key, "", metadata.version, storage_->IsSlotIdEncoded()).Encode(); + std::string next_version_prefix_key = + InternalKey(ns_key, "", metadata.version + 1, storage_->IsSlotIdEncoded()).Encode(); + + rocksdb::Slice upper_bound(next_version_prefix_key); + read_options.iterate_upper_bound = &upper_bound; + + auto iter = util::UniqueIterator(storage_, read_options); + for (iter->Seek(prefix_key); iter->Valid() && iter->key().starts_with(prefix_key); iter->Next()) { + InternalKey sub_ikey(iter->key(), storage_->IsSlotIdEncoded()); + auto value = iter->value().ToString(); + if (field_set.find(sub_ikey.GetSubKey().ToStringView()) == field_set.end()) { + encodeValueExpire(&value, 0); + batch->Put(sub_ikey.Encode(), value); + } + } + } + + std::string bytes; + metadata.Encode(&bytes); + batch->Put(metadata_cf_handle_, ns_key, bytes); + + return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); +} + +rocksdb::Status Hash::TTLFields(const Slice &user_key, const std::vector &fields, std::vector *ret) { + std::string ns_key = AppendNamespacePrefix(user_key); + HashMetadata metadata(false); + LatestSnapShot ss(storage_); + rocksdb::Status s = GetMetadata(GetOptions{ss.GetSnapShot()}, ns_key, &metadata); + if (!s.ok()) { + ret->resize(fields.size(), -2); + return rocksdb::Status::OK(); + } + + rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions(); + read_options.snapshot = ss.GetSnapShot(); + std::vector keys; + + keys.reserve(fields.size()); + std::vector sub_keys; + sub_keys.resize(fields.size()); + for (size_t i = 0; i < fields.size(); i++) { + auto &field = fields[i]; + sub_keys[i] = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + keys.emplace_back(sub_keys[i]); + } + + std::vector values_vector; + values_vector.resize(keys.size()); + std::vector statuses_vector; + statuses_vector.resize(keys.size()); + storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(), + values_vector.data(), statuses_vector.data()); + + ret->reserve(fields.size()); + auto now = util::GetTimeStampMS(); + for (size_t i = 0; i < keys.size(); i++) { + if (!statuses_vector[i].ok() && !statuses_vector[i].IsNotFound()) return statuses_vector[i]; + auto value = values_vector[i].ToString(); + auto status = statuses_vector[i]; + + if (status.IsNotFound()) { + ret->emplace_back(-2); + continue; + } + + uint64_t expire = 0; + status = decodeFieldValue(metadata, &value, expire); + if (status.IsNotFound()) { + ret->emplace_back(-2); + } else if (expire == 0) { + ret->emplace_back(-1); + } else { + ret->emplace_back(int64_t(expire - now)); + } + } + return rocksdb::Status::OK(); +} + +rocksdb::Status Hash::decodeFieldValue(const HashMetadata &metadata, std::string *value, uint64_t &expire) { + if (!metadata.IsEncodedFieldExpire()) { + return rocksdb::Status::OK(); + } + rocksdb::Slice data(value->data(), value->size()); + GetFixed64(&data, &expire); + *value = data.ToString(); + return (expire == 0 || expire > util::GetTimeStampMS()) ? rocksdb::Status::OK() : rocksdb::Status::NotFound(); +} + +rocksdb::Status Hash::encodeValueExpire(std::string *value, uint64_t expire) { + std::string buf; + PutFixed64(&buf, expire); + buf.append(*value); + value->assign(buf.data(), buf.size()); + return rocksdb::Status::OK(); +} + +bool Hash::isMeetCondition(HashFieldExpireType type, uint64_t new_expire, uint64_t old_expire) { + if (type == HashFieldExpireType::None) return true; + if (type == HashFieldExpireType::NX && old_expire == 0) return true; + if (type == HashFieldExpireType::XX && old_expire != 0) return true; + // if a filed has no associated expiration, we treated it expiration is infinite + auto expire = old_expire == 0 ? UINT64_MAX : old_expire; + if (type == HashFieldExpireType::GT && new_expire > expire) return true; + if (type == HashFieldExpireType::LT && new_expire < expire) return true; + return false; +} + } // namespace redis diff --git a/src/types/redis_hash.h b/src/types/redis_hash.h index 10fc7d54502..8bde10fb615 100644 --- a/src/types/redis_hash.h +++ b/src/types/redis_hash.h @@ -39,6 +39,8 @@ struct FieldValue { enum class HashFetchType { kAll = 0, kOnlyKey = 1, kOnlyValue = 2 }; +enum class HashFieldExpireType { None, NX, XX, GT, LT }; + namespace redis { class Hash : public SubKeyScanner { @@ -63,9 +65,15 @@ class Hash : public SubKeyScanner { std::vector *values = nullptr); rocksdb::Status RandField(const Slice &user_key, int64_t command_count, std::vector *field_values, HashFetchType type = HashFetchType::kOnlyKey); + rocksdb::Status ExpireFields(const Slice &user_key, uint64_t expire_ms, const std::vector &fields, + HashFieldExpireType type, std::vector *ret); + rocksdb::Status TTLFields(const Slice &user_key, const std::vector &fields, std::vector *ret); private: rocksdb::Status GetMetadata(Database::GetOptions get_options, const Slice &ns_key, HashMetadata *metadata); + static rocksdb::Status decodeFieldValue(const HashMetadata &metadata, std::string *value, uint64_t &expire); + static rocksdb::Status encodeValueExpire(std::string *value, uint64_t expire); + static bool isMeetCondition(HashFieldExpireType type, uint64_t new_expire, uint64_t old_expire); friend struct FieldValueRetriever; };