diff --git a/src/xrdsvc/CMakeLists.txt b/src/xrdsvc/CMakeLists.txt index b6f1bff6c..a11764703 100644 --- a/src/xrdsvc/CMakeLists.txt +++ b/src/xrdsvc/CMakeLists.txt @@ -3,6 +3,7 @@ add_dependencies(qserv_xrdsvc proto) target_sources(qserv_xrdsvc PRIVATE ChannelStream.cc + HttpSvc.cc SsiProvider.cc SsiRequest.cc SsiService.cc diff --git a/src/xrdsvc/HttpSvc.cc b/src/xrdsvc/HttpSvc.cc new file mode 100644 index 000000000..023ed418b --- /dev/null +++ b/src/xrdsvc/HttpSvc.cc @@ -0,0 +1,110 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "xrdsvc/HttpSvc.h" + +// System headers +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "qhttp/Request.h" +#include "qhttp/Response.h" +#include "qhttp/Server.h" +#include "qhttp/Status.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace nlohmann; +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.xrdsvc.HttpSvc"); + +} // namespace + +namespace lsst::qserv::xrdsvc { + +shared_ptr HttpSvc::create(uint16_t port, unsigned int numThreads) { + return shared_ptr(new HttpSvc(port, numThreads)); +} + +HttpSvc::HttpSvc(uint16_t port, unsigned int numThreads) : _port(port), _numThreads(numThreads) {} + +uint16_t HttpSvc::start() { + string const context = "xrdsvc::HttpSvc::" + string(__func__) + " "; + lock_guard const lock(_mtx); + if (_httpServerPtr != nullptr) { + throw logic_error(context + "the service is already running."); + } + _httpServerPtr = qhttp::Server::create(_io_service, _port); + + auto const self = shared_from_this(); + + // Make sure the handlers are registered and the server is started before + // launching any BOOST ASIO threads. This will prevent threads from finishing + // due to a lack of work to be done. + _httpServerPtr->addHandlers({{"GET", "/meta/version", + [self](qhttp::Request::Ptr const& req, qhttp::Response::Ptr const& resp) { + json result = json::object({{"success", 1}, + {"error", string()}, + {"error_ext", json::object()}, + {"warning", string()}, + {"version", 1}}); + resp->send(result.dump(), "application/json"); + }}}); + _httpServerPtr->start(); + + // Initialize the I/O context and start the service threads. At this point + // the server will be ready to service incoming requests. + for (unsigned int i = 0; i < _numThreads; ++i) { + _threads.push_back(make_unique([self]() { self->_io_service.run(); })); + } + auto const actualPort = _httpServerPtr->getPort(); + LOGS(_log, LOG_LVL_INFO, context + "started on port " + to_string(actualPort)); + return actualPort; +} + +void HttpSvc::stop() { + string const context = "xrdsvc::HttpSvc::" + string(__func__) + " "; + lock_guard const lock(_mtx); + if (_httpServerPtr == nullptr) { + throw logic_error(context + "the service is not running."); + } + + // Stopping the server and resetting the I/O context will abort the ongoing + // requests and unblock the service threads. + _httpServerPtr->stop(); + _httpServerPtr = nullptr; + _io_service.reset(); + for (auto&& t : _threads) { + t->join(); + } + _threads.clear(); + LOGS(_log, LOG_LVL_INFO, context + "stopped"); +} + +} // namespace lsst::qserv::xrdsvc diff --git a/src/xrdsvc/HttpSvc.h b/src/xrdsvc/HttpSvc.h new file mode 100644 index 000000000..dfe3da47a --- /dev/null +++ b/src/xrdsvc/HttpSvc.h @@ -0,0 +1,132 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_XRDSVC_HTTPSVC_H +#define LSST_QSERV_XRDSVC_HTTPSVC_H + +// System headers +#include +#include +#include +#include +#include + +// Third party headers +#include "boost/asio.hpp" + +namespace lsst::qserv::qhttp { +class Server; +} // namespace lsst::qserv::qhttp + +// This header declarations +namespace lsst::qserv::xrdsvc { + +/** + * Class HttpSvc is the HTTP server for processing worker management requests. + * + * The server creates and manages its own collection of BOOST ASIO service threads. + * The number of threads is specified via the corresponding parameter of the class's + * constructor. + * + * Typical usage of the class: + * @code + * // Create the server. Note, it won't run yet until explicitly started. + * uint16_t const port = 0; // The port will be dynamically allocated at start + * unsigned int const numThreads = 2; // The number of BOOST ASIO threads + * auto const svc = xrdsvc::HttpSvc::create(port, numThreads); + * + * // Start the server and get the actual port number. + * uint16_t const actualPort = svc->start(); + * std::cout << "HTTP server is running on port " << actualPort << std::endl; + * + * // Stop the server to release resources. + * svc->stop(); + * svc.reset(); + * @code + * @note The class implementation is thread safe. + */ +class HttpSvc : public std::enable_shared_from_this { +public: + /** + * The factory will not initialize ASIO context and threads, or start + * the server. This has to be done by calling method HttpSvc::start() + * + * @param port The number of a port to bind to. + * @param numThreads The number of BOOST ASIO threads. + * @return The shared pointer to the running server. + */ + static std::shared_ptr create(uint16_t port, unsigned int numThreads); + + HttpSvc() = delete; + HttpSvc(HttpSvc const&) = delete; + HttpSvc& operator=(HttpSvc const&) = delete; + + ~HttpSvc() = default; + + /** + * Initialize ASIO context and threads, and start the server. + * + * @note Once the server is started it has to be explicitly stopped + * using the counterpart method stop() to allow releasing allocated + * resources and letting the destructor to be executed. Note that + * service threads started by the curent method and the HTTP server + * incerement the reference counter on the shared pointer that is + * returned by the class's factory method. + * + * @return The actual port number on which the server is run. + * @throws std::logic_error If the server is already running. + */ + uint16_t start(); + + /** + * Stop the server and threads, and release the relevant resources. + * @throws std::logic_error If the server is not running. + */ + void stop(); + +private: + /** + * The constructor will not initialize ASIO context and threads, or start + * the server. This has to be done by calling method HttpSvc::start() + * @param port The number of a port to bind to. + * @param numThreads The number of BOOST ASIO threads. + */ + HttpSvc(uint16_t port, unsigned int numThreads); + + // Input parameters + uint16_t const _port; ///< The input port number (could be 0 to allow autoallocation). + unsigned int const _numThreads; ///< The number of the BOOST ASIO service threads. + + /// This mutex protects the object state. + mutable std::mutex _mtx; + + /// Worker management requests are processed by this server. + std::shared_ptr _httpServerPtr; + + /// The BOOST ASIO I/O services. + boost::asio::io_service _io_service; + + /// The thread pool for running ASIO services. + std::vector> _threads; +}; + +} // namespace lsst::qserv::xrdsvc + +#endif // LSST_QSERV_XRDSVC_HTTPSVC_H diff --git a/src/xrdsvc/SsiService.cc b/src/xrdsvc/SsiService.cc index 30e194433..944f2c7c5 100644 --- a/src/xrdsvc/SsiService.cc +++ b/src/xrdsvc/SsiService.cc @@ -48,6 +48,7 @@ #include "memman/MemMan.h" #include "memman/MemManNone.h" #include "mysql/MySqlConnection.h" +#include "qhttp/Server.h" #include "sql/SqlConnection.h" #include "sql/SqlConnectionFactory.h" #include "util/common.h" @@ -65,6 +66,7 @@ #include "wsched/FifoScheduler.h" #include "wsched/GroupScheduler.h" #include "wsched/ScanScheduler.h" +#include "xrdsvc/HttpSvc.h" #include "xrdsvc/SsiRequest.h" #include "xrdsvc/XrdName.h" @@ -240,13 +242,24 @@ SsiService::SsiService(XrdSsiLogger* log) } } + // Start the control server for processing worker management requests sent + // by the Replication System. Update the port number in the configuration + // in case if the server is run on the dynamically allocated port. + _controlHttpSvc = + HttpSvc::create(workerConfig->replicationHttpPort(), workerConfig->replicationNumHttpThreads()); + auto const port = _controlHttpSvc->start(); + workerConfig->setReplicationHttpPort(port); + // Begin periodically updating worker's status in the Replication System's registry // in the detached thread. This will continue before the application gets terminated. thread registryUpdateThread(::registryUpdateLoop, _chunkInventory->id()); registryUpdateThread.detach(); } -SsiService::~SsiService() { LOGS(_log, LOG_LVL_DEBUG, "SsiService dying."); } +SsiService::~SsiService() { + LOGS(_log, LOG_LVL_DEBUG, "SsiService dying."); + _controlHttpSvc->stop(); +} void SsiService::ProcessRequest(XrdSsiRequest& reqRef, XrdSsiResource& resRef) { LOGS(_log, LOG_LVL_DEBUG, "Got request call where rName is: " << resRef.rName); diff --git a/src/xrdsvc/SsiService.h b/src/xrdsvc/SsiService.h index 8b1568f0d..0fc9f7dd3 100644 --- a/src/xrdsvc/SsiService.h +++ b/src/xrdsvc/SsiService.h @@ -37,17 +37,21 @@ // Forward declarations class XrdSsiLogger; -namespace lsst::qserv { -namespace util { +namespace lsst::qserv::util { class FileMonitor; -} -namespace wcontrol { +} // namespace lsst::qserv::util + +namespace lsst::qserv::wcontrol { class Foreman; -} // namespace wcontrol -namespace wpublish { +} // namespace lsst::qserv::wcontrol + +namespace lsst::qserv::wpublish { class ChunkInventory; -} -} // namespace lsst::qserv +} // namespace lsst::qserv::wpublish + +namespace lsst::qserv::xrdsvc { +class HttpSvc; +} // namespace lsst::qserv::xrdsvc namespace lsst::qserv::xrdsvc { @@ -81,7 +85,9 @@ class SsiService : public XrdSsiService { /// Reloads the log configuration file on log config file change. std::shared_ptr _logFileMonitor; -}; // class SsiService + /// The HTTP server processing worker management requests. + std::shared_ptr _controlHttpSvc; +}; } // namespace lsst::qserv::xrdsvc