From de847fe360421932ed7f1e18c34e6d49e2f5d9d4 Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Mon, 14 Oct 2024 11:14:22 -0400 Subject: [PATCH 1/4] Allow passing a socket FD to dup and listen on --- src/cli/main.cc | 10 ++++++---- src/config/config.cc | 1 + src/config/config.h | 1 + src/server/worker.cc | 43 +++++++++++++++++++++++++++++++------------ src/server/worker.h | 4 ++-- 5 files changed, 41 insertions(+), 18 deletions(-) diff --git a/src/cli/main.cc b/src/cli/main.cc index a6ed2025873..468905eeb97 100644 --- a/src/cli/main.cc +++ b/src/cli/main.cc @@ -30,7 +30,6 @@ #include #include -#include "config.h" #include "daemon_util.h" #include "io_util.h" #include "pid_util.h" @@ -40,9 +39,7 @@ #include "storage/storage.h" #include "string_util.h" #include "time_util.h" -#include "unique_fd.h" #include "vendor/crc64.h" -#include "version.h" #include "version_util.h" Server *srv = nullptr; @@ -136,6 +133,11 @@ int main(int argc, char *argv[]) { std::cout << "Failed to load config. Error: " << s.Msg() << std::endl; return 1; } + const auto socket_fd_exit = MakeScopeExit([&config] { + if (config.socket_fd != -1) { + close(config.socket_fd); + } + }); crc64_init(); InitGoogleLog(&config); @@ -143,7 +145,7 @@ int main(int argc, char *argv[]) { // Tricky: We don't expect that different instances running on the same port, // but the server use REUSE_PORT to support the multi listeners. So we connect // the listen port to check if the port has already listened or not. - if (!config.binds.empty()) { + if (config.socket_fd != -1 && !config.binds.empty()) { uint32_t ports[] = {config.port, config.tls_port, 0}; for (uint32_t *port = ports; *port; ++port) { if (util::IsPortInUse(*port)) { diff --git a/src/config/config.cc b/src/config/config.cc index 2ced3d084ab..38b8fbaca54 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -110,6 +110,7 @@ Config::Config() { {"daemonize", true, new YesNoField(&daemonize, false)}, {"bind", true, new StringField(&binds_str_, "")}, {"port", true, new UInt32Field(&port, kDefaultPort, 1, PORT_LIMIT)}, + {"socket-fd", true, new IntField(&socket_fd, -1, -1, 1 << 16)}, #ifdef ENABLE_OPENSSL {"tls-port", true, new UInt32Field(&tls_port, 0, 0, PORT_LIMIT)}, {"tls-cert-file", false, new StringField(&tls_cert_file, "")}, diff --git a/src/config/config.h b/src/config/config.h index 8a2ebfc4786..1b7311e4b18 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -70,6 +70,7 @@ struct Config { Config(); ~Config() = default; uint32_t port = 0; + int socket_fd = -1; uint32_t tls_port = 0; std::string tls_cert_file; diff --git a/src/server/worker.cc b/src/server/worker.cc index d5a751e1578..e55fd15978e 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -30,7 +31,6 @@ #include "io_util.h" #include "scope_exit.h" #include "thread_util.h" -#include "time_util.h" #ifdef ENABLE_OPENSSL #include @@ -44,7 +44,6 @@ #include #include -#include #include #include "redis_connection.h" @@ -59,17 +58,22 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new()) timeval tm = {10, 0}; evtimer_add(timer_.get(), &tm); - uint32_t ports[3] = {config->port, config->tls_port, 0}; - auto binds = config->binds; - - for (uint32_t *port = ports; *port; ++port) { - for (const auto &bind : binds) { - Status s = listenTCP(bind, *port, config->backlog); - if (!s.IsOK()) { - LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port << ". Error: " << s.Msg(); - exit(1); + if (config->socket_fd != -1) { + if (const Status s = listenFD(config->socket_fd, config->port, config->backlog); !s.IsOK()) { + LOG(ERROR) << "[worker] Failed to listen to socket with fd: " << config->socket_fd << ". Error: " << s.Msg(); + exit(1); + } + } else { + const uint32_t ports[3] = {config->port, config->tls_port, 0}; + + for (const uint32_t *port = ports; *port; ++port) { + for (const auto &bind : config->binds) { + if (const Status s = listenTCP(bind, *port, config->backlog); !s.IsOK()) { + LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port << ". Error: " << s.Msg(); + exit(1); + } + LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port; } - LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port; } } lua_ = lua::CreateState(srv); @@ -216,6 +220,21 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f } } +Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) { + const uint32_t port = util::GetLocalPort(fd); + if (port != expected_port) { + return {Status::NotOK, "The port of the provided socket fd doesn't match the configured port"}; + } + const int dup_fd = dup(fd); + if (dup_fd == -1) { + return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; + } + evconnlistener* lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd); + listen_events_.emplace_back(lev); + LOG(INFO) << "Listening on dup'ed fd: " << dup_fd; + return Status::OK(); +} + Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) { bool ipv6_used = strchr(host.data(), ':'); diff --git a/src/server/worker.h b/src/server/worker.h index b6918ba9296..ac88ec8a8b5 100644 --- a/src/server/worker.h +++ b/src/server/worker.h @@ -27,7 +27,6 @@ #include #include -#include #include #include #include @@ -36,9 +35,9 @@ #include #include +#include "config/config.h" #include "event_util.h" #include "redis_connection.h" -#include "storage/storage.h" class Server; @@ -79,6 +78,7 @@ class Worker : EventCallbackBase, EvconnlistenerBase { Server *srv; private: + Status listenFD(int fd, uint32_t expected_port, int backlog); Status listenTCP(const std::string &host, uint32_t port, int backlog); void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen); From 75dd4e6c25b09d37835dde56adc82d60201953ba Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Mon, 14 Oct 2024 11:34:11 -0400 Subject: [PATCH 2/4] Run format --- src/server/worker.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server/worker.cc b/src/server/worker.cc index e55fd15978e..4ddf31add0b 100644 --- a/src/server/worker.cc +++ b/src/server/worker.cc @@ -229,7 +229,8 @@ Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) { if (dup_fd == -1) { return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())}; } - evconnlistener* lev = NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd); + evconnlistener *lev = + NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd); listen_events_.emplace_back(lev); LOG(INFO) << "Listening on dup'ed fd: " << dup_fd; return Status::OK(); From 238a1544135aab50a381674f0760a9dbbd295e23 Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Tue, 15 Oct 2024 14:16:27 -0400 Subject: [PATCH 3/4] Fix typo: we want to check ports only if we're not passing a socket FD --- src/cli/main.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli/main.cc b/src/cli/main.cc index 468905eeb97..3eaf4b13b6b 100644 --- a/src/cli/main.cc +++ b/src/cli/main.cc @@ -145,7 +145,7 @@ int main(int argc, char *argv[]) { // Tricky: We don't expect that different instances running on the same port, // but the server use REUSE_PORT to support the multi listeners. So we connect // the listen port to check if the port has already listened or not. - if (config.socket_fd != -1 && !config.binds.empty()) { + if (config.socket_fd == -1 && !config.binds.empty()) { uint32_t ports[] = {config.port, config.tls_port, 0}; for (uint32_t *port = ports; *port; ++port) { if (util::IsPortInUse(*port)) { From f60300f72836c216d1227ca036bf88de9ac2eeaf Mon Sep 17 00:00:00 2001 From: Nathan Lo Date: Tue, 15 Oct 2024 14:30:35 -0400 Subject: [PATCH 4/4] Add socket-fd to kvrocks.conf and document it --- kvrocks.conf | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kvrocks.conf b/kvrocks.conf index f6b6b31c2d4..b6a2e0279f9 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -20,6 +20,16 @@ bind 127.0.0.1 # unixsocket /tmp/kvrocks.sock # unixsocketperm 777 +# Allows a parent process to open a socket and pass its FD down to kvrocks as a child +# process. Useful to reserve a port and prevent race conditions. +# +# PLEASE NOTE: +# If this is overridden to a value other than -1, the bind and tls* directives will be +# ignored. +# +# Default: -1 (not overridden, defer to creating a connection to the specified port) +socket-fd -1 + # Accept connections on the specified port, default is 6666. port 6666