Skip to content

Commit

Permalink
Merge pull request #122 from project-tsurugi/wip/i_573_dev_shm_cleanup
Browse files Browse the repository at this point in the history
Wip/i 573 dev shm cleanup
  • Loading branch information
t-horikawa authored Jan 18, 2024
2 parents 70cdcd0 + 512545d commit 22eeaa4
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 152 deletions.
2 changes: 1 addition & 1 deletion src/tateyama/monitor/monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ enum class status : std::int64_t {
case status::boot_error: return "boot_error"sv;
case status::unknown: return "disconnected"sv;
}
std::abort();
return "illegal state"sv;
}

class monitor {
Expand Down
9 changes: 7 additions & 2 deletions src/tateyama/process/control.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,13 @@ tgctl::return_code tgctl_start(const std::string& argv0, bool need_check, tateya
}
} else {
if (!FLAGS_quiet) {
std::cout << "could not launch " << server_name_string
<< ", because launch is still in progres" << std::endl;
if (file_mutex->check() == proc_mutex::lock_state::locked) {
std::cout << "could not launch " << server_name_string
<< ", because launch is still in progres" << std::endl;
} else { // if the lock is not held by the tsurugidb process, this means that the tsurugidb boot has failed.
std::cout << "could not launch " << server_name_string << ", as "
<< server_name_string << " exited due to some error." << std::endl;
}
}
rtnv = tgctl::return_code::err;
}
Expand Down
139 changes: 93 additions & 46 deletions src/tateyama/process/proc_mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,62 +21,62 @@
#include <stdexcept> // std::runtime_error
#include <filesystem>
#include <fstream>
#include <functional>

namespace tateyama::process {

class proc_mutex {
class file_mutex {
public:
enum class lock_state : std::int32_t {
no_file = 0,
not_locked,
locked,
error,
};

explicit proc_mutex(std::filesystem::path lock_file, bool create_file = true, bool throw_exception = true)
: lock_file_(std::move(lock_file)), create_file_(create_file) {
if (!create_file_ && throw_exception && !std::filesystem::exists(lock_file_)) {
throw std::runtime_error("the lock file does not exist");

file_mutex(std::filesystem::path lock_file, bool create_file, bool throw_exception) : lock_file_(std::move(lock_file)) {
if (create_file) {
if ((fd_ = open(lock_file_.generic_string().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR)) < 0) { // NOLINT
if (throw_exception) {
throw std::runtime_error("the lock file already exist");
}
}
} else {
if ((fd_ = open(lock_file_.generic_string().c_str(), O_RDWR)) < 0) { // NOLINT
if (throw_exception) {
throw std::runtime_error("the lock file does not exist");
}
}
}
}
~proc_mutex() {
if (fd_ != not_opened) {
~file_mutex() {
if (fd_ >= 0) {
close(fd_);
}
if (create_file_) {
if (owner_) {
unlink(lock_file_.generic_string().c_str());
}
}

proc_mutex(proc_mutex const& other) = delete;
proc_mutex& operator=(proc_mutex const& other) = delete;
proc_mutex(proc_mutex&& other) noexcept = delete;
proc_mutex& operator=(proc_mutex&& other) noexcept = delete;
file_mutex(file_mutex const& other) = delete;
file_mutex& operator=(file_mutex const& other) = delete;
file_mutex(file_mutex&& other) noexcept = delete;
file_mutex& operator=(file_mutex&& other) noexcept = delete;

void lock() {
if (create_file_) {
if ((fd_ = open(lock_file_.generic_string().c_str(), O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR)) < 0) { // NOLINT
throw std::runtime_error("open error");
}
}
[[nodiscard]] inline std::string name() const {
return lock_file_.generic_string();
}
void lock(const std::function<void(void)>& fill_contents = []{}) {
if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { // NOLINT
if (ftruncate(fd_, 0) < 0) {
throw std::runtime_error("ftruncate error");
}
std::string pid = std::to_string(getpid());
if (write(fd_, pid.data(), pid.length()) < 0) {
throw std::runtime_error("write error");
}
fill_contents();
owner_ = true;
return;
}
throw std::runtime_error("lock error");
}
void unlock() const {
flock(fd_, LOCK_UN);
}
[[nodiscard]] inline std::string name() const {
return lock_file_.generic_string();
}
[[nodiscard]] lock_state check() {
std::error_code error;
const bool result = std::filesystem::exists(lock_file_, error);
Expand All @@ -86,7 +86,7 @@ class proc_mutex {
if (!std::filesystem::is_regular_file(lock_file_)) {
return lock_state::error;
}
if (fd_ = open(lock_file_.generic_string().c_str(), O_WRONLY); fd_ < 0) { // NOLINT
if (fd_ = open(lock_file_.generic_string().c_str(), O_RDWR); fd_ < 0) { // NOLINT
return lock_state::error;
}
if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { // NOLINT
Expand All @@ -95,6 +95,45 @@ class proc_mutex {
}
return lock_state::locked;
}
[[nodiscard]] inline constexpr std::string_view to_string_view(lock_state value) noexcept {
using namespace std::string_view_literals;
switch (value) {
case lock_state::no_file: return "no_file"sv;
case lock_state::not_locked: return "not_locked"sv;
case lock_state::locked: return "locked"sv;
case lock_state::error: return "error"sv;
}
return "illegal lock state"sv;
}

protected:
std::filesystem::path lock_file_; // NOLINT
int fd_{not_opened}; // NOLINT

private:
bool owner_{};
static constexpr int not_opened = -1;
};


class proc_mutex : public file_mutex {
public:
explicit proc_mutex(std::filesystem::path lock_file, bool create_file = true, bool throw_exception = true)
: file_mutex(std::move(lock_file), create_file, throw_exception) {
}

void lock() {
file_mutex::lock([this]{
if (ftruncate(fd_, 0) < 0) {
throw std::runtime_error("ftruncate error");
}
std::string pid = std::to_string(getpid());
if (write(fd_, pid.data(), pid.length()) < 0) {
throw std::runtime_error("write error");
}
});
}

[[nodiscard]] int pid(bool do_check = true) {
std::string str;
if (contents(str, do_check)) {
Expand All @@ -106,24 +145,8 @@ class proc_mutex {
}
return 0;
}
[[nodiscard]] inline constexpr std::string_view to_string_view(lock_state value) noexcept {
using namespace std::string_view_literals;
using state = lock_state;
switch (value) {
case state::no_file: return "no_file"sv;
case state::not_locked: return "not_locked"sv;
case state::locked: return "locked"sv;
case state::error: return "error"sv;
}
std::abort();
}

private:
std::filesystem::path lock_file_;
int fd_{not_opened};
const bool create_file_;
static constexpr int not_opened = -1;

[[nodiscard]] bool contents(std::string& str, bool do_check = true) {
if (do_check && check() != lock_state::locked) {
return false;
Expand All @@ -136,4 +159,28 @@ class proc_mutex {
}
};

class shm_mutex {
public:
explicit shm_mutex(std::filesystem::path lock_file) {
try {
shm_mutex_ = std::make_unique<file_mutex>(std::move(lock_file), true, true);
shm_mutex_->lock();
return;
} catch (std::runtime_error &ex) {
shm_mutex_ = std::make_unique<file_mutex>(std::move(lock_file), false, true);
shm_mutex_->lock();
return;
}
}
static std::filesystem::path lock_file_name(std::string_view dbname) {
std::string str = "tsurugi-";
str += dbname;
str += ".lock";
return {str};
}

private:
std::unique_ptr<file_mutex> shm_mutex_{};
};

} // tateyama::process
126 changes: 23 additions & 103 deletions src/tateyama/server/backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "server.h"
#include "utils.h"
#include "logging.h"
#include "glog_helper.h"
#ifdef ALTIMETER
#include <altimeter/logger.h>
#include "tateyama/altimeter/altimeter_helper.h"
Expand Down Expand Up @@ -80,108 +81,6 @@ static void sighup_handler([[maybe_unused]] int sig) {
}
}

void setup_glog(tateyama::api::configuration::whole* conf) {
auto* glog_section = conf->get_section("glog");

// logtostderr
if (auto logtostderr_env = getenv("GLOG_logtostderr"); logtostderr_env) {
FLAGS_logtostderr = true;
} else {
auto logtostderr = glog_section->get<bool>("logtostderr");
if (logtostderr) {
FLAGS_logtostderr = logtostderr.value();
}
}

// stderrthreshold
if (auto stderrthreshold_env = getenv("GLOG_stderrthreshold"); stderrthreshold_env) {
FLAGS_stderrthreshold = static_cast<::google::int32>(strtol(stderrthreshold_env, nullptr, 10));
} else {
auto stderrthreshold = glog_section->get<int>("stderrthreshold");
if (stderrthreshold) {
FLAGS_stderrthreshold = stderrthreshold.value();
}
}

// minloglevel
if (auto minloglevel_env = getenv("GLOG_minloglevel"); minloglevel_env) {
FLAGS_minloglevel = static_cast<::google::int32>(strtol(minloglevel_env, nullptr, 10));
} else {
auto minloglevel = glog_section->get<int>("minloglevel");
if (minloglevel) {
FLAGS_minloglevel = minloglevel.value();
}
}

// log_dir
if (auto log_dir_env = getenv("GLOG_log_dir"); log_dir_env) {
FLAGS_log_dir=log_dir_env;
} else {
auto log_dir = glog_section->get<std::filesystem::path>("log_dir");
if (log_dir) {
FLAGS_log_dir=log_dir.value().string();
}
}

// max_log_size
if (auto max_log_size_env = getenv("GLOG_max_log_size"); max_log_size_env) {
FLAGS_max_log_size = static_cast<::google::int32>(strtol(max_log_size_env, nullptr, 10));
} else {
auto max_log_size = glog_section->get<int>("max_log_size");
if (max_log_size) {
FLAGS_max_log_size = max_log_size.value();
}
}

// v
if (auto v_env = getenv("GLOG_v"); v_env) {
FLAGS_v = static_cast<::google::int32>(strtol(v_env, nullptr, 10));
} else {
auto v = glog_section->get<int>("v");
if (v) {
FLAGS_v = v.value();
}
}

// logbuflevel
if (auto logbuflevel_env = getenv("GLOG_logbuflevel"); logbuflevel_env) {
FLAGS_logbuflevel = static_cast<::google::int32>(strtol(logbuflevel_env, nullptr, 10));
} else {
auto logbuflevel = glog_section->get<int>("logbuflevel");
if (logbuflevel) {
FLAGS_logbuflevel = logbuflevel.value();
}
}

google::InitGoogleLogging("tsurugidb");
google::InstallFailureSignalHandler();
conf->show_vlog_info_message();

// output configuration to be used
LOG(INFO) << glog_config_prefix
<< std::boolalpha
<< "logtostderr: " << FLAGS_logtostderr << ", "
<< "logtostderr for glog.";
LOG(INFO) << glog_config_prefix
<< "stderrthreshold: " << FLAGS_stderrthreshold << ", "
<< "stderrthreshold for glog.";
LOG(INFO) << glog_config_prefix
<< "minloglevel: " << FLAGS_minloglevel << ", "
<< "minloglevel for glog.";
LOG(INFO) << glog_config_prefix
<< "log_dir: " << "\"" << FLAGS_log_dir << "\", "
<< "log_dir for glog.";
LOG(INFO) << glog_config_prefix
<< "max_log_size: " << FLAGS_max_log_size << ", "
<< "max_log_size for glog.";
LOG(INFO) << glog_config_prefix
<< "v: " << FLAGS_v << ", "
<< "v for glog.";
LOG(INFO) << glog_config_prefix
<< "logbuflevel: " << FLAGS_logbuflevel << ", "
<< "logbuflevel for glog.";
}

int backend_main(int argc, char **argv) {
// command arguments
gflags::SetUsageMessage("tateyama database server");
Expand Down Expand Up @@ -211,9 +110,10 @@ int backend_main(int argc, char **argv) {
LOG(INFO) << "==== configuration end ====";
} catch (boost::property_tree::json_parser_error& e) {
LOG(ERROR) << e.what();
exit(1);
}

// mutex
// process mutex
auto mutex_file = bst_conf.lock_file();
// output configuration to be used
LOG(INFO) << system_config_prefix
Expand All @@ -227,6 +127,7 @@ int backend_main(int argc, char **argv) {
exit(1);
}

// obsolete
bool tpch_mode = false;
if (FLAGS_load) {
tpch_mode = false;
Expand Down Expand Up @@ -269,9 +170,27 @@ int backend_main(int argc, char **argv) {
LOG(ERROR) << "Starting server failed due to errors in setting up server application framework.";
exit(1);
}

// should do after setup()
status_info->mutex_file(mutex_file.string());

// shm mutex
std::unique_ptr<tateyama::process::shm_mutex> shm_mutex{};
if (auto database_name_opt = conf->get_section("ipc_endpoint")->get<std::string>("database_name"); database_name_opt) {
try {
shm_mutex = std::make_unique<tateyama::process::shm_mutex>(
// ensured that pid_directoy in system section always exists, see src/tateyama/configuration/bootstrap_configuration.cpp
conf->get_section("system")->get<std::filesystem::path>("pid_directory").value() /
tateyama::process::shm_mutex::lock_file_name(database_name_opt.value())
);
} catch (std::runtime_error &ex) {
status_info->whole(tateyama::status_info::state::boot_error);
LOG(ERROR) << "A tsurugidb process is already running using the same database name (" << database_name_opt.value() << ")";
tgsv.shutdown();
exit(1);
}
} // when database_name in ipc_endpointipc section doex not exist, ipc_endpoint is inactive. Thus this check is unnecessary.

auto* tgdb = sqlsvc->database();
if (tgdb) {
if (tpch_mode) {
Expand All @@ -284,6 +203,7 @@ int backend_main(int argc, char **argv) {
status_info->whole(tateyama::status_info::state::boot_error);
// detailed message must have been logged in the components where start error occurs
LOG(ERROR) << "Starting server failed due to errors in starting server application framework.";
tgsv.shutdown();
exit(1);
}

Expand Down
Loading

0 comments on commit 22eeaa4

Please sign in to comment.