Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(worker): Allow passing a socket FD to dup and listen on #2598

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/cli/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
#include <iomanip>
#include <ostream>

#include "config.h"
#include "daemon_util.h"
#include "io_util.h"
#include "pid_util.h"
Expand All @@ -40,9 +39,7 @@
#include "storage/storage.h"
#include "string_util.h"
#include "time_util.h"
#include "unique_fd.h"
#include "vendor/crc64.h"
#include "version.h"
#include "version_util.h"

Server *srv = nullptr;
Expand Down Expand Up @@ -136,14 +133,19 @@ int main(int argc, char *argv[]) {
std::cout << "Failed to load config. Error: " << s.Msg() << std::endl;
return 1;
}
const auto socket_fd_exit = MakeScopeExit([&config] {
if (config.socket_fd != -1) {
close(config.socket_fd);
}
});

crc64_init();
InitGoogleLog(&config);
LOG(INFO) << "kvrocks " << PrintVersion;
// Tricky: We don't expect that different instances running on the same port,
// but the server use REUSE_PORT to support the multi listeners. So we connect
// the listen port to check if the port has already listened or not.
if (!config.binds.empty()) {
if (config.socket_fd != -1 && !config.binds.empty()) {
uint32_t ports[] = {config.port, config.tls_port, 0};
for (uint32_t *port = ports; *port; ++port) {
if (util::IsPortInUse(*port)) {
Expand Down
1 change: 1 addition & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Config::Config() {
{"daemonize", true, new YesNoField(&daemonize, false)},
{"bind", true, new StringField(&binds_str_, "")},
{"port", true, new UInt32Field(&port, kDefaultPort, 1, PORT_LIMIT)},
{"socket-fd", true, new IntField(&socket_fd, -1, -1, 1 << 16)},
nathanlo-hrt marked this conversation as resolved.
Show resolved Hide resolved
#ifdef ENABLE_OPENSSL
{"tls-port", true, new UInt32Field(&tls_port, 0, 0, PORT_LIMIT)},
{"tls-cert-file", false, new StringField(&tls_cert_file, "")},
Expand Down
1 change: 1 addition & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct Config {
Config();
~Config() = default;
uint32_t port = 0;
int socket_fd = -1;

uint32_t tls_port = 0;
std::string tls_cert_file;
Expand Down
44 changes: 32 additions & 12 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <event2/util.h>
#include <glog/logging.h>
#include <unistd.h>

#include <stdexcept>
#include <string>
Expand All @@ -30,7 +31,6 @@
#include "io_util.h"
#include "scope_exit.h"
#include "thread_util.h"
#include "time_util.h"

#ifdef ENABLE_OPENSSL
#include <event2/bufferevent_ssl.h>
Expand All @@ -44,7 +44,6 @@
#include <sys/un.h>

#include <algorithm>
#include <list>
#include <utility>

#include "redis_connection.h"
Expand All @@ -59,17 +58,22 @@ Worker::Worker(Server *srv, Config *config) : srv(srv), base_(event_base_new())
timeval tm = {10, 0};
evtimer_add(timer_.get(), &tm);

uint32_t ports[3] = {config->port, config->tls_port, 0};
auto binds = config->binds;

for (uint32_t *port = ports; *port; ++port) {
for (const auto &bind : binds) {
Status s = listenTCP(bind, *port, config->backlog);
if (!s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port << ". Error: " << s.Msg();
exit(1);
if (config->socket_fd != -1) {
if (const Status s = listenFD(config->socket_fd, config->port, config->backlog); !s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen to socket with fd: " << config->socket_fd << ". Error: " << s.Msg();
exit(1);
}
} else {
const uint32_t ports[3] = {config->port, config->tls_port, 0};

for (const uint32_t *port = ports; *port; ++port) {
for (const auto &bind : config->binds) {
if (const Status s = listenTCP(bind, *port, config->backlog); !s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << *port << ". Error: " << s.Msg();
exit(1);
}
LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
}
LOG(INFO) << "[worker] Listening on: " << bind << ":" << *port;
}
}
lua_ = lua::CreateState(srv);
Expand Down Expand Up @@ -216,6 +220,22 @@ void Worker::newUnixSocketConnection(evconnlistener *listener, evutil_socket_t f
}
}

Status Worker::listenFD(int fd, uint32_t expected_port, int backlog) {
const uint32_t port = util::GetLocalPort(fd);
if (port != expected_port) {
return {Status::NotOK, "The port of the provided socket fd doesn't match the configured port"};
}
const int dup_fd = dup(fd);
if (dup_fd == -1) {
return {Status::NotOK, evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())};
}
evconnlistener *lev =
NewEvconnlistener<&Worker::newTCPConnection>(base_, LEV_OPT_THREADSAFE | LEV_OPT_CLOSE_ON_FREE, backlog, dup_fd);
listen_events_.emplace_back(lev);
LOG(INFO) << "Listening on dup'ed fd: " << dup_fd;
return Status::OK();
}

Status Worker::listenTCP(const std::string &host, uint32_t port, int backlog) {
bool ipv6_used = strchr(host.data(), ':');

Expand Down
4 changes: 2 additions & 2 deletions src/server/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include <cstdint>
#include <cstring>
#include <iostream>
#include <lua.hpp>
#include <map>
#include <memory>
Expand All @@ -36,9 +35,9 @@
#include <utility>
#include <vector>

#include "config/config.h"
#include "event_util.h"
#include "redis_connection.h"
#include "storage/storage.h"

class Server;

Expand Down Expand Up @@ -79,6 +78,7 @@ class Worker : EventCallbackBase<Worker>, EvconnlistenerBase<Worker> {
Server *srv;

private:
Status listenFD(int fd, uint32_t expected_port, int backlog);
Status listenTCP(const std::string &host, uint32_t port, int backlog);
void newTCPConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
void newUnixSocketConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen);
Expand Down