Skip to content

Commit

Permalink
fix: do not check-fail in OpRestore (#4332)
Browse files Browse the repository at this point in the history
fix: do not check-fail OpRestore

In some rare cases we reach inconsistent state inside OpRestore where a key already exists, though it should not.
In that case log the error instead of crashing the server. In addition, we update the existing entry to the latest restored value.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange committed Dec 18, 2024
1 parent ad3e037 commit f4d3faa
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 33 deletions.
4 changes: 4 additions & 0 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ class DbSlice {
Iterator it;
ExpIterator exp_it;
AutoUpdater post_updater;

bool IsValid() const {
return !it.is_done();
}
};

ItAndUpdater FindMutable(const Context& cntx, std::string_view key);
Expand Down
81 changes: 48 additions & 33 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,28 @@ class RestoreArgs {
: expiration_(expiration), abs_time_(abs_time), replace_(replace) {
}

constexpr bool Replace() const {
bool Replace() const {
return replace_;
}

constexpr bool Sticky() const {
bool Sticky() const {
return sticky_;
}

void SetSticky(bool sticky) {
sticky_ = sticky;
}

uint64_t ExpirationTime() const {
DCHECK_GE(expiration_, 0);
return expiration_;
}

[[nodiscard]] constexpr bool Expired() const {
bool Expired() const {
return expiration_ < 0;
}

[[nodiscard]] constexpr bool HasExpiration() const {
bool HasExpiration() const {
return expiration_ != NO_EXPIRATION;
}

Expand All @@ -152,9 +156,12 @@ class RdbRestoreValue : protected RdbLoaderBase {
rdb_version_ = rdb_version;
}

std::optional<DbSlice::ItAndUpdater> Add(std::string_view payload, std::string_view key,
DbSlice& db_slice, const DbContext& cntx,
const RestoreArgs& args, EngineShard* shard);
// Returns default ItAndUpdater if Add failed.
// In case a valid ItAndUpdater is returned, then second is true in case a new key is added,
// false if the existing key is updated (should not happen unless we have a bug).
pair<DbSlice::ItAndUpdater, bool> Add(string_view key, string_view payload, const DbContext& cntx,
const RestoreArgs& args, DbSlice* db_slice,
EngineShard* shard);

private:
std::optional<OpaqueObj> Parse(io::Source* source);
Expand Down Expand Up @@ -185,18 +192,17 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(io::Source* sourc
return std::optional<OpaqueObj>(std::move(obj));
}

std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
std::string_view key, DbSlice& db_slice,
const DbContext& cntx,
const RestoreArgs& args,
EngineShard* shard) {
pair<DbSlice::ItAndUpdater, bool> RdbRestoreValue::Add(string_view key, string_view data,
const DbContext& cntx,
const RestoreArgs& args, DbSlice* db_slice,
EngineShard* shard) {
InMemSource data_src(data);
PrimeValue pv;
bool first_parse = true;
do {
auto opaque_res = Parse(&data_src);
if (!opaque_res) {
return std::nullopt;
return {};
}

LoadConfig config;
Expand All @@ -212,16 +218,18 @@ std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) {
// we failed - report and exit
LOG(WARNING) << "error while trying to read data: " << ec;
return std::nullopt;
return {};
}
} while (pending_read_.remaining > 0);

if (auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime()); res) {
if (auto res = db_slice->AddOrUpdate(cntx, key, std::move(pv), args.ExpirationTime()); res) {
res->it->first.SetSticky(args.Sticky());
shard->search_indices()->AddDoc(key, cntx, res->it->second);
return std::move(res.value());
return {DbSlice::ItAndUpdater{
.it = res->it, .exp_it = res->exp_it, .post_updater = std::move(res->post_updater)},
res->is_new};
}
return std::nullopt;
return {};
}

[[nodiscard]] bool RestoreArgs::UpdateExpiration(int64_t now_msec) {
Expand Down Expand Up @@ -467,14 +475,14 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) {
return OpStatus::OK;
}

RdbRestoreValue loader(serialized_value_.version.value());
auto restored_dest_it = loader.Add(serialized_value_.value, dest_key_, db_slice, op_args.db_cntx,
restore_args, shard);
restore_args.SetSticky(serialized_value_.sticky);

if (restored_dest_it) {
auto& dest_it = restored_dest_it->it;
dest_it->first.SetSticky(serialized_value_.sticky);
RdbRestoreValue loader(serialized_value_.version.value());
auto [restored_dest_it, is_new] = loader.Add(dest_key_, serialized_value_.value, op_args.db_cntx,
restore_args, &db_slice, shard);

if (restored_dest_it.IsValid()) {
LOG_IF(DFATAL, !is_new) << "Unexpected override for key " << dest_key_ << " " << dest_found_;
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (bc) {
bc->AwakeWatched(t->GetDbIndex(), dest_key_);
Expand Down Expand Up @@ -527,27 +535,28 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
return OpStatus::KEY_NOTFOUND;
}

OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::string_view payload,
OpResult<bool> OpRestore(const OpArgs& op_args, std::string_view key, std::string_view payload,
RestoreArgs restore_args, RdbVersion rdb_version) {
if (!restore_args.UpdateExpiration(op_args.db_cntx.time_now_ms)) {
return OpStatus::OUT_OF_RANGE;
}

auto& db_slice = op_args.GetDbSlice();
bool found_prev = false;

// The redis impl (see cluster.c function restoreCommand), remove the old key if
// the replace option is set, so lets do the same here
{
auto res = db_slice.FindMutable(op_args.db_cntx, key);
if (restore_args.Replace()) {
if (IsValid(res.it)) {
if (IsValid(res.it)) {
found_prev = true;
if (restore_args.Replace()) {
VLOG(1) << "restore command is running with replace, found old key '" << key
<< "' and removing it";
res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, res.it));
}
} else {
// we are not allowed to replace it, so make sure it doesn't exist
if (IsValid(res.it)) {
} else {
// we are not allowed to replace it.
return OpStatus::KEY_EXISTS;
}
}
Expand All @@ -559,8 +568,14 @@ OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::strin
}

RdbRestoreValue loader(rdb_version);
auto res = loader.Add(payload, key, db_slice, op_args.db_cntx, restore_args, op_args.shard);
return res.has_value();
auto [res_it, is_new] =
loader.Add(key, payload, op_args.db_cntx, restore_args, &db_slice, op_args.shard);
LOG_IF(DFATAL, res_it.IsValid() && !is_new)
<< "Unexpected override for key " << key << ", found previous " << found_prev
<< " override: " << restore_args.Replace()
<< ", type: " << ObjTypeToString(res_it.it->second.ObjType());

return res_it.IsValid();
}

bool ScanCb(const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, string* scratch,
Expand Down Expand Up @@ -1473,7 +1488,7 @@ void GenericFamily::Restore(CmdArgList args, Transaction* tx, SinkReplyBuilder*
}

auto cb = [&](Transaction* t, EngineShard* shard) {
return OnRestore(t->GetOpArgs(shard), key, serialized_value, restore_args.value(),
return OpRestore(t->GetOpArgs(shard), key, serialized_value, restore_args.value(),
rdb_version.value());
};

Expand Down

0 comments on commit f4d3faa

Please sign in to comment.