Skip to content

Commit

Permalink
Added an http module for handling worker monitoring requests
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Nov 10, 2023
1 parent 16d11b9 commit c7d2cea
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/xrdsvc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_dependencies(qserv_xrdsvc proto)

target_sources(qserv_xrdsvc PRIVATE
ChannelStream.cc
HttpMonitorModule.cc
HttpSvc.cc
SsiProvider.cc
SsiRequest.cc
Expand Down
147 changes: 147 additions & 0 deletions src/xrdsvc/HttpMonitorModule.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "xrdsvc/HttpMonitorModule.h"

// System headers
#include <set>
#include <stdexcept>

// Qserv headers
#include "http/Exceptions.h"
#include "http/RequestQuery.h"
#include "mysql/MySqlUtils.h"
#include "qhttp/Request.h"
#include "qhttp/Response.h"
#include "wbase/FileChannelShared.h"
#include "wbase/TaskState.h"
#include "wconfig/WorkerConfig.h"
#include "wcontrol/Foreman.h"
#include "wcontrol/ResourceMonitor.h"
#include "wpublish/QueriesAndChunks.h"

using namespace std;
using json = nlohmann::json;

namespace {
string const noAdminAuthKey;
} // namespace

namespace lsst::qserv::xrdsvc {

void HttpMonitorModule::process(string const& context, shared_ptr<wcontrol::Foreman> const& foreman,
shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp, string const& subModuleName,
http::AuthType const authType) {
HttpMonitorModule module(context, foreman, req, resp);
module.execute(subModuleName, authType);
}

HttpMonitorModule::HttpMonitorModule(string const& context, shared_ptr<wcontrol::Foreman> const& foreman,
shared_ptr<qhttp::Request> const& req,
shared_ptr<qhttp::Response> const& resp)
: http::ModuleBase(wconfig::WorkerConfig::instance()->replicationAuthKey(), ::noAdminAuthKey, req,
resp),
_context(context),
_foreman(foreman) {}

json HttpMonitorModule::executeImpl(string const& subModuleName) {
string const func = string(__func__) + "[sub-module='" + subModuleName + "']";
debug(func);
enforceInstanceId(func, wconfig::WorkerConfig::instance()->replicationInstanceId());
if (subModuleName == "CONFIG")
return _config();
else if (subModuleName == "MYSQL")
return _mysql();
else if (subModuleName == "STATUS")
return _status();
throw invalid_argument(context() + func + " unsupported sub-module");
}

string HttpMonitorModule::context() const { return _context; }

json HttpMonitorModule::_config() {
debug(__func__);
return wconfig::WorkerConfig::instance()->toJson();
}

json HttpMonitorModule::_mysql() {
debug(__func__);
json result;
try {
bool const full = true;
result = mysql::MySqlUtils::processList(wconfig::WorkerConfig::instance()->getMySqlConfig(), full);
} catch (mysql::MySqlQueryError const& ex) {
error(__func__, ex.what());
throw http::Error(__func__, ex.what());
}

// Amend the result with a map linking MySQL thread identifiers to the corresponding
// tasks that are being (or have been) processed by the worker. Note that only a subset
// of tasks is selected for the known MySQL threads. This prevents the monitoring
// system from pulling old tasks that may still keep records of the closed threads.
set<unsigned long> activeMySqlThreadIds;
for (auto const& row : result["queries"]["rows"]) {
// The thread identifier is stored as a string at the very first element
// of the array. See mysql::MySqlUtils::processList for details.
activeMySqlThreadIds.insert(stoul(row[0].get<string>()));
}
result["mysql_thread_to_task"] = _foreman->queriesAndChunks()->mySqlThread2task(activeMySqlThreadIds);
return result;
}

json HttpMonitorModule::_status() {
debug(__func__);
wbase::TaskSelector const taskSelector = _translateTaskSelector(__func__);
json result;
result["processor"] = _foreman->statusToJson(taskSelector);
result["resources"] = _foreman->resourceMonitor()->statusToJson();
result["filesystem"] = wbase::FileChannelShared::statusToJson();
return result;
}

wbase::TaskSelector HttpMonitorModule::_translateTaskSelector(string const& func) const {
wbase::TaskSelector selector;
selector.includeTasks = query().optionalUInt("include_tasks", 0) != 0;
selector.queryIds = query().optionalVectorUInt64("query_ids");
string const taskStatesParam = "task_states";
for (auto&& str : query().optionalVectorStr(taskStatesParam)) {
try {
auto const state = wbase::str2taskState(str);
selector.taskStates.push_back(state);
debug(func, "str='" + str + "', task state=" + wbase::taskState2str(state));
} catch (exception const& ex) {
string const msg =
"failed to parse query parameter '" + taskStatesParam + "', ex: " + string(ex.what());
error(func, msg);
throw invalid_argument(msg);
}
}
selector.maxTasks = query().optionalUInt("max_tasks", 0);
debug(func, "include_tasks=" + string(selector.includeTasks ? "1" : "0"));
debug(func, "queryIds.size()=" + to_string(selector.queryIds.size()));
debug(func, "taskStates.size()=" + to_string(selector.taskStates.size()));
debug(func, "max_tasks=" + to_string(selector.maxTasks));
return selector;
}

} // namespace lsst::qserv::xrdsvc
108 changes: 108 additions & 0 deletions src/xrdsvc/HttpMonitorModule.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_XRDSVC_HTTPMONITORMODULE_H
#define LSST_QSERV_XRDSVC_HTTPMONITORMODULE_H

// System headers
#include <memory>
#include <string>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "http/ModuleBase.h"

namespace lsst::qserv::qhttp {
class Request;
class Response;
} // namespace lsst::qserv::qhttp

// Forward declarations
namespace lsst::qserv::wbase {
struct TaskSelector;
} // namespace lsst::qserv::wbase

namespace lsst::qserv::wcontrol {
class Foreman;
} // namespace lsst::qserv::wcontrol

// This header declarations
namespace lsst::qserv::xrdsvc {

/**
* Class HttpMonitorModule implements a handler for reporting various run-time monitoring
* metrics and statistics collected from an instance of a Qserv worker.
*/
class HttpMonitorModule : public http::ModuleBase {
public:
/**
* @note supported values for parameter 'subModuleName' are:
* 'CONFIG' - get configuration parameters
* 'MYSQL' - get the status (running queries) of the worker's MySQL service
* 'STATUS' - get the status info (tasks, schedulers, etc.)
*
* @throws std::invalid_argument for unknown values of parameter 'subModuleName'
*/
static void process(std::string const& context, std::shared_ptr<wcontrol::Foreman> const& foreman,
std::shared_ptr<qhttp::Request> const& req,
std::shared_ptr<qhttp::Response> const& resp, std::string const& subModuleName,
http::AuthType const authType = http::AuthType::NONE);

HttpMonitorModule() = delete;
HttpMonitorModule(HttpMonitorModule const&) = delete;
HttpMonitorModule& operator=(HttpMonitorModule const&) = delete;

~HttpMonitorModule() final = default;

protected:
virtual nlohmann::json executeImpl(std::string const& subModuleName) final;
virtual std::string context() const final;

private:
HttpMonitorModule(std::string const& context, std::shared_ptr<wcontrol::Foreman> const& foreman,
std::shared_ptr<qhttp::Request> const& req,
std::shared_ptr<qhttp::Response> const& resp);

/// @return Configuration parameters.
nlohmann::json _config();

/// @return The status (running queries) of the worker's MySQL service.
nlohmann::json _mysql();

/// @return The worker status info (tasks, schedulers, etc.).
nlohmann::json _status();

/**
* Extract and parse values of the worker task selector from the request's query.
* @param func The calling context (for error reporting).
* @return wbase::TaskSelector The translated selector.
* @throws std::invalid_argument For not well formed request query or unsupported values in it.
*/
wbase::TaskSelector _translateTaskSelector(std::string const& func) const;

std::string const _context;
std::shared_ptr<wcontrol::Foreman> const _foreman;
};

} // namespace lsst::qserv::xrdsvc

#endif // LSST_QSERV_XRDSVC_HTTPMONITORMODULE_H
45 changes: 30 additions & 15 deletions src/xrdsvc/HttpSvc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@

// Qserv headers
#include "http/MetaModule.h"
#include "qhttp/Request.h"
#include "qhttp/Response.h"
#include "qhttp/Server.h"
#include "qhttp/Status.h"
#include "wconfig/WorkerConfig.h"
#include "xrdsvc/HttpMonitorModule.h"

// LSST headers
#include "lsst/log/Log.h"
Expand All @@ -48,11 +46,13 @@ string const serviceName = "WORKER-MANAGEMENT";

namespace lsst::qserv::xrdsvc {

shared_ptr<HttpSvc> HttpSvc::create(uint16_t port, unsigned int numThreads) {
return shared_ptr<HttpSvc>(new HttpSvc(port, numThreads));
shared_ptr<HttpSvc> HttpSvc::create(shared_ptr<wcontrol::Foreman> const& foreman, uint16_t port,
unsigned int numThreads) {
return shared_ptr<HttpSvc>(new HttpSvc(foreman, port, numThreads));
}

HttpSvc::HttpSvc(uint16_t port, unsigned int numThreads) : _port(port), _numThreads(numThreads) {}
HttpSvc::HttpSvc(shared_ptr<wcontrol::Foreman> const& foreman, uint16_t port, unsigned int numThreads)
: _foreman(foreman), _port(port), _numThreads(numThreads) {}

uint16_t HttpSvc::start() {
string const context = "xrdsvc::HttpSvc::" + string(__func__) + " ";
Expand All @@ -67,15 +67,30 @@ uint16_t HttpSvc::start() {
// Make sure the handlers are registered and the server is started before
// launching any BOOST ASIO threads. This will prevent threads from finishing
// due to a lack of work to be done.
_httpServerPtr->addHandlers({{"GET", "/meta/version",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
auto const workerConfig = wconfig::WorkerConfig::instance();
string const noAdminAuthKey;
http::MetaModule::process(::serviceName,
workerConfig->replicationInstanceId(),
workerConfig->replicationAuthKey(),
noAdminAuthKey, req, resp, "VERSION");
}}});
_httpServerPtr->addHandlers(
{{"GET", "/meta/version",
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
auto const workerConfig = wconfig::WorkerConfig::instance();
string const noAdminAuthKey;
http::MetaModule::process(::serviceName, workerConfig->replicationInstanceId(),
workerConfig->replicationAuthKey(), noAdminAuthKey, req, resp,
"VERSION");
}}});
_httpServerPtr->addHandlers(
{{"GET", "/config",
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
HttpMonitorModule::process(::serviceName, self->_foreman, req, resp, "CONFIG");
}}});
_httpServerPtr->addHandlers(
{{"GET", "/mysql",
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
HttpMonitorModule::process(::serviceName, self->_foreman, req, resp, "MYSQL");
}}});
_httpServerPtr->addHandlers(
{{"GET", "/status",
[self](shared_ptr<qhttp::Request> const& req, shared_ptr<qhttp::Response> const& resp) {
HttpMonitorModule::process(::serviceName, self->_foreman, req, resp, "STATUS");
}}});
_httpServerPtr->start();

// Initialize the I/O context and start the service threads. At this point
Expand Down
12 changes: 10 additions & 2 deletions src/xrdsvc/HttpSvc.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ namespace lsst::qserv::qhttp {
class Server;
} // namespace lsst::qserv::qhttp

namespace lsst::qserv::wcontrol {
class Foreman;
} // namespace lsst::qserv::wcontrol

// This header declarations
namespace lsst::qserv::xrdsvc {

Expand Down Expand Up @@ -72,7 +76,8 @@ class HttpSvc : public std::enable_shared_from_this<HttpSvc> {
* @param numThreads The number of BOOST ASIO threads.
* @return The shared pointer to the running server.
*/
static std::shared_ptr<HttpSvc> create(uint16_t port, unsigned int numThreads);
static std::shared_ptr<HttpSvc> create(std::shared_ptr<wcontrol::Foreman> const& foreman, uint16_t port,
unsigned int numThreads);

HttpSvc() = delete;
HttpSvc(HttpSvc const&) = delete;
Expand Down Expand Up @@ -108,9 +113,12 @@ class HttpSvc : public std::enable_shared_from_this<HttpSvc> {
* @param port The number of a port to bind to.
* @param numThreads The number of BOOST ASIO threads.
*/
HttpSvc(uint16_t port, unsigned int numThreads);
HttpSvc(std::shared_ptr<wcontrol::Foreman> const& foreman, uint16_t port, unsigned int numThreads);

// Input parameters

std::shared_ptr<wcontrol::Foreman> const _foreman;

uint16_t const _port; ///< The input port number (could be 0 to allow autoallocation).
unsigned int const _numThreads; ///< The number of the BOOST ASIO service threads.

Expand Down
4 changes: 2 additions & 2 deletions src/xrdsvc/SsiService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ SsiService::SsiService(XrdSsiLogger* log)
// Start the control server for processing worker management requests sent
// by the Replication System. Update the port number in the configuration
// in case if the server is run on the dynamically allocated port.
_controlHttpSvc =
HttpSvc::create(workerConfig->replicationHttpPort(), workerConfig->replicationNumHttpThreads());
_controlHttpSvc = HttpSvc::create(_foreman, workerConfig->replicationHttpPort(),
workerConfig->replicationNumHttpThreads());
auto const port = _controlHttpSvc->start();
workerConfig->setReplicationHttpPort(port);

Expand Down

0 comments on commit c7d2cea

Please sign in to comment.