diff --git a/cmake/rocksdb.cmake b/cmake/rocksdb.cmake index f571ebe7e2f..ffb3ca941a8 100644 --- a/cmake/rocksdb.cmake +++ b/cmake/rocksdb.cmake @@ -26,8 +26,8 @@ endif() include(cmake/utils.cmake) FetchContent_DeclareGitHubWithMirror(rocksdb - facebook/rocksdb v9.6.1 - MD5=ce31144a7e65d8f4f3f9d98986509eb1 + facebook/rocksdb v9.7.2 + MD5=1d6d569285b6942cf37b5e8cbf396f65 ) FetchContent_GetProperties(jemalloc) diff --git a/kvrocks.conf b/kvrocks.conf index f6b6b31c2d4..3c460c881e4 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -20,6 +20,16 @@ bind 127.0.0.1 # unixsocket /tmp/kvrocks.sock # unixsocketperm 777 +# Allows a parent process to open a socket and pass its FD down to kvrocks as a child +# process. Useful to reserve a port and prevent race conditions. +# +# PLEASE NOTE: +# If this is overridden to a value other than -1, the bind and tls* directives will be +# ignored. +# +# Default: -1 (not overridden, defer to creating a connection to the specified port) +socket-fd -1 + # Accept connections on the specified port, default is 6666. port 6666 @@ -708,6 +718,16 @@ rocksdb.max_background_flushes -1 # Default: 2 rocksdb.max_subcompactions 2 +# If enabled WAL records will be compressed before they are written. Only +# ZSTD (= kZSTD) is supported (until streaming support is adapted for other +# compression types). Compressed WAL records will be read in supported +# versions (>= RocksDB 7.4.0 for ZSTD) regardless of this setting when +# the WAL is read. +# +# Accept value: "no", "zstd" +# Default is no +rocksdb.wal_compression no + # In order to limit the size of WALs, RocksDB uses DBOptions::max_total_wal_size # as the trigger of column family flush. Once WALs exceed this size, RocksDB # will start forcing the flush of column families to allow deletion of some diff --git a/src/cli/main.cc b/src/cli/main.cc index 1dd52bc9266..4008d5cc18e 100644 --- a/src/cli/main.cc +++ b/src/cli/main.cc @@ -31,7 +31,6 @@ #include #include -#include "config.h" #include "daemon_util.h" #include "io_util.h" #include "pid_util.h" @@ -42,9 +41,7 @@ #include "storage/storage.h" #include "string_util.h" #include "time_util.h" -#include "unique_fd.h" #include "vendor/crc64.h" -#include "version.h" #include "version_util.h" Server *srv = nullptr; @@ -163,6 +160,11 @@ int main(int argc, char *argv[]) { std::cout << "Failed to load config. Error: " << s.Msg() << std::endl; return 1; } + const auto socket_fd_exit = MakeScopeExit([&config] { + if (config.socket_fd != -1) { + close(config.socket_fd); + } + }); crc64_init(); InitGoogleLog(&config); @@ -170,7 +172,7 @@ int main(int argc, char *argv[]) { // Tricky: We don't expect that different instances running on the same port, // but the server use REUSE_PORT to support the multi listeners. So we connect // the listen port to check if the port has already listened or not. - if (!config.binds.empty()) { + if (config.socket_fd == -1 && !config.binds.empty()) { uint32_t ports[] = {config.port, config.tls_port, 0}; for (uint32_t *port = ports; *port; ++port) { if (util::IsPortInUse(*port)) { diff --git a/src/config/config.cc b/src/config/config.cc index 4f3bebd6bbc..a1924f9eac3 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -79,6 +79,15 @@ const std::vector> compression_types{[] { return res; }()}; +const std::vector> wal_compression_types{[] { + std::vector> res; + res.reserve(engine::WalCompressionOptions.size()); + for (const auto &e : engine::WalCompressionOptions) { + res.push_back({e.name, e.type}); + } + return res; +}()}; + const std::vector> cache_types{[] { std::vector> res; res.reserve(engine::CacheOptions.size()); @@ -110,6 +119,7 @@ Config::Config() { {"daemonize", true, new YesNoField(&daemonize, false)}, {"bind", true, new StringField(&binds_str_, "")}, {"port", true, new UInt32Field(&port, kDefaultPort, 1, PORT_LIMIT)}, + {"socket-fd", true, new IntField(&socket_fd, -1, -1, 1 << 16)}, #ifdef ENABLE_OPENSSL {"tls-port", true, new UInt32Field(&tls_port, 0, 0, PORT_LIMIT)}, {"tls-cert-file", false, new StringField(&tls_cert_file, "")}, @@ -212,6 +222,9 @@ Config::Config() { {"rocksdb.max_background_flushes", true, new IntField(&rocks_db.max_background_flushes, 2, -1, 32)}, {"rocksdb.max_subcompactions", false, new IntField(&rocks_db.max_subcompactions, 2, 0, 16)}, {"rocksdb.delayed_write_rate", false, new Int64Field(&rocks_db.delayed_write_rate, 0, 0, INT64_MAX)}, + {"rocksdb.wal_compression", true, + new EnumField(&rocks_db.wal_compression, wal_compression_types, + rocksdb::CompressionType::kNoCompression)}, {"rocksdb.wal_ttl_seconds", true, new IntField(&rocks_db.wal_ttl_seconds, 3 * 3600, 0, INT_MAX)}, {"rocksdb.wal_size_limit_mb", true, new IntField(&rocks_db.wal_size_limit_mb, 16384, 0, INT_MAX)}, {"rocksdb.max_total_wal_size", false, new IntField(&rocks_db.max_total_wal_size, 64 * 4 * 2, 0, INT_MAX)}, diff --git a/src/config/config.h b/src/config/config.h index e80e8578776..391d9534f58 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -70,6 +70,7 @@ struct Config { Config(); ~Config() = default; uint32_t port = 0; + int socket_fd = -1; uint32_t tls_port = 0; std::string tls_cert_file; @@ -196,6 +197,7 @@ struct Config { int64_t delayed_write_rate; int compaction_readahead_size; int target_file_size_base; + rocksdb::CompressionType wal_compression; int wal_ttl_seconds; int wal_size_limit_mb; int max_total_wal_size; diff --git a/src/server/worker.cc b/src/server/worker.cc index d5a751e1578..4ddf31add0b 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -30,7 +31,6 @@ #include "io_util.h" #include "scope_exit.h" #include "thread_util.h" -#include "time_util.h" #ifdef ENABLE_OPENSSL #include @@ -44,7 +44,6 @@ #include #include -#include #include #include "redis_connection.h" @@ -59,17 +58,22 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) timeval tm = {10, 0}; evtimer_add(timer_.get(), &tm); - uint32_t ports[3] = {config->port, config->tls_port, 0}; - auto binds = config->binds; - - for (uint32_t *port = ports; *port; ++port) { - for (const auto &bind : binds) { - Status s = listenTCP(bind, *port, config->backlog); - if (!s.IsOK()) { - LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port << ". Error: " << s.Msg(); - exit(1); + if (config->socket_fd != -1) { + if (const Status s = listenFD(config->socket_fd, config->port, config->backlog); !s.IsOK()) { + LOG(ERROR) << "[worker] Failed to listen to socket with fd: " << config->socket_fd << ". Error: " << s.Msg(); + exit(1); + } + } else { + const uint32_t ports[3] = {config->port, config->tls_port, 0}; + + for (const uint32_t *port = ports; *port; ++port) { + for (const auto &bind : config->binds) { + if (const Status s = listenTCP(bind, *port, config->backlog); !s.IsOK()) { + LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port << ". Error: " << s.Msg(); + exit(1); + } + LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port; } - LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port; } } lua_ = lua::CreateState(srv); @@ -216,6 +220,22 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f } } +Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) { + const uint32_t port = util::GetLocalPort(fd); + if (port != expected_port) { + return {Status::NotOK, "The port of the provided socket fd doesn't match the configured port"}; + } + const int dup_fd = dup(fd); + if (dup_fd == -1) { + return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; + } + evconnlistener *lev = + NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd); + listen_events_.emplace_back(lev); + LOG(INFO) << "Listening on dup'ed fd: " << dup_fd; + return Status::OK(); +} + Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) { bool ipv6_used = strchr(host.data(), ':'); diff --git a/src/server/worker.h b/src/server/worker.h index b6918ba9296..ac88ec8a8b5 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -27,7 +27,6 @@ #include #include -#include #include #include #include @@ -36,9 +35,9 @@ #include #include +#include "config/config.h" #include "event_util.h" #include "redis_connection.h" -#include "storage/storage.h" class Server; @@ -79,6 +78,7 @@ class Worker : EventCallbackBase, EvconnlistenerBase { Server *srv; private: + Status listenFD(int fd, uint32_t expected_port, int backlog); Status listenTCP(const std::string &host, uint32_t port, int backlog); void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 116b452708e..9e1b315d073 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -168,6 +168,7 @@ rocksdb::Options Storage::InitRocksDBOptions() { options.num_levels = 7; options.compression_opts.level = config_->rocks_db.compression_level; options.compression_per_level.resize(options.num_levels); + options.wal_compression = config_->rocks_db.wal_compression; // only compress levels >= 2 for (int i = 0; i < options.num_levels; ++i) { if (i < 2) { diff --git a/src/storage/storage.h b/src/storage/storage.h index 3ce9b78a548..c29b4689de5 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -94,6 +94,11 @@ inline const std::vector CompressionOptions = { {rocksdb::kZSTD, "zstd", "kZSTD"}, }; +inline const std::vector WalCompressionOptions = { + {rocksdb::kNoCompression, "no", "kNoCompression"}, + {rocksdb::kZSTD, "zstd", "kZSTD"}, +}; + struct CacheOption { BlockCacheType type; const std::string name; diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc index 68919ed8415..dfe631189a9 100644 --- a/tests/cppunit/config_test.cc +++ b/tests/cppunit/config_test.cc @@ -128,6 +128,7 @@ TEST(Config, GetAndSet) { {"rocksdb.row_cache_size", "100"}, {"rocksdb.rate_limiter_auto_tuned", "yes"}, {"rocksdb.compression_level", "32767"}, + {"rocksdb.wal_compression", "no"}, }; for (const auto &iter : immutable_cases) { s = config.Set(nullptr, iter.first, iter.second);