Skip to content

Commit

Permalink
Merge branch 'unstable' into unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanlo99 authored Oct 18, 2024
2 parents 916ce73 + 4709734 commit 2edb82b
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 20 deletions.
4 changes: 2 additions & 2 deletions cmake/rocksdb.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ endif()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(rocksdb
facebook/rocksdb v9.6.1
MD5=ce31144a7e65d8f4f3f9d98986509eb1
facebook/rocksdb v9.7.2
MD5=1d6d569285b6942cf37b5e8cbf396f65
)

FetchContent_GetProperties(jemalloc)
Expand Down
20 changes: 20 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ bind 127.0.0.1
# unixsocket /tmp/kvrocks.sock
# unixsocketperm 777

# Allows a parent process to open a socket and pass its FD down to kvrocks as a child
# process. Useful to reserve a port and prevent race conditions.
#
# PLEASE NOTE:
# If this is overridden to a value other than -1, the bind and tls* directives will be
# ignored.
#
# Default: -1 (not overridden, defer to creating a connection to the specified port)
socket-fd -1

# Accept connections on the specified port, default is 6666.
port 6666

Expand Down Expand Up @@ -708,6 +718,16 @@ rocksdb.max_background_flushes -1
# Default: 2
rocksdb.max_subcompactions 2

# If enabled WAL records will be compressed before they are written. Only
# ZSTD (= kZSTD) is supported (until streaming support is adapted for other
# compression types). Compressed WAL records will be read in supported
# versions (>= RocksDB 7.4.0 for ZSTD) regardless of this setting when
# the WAL is read.
#
# Accept value: "no", "zstd"
# Default is no
rocksdb.wal_compression no

# In order to limit the size of WALs, RocksDB uses DBOptions::max_total_wal_size
# as the trigger of column family flush. Once WALs exceed this size, RocksDB
# will start forcing the flush of column families to allow deletion of some
Expand Down
10 changes: 6 additions & 4 deletions src/cli/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <iomanip>
#include <ostream>

#include "config.h"
#include "daemon_util.h"
#include "io_util.h"
#include "pid_util.h"
Expand All @@ -42,9 +41,7 @@
#include "storage/storage.h"
#include "string_util.h"
#include "time_util.h"
#include "unique_fd.h"
#include "vendor/crc64.h"
#include "version.h"
#include "version_util.h"

Server *srv = nullptr;
Expand Down Expand Up @@ -163,14 +160,19 @@ int main(int argc, char *argv[]) {
std::cout << "Failed to load config. Error: " << s.Msg() << std::endl;
return 1;
}
const auto socket_fd_exit = MakeScopeExit([&config] {
if (config.socket_fd != -1) {
close(config.socket_fd);
}
});

crc64_init();
InitGoogleLog(&config);
LOG(INFO) << "kvrocks " << PrintVersion;
// Tricky: We don't expect that different instances running on the same port,
// but the server use REUSE_PORT to support the multi listeners. So we connect
// the listen port to check if the port has already listened or not.
if (!config.binds.empty()) {
if (config.socket_fd == -1 && !config.binds.empty()) {
uint32_t ports[] = {config.port, config.tls_port, 0};
for (uint32_t *port = ports; *port; ++port) {
if (util::IsPortInUse(*port)) {
Expand Down
13 changes: 13 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ const std::vector<ConfigEnum<rocksdb::CompressionType>> compression_types{[] {
return res;
}()};

const std::vector<ConfigEnum<rocksdb::CompressionType>> wal_compression_types{[] {
std::vector<ConfigEnum<rocksdb::CompressionType>> res;
res.reserve(engine::WalCompressionOptions.size());
for (const auto &e : engine::WalCompressionOptions) {
res.push_back({e.name, e.type});
}
return res;
}()};

const std::vector<ConfigEnum<BlockCacheType>> cache_types{[] {
std::vector<ConfigEnum<BlockCacheType>> res;
res.reserve(engine::CacheOptions.size());
Expand Down Expand Up @@ -110,6 +119,7 @@ Config::Config() {
{"daemonize", true, new YesNoField(&daemonize, false)},
{"bind", true, new StringField(&binds_str_, "")},
{"port", true, new UInt32Field(&port, kDefaultPort, 1, PORT_LIMIT)},
{"socket-fd", true, new IntField(&socket_fd, -1, -1, 1 << 16)},
#ifdef ENABLE_OPENSSL
{"tls-port", true, new UInt32Field(&tls_port, 0, 0, PORT_LIMIT)},
{"tls-cert-file", false, new StringField(&tls_cert_file, "")},
Expand Down Expand Up @@ -212,6 +222,9 @@ Config::Config() {
{"rocksdb.max_background_flushes", true, new IntField(&rocks_db.max_background_flushes, 2, -1, 32)},
{"rocksdb.max_subcompactions", false, new IntField(&rocks_db.max_subcompactions, 2, 0, 16)},
{"rocksdb.delayed_write_rate", false, new Int64Field(&rocks_db.delayed_write_rate, 0, 0, INT64_MAX)},
{"rocksdb.wal_compression", true,
new EnumField<rocksdb::CompressionType>(&rocks_db.wal_compression, wal_compression_types,
rocksdb::CompressionType::kNoCompression)},
{"rocksdb.wal_ttl_seconds", true, new IntField(&rocks_db.wal_ttl_seconds, 3 * 3600, 0, INT_MAX)},
{"rocksdb.wal_size_limit_mb", true, new IntField(&rocks_db.wal_size_limit_mb, 16384, 0, INT_MAX)},
{"rocksdb.max_total_wal_size", false, new IntField(&rocks_db.max_total_wal_size, 64 * 4 * 2, 0, INT_MAX)},
Expand Down
2 changes: 2 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct Config {
Config();
~Config() = default;
uint32_t port = 0;
int socket_fd = -1;

uint32_t tls_port = 0;
std::string tls_cert_file;
Expand Down Expand Up @@ -196,6 +197,7 @@ struct Config {
int64_t delayed_write_rate;
int compaction_readahead_size;
int target_file_size_base;
rocksdb::CompressionType wal_compression;
int wal_ttl_seconds;
int wal_size_limit_mb;
int max_total_wal_size;
Expand Down
44 changes: 32 additions & 12 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <event2/util.h>
#include <glog/logging.h>
#include <unistd.h>

#include <stdexcept>
#include <string>
Expand All @@ -30,7 +31,6 @@
#include "io_util.h"
#include "scope_exit.h"
#include "thread_util.h"
#include "time_util.h"

#ifdef ENABLE_OPENSSL
#include <event2/bufferevent_ssl.h>
Expand All @@ -44,7 +44,6 @@
#include <sys/un.h>

#include <algorithm>
#include <list>
#include <utility>

#include "redis_connection.h"
Expand All @@ -59,17 +58,22 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new())
timeval tm = {10, 0};
evtimer_add(timer_.get(), &tm);

uint32_t ports[3] = {config->port, config->tls_port, 0};
auto binds = config->binds;

for (uint32_t *port = ports; *port; ++port) {
for (const auto &bind : binds) {
Status s = listenTCP(bind, *port, config->backlog);
if (!s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port << ". Error: " << s.Msg();
exit(1);
if (config->socket_fd != -1) {
if (const Status s = listenFD(config->socket_fd, config->port, config->backlog); !s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen to socket with fd: " << config->socket_fd << ". Error: " << s.Msg();
exit(1);
}
} else {
const uint32_t ports[3] = {config->port, config->tls_port, 0};

for (const uint32_t *port = ports; *port; ++port) {
for (const auto &bind : config->binds) {
if (const Status s = listenTCP(bind, *port, config->backlog); !s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port << ". Error: " << s.Msg();
exit(1);
}
LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
}
LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
}
}
lua_ = lua::CreateState(srv);
Expand Down Expand Up @@ -216,6 +220,22 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f
}
}

Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) {
const uint32_t port = util::GetLocalPort(fd);
if (port != expected_port) {
return {Status::NotOK, "The port of the provided socket fd doesn't match the configured port"};
}
const int dup_fd = dup(fd);
if (dup_fd == -1) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
evconnlistener *lev =
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd);
listen_events_.emplace_back(lev);
LOG(INFO) << "Listening on dup'ed fd: " << dup_fd;
return Status::OK();
}

Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
bool ipv6_used = strchr(host.data(), ':');

Expand Down
4 changes: 2 additions & 2 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include <cstdint>
#include <cstring>
#include <iostream>
#include <lua.hpp>
#include <map>
#include <memory>
Expand All @@ -36,9 +35,9 @@
#include <utility>
#include <vector>

#include "config/config.h"
#include "event_util.h"
#include "redis_connection.h"
#include "storage/storage.h"

class Server;

Expand Down Expand Up @@ -79,6 +78,7 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
Server *srv;

private:
Status listenFD(int fd, uint32_t expected_port, int backlog);
Status listenTCP(const std::string &host, uint32_t port, int backlog);
void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
Expand Down
1 change: 1 addition & 0 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ rocksdb::Options Storage::InitRocksDBOptions() {
options.num_levels = 7;
options.compression_opts.level = config_->rocks_db.compression_level;
options.compression_per_level.resize(options.num_levels);
options.wal_compression = config_->rocks_db.wal_compression;
// only compress levels >= 2
for (int i = 0; i < options.num_levels; ++i) {
if (i < 2) {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ inline const std::vector<CompressionOption> CompressionOptions = {
{rocksdb::kZSTD, "zstd", "kZSTD"},
};

inline const std::vector<CompressionOption> WalCompressionOptions = {
{rocksdb::kNoCompression, "no", "kNoCompression"},
{rocksdb::kZSTD, "zstd", "kZSTD"},
};

struct CacheOption {
BlockCacheType type;
const std::string name;
Expand Down
1 change: 1 addition & 0 deletions tests/cppunit/config_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ TEST(Config, GetAndSet) {
{"rocksdb.row_cache_size", "100"},
{"rocksdb.rate_limiter_auto_tuned", "yes"},
{"rocksdb.compression_level", "32767"},
{"rocksdb.wal_compression", "no"},
};
for (const auto &iter : immutable_cases) {
s = config.Set(nullptr, iter.first, iter.second);
Expand Down

0 comments on commit 2edb82b

Please sign in to comment.