diff --git a/src/core/bptree_set.h b/src/core/bptree_set.h index 930187cabbb4..f48a9697d3f2 100644 --- a/src/core/bptree_set.h +++ b/src/core/bptree_set.h @@ -108,6 +108,10 @@ template > class BPTree { /// Replaces old with new_obj. void ForceUpdate(KeyT old, KeyT new_obj); + bool LocateDebug(KeyT key, BPTreePath* path) const { + return Locate(key, path); + } + private: BPTreeNode* CreateNode(bool leaf); diff --git a/src/core/sorted_map.cc b/src/core/sorted_map.cc index 60cd43e5c75c..20cde63dadc2 100644 --- a/src/core/sorted_map.cc +++ b/src/core/sorted_map.cc @@ -314,8 +314,8 @@ optional SortedMap::GetRank(sds ele, bool reverse) const { return *rank; } -SortedMap::ScoredArray SortedMap::GetRange(const zrangespec& range, unsigned offset, unsigned limit, - bool reverse) const { +SortedMap::ScoredArray SortedMap::GetRangeInternal(const zrangespec& range, unsigned offset, + unsigned limit, bool reverse) const { ScoredArray arr; if (score_tree->Size() <= offset || limit == 0) return arr; @@ -385,6 +385,29 @@ SortedMap::ScoredArray SortedMap::GetRange(const zrangespec& range, unsigned off return arr; } +SortedMap::ScoredArray SortedMap::GetRange(const zrangespec& range, unsigned offset, unsigned limit, + bool reverse) const { + ScoredArray res = GetRangeInternal(range, offset, limit, reverse); + if (!res.empty()) { + return res; + } + has_bug = false; + + if (range.max == HUGE_VAL && !reverse && offset == 0) { + size_t len = Size(); + double score = 0; + score_tree->Iterate(len - 1, len - 1, [&](ScoreSds ele) { + score = GetObjScore(ele); + return true; + }); + + if ((range.min < score) || ((range.min == score) && !range.minex)) { + has_bug = true; + } + } + return {}; +} + SortedMap::ScoredArray SortedMap::GetLexRange(const zlexrangespec& range, unsigned offset, unsigned limit, bool reverse) const { if (score_tree->Size() <= offset || limit == 0) @@ -780,6 +803,10 @@ bool SortedMap::DefragIfNeeded(float ratio) { return reallocated; } +SortedMap::ScoreSds SortedMap::BuildKey(double score, bool is_str_inf, char buf[]) { + return BuildScoredKey(score, is_str_inf, buf); +} + std::optional SortedMap::GetRankAndScore(sds ele, bool reverse) const { ScoreSds obj = score_map->FindObj(ele); if (obj == nullptr) diff --git a/src/core/sorted_map.h b/src/core/sorted_map.h index 34f4f6135db9..dd94f8a7d5c3 100644 --- a/src/core/sorted_map.h +++ b/src/core/sorted_map.h @@ -32,6 +32,7 @@ namespace detail { */ class SortedMap { public: + mutable bool has_bug = false; using ScoredMember = std::pair; using ScoredArray = std::vector; using ScoreSds = void*; @@ -92,14 +93,20 @@ class SortedMap { bool DefragIfNeeded(float ratio); + static ScoreSds BuildKey(double score, bool is_str_inf, char buf[]); + private: using ScoreTree = BPTree; + ScoredArray GetRangeInternal(const zrangespec& r, unsigned offs, unsigned len, bool rev) const; + // hash map from fields to scores. ScoreMap* score_map = nullptr; // sorted tree of (score,field) items. ScoreTree* score_tree = nullptr; + public: + ScoreTree* get_score_tree() { return score_tree; } }; // Used by CompactObject. diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 56b8a1777f07..ba1d8d4ef6a9 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -67,10 +67,10 @@ struct GeoPoint { double dist; double score; std::string member; - GeoPoint() : longitude(0.0), latitude(0.0), dist(0.0), score(0.0){}; + GeoPoint() : longitude(0.0), latitude(0.0), dist(0.0), score(0.0) {}; GeoPoint(double _longitude, double _latitude, double _dist, double _score, const std::string& _member) - : longitude(_longitude), latitude(_latitude), dist(_dist), score(_score), member(_member){}; + : longitude(_longitude), latitude(_latitude), dist(_dist), score(_score), member(_member) {}; }; using GeoArray = std::vector; @@ -302,6 +302,8 @@ class IntervalVisitor { return removed_; } + bool has_bug = false; + private: void ExtractListPack(const zrangespec& range); void ExtractSkipList(const zrangespec& range); @@ -554,6 +556,9 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) { unsigned limit = params_.limit; result_ = zs->GetRange(range, offset, limit, params_.reverse); + if (result_.empty() && zs->has_bug) { + has_bug = true; + } } void IntervalVisitor::ExtractListPack(const zlexrangespec& range) { @@ -1337,6 +1342,88 @@ auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, return iv.PopResult(); } +static double GetObjScore(const void* obj) { + constexpr uint64_t kSdsMask = (1ULL << 60) - 1; + + sds s = (sds)(uint64_t(obj) & kSdsMask); + char* ptr = s + sdslen(s) + 1; + return absl::bit_cast(absl::little_endian::Load64(ptr)); +} + +using ZsetNode = detail::BPTreeNode; + +static bool ValidateNode(const ZsetNode* node, detail::SortedMap::ScoreSds ubound) { + detail::SortedMap::ScoreSdsPolicy::KeyCompareTo cmp; + + for (unsigned i = 1; i < node->NumItems(); ++i) { + if (cmp(node->Key(i - 1), node->Key(i)) >= 0) { + LOG(ERROR) << "Key " << i - 1 << " is GE than " << i << ":" << GetObjScore(node->Key(i - 1)) + << " vs " << GetObjScore(node->Key(i)); + return false; + } + } + + int res = cmp(node->Key(node->NumItems() - 1), ubound); + if (res != -1) { + LOG(ERROR) << "Last key is not smaller than ubound: " + << GetObjScore(node->Key(node->NumItems() - 1)) << " vs " << GetObjScore(ubound); + return false; + } + return true; +} + +static bool ValidateTree(const ZsetNode* root) { + // node, upper bound + vector> stack; + char buf[32]; + detail::SortedMap::ScoreSds plus_inf = detail::SortedMap::BuildKey(HUGE_VAL, true, buf); + + stack.emplace_back(root, plus_inf); + + while (!stack.empty()) { + const ZsetNode* node = stack.back().first; + detail::SortedMap::ScoreSds ubound = stack.back().second; + stack.pop_back(); + + if (!ValidateNode(node, ubound)) + return false; + + if (!node->IsLeaf()) { + for (unsigned i = 0; i < node->NumItems(); ++i) { + stack.emplace_back(node->Child(i), node->Key(i)); + } + stack.emplace_back(node->Child(node->NumItems()), ubound); + } + } + return true; +} + +static void PrintPath(const detail::BPTreePath& path) { + string res; + char buf[64]; + for (unsigned i = 0; i < path.Depth(); ++i) { + detail::BPTreeNode* node = path.Node(i); + unsigned pos = path.Position(i); + unsigned num_items = node->NumItems(); + bool last = (i == path.Depth() - 1); + const char* type = last ? "=" : "L"; + void* key_score = nullptr; + if (!last && pos == num_items) { + key_score = node->Key(pos - 1); + type = "R"; + } else if (pos < num_items) { + key_score = node->Key(pos); + } else { + absl::StrAppend(&res, i, "(INF, ", pos, ") "); + continue; + } + double score = GetObjScore(key_score); + char* str = RedisReplyBuilder::FormatDouble(score, buf, sizeof(buf)); + absl::StrAppend(&res, i, "(", type, " ", str, ", ", pos, ") "); + } + LOG(ERROR) << "Path@rank=" << path.Rank() << " depth=" << path.Depth() << " " << res; +} + auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, string_view key) -> OpResult { auto res_it = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_ZSET); @@ -1348,6 +1435,46 @@ auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, st IntervalVisitor iv{Action::RANGE, range_spec.params, &pv}; std::visit(iv, range_spec.interval); + if (iv.has_bug) { + static atomic_long last{0}; + long now = time(nullptr); + long last_cached = last.load(memory_order_relaxed); + if (now - last_cached >= 10 && + last.compare_exchange_strong(last_cached, now, memory_order_relaxed)) { + const auto& itv = std::get(range_spec.interval); + auto* robj = pv.GetRobjWrapper(); + detail::SortedMap* sm = (detail::SortedMap*)robj->inner_obj(); + auto* st = sm->get_score_tree(); + bool path_empty = false; + double skey_score; + char buf[64]; + + { + detail::SortedMap::ScoreSds skey = + detail::SortedMap::BuildKey(itv.first.val, itv.first.is_open, buf); + auto path = st->GEQ(skey); + path_empty = path.Empty(); + skey_score = GetObjScore(skey); + if (path_empty) { + st->LocateDebug(skey, &path); + char buf2[64]; + char* str = RedisReplyBuilder::FormatDouble(itv.first.val, buf2, sizeof(buf2)); + LOG(ERROR) << "BUG: " << key << " range from " << str + << " GEQ returned empty, locate path:"; + PrintPath(path); + } + } + + char* str = RedisReplyBuilder::FormatDouble(skey_score, buf, sizeof(buf)); + LOG(ERROR) << "BUG: score_key is: " << str; + bool valid = ValidateTree(st->DEBUG_root()); + LOG_IF(ERROR, !valid) << "BUG: " << key << " tree is invalid"; + auto path = st->FromRank(0); + do { + PrintPath(path); + } while (path.Next()); + } + } return iv.PopResult(); } diff --git a/tests/dragonfly/sentinel_test.py b/tests/dragonfly/sentinel_test.py deleted file mode 100644 index 411b699f17d6..000000000000 --- a/tests/dragonfly/sentinel_test.py +++ /dev/null @@ -1,308 +0,0 @@ -import pathlib -import subprocess -from typing import Awaitable -from redis import asyncio as aioredis -import pytest -import time -import asyncio -from datetime import datetime -from sys import stderr -import logging - -from .utility import assert_eventually, wait_available_async - -from .instance import DflyInstanceFactory -from . import dfly_args - - -# Helper function to parse some sentinel cli commands output as key value dictionaries. -# Output is expected be of even number of lines where each pair of consecutive lines results in a single key value pair. -# If new_dict_key is not empty, encountering it in the output will start a new dictionary, this let us return multiple -# dictionaries, for example in the 'slaves' command, one dictionary for each slave. -def stdout_as_list_of_dicts(cp: subprocess.CompletedProcess, new_dict_key=""): - lines = cp.stdout.splitlines() - res = [] - d = None - if new_dict_key == "": - d = dict() - res.append(d) - for i in range(0, len(lines), 2): - if (lines[i]) == new_dict_key: # assumes output never has '' as a key - d = dict() - res.append(d) - d[lines[i]] = lines[i + 1] - return res - - -def wait_for(func, pred, timeout_sec, timeout_msg=""): - while not pred(func()): - assert timeout_sec > 0, timeout_msg - timeout_sec = timeout_sec - 1 - time.sleep(1) - - -async def await_for(func, pred, timeout_sec, timeout_msg=""): - done = False - while not done: - val = func() - if isinstance(val, Awaitable): - val = await val - done = pred(val) - assert timeout_sec > 0, timeout_msg - timeout_sec = timeout_sec - 1 - await asyncio.sleep(1) - - -@assert_eventually -async def assert_master_became_replica(client): - repl_info = await client.info("replication") - assert repl_info["role"] == "slave" - - -class Sentinel: - def __init__(self, port, master_port, config_dir) -> None: - self.config_file = pathlib.Path(config_dir).joinpath("sentinel.conf") - self.port = port - self.image = "bitnami/redis-sentinel:latest" - self.container_name = "sentinel_test_py_sentinel" - self.default_deployment = "my_deployment" - self.initial_master_port = master_port - self.proc = None - - def start(self): - config = [ - f"port {self.port}", - f"sentinel monitor {self.default_deployment} 127.0.0.1 {self.initial_master_port} 1", - f"sentinel down-after-milliseconds {self.default_deployment} 3000", - f"slave-priority 100", - ] - self.config_file.write_text("\n".join(config)) - - logging.info(self.config_file.read_text()) - - self.proc = subprocess.Popen( - ["redis-server", f"{self.config_file.absolute()}", "--sentinel"] - ) - - def stop(self): - self.proc.terminate() - self.proc.wait(timeout=10) - - def run_cmd( - self, args, sentinel_cmd=True, capture_output=False, assert_ok=True - ) -> subprocess.CompletedProcess: - run_args = ["redis-cli", "-p", f"{self.port}"] - if sentinel_cmd: - run_args = run_args + ["sentinel"] - run_args = run_args + args - cp = subprocess.run(run_args, capture_output=capture_output, text=True) - if assert_ok: - assert cp.returncode == 0, f"Command failed: {run_args}" - return cp - - def wait_ready(self): - wait_for( - lambda: self.run_cmd(["ping"], sentinel_cmd=False, assert_ok=False), - lambda cp: cp.returncode == 0, - timeout_sec=10, - timeout_msg="Timeout waiting for sentinel to become ready.", - ) - - def master(self, deployment="") -> dict: - if deployment == "": - deployment = self.default_deployment - cp = self.run_cmd(["master", deployment], capture_output=True) - return stdout_as_list_of_dicts(cp)[0] - - def slaves(self, deployment="") -> dict: - if deployment == "": - deployment = self.default_deployment - cp = self.run_cmd(["slaves", deployment], capture_output=True) - return stdout_as_list_of_dicts(cp) - - def live_master_port(self, deployment=""): - if deployment == "": - deployment = self.default_deployment - cp = self.run_cmd(["get-master-addr-by-name", deployment], capture_output=True) - return int(cp.stdout.splitlines()[1]) - - def failover(self, deployment=""): - if deployment == "": - deployment = self.default_deployment - self.run_cmd( - [ - "failover", - deployment, - ] - ) - - -@pytest.fixture( - scope="function" -) # Sentinel has state which we don't want carried over form test to test. -def sentinel(tmp_dir, port_picker) -> Sentinel: - s = Sentinel(port_picker.get_available_port(), port_picker.get_available_port(), tmp_dir) - s.start() - s.wait_ready() - yield s - s.stop() - - -@pytest.mark.asyncio -@pytest.mark.slow -async def test_failover(df_factory: DflyInstanceFactory, sentinel, port_picker): - master = df_factory.create(port=sentinel.initial_master_port) - replica = df_factory.create(port=port_picker.get_available_port()) - - master.start() - replica.start() - - master_client = aioredis.Redis(port=master.port) - replica_client = aioredis.Redis(port=replica.port) - logging.info("master: " + str(master.port) + " replica: " + str(replica.port)) - - await replica_client.execute_command("REPLICAOF localhost " + str(master.port)) - - assert sentinel.live_master_port() == master.port - - # Verify sentinel picked up replica. - await await_for( - lambda: sentinel.master(), - lambda m: m["num-slaves"] == "1", - timeout_sec=15, - timeout_msg="Timeout waiting for sentinel to pick up replica.", - ) - sentinel.failover() - - # Verify sentinel switched. - await await_for( - lambda: sentinel.live_master_port(), - lambda p: p == replica.port, - timeout_sec=10, - timeout_msg="Timeout waiting for sentinel to report replica as master.", - ) - assert sentinel.slaves()[0]["port"] == str(master.port) - - # Verify we can now write to replica and read replicated value from master. - assert await replica_client.set("key", "value"), "Failed to set key on promoted replica." - - logging.info("key was set on promoted replica, awaiting get on promoted replica. ") - - await assert_master_became_replica(master_client) - await wait_available_async(master_client) - - try: - await await_for( - lambda: master_client.get("key"), - lambda val: val == b"value", - 10, - "Timeout waiting for key to exist in replica.", - ) - except AssertionError: - syncid, r_offset = await master_client.execute_command("DEBUG REPLICA OFFSET") - replicaoffset_cmd = "DFLY REPLICAOFFSET " + syncid.decode() - m_offset = await replica_client.execute_command(replicaoffset_cmd) - logging.info(f"{syncid.decode()} {r_offset} {m_offset}") - logging.info("replica client role:") - logging.info(await replica_client.execute_command("role")) - logging.info("master client role:") - logging.info(await master_client.execute_command("role")) - logging.info("replica client info:") - logging.info(await replica_client.info()) - logging.info("master client info:") - logging.info(await master_client.info()) - replica_val = await replica_client.get("key") - master_val = await master_client.get("key") - logging.info(f"replica val: {replica_val}") - logging.info(f"master val: {master_val}") - raise - - -@pytest.mark.asyncio -@pytest.mark.slow -async def test_master_failure(df_factory, sentinel, port_picker): - master = df_factory.create(port=sentinel.initial_master_port) - replica = df_factory.create(port=port_picker.get_available_port()) - - master.start() - replica.start() - - replica_client = aioredis.Redis(port=replica.port) - - await replica_client.execute_command("REPLICAOF localhost " + str(master.port)) - - assert sentinel.live_master_port() == master.port - - # Verify sentinel picked up replica. - await await_for( - lambda: sentinel.master(), - lambda m: m["num-slaves"] == "1", - timeout_sec=15, - timeout_msg="Timeout waiting for sentinel to pick up replica.", - ) - - # Simulate master failure. - master.stop() - - # Verify replica promoted. - await await_for( - lambda: sentinel.live_master_port(), - lambda p: p == replica.port, - timeout_sec=300, - timeout_msg="Timeout waiting for sentinel to report replica as master.", - ) - - # Verify we can now write to replica. - await replica_client.set("key", "value") - assert await replica_client.get("key") == b"value" - - -@dfly_args({"info_replication_valkey_compatible": True}) -@pytest.mark.asyncio -async def test_priority_on_failover(df_factory, sentinel, port_picker): - master = df_factory.create(port=sentinel.initial_master_port) - # lower priority is the best candidate for sentinel - low_priority_repl = df_factory.create( - port=port_picker.get_available_port(), replica_priority=20 - ) - mid_priority_repl = df_factory.create( - port=port_picker.get_available_port(), replica_priority=60 - ) - high_priority_repl = df_factory.create( - port=port_picker.get_available_port(), replica_priority=80 - ) - - master.start() - low_priority_repl.start() - mid_priority_repl.start() - high_priority_repl.start() - - high_client = aioredis.Redis(port=high_priority_repl.port) - await high_client.execute_command("REPLICAOF localhost " + str(master.port)) - - mid_client = aioredis.Redis(port=mid_priority_repl.port) - await mid_client.execute_command("REPLICAOF localhost " + str(master.port)) - - low_client = aioredis.Redis(port=low_priority_repl.port) - await low_client.execute_command("REPLICAOF localhost " + str(master.port)) - - assert sentinel.live_master_port() == master.port - - # Verify sentinel picked up replica. - await await_for( - lambda: sentinel.master(), - lambda m: m["num-slaves"] == "3", - timeout_sec=15, - timeout_msg="Timeout waiting for sentinel to pick up replica.", - ) - - # Simulate master failure. - master.stop() - - # Verify replica promoted. - await await_for( - lambda: sentinel.live_master_port(), - lambda p: p == low_priority_repl.port, - timeout_sec=30, - timeout_msg="Timeout waiting for sentinel to report replica as master.", - )