Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactored MySQL connector class, fixed bugs
Browse files Browse the repository at this point in the history
Eliminated duplicate data members. Reinforced anf refined the public API
and the implementation.
iagaponenko committed Nov 7, 2023
1 parent 0d1e943 commit c423e02
Showing 2 changed files with 229 additions and 177 deletions.
258 changes: 128 additions & 130 deletions src/mysql/MySqlConnection.cc
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@

// System headers
#include <cstddef>
#include <stdexcept>
#include <sstream>

// Third-party headers
@@ -44,9 +45,6 @@ namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.mysql.MySqlConnection");

} // namespace

namespace {
// A class that calls mysql_thread_end when an instance is destroyed.
struct MySqlThreadJanitor {
~MySqlThreadJanitor() { mysql_thread_end(); }
@@ -67,130 +65,126 @@ struct InitializeMysqlLibrary {
janitor.reset(new MySqlThreadJanitor);
}
};
} // namespace

namespace lsst::qserv::mysql {
/**
* Establish a new MySQL connection.
* @param config Parameters of the connection.
* @return A pointer to the MySQL connection or nullptr.
*/
MYSQL* doConnect(std::shared_ptr<lsst::qserv::mysql::MySqlConfig> const& config) {
// We must call mysql_library_init() exactly once before calling mysql_init
// because it is not thread safe. Both mysql_library_init and mysql_init
// call mysql_thread_init, and so we must arrange to call mysql_thread_end
// when the calling thread exists. We do this by allocating a thread
// local object that calls mysql_thread_end from its destructor.
static std::once_flag initialized;
static thread_local std::unique_ptr<MySqlThreadJanitor> janitor;

MySqlConnection::MySqlConnection()
: _mysql(nullptr),
_mysql_res(nullptr),
_isConnected(false),
_isExecuting(false),
_interrupted(false) {}

MySqlConnection::MySqlConnection(MySqlConfig const& sqlConfig)
: _mysql(nullptr),
_mysql_res(nullptr),
_isConnected(false),
_sqlConfig(std::make_shared<MySqlConfig>(sqlConfig)),
_isExecuting(false),
_interrupted(false) {}

MySqlConnection::~MySqlConnection() {
if (_mysql) {
if (_mysql_res) {
MYSQL_ROW row;
while ((row = mysql_fetch_row(_mysql_res)))
; // Drain results.
_mysql_res = nullptr;
}
closeMySqlConn();
std::call_once(initialized, InitializeMysqlLibrary(janitor));
MYSQL* m = mysql_init(nullptr);
if (nullptr == m) return m;
if (nullptr == janitor) janitor.reset(new MySqlThreadJanitor);
unsigned long const clientFlag = CLIENT_MULTI_STATEMENTS;
mysql_options(m, MYSQL_OPT_LOCAL_INFILE, 0);
MYSQL* c = mysql_real_connect(m, config->socket.empty() ? config->hostname.c_str() : 0,
config->username.empty() ? 0 : config->username.c_str(),
config->password.empty() ? 0 : config->password.c_str(),
config->dbName.empty() ? 0 : config->dbName.c_str(), config->port,
config->socket.empty() ? 0 : config->socket.c_str(), clientFlag);
if (nullptr == c) {
// Failed to connect: free resources.
mysql_close(m);
return c;
}
return m;
}

bool MySqlConnection::checkConnection(mysql::MySqlConfig const& mysqlconfig) {
MySqlConnection conn(mysqlconfig);
} // namespace

namespace lsst::qserv::mysql {

bool MySqlConnection::checkConnection(mysql::MySqlConfig const& config) {
MySqlConnection conn(config);
if (conn.connect()) {
LOGS(_log, LOG_LVL_DEBUG, "Successful MySQL connection check: " << mysqlconfig);
LOGS(_log, LOG_LVL_DEBUG, "Successful MySQL connection check: " << config);
return true;
} else {
LOGS(_log, LOG_LVL_WARN, "Unsuccessful MySQL connection check: " << mysqlconfig);
LOGS(_log, LOG_LVL_WARN, "Unsuccessful MySQL connection check: " << config);
return false;
}
}

void MySqlConnection::closeMySqlConn() {
// Close mysql connection and set deallocated pointer to null
mysql_close(_mysql);
_mysql = nullptr;
}
MySqlConnection::MySqlConnection(MySqlConfig const& config)
: _config(std::make_shared<MySqlConfig>(config)), _mysql(nullptr), _mysql_res(nullptr) {}

MySqlConnection::~MySqlConnection() { _closeMySqlConnImpl(std::lock_guard<std::mutex>(_mtx)); }

void MySqlConnection::closeMySqlConn() { _closeMySqlConnImpl(std::lock_guard<std::mutex>(_mtx)); }

bool MySqlConnection::connect() {
// Cleanup garbage
if (_mysql != nullptr) {
closeMySqlConn();
std::lock_guard<std::mutex> const lock(_mtx);
_closeMySqlConnImpl(lock);
_mysql = ::doConnect(_config);
if (nullptr != _mysql) {
_threadId = mysql_thread_id(_mysql);
return true;
}
_isConnected = false;
// Make myself a thread
_mysql = _connectHelper();
_isConnected = (_mysql != nullptr);
return _isConnected;
return false;
}

bool MySqlConnection::queryUnbuffered(std::string const& query) {
// run query, store into list.
int rc;
{
std::lock_guard<std::mutex> lock(_interruptMutex);
_isExecuting = true;
_interrupted = false;
}
rc = mysql_real_query(_mysql, query.c_str(), query.length());
if (rc) {
return false;
}
std::lock_guard<std::mutex> lock(_mtx);
if (_mysql == nullptr) return false;
int const rc = mysql_real_query(_mysql, query.c_str(), query.length());
if (rc) return false;
_mysql_res = mysql_use_result(_mysql);
_isExecuting = false;
if (!_mysql_res) {
return false;
}
if (nullptr == _mysql_res) return false;
return true;
}

/// Cancel existing query
/// @return 0 on success.
/// 1 indicates error in connecting. (may try again)
/// 2 indicates error executing kill query. (do not try again)
/// -1 indicates NOP: No query in progress or query already interrupted.
int MySqlConnection::cancel() {
std::lock_guard<std::mutex> lock(_interruptMutex);
int rc;
if (_interrupted) {
// Should we log this?
return -1; // No further action needed.
}
_interrupted = true; // Prevent others from trying to interrupt
MYSQL* killMysql = _connectHelper();
if (!killMysql) {
_interrupted = false; // Didn't try
return 1;
// Handle broken connection
}
// KILL QUERY only, not KILL CONNECTION.
int threadId = mysql_thread_id(_mysql);
MySqlConnection::CancelStatus MySqlConnection::cancel() {
unsigned int const threadId = _threadId.load();
if (!(connected() && (0 != threadId))) return CancelStatus::CANCEL_NOP;
MYSQL* killMysql = ::doConnect(_config);
if (nullptr == killMysql) return CancelStatus::CANCEL_CONNECT_ERROR;
std::string const killSql = "KILL QUERY " + std::to_string(threadId);
rc = mysql_real_query(killMysql, killSql.c_str(), killSql.size());
int const rc = mysql_real_query(killMysql, killSql.c_str(), killSql.size());
mysql_close(killMysql);
if (rc) {
LOGS(_log, LOG_LVL_WARN,
"failed to kill MySQL thread: " << threadId << ", error: " << std::string(mysql_error(killMysql))
<< ", errno: " << std::to_string(mysql_errno(killMysql)));
return 2;
return CancelStatus::CANCEL_FAILED;
}
return 0;
return CancelStatus::CANCEL_SUCCESS;
}

bool MySqlConnection::selectDb(std::string const& dbName) {
if (!dbName.empty() && mysql_select_db(_mysql, dbName.c_str())) {
return false;
}
_sqlConfig->dbName = dbName;
return true;
MYSQL* MySqlConnection::getMySql() {
_throwIfNotConnected(__func__);
return _mysql;
}

MYSQL_RES* MySqlConnection::getResult() {
_throwIfNotConnected(__func__);
return _mysql_res;
}

void MySqlConnection::freeResult() {
std::lock_guard<std::mutex> lock(_mtx);
_throwIfNotInProcessingResult(__func__);
mysql_free_result(_mysql_res);
_mysql_res = nullptr;
}

int MySqlConnection::getResultFieldCount() {
std::lock_guard<std::mutex> lock(_mtx);
_throwIfNotInProcessingResult(__func__);
return mysql_field_count(_mysql);
}

std::vector<std::string> MySqlConnection::getColumnNames() const {
assert(_mysql);
assert(_mysql_res);
std::lock_guard<std::mutex> lock(_mtx);
_throwIfNotInProcessingResult(__func__);
std::vector<std::string> names;
if (0 != mysql_field_count(_mysql)) {
auto fields = mysql_fetch_fields(_mysql_res);
@@ -201,50 +195,54 @@ std::vector<std::string> MySqlConnection::getColumnNames() const {
return names;
}

////////////////////////////////////////////////////////////////////////
// MySqlConnection
// private:
////////////////////////////////////////////////////////////////////////

MYSQL* MySqlConnection::_connectHelper() {
// We must call mysql_library_init() exactly once before calling mysql_init
// because it is not thread safe. Both mysql_library_init and mysql_init
// call mysql_thread_init, and so we must arrange to call mysql_thread_end
// when the calling thread exists. We do this by allocating a thread
// local object that calls mysql_thread_end from its destructor.
static std::once_flag initialized;
static thread_local std::unique_ptr<MySqlThreadJanitor> janitor;
unsigned int MySqlConnection::getErrno() const {
_throwIfNotConnected(__func__);
return mysql_errno(_mysql);
}
const std::string MySqlConnection::getError() const {
_throwIfNotConnected(__func__);
return std::string(mysql_error(_mysql));
}

std::call_once(initialized, InitializeMysqlLibrary(janitor));
MYSQL* m = mysql_init(nullptr);
if (!m) {
return m;
}
if (!janitor.get()) {
janitor.reset(new MySqlThreadJanitor);
}
unsigned long clientFlag = CLIENT_MULTI_STATEMENTS;
mysql_options(m, MYSQL_OPT_LOCAL_INFILE, 0);
MYSQL* c =
mysql_real_connect(m, _sqlConfig->socket.empty() ? _sqlConfig->hostname.c_str() : 0,
_sqlConfig->username.empty() ? 0 : _sqlConfig->username.c_str(),
_sqlConfig->password.empty() ? 0 : _sqlConfig->password.c_str(),
_sqlConfig->dbName.empty() ? 0 : _sqlConfig->dbName.c_str(), _sqlConfig->port,
_sqlConfig->socket.empty() ? 0 : _sqlConfig->socket.c_str(), clientFlag);
if (!c) {
// Failed to connect: free resources.
mysql_close(m);
return c;
bool MySqlConnection::selectDb(std::string const& dbName) {
_throwIfNotConnected(__func__);
if (!dbName.empty() && (0 != mysql_select_db(_mysql, dbName.c_str()))) {
return false;
}
_threadId = mysql_thread_id(m);
return m;
_config->dbName = dbName;
return true;
}

std::string MySqlConnection::dump() {
std::ostringstream os;
os << "hostN=" << _sqlConfig->hostname << " sock=" << _sqlConfig->socket
<< " uname=" << _sqlConfig->username << " dbN=" << _sqlConfig->dbName << " port=" << _sqlConfig->port;
os << "hostN=" << _config->hostname << " sock=" << _config->socket << " uname=" << _config->username
<< " dbN=" << _config->dbName << " port=" << _config->port;
return os.str();
}

void MySqlConnection::_closeMySqlConnImpl(std::lock_guard<std::mutex> const& lock) {
if (nullptr != _mysql) {
mysql_close(_mysql);
_mysql = nullptr;
_threadId = 0;
if (nullptr != _mysql_res) {
mysql_free_result(_mysql_res);
_mysql_res = nullptr;
}
}
}

void MySqlConnection::_throwIfNotConnected(std::string const& func) const {
if (_mysql == nullptr) {
throw std::logic_error("MySqlConnection::" + func + " connection is not open.");
}
}

void MySqlConnection::_throwIfNotInProcessingResult(std::string const& func) const {
_throwIfNotConnected(func);
if (_mysql_res == nullptr) {
throw std::logic_error("MySqlConnection::" + func + " not in the result processing context.");
}
}

} // namespace lsst::qserv::mysql
148 changes: 101 additions & 47 deletions src/mysql/MySqlConnection.h
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@
#define LSST_QSERV_MYSQL_MYSQLCONNECTION_H

// System headers
#include <atomic>
#include <cassert>
#include <cstddef>
#include <memory>
@@ -36,81 +37,134 @@
#include <vector>

// Third-party headers
#include "boost/utility.hpp"
#include <mysql/mysql.h>

// Forward declarations
namespace lsst::qserv::mysql {

// Forward
class MySqlConfig;
} // namespace lsst::qserv::mysql

namespace lsst::qserv::mysql {

/// MySqlConnection is a thin wrapper around the MySQL C-API that partially
/// shields clients from the raw API, while still providing raw access for
/// clients that need it.
class MySqlConnection : boost::noncopyable {
class MySqlConnection {
public:
MySqlConnection();
MySqlConnection(MySqlConfig const& sqlConfig);
/// The completion status of the query cancelation operation.
enum CancelStatus {
CANCEL_SUCCESS = 0, ///< The operation was succesfull.
CANCEL_CONNECT_ERROR = 1, ///< Failed to establish a separate connection to MySQL.
CANCEL_FAILED = 2, ///< Failed to failure to kill the query.
CANCEL_NOP = -1 ///< Connection is not open.
};

/**
* Check if a MySQL connection could be established for the given configuration.
* @return 'true' if MySQL connection succeeded.
*/
static bool checkConnection(mysql::MySqlConfig const& config);

/**
* Construct the connector with specifid configuration.
* @param sqlConfig Parameters of the connection.
*/
explicit MySqlConnection(MySqlConfig const& config);

MySqlConnection() = delete;
MySqlConnection(MySqlConnection const&) = delete;
MySqlConnection& operator=(MySqlConnection const&) = delete;

/// Non-trivial destructor is needed to close the connection and release resources.
~MySqlConnection();

void closeMySqlConn();
MySqlConfig const& getConfig() const { return *_config; }

/// Close the current connection (if any) and open the new one.
/// @return 'true' if the operation was succesfull.
bool connect();

bool connected() const { return nullptr != _mysql; }

/// @note The identifier is set after making a connection, and it's reset
/// to 0 upon disconnects.
/// @return A thread identifier of the last succesfully established connection.
unsigned long threadId() const { return _threadId.load(); }

/// Close the current connection (if any).
void closeMySqlConn();

/**
* Check MySQL connection for a given configuration
*
* @return: true if MySQL connection succeeded, else false
* Execute a query.
* @param query The query to be executed.
* @return 'true' if the operation was successfull.
*/
static bool checkConnection(mysql::MySqlConfig const& mysqlconfig);
bool queryUnbuffered(std::string const& query);

bool connected() const { return _isConnected; }
unsigned long threadId() const { return _threadId; }
/**
* Cancel existing query (if any).
* @note The method will only attempt to cancel the ognoing query (if any).
* The connection (if any) will be left intact, and it could be used for
* submitting other queries.
* @return CancelStatus The completion status of the operation.
*/
CancelStatus cancel();

// instance destruction invalidates this return value
MYSQL* getMySql() { return _mysql; }
MySqlConfig const& getMySqlConfig() const { return *_sqlConfig; }
// The following methods require a valid connection.
// Otherwise std::logic_error will be thrown.

bool queryUnbuffered(std::string const& query);
int cancel();

MYSQL_RES* getResult() { return _mysql_res; }
void freeResult() {
mysql_free_result(_mysql_res);
_mysql_res = nullptr;
}
int getResultFieldCount() {
assert(_mysql);
return mysql_field_count(_mysql);
}
std::vector<std::string> getColumnNames() const;
unsigned int getErrno() const {
assert(_mysql);
return mysql_errno(_mysql);
}
const std::string getError() const {
assert(_mysql);
return std::string(mysql_error(_mysql));
}
MySqlConfig const& getConfig() const { return *_sqlConfig; }
MYSQL* getMySql();
unsigned int getErrno() const;
const std::string getError() const;
bool selectDb(std::string const& dbName);

/**
* The method requires a valid connection.
* @return A pointer to the result descriptor. The method returns nullptr
* if the last query failed, if no query submitted after establishing a connection,
* or if the result setof the last query was explicitly cleared by calling freeResult().
* @throws std::logic_error if the connection is not open.
*/
MYSQL_RES* getResult();

// The following methods require must be called within the query procesing
// context (assuming the connection is open and the last query succeeded).
// Otherwise std::logic_error will be thrown.

void freeResult();
int getResultFieldCount();
std::vector<std::string> getColumnNames() const;

/// @return a string suitable for logging.
std::string dump();

private:
MYSQL* _connectHelper();
static std::mutex _mysqlShared;
static bool _mysqlReady;
/// Close the current connection (if open).
/// @param lock An exclusive lock on the _mtx must be acquired before calling the method.
void _closeMySqlConnImpl(std::lock_guard<std::mutex> const& lock);

/// Ensure a connection is establushed.
/// @param func A context the method was called from (for error reporting).
/// @throw std::logic_error If not in the desired state.
void _throwIfNotConnected(std::string const& func) const;

/// Ensure the object in the result processing state (a connection is established and
/// the last submitted query succeeded).
/// @param func A context the method was called from (for error reporting).
/// @throw std::logic_error If not in the desired state.
void _throwIfNotInProcessingResult(std::string const& func) const;

std::shared_ptr<MySqlConfig> _config; ///< Input parameters of the connections.

mutable std::mutex _mtx; ///< Guards state transitions.

// The current state of the connection. Values of the data members
// are modified after establishing a connection, query completion, or
// uppon disconnects.

MYSQL* _mysql;
MYSQL_RES* _mysql_res;
bool _isConnected;
unsigned long _threadId = 0; ///< 0 if not connected
std::shared_ptr<MySqlConfig> _sqlConfig;
bool _isExecuting; ///< true during mysql_real_query and mysql_use_result
bool _interrupted; ///< true if cancellation requested
std::mutex _interruptMutex;
std::atomic<unsigned long> _threadId{0};
};

} // namespace lsst::qserv::mysql

0 comments on commit c423e02

Please sign in to comment.