Skip to content

Commit

Permalink
persist cluster-enabled status in RocksDB
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisxu333 committed Jan 15, 2024
1 parent 9d656f1 commit 74f245b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 2 deletions.
7 changes: 5 additions & 2 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ daemonize no
# If you enable cluster, kvrocks will encode key with its slot id calculated by
# CRC16 and modulo 16384, encoding key with its slot id makes it efficient to
# migrate keys based on the slot. So if you enabled at first time, cluster mode must
# not be disabled after restarting, and vice versa. That is to say, data is not
# compatible between standalone mode with cluster mode, you must migrate data
# not be disabled after restarting, and vice versa. Currently, kvrocks will keep
# using the cluster-enabled status that is persisted at first time, regardless of
# what cluster-enabled status is provided afterwards.
# Note that even if kvrocks has such protection, you should also be aware that data
# is not compatible between standalone mode with cluster mode, you must migrate data
# if you want to change mode, otherwise, kvrocks will make data corrupt.
#
# Default: no
Expand Down
25 changes: 25 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ Status Server::Start() {
}
}

s = forceClusterMode();
if (!s.IsOK()) {
return s;
}

if (config_->cluster_enabled) {
if (config_->persist_cluster_nodes_enabled) {
auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
Expand Down Expand Up @@ -1839,6 +1844,26 @@ void Server::cleanupExitedWorkerThreads(bool force) {
}
}

Status Server::forceClusterMode() {
std::string value;
auto cf = storage->GetCFHandle(engine::kPropagateColumnFamilyName);
rocksdb::Status check_cluster_enabled =
storage->Get(rocksdb::ReadOptions(), cf, rocksdb::Slice(engine::kClusterEnabledKey), &value);

if (check_cluster_enabled.IsNotFound()) {
Status s = storage->WriteToPropagateCF(engine::kClusterEnabledKey, std::to_string(config_->cluster_enabled));
if (!s.IsOK()) return s;
} else {
if (check_cluster_enabled.ok()) {
LOG(WARNING) << "cluster_enable status is inconsistent. Using the previously persisted one.";
config_->cluster_enabled = std::stoi(value);
} else {
return {Status::NotOK, "failed to load cluster_enable from storage: " + check_cluster_enabled.ToString()};
}
}
return Status::OK();
}

std::string ServerLogData::Encode() const {
if (type_ == kReplIdLog) {
return std::string(1, kReplIdTag) + " " + content_;
Expand Down
1 change: 1 addition & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ class Server {
void increaseWorkerThreads(size_t delta);
void decreaseWorkerThreads(size_t delta);
void cleanupExitedWorkerThreads(bool force);
Status forceClusterMode();

std::atomic<bool> stop_ = false;
std::atomic<bool> is_loading_ = false;
Expand Down
2 changes: 2 additions & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ constexpr const char *kLuaFuncSHAPrefix = "lua_f_";
constexpr const char *kLuaFuncLibPrefix = "lua_func_lib_";
constexpr const char *kLuaLibCodePrefix = "lua_lib_code_";

const std::string kClusterEnabledKey = "config_cluster_enabled";

struct CompressionOption {
rocksdb::CompressionType type;
const std::string name;
Expand Down
8 changes: 8 additions & 0 deletions tests/gocase/integration/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ func TestClusterNodes(t *testing.T) {
require.EqualValues(t, []redis.ClusterNode{{ID: nodeID, Addr: srv.HostPort()}}, slots[0].Nodes)
})

t.Run("enable/disable cluster-enabled option", func(t *testing.T) {
// force change cluster-enabled status in kvrocks.conf file
srv.ForceChangeClusterMode(false)
defer srv.ForceChangeClusterMode(true)
srv.Restart()
require.NoError(t, rdb.Do(ctx, "clusterx", "version").Err())
})

t.Run("enable/disable the persist cluster nodes", func(t *testing.T) {
require.NoError(t, rdb.ConfigSet(ctx, "persist-cluster-nodes-enabled", "yes").Err())
srv.Restart()
Expand Down
22 changes: 22 additions & 0 deletions tests/gocase/util/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os/exec"
"path/filepath"
"regexp"
"strings"
"sync"
"syscall"
"testing"
Expand Down Expand Up @@ -134,6 +135,27 @@ func (s *KvrocksServer) close(keepDir bool) {
s.clean(keepDir)
}

func (s *KvrocksServer) ForceChangeClusterMode(enable bool) {
dir := s.configs["dir"]
f, err := os.OpenFile(filepath.Join(dir, "kvrocks.conf"), os.O_RDWR, 0666)
require.NoError(s.t, err)
defer func() { require.NoError(s.t, f.Close()) }()

// change the line containing cluster-enabled to no
data, err := os.ReadFile(filepath.Join(dir, "kvrocks.conf"))
require.NoError(s.t, err)

content := string(data)
var new_content string
if !enable {
new_content = strings.ReplaceAll(content, "cluster-enabled yes", "cluster-enabled no")
} else {
new_content = strings.ReplaceAll(content, "cluster-enabled no", "cluster-enabled yes")
}
err = os.WriteFile(filepath.Join(dir, "kvrocks.conf"), []byte(new_content), 0666)
require.NoError(s.t, err)
}

func (s *KvrocksServer) Restart() {
s.close(true)

Expand Down

0 comments on commit 74f245b

Please sign in to comment.