Skip to content

Commit

Permalink
Implemented and planted the qhttp-based server into Qserv worker
Browse files Browse the repository at this point in the history
The server is meant to process management and monitoring requests
sent by the Replication Controller and relevant applications.
  • Loading branch information
iagaponenko committed Nov 8, 2023
1 parent bf0319c commit e159dd4
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 10 deletions.
1 change: 1 addition & 0 deletions src/xrdsvc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_dependencies(qserv_xrdsvc proto)

target_sources(qserv_xrdsvc PRIVATE
ChannelStream.cc
HttpSvc.cc
SsiProvider.cc
SsiRequest.cc
SsiService.cc
Expand Down
110 changes: 110 additions & 0 deletions src/xrdsvc/HttpSvc.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "xrdsvc/HttpSvc.h"

// System headers
#include <stdexcept>

// Third party headers
#include "nlohmann/json.hpp"

// Qserv headers
#include "qhttp/Request.h"
#include "qhttp/Response.h"
#include "qhttp/Server.h"
#include "qhttp/Status.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace nlohmann;
using namespace std;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdsvc.HttpSvc");

} // namespace

namespace lsst::qserv::xrdsvc {

shared_ptr<HttpSvc> HttpSvc::create(uint16_t port, unsigned int numThreads) {
return shared_ptr<HttpSvc>(new HttpSvc(port, numThreads));
}

HttpSvc::HttpSvc(uint16_t port, unsigned int numThreads) : _port(port), _numThreads(numThreads) {}

uint16_t HttpSvc::start() {
string const context = "xrdsvc::HttpSvc::" + string(__func__) + " ";
lock_guard<mutex> const lock(_mtx);
if (_httpServerPtr != nullptr) {
throw logic_error(context + "the service is already running.");
}
_httpServerPtr = qhttp::Server::create(_io_service, _port);

auto const self = shared_from_this();

// Make sure the handlers are registered and the server is started before
// launching any BOOST ASIO threads. This will prevent threads from finishing
// due to a lack of work to be done.
_httpServerPtr->addHandlers({{"GET", "/meta/version",
[self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) {
json result = json::object({{"success", 1},
{"error", string()},
{"error_ext", json::object()},
{"warning", string()},
{"version", 1}});
resp->send(result.dump(), "application/json");
}}});
_httpServerPtr->start();

// Initialize the I/O context and start the service threads. At this point
// the server will be ready to service incoming requests.
for (unsigned int i = 0; i < _numThreads; ++i) {
_threads.push_back(make_unique<thread>([self]() { self->_io_service.run(); }));
}
auto const actualPort = _httpServerPtr->getPort();
LOGS(_log, LOG_LVL_INFO, context + "started on port " + to_string(actualPort));
return actualPort;
}

void HttpSvc::stop() {
string const context = "xrdsvc::HttpSvc::" + string(__func__) + " ";
lock_guard<mutex> const lock(_mtx);
if (_httpServerPtr == nullptr) {
throw logic_error(context + "the service is not running.");
}

// Stopping the server and resetting the I/O context will abort the ongoing
// requests and unblock the service threads.
_httpServerPtr->stop();
_httpServerPtr = nullptr;
_io_service.reset();
for (auto&& t : _threads) {
t->join();
}
_threads.clear();
LOGS(_log, LOG_LVL_INFO, context + "stopped");
}

} // namespace lsst::qserv::xrdsvc
132 changes: 132 additions & 0 deletions src/xrdsvc/HttpSvc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_XRDSVC_HTTPSVC_H
#define LSST_QSERV_XRDSVC_HTTPSVC_H

// System headers
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

// Third party headers
#include "boost/asio.hpp"

namespace lsst::qserv::qhttp {
class Server;
} // namespace lsst::qserv::qhttp

// This header declarations
namespace lsst::qserv::xrdsvc {

/**
* Class HttpSvc is the HTTP server for processing worker management requests.
*
* The server creates and manages its own collection of BOOST ASIO service threads.
* The number of threads is specified via the corresponding parameter of the class's
* constructor.
*
* Typical usage of the class:
* @code
* // Create the server. Note, it won't run yet until explicitly started.
* uint16_t const port = 0; // The port will be dynamically allocated at start
* unsigned int const numThreads = 2; // The number of BOOST ASIO threads
* auto const svc = xrdsvc::HttpSvc::create(port, numThreads);
*
* // Start the server and get the actual port number.
* uint16_t const actualPort = svc->start();
* std::cout << "HTTP server is running on port " << actualPort << std::endl;
*
* // Stop the server to release resources.
* svc->stop();
* svc.reset();
* @code
* @note The class implementation is thread safe.
*/
class HttpSvc : public std::enable_shared_from_this<HttpSvc> {
public:
/**
* The factory will not initialize ASIO context and threads, or start
* the server. This has to be done by calling method HttpSvc::start()
*
* @param port The number of a port to bind to.
* @param numThreads The number of BOOST ASIO threads.
* @return The shared pointer to the running server.
*/
static std::shared_ptr<HttpSvc> create(uint16_t port, unsigned int numThreads);

HttpSvc() = delete;
HttpSvc(HttpSvc const&) = delete;
HttpSvc& operator=(HttpSvc const&) = delete;

~HttpSvc() = default;

/**
* Initialize ASIO context and threads, and start the server.
*
* @note Once the server is started it has to be explicitly stopped
* using the counterpart method stop() to allow releasing allocated
* resources and letting the destructor to be executed. Note that
* service threads started by the curent method and the HTTP server
* incerement the reference counter on the shared pointer that is
* returned by the class's factory method.
*
* @return The actual port number on which the server is run.
* @throws std::logic_error If the server is already running.
*/
uint16_t start();

/**
* Stop the server and threads, and release the relevant resources.
* @throws std::logic_error If the server is not running.
*/
void stop();

private:
/**
* The constructor will not initialize ASIO context and threads, or start
* the server. This has to be done by calling method HttpSvc::start()
* @param port The number of a port to bind to.
* @param numThreads The number of BOOST ASIO threads.
*/
HttpSvc(uint16_t port, unsigned int numThreads);

// Input parameters
uint16_t const _port; ///< The input port number (could be 0 to allow autoallocation).
unsigned int const _numThreads; ///< The number of the BOOST ASIO service threads.

/// This mutex protects the object state.
mutable std::mutex _mtx;

/// Worker management requests are processed by this server.
std::shared_ptr<qhttp::Server> _httpServerPtr;

/// The BOOST ASIO I/O services.
boost::asio::io_service _io_service;

/// The thread pool for running ASIO services.
std::vector<std::unique_ptr<std::thread>> _threads;
};

} // namespace lsst::qserv::xrdsvc

#endif // LSST_QSERV_XRDSVC_HTTPSVC_H
15 changes: 14 additions & 1 deletion src/xrdsvc/SsiService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "memman/MemMan.h"
#include "memman/MemManNone.h"
#include "mysql/MySqlConnection.h"
#include "qhttp/Server.h"
#include "sql/SqlConnection.h"
#include "sql/SqlConnectionFactory.h"
#include "util/common.h"
Expand All @@ -65,6 +66,7 @@
#include "wsched/FifoScheduler.h"
#include "wsched/GroupScheduler.h"
#include "wsched/ScanScheduler.h"
#include "xrdsvc/HttpSvc.h"
#include "xrdsvc/SsiRequest.h"
#include "xrdsvc/XrdName.h"

Expand Down Expand Up @@ -240,13 +242,24 @@ SsiService::SsiService(XrdSsiLogger* log)
}
}

// Start the control server for processing worker management requests sent
// by the Replication System. Update the port number in the configuration
// in case if the server is run on the dynamically allocated port.
_controlHttpSvc =
HttpSvc::create(workerConfig->replicationHttpPort(), workerConfig->replicationNumHttpThreads());
auto const port = _controlHttpSvc->start();
workerConfig->setReplicationHttpPort(port);

// Begin periodically updating worker's status in the Replication System's registry
// in the detached thread. This will continue before the application gets terminated.
thread registryUpdateThread(::registryUpdateLoop, _chunkInventory->id());
registryUpdateThread.detach();
}

SsiService::~SsiService() { LOGS(_log, LOG_LVL_DEBUG, "SsiService dying."); }
SsiService::~SsiService() {
LOGS(_log, LOG_LVL_DEBUG, "SsiService dying.");
_controlHttpSvc->stop();
}

void SsiService::ProcessRequest(XrdSsiRequest& reqRef, XrdSsiResource& resRef) {
LOGS(_log, LOG_LVL_DEBUG, "Got request call where rName is: " << resRef.rName);
Expand Down
24 changes: 15 additions & 9 deletions src/xrdsvc/SsiService.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,21 @@
// Forward declarations
class XrdSsiLogger;

namespace lsst::qserv {
namespace util {
namespace lsst::qserv::util {
class FileMonitor;
}
namespace wcontrol {
} // namespace lsst::qserv::util

namespace lsst::qserv::wcontrol {
class Foreman;
} // namespace wcontrol
namespace wpublish {
} // namespace lsst::qserv::wcontrol

namespace lsst::qserv::wpublish {
class ChunkInventory;
}
} // namespace lsst::qserv
} // namespace lsst::qserv::wpublish

namespace lsst::qserv::xrdsvc {
class HttpSvc;
} // namespace lsst::qserv::xrdsvc

namespace lsst::qserv::xrdsvc {

Expand Down Expand Up @@ -81,7 +85,9 @@ class SsiService : public XrdSsiService {
/// Reloads the log configuration file on log config file change.
std::shared_ptr<util::FileMonitor> _logFileMonitor;

}; // class SsiService
/// The HTTP server processing worker management requests.
std::shared_ptr<HttpSvc> _controlHttpSvc;
};

} // namespace lsst::qserv::xrdsvc

Expand Down

0 comments on commit e159dd4

Please sign in to comment.