Skip to content

Commit

Permalink
Support hash field expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
jjz921024 committed Jul 11, 2024
1 parent 7dd0248 commit 7854e04
Show file tree
Hide file tree
Showing 6 changed files with 531 additions and 22 deletions.
243 changes: 242 additions & 1 deletion src/commands/cmd_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "error_constants.h"
#include "scan_base.h"
#include "server/server.h"
#include "time_util.h"
#include "types/redis_hash.h"

namespace redis {
Expand Down Expand Up @@ -429,6 +430,237 @@ class CommandHRandField : public Commander {
bool no_parameters_ = true;
};



class CommandFiledExpireBase : public Commander {
protected:
Status commonParse(const std::vector<std::string> &args, int start_idx) {
int idx = start_idx;
CommandParser parser(args, idx);
std::string_view expire_flag, num_flag;
uint64_t fields_num = 0;
while (parser.Good()) {
if (parser.EatEqICaseFlag("FIELDS", num_flag)) {
fields_num = GET_OR_RET(parser.template TakeInt<uint64_t>());
idx += 2;
break;
} else if (parser.EatEqICaseFlag("NX", expire_flag)) {
idx += 1;
field_expire_type_ = HashFieldExpireType::NX;
} else if (parser.EatEqICaseFlag("XX", expire_flag)) {
idx += 1;
field_expire_type_ = HashFieldExpireType::XX;
} else if (parser.EatEqICaseFlag("GT", expire_flag)) {
idx += 1;
field_expire_type_ = HashFieldExpireType::GT;
} else if (parser.EatEqICaseFlag("LT", expire_flag)) {
idx += 1;
field_expire_type_ = HashFieldExpireType::LT;
} else {
return parser.InvalidSyntax();
}
}

if (args.size() != idx + fields_num) {
return { Status::RedisParseErr, errWrongNumOfArguments };
}

for (size_t i = idx; i < args.size(); i++) {
fields_.emplace_back(args_[i]);
}

return Status::OK();
}

Status expireFieldExecute(Server *srv, Connection *conn, std::string *output) {
std::vector<int8_t> ret;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.ExpireFields(args_[1], expire_, fields_, field_expire_type_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::MultiLen(ret.size());
for (const auto i : ret) {
output->append(redis::Integer(i));
}

return Status::OK();
}

Status ttlExpireExecute(Server *srv, Connection *conn, std::vector<int64_t> &ret) {
redis::Hash hash_db(srv->storage, conn->GetNamespace());
auto s = hash_db.TTLFields(args_[1], fields_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
return Status::OK();
}

uint64_t expire_ = 0;
HashFieldExpireType field_expire_type_ = HashFieldExpireType::None;
std::vector<Slice> fields_;
};


class CommandHExpire : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return { Status::RedisParseErr, errValueNotInteger };

expire_ = *parse_result * 1000 + util::GetTimeStampMS();
return CommandFiledExpireBase::commonParse(args, 3);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(srv, conn, output);
}
};

class CommandHExpireAt : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return { Status::RedisParseErr, errValueNotInteger };

expire_ = *parse_result * 1000;
return CommandFiledExpireBase::commonParse(args, 3);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(srv, conn, output);
}
};

class CommandHPExpire : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return { Status::RedisParseErr, errValueNotInteger };

expire_ = *parse_result + util::GetTimeStampMS();
return CommandFiledExpireBase::commonParse(args, 3);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(srv, conn, output);
}
};

class CommandHPExpireAt : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
auto parse_result = ParseInt<uint64_t>(args[2], 10);
if (!parse_result) return { Status::RedisParseErr, errValueNotInteger };

expire_ = *parse_result;
return CommandFiledExpireBase::commonParse(args, 3);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
return expireFieldExecute(srv, conn, output);
}
};


class CommandHExpireTime : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
return CommandFiledExpireBase::commonParse(args, 2);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto ttl : ret) {
uint64_t expire = ttl;
if (ttl > 0) {
expire = (now + expire) / 1000;
}
output->append(redis::Integer(expire));
}
return Status::OK();
}
};

class CommandHPExpireTime : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
return CommandFiledExpireBase::commonParse(args, 2);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
auto now = util::GetTimeStampMS();
*output = redis::MultiLen(ret.size());
for (const auto ttl : ret) {
uint64_t expire = ttl;
if (ttl > 0) {
expire = now + expire;
}
output->append(redis::Integer(expire));
}
return Status::OK();
}
};

class CommandHTTL : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
return CommandFiledExpireBase::commonParse(args, 2);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto ttl : ret) {
output->append(redis::Integer(ttl > 0 ? ttl / 1000 : ttl));
}
return Status::OK();
}
};

class CommandHPTTL : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {
return CommandFiledExpireBase::commonParse(args, 2);
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<int64_t> ret;
auto s = ttlExpireExecute(srv, conn, ret);
if (!s.IsOK()) {
return {Status::RedisExecErr, s.Msg()};
}
*output = redis::MultiLen(ret.size());
for (const auto ttl : ret) {
output->append(redis::Integer(ttl));
}
return Status::OK();
}
};


class CommandHPersist : public CommandFiledExpireBase {
public:
Status Parse(const std::vector<std::string> &args) override {}
Status Execute(Server *srv, Connection *conn, std::string *output) override {}
};

REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
Expand All @@ -445,6 +677,15 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1
MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only", 1, 1, 1), )
MakeCmdAttr<CommandHRandField>("hrandfield", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHExpire>("hexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireAt>("hexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHExpireTime>("hexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPExpire>("hpexpire", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireAt>("hpexpireat", -6, "write", 1, 1, 1),
MakeCmdAttr<CommandHPExpireTime>("hpexpiretime", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1),
MakeCmdAttr<CommandHTTL>("httl", -5, "read-only", 1, 1, 1),
MakeCmdAttr<CommandHPTTL>("hpttl", -5, "read-only", 1, 1, 1), )

} // namespace redis
14 changes: 13 additions & 1 deletion src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
} else {
start_key = match_prefix_key;
}
auto now = util::GetTimeStampMS();
for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
if (!cursor.empty() && iter->key() == start_key) {
// if cursor is not empty, then we need to skip start_key
Expand All @@ -578,9 +579,20 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
break;
}
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
auto value = iter->value().ToString();
// filter expired hash feild
if (type == kRedisHash && (static_cast<HashMetadata*>(&metadata))->IsEncodedFieldExpire()) {
uint64_t expire = 0;
rocksdb::Slice data(value.data(), value.size());
GetFixed64(&data, &expire);
value = data.ToString();
if (expire != 0 && expire <= now) {
continue;
}
}
keys->emplace_back(ikey.GetSubKey().ToString());
if (values != nullptr) {
values->emplace_back(iter->value().ToString());
values->emplace_back(value);
}
cnt++;
if (limit > 0 && cnt >= limit) {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/redis_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ bool Metadata::IsEmptyableType() const {

bool Metadata::Expired() const { return ExpireAt(util::GetTimeStampMS()); }

bool HashMetadata::IsEncodedFieldExpire() const {
return flags & METADATA_HASH_FIELD_EXPIRE_MASK;
}

ListMetadata::ListMetadata(bool generate_version)
: Metadata(kRedisList, generate_version), head(UINT64_MAX / 2), tail(head) {}

Expand Down
3 changes: 3 additions & 0 deletions src/storage/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class InternalKey {
};

constexpr uint8_t METADATA_64BIT_ENCODING_MASK = 0x80;
constexpr uint8_t METADATA_HASH_FIELD_EXPIRE_MASK = 0x40;
constexpr uint8_t METADATA_TYPE_MASK = 0x0f;

class Metadata {
Expand Down Expand Up @@ -203,6 +204,8 @@ class Metadata {
class HashMetadata : public Metadata {
public:
explicit HashMetadata(bool generate_version = true) : Metadata(kRedisHash, generate_version) {}

bool IsEncodedFieldExpire() const;
};

class SetMetadata : public Metadata {
Expand Down
Loading

0 comments on commit 7854e04

Please sign in to comment.