From 2050406f7eca93fe025d7aa5a943d1aa4a64e96b Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Sat, 11 Nov 2023 03:58:22 +0000 Subject: [PATCH] Extended Replication system's framework The new design of the Qserv managememt control plain now relies upon the HTTP protocol to communicate with workers. --- src/replica/CMakeLists.txt | 5 + src/replica/GetConfigQservHttpMgtRequest.cc | 60 +++ src/replica/GetConfigQservHttpMgtRequest.h | 86 ++++ src/replica/GetDbStatusQservHttpMgtRequest.cc | 60 +++ src/replica/GetDbStatusQservHttpMgtRequest.h | 86 ++++ src/replica/GetStatusQservHttpMgtRequest.cc | 63 +++ src/replica/GetStatusQservHttpMgtRequest.h | 91 ++++ src/replica/QservHttpMgtRequest.cc | 345 ++++++++++++++++ src/replica/QservHttpMgtRequest.h | 391 ++++++++++++++++++ src/replica/QservMgtServices.cc | 44 +- src/replica/QservMgtServices.h | 70 +++- src/replica/TestEchoQservHttpMgtRequest.cc | 83 ++++ src/replica/TestEchoQservHttpMgtRequest.h | 114 +++++ src/wbase/TaskState.h | 39 ++ 14 files changed, 1516 insertions(+), 21 deletions(-) create mode 100644 src/replica/GetConfigQservHttpMgtRequest.cc create mode 100644 src/replica/GetConfigQservHttpMgtRequest.h create mode 100644 src/replica/GetDbStatusQservHttpMgtRequest.cc create mode 100644 src/replica/GetDbStatusQservHttpMgtRequest.h create mode 100644 src/replica/GetStatusQservHttpMgtRequest.cc create mode 100644 src/replica/GetStatusQservHttpMgtRequest.h create mode 100644 src/replica/QservHttpMgtRequest.cc create mode 100644 src/replica/QservHttpMgtRequest.h create mode 100644 src/replica/TestEchoQservHttpMgtRequest.cc create mode 100644 src/replica/TestEchoQservHttpMgtRequest.h diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index 991dc2015..75d0c7a8d 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -74,8 +74,11 @@ target_sources(replica PRIVATE FixUpApp.cc FixUpJob.cc GetReplicasQservMgtRequest.cc + GetDbStatusQservHttpMgtRequest.cc GetDbStatusQservMgtRequest.cc + GetConfigQservHttpMgtRequest.cc GetConfigQservMgtRequest.cc + GetStatusQservHttpMgtRequest.cc GetStatusQservMgtRequest.cc HealthMonitorTask.cc HttpAsyncReqApp.cc @@ -130,6 +133,7 @@ target_sources(replica PRIVATE PurgeJob.cc QhttpTestApp.cc QservGetReplicasJob.cc + QservHttpMgtRequest.cc QservMgtRequest.cc QservMgtServices.cc QservStatusJob.cc @@ -202,6 +206,7 @@ target_sources(replica PRIVATE SuccessRateGenerator.cc SyncApp.cc Task.cc + TestEchoQservHttpMgtRequest.cc TestEchoQservMgtRequest.cc TransactionContrib.cc TransactionsApp.cc diff --git a/src/replica/GetConfigQservHttpMgtRequest.cc b/src/replica/GetConfigQservHttpMgtRequest.cc new file mode 100644 index 000000000..6f4c5140e --- /dev/null +++ b/src/replica/GetConfigQservHttpMgtRequest.cc @@ -0,0 +1,60 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/GetConfigQservHttpMgtRequest.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetConfigQservHttpMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +shared_ptr GetConfigQservHttpMgtRequest::create( + shared_ptr const& serviceProvider, string const& worker, + GetConfigQservHttpMgtRequest::CallbackType const& onFinish) { + return shared_ptr( + new GetConfigQservHttpMgtRequest(serviceProvider, worker, onFinish)); +} + +GetConfigQservHttpMgtRequest::GetConfigQservHttpMgtRequest( + shared_ptr const& serviceProvider, string const& worker, + GetConfigQservHttpMgtRequest::CallbackType const& onFinish) + : QservHttpMgtRequest(serviceProvider, "QSERV_GET_DATABASE_STATUS", worker), _onFinish(onFinish) {} + +void GetConfigQservHttpMgtRequest::createHttpReqImpl(replica::Lock const& lock) { + string const service = "/config"; + createHttpReq(lock, service); +} + +void GetConfigQservHttpMgtRequest::notify(replica::Lock const& lock) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__); + notifyDefaultImpl(lock, _onFinish); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/GetConfigQservHttpMgtRequest.h b/src/replica/GetConfigQservHttpMgtRequest.h new file mode 100644 index 000000000..94c2bec91 --- /dev/null +++ b/src/replica/GetConfigQservHttpMgtRequest.h @@ -0,0 +1,86 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_GETCONFIGQSERVHTTPMGTREQUEST_H +#define LSST_QSERV_REPLICA_GETCONFIGQSERVHTTPMGTREQUEST_H + +// System headers +#include +#include + +// Qserv headers +#include "replica/QservHttpMgtRequest.h" + +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class GetConfigQservHttpMgtRequest is a request for obtaining various info + * on the database service of the Qserv worker. + */ +class GetConfigQservHttpMgtRequest : public QservHttpMgtRequest { +public: + /// The function type for notifications on the completion of the request + typedef std::function const&)> CallbackType; + + GetConfigQservHttpMgtRequest() = delete; + GetConfigQservHttpMgtRequest(GetConfigQservHttpMgtRequest const&) = delete; + GetConfigQservHttpMgtRequest& operator=(GetConfigQservHttpMgtRequest const&) = delete; + + ~GetConfigQservHttpMgtRequest() final = default; + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param serviceProvider A reference to a provider of services for accessing + * Configuration, saving the request's persistent state to the database. + * @param worker The name of a worker to send the request to. + * @param onFinish (optional) callback function to be called upon request completion. + * @return A pointer to the created object. + */ + static std::shared_ptr create( + std::shared_ptr const& serviceProvider, std::string const& worker, + CallbackType const& onFinish = nullptr); + +protected: + /// @see QservHttpMgtRequest::createHttpReq() + virtual void createHttpReqImpl(replica::Lock const& lock) final; + + /// @see QservHttpMgtRequest::notify() + virtual void notify(replica::Lock const& lock) final; + +private: + /// @see GetConfigQservHttpMgtRequest::create() + GetConfigQservHttpMgtRequest(std::shared_ptr const& serviceProvider, + std::string const& worker, CallbackType const& onFinish); + + // Input parameters + + CallbackType _onFinish; ///< this object is reset after finishing the request +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_GETCONFIGQSERVHTTPMGTREQUEST_H diff --git a/src/replica/GetDbStatusQservHttpMgtRequest.cc b/src/replica/GetDbStatusQservHttpMgtRequest.cc new file mode 100644 index 000000000..075de27fd --- /dev/null +++ b/src/replica/GetDbStatusQservHttpMgtRequest.cc @@ -0,0 +1,60 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/GetDbStatusQservHttpMgtRequest.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetDbStatusQservHttpMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +shared_ptr GetDbStatusQservHttpMgtRequest::create( + shared_ptr const& serviceProvider, string const& worker, + GetDbStatusQservHttpMgtRequest::CallbackType const& onFinish) { + return shared_ptr( + new GetDbStatusQservHttpMgtRequest(serviceProvider, worker, onFinish)); +} + +GetDbStatusQservHttpMgtRequest::GetDbStatusQservHttpMgtRequest( + shared_ptr const& serviceProvider, string const& worker, + GetDbStatusQservHttpMgtRequest::CallbackType const& onFinish) + : QservHttpMgtRequest(serviceProvider, "QSERV_GET_DATABASE_STATUS", worker), _onFinish(onFinish) {} + +void GetDbStatusQservHttpMgtRequest::createHttpReqImpl(replica::Lock const& lock) { + string const service = "/mysql"; + createHttpReq(lock, service); +} + +void GetDbStatusQservHttpMgtRequest::notify(replica::Lock const& lock) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__); + notifyDefaultImpl(lock, _onFinish); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/GetDbStatusQservHttpMgtRequest.h b/src/replica/GetDbStatusQservHttpMgtRequest.h new file mode 100644 index 000000000..b1da22a9b --- /dev/null +++ b/src/replica/GetDbStatusQservHttpMgtRequest.h @@ -0,0 +1,86 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_GETDBSTATUSQSERVHTTPMGTREQUEST_H +#define LSST_QSERV_REPLICA_GETDBSTATUSQSERVHTTPMGTREQUEST_H + +// System headers +#include +#include + +// Qserv headers +#include "replica/QservHttpMgtRequest.h" + +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class GetDbStatusQservHttpMgtRequest is a request for obtaining various info + * on the database service of the Qserv worker. + */ +class GetDbStatusQservHttpMgtRequest : public QservHttpMgtRequest { +public: + /// The function type for notifications on the completion of the request + typedef std::function const&)> CallbackType; + + GetDbStatusQservHttpMgtRequest() = delete; + GetDbStatusQservHttpMgtRequest(GetDbStatusQservHttpMgtRequest const&) = delete; + GetDbStatusQservHttpMgtRequest& operator=(GetDbStatusQservHttpMgtRequest const&) = delete; + + ~GetDbStatusQservHttpMgtRequest() final = default; + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param serviceProvider A reference to a provider of services for accessing + * Configuration, saving the request's persistent state to the database. + * @param worker The name of a worker to send the request to. + * @param onFinish (optional) callback function to be called upon request completion. + * @return A pointer to the created object. + */ + static std::shared_ptr create( + std::shared_ptr const& serviceProvider, std::string const& worker, + CallbackType const& onFinish = nullptr); + +protected: + /// @see QservHttpMgtRequest::createHttpReq() + virtual void createHttpReqImpl(replica::Lock const& lock) final; + + /// @see QservHttpMgtRequest::notify() + virtual void notify(replica::Lock const& lock) final; + +private: + /// @see GetDbStatusQservHttpMgtRequest::create() + GetDbStatusQservHttpMgtRequest(std::shared_ptr const& serviceProvider, + std::string const& worker, CallbackType const& onFinish); + + // Input parameters + + CallbackType _onFinish; ///< this object is reset after finishing the request +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_GETDBSTATUSQSERVHTTPMGTREQUEST_H diff --git a/src/replica/GetStatusQservHttpMgtRequest.cc b/src/replica/GetStatusQservHttpMgtRequest.cc new file mode 100644 index 000000000..9d83c184f --- /dev/null +++ b/src/replica/GetStatusQservHttpMgtRequest.cc @@ -0,0 +1,63 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/GetStatusQservHttpMgtRequest.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetStatusQservHttpMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +shared_ptr GetStatusQservHttpMgtRequest::create( + shared_ptr const& serviceProvider, string const& worker, + wbase::TaskSelector const& taskSelector, GetStatusQservHttpMgtRequest::CallbackType const& onFinish) { + return shared_ptr( + new GetStatusQservHttpMgtRequest(serviceProvider, worker, taskSelector, onFinish)); +} + +GetStatusQservHttpMgtRequest::GetStatusQservHttpMgtRequest( + shared_ptr const& serviceProvider, string const& worker, + wbase::TaskSelector const& taskSelector, GetStatusQservHttpMgtRequest::CallbackType const& onFinish) + : QservHttpMgtRequest(serviceProvider, "QSERV_GET_STATUS", worker), + _taskSelector(taskSelector), + _onFinish(onFinish) {} + +void GetStatusQservHttpMgtRequest::createHttpReqImpl(replica::Lock const& lock) { + string const service = "/status"; + string const query = wbase::taskSelectorToHttpQuery(_taskSelector); + createHttpReq(lock, service, query); +} + +void GetStatusQservHttpMgtRequest::notify(replica::Lock const& lock) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__); + notifyDefaultImpl(lock, _onFinish); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/GetStatusQservHttpMgtRequest.h b/src/replica/GetStatusQservHttpMgtRequest.h new file mode 100644 index 000000000..2deb66ead --- /dev/null +++ b/src/replica/GetStatusQservHttpMgtRequest.h @@ -0,0 +1,91 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_GETSTATUSQSERVHTTPMGTREQUEST_H +#define LSST_QSERV_REPLICA_GETSTATUSQSERVHTTPMGTREQUEST_H + +// System headers +#include +#include + +// Qserv headers +#include "replica/QservHttpMgtRequest.h" +#include "wbase/TaskState.h" + +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class GetStatusQservHttpMgtRequest is a request for obtaining various info + * (status, counters, monitoring) reported the Qserv workers. + */ +class GetStatusQservHttpMgtRequest : public QservHttpMgtRequest { +public: + /// The function type for notifications on the completion of the request + typedef std::function const&)> CallbackType; + + GetStatusQservHttpMgtRequest() = delete; + GetStatusQservHttpMgtRequest(GetStatusQservHttpMgtRequest const&) = delete; + GetStatusQservHttpMgtRequest& operator=(GetStatusQservHttpMgtRequest const&) = delete; + + ~GetStatusQservHttpMgtRequest() final = default; + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param serviceProvider A reference to a provider of services for accessing + * Configuration, saving the request's persistent state to the database. + * @param worker The name of a worker to send the request to. + * @param taskSelector (optional) task selection criterias + * @param onFinish (optional) callback function to be called upon request completion. + * @return A pointer to the created object. + */ + static std::shared_ptr create( + std::shared_ptr const& serviceProvider, std::string const& worker, + wbase::TaskSelector const& taskSelector = wbase::TaskSelector(), + CallbackType const& onFinish = nullptr); + +protected: + /// @see QservHttpMgtRequest::createHttpReq() + virtual void createHttpReqImpl(replica::Lock const& lock) final; + + /// @see QservHttpMgtRequest::notify() + virtual void notify(replica::Lock const& lock) final; + +private: + /// @see GetStatusQservHttpMgtRequest::create() + GetStatusQservHttpMgtRequest(std::shared_ptr const& serviceProvider, + std::string const& worker, wbase::TaskSelector const& taskSelector, + CallbackType const& onFinish); + + // Input parameters + + wbase::TaskSelector const _taskSelector; + CallbackType _onFinish; ///< this object is reset after finishing the request +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_GETSTATUSQSERVHTTPMGTREQUEST_H diff --git a/src/replica/QservHttpMgtRequest.cc b/src/replica/QservHttpMgtRequest.cc new file mode 100644 index 000000000..1f15bd0b2 --- /dev/null +++ b/src/replica/QservHttpMgtRequest.cc @@ -0,0 +1,345 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/QservHttpMgtRequest.h" + +// System headers +#include +#include + +// Qserv headers +#include "http/MetaModule.h" +#include "replica/Configuration.h" +#include "replica/Common.h" +#include "replica/DatabaseServices.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace nlohmann; +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.QservHttpMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +atomic QservHttpMgtRequest::_numClassInstances(0); + +string QservHttpMgtRequest::state2string(State state) { + switch (state) { + case State::CREATED: + return "CREATED"; + case State::IN_PROGRESS: + return "IN_PROGRESS"; + case State::FINISHED: + return "FINISHED"; + } + throw logic_error("QservHttpMgtRequest::" + string(__func__) + "(State) incomplete implementation"); +} + +string QservHttpMgtRequest::state2string(ExtendedState state) { + switch (state) { + case ExtendedState::NONE: + return "NONE"; + case ExtendedState::SUCCESS: + return "SUCCESS"; + case ExtendedState::CONFIG_ERROR: + return "CONFIG_ERROR"; + case ExtendedState::BODY_LIMIT_ERROR: + return "BODY_LIMIT_ERROR"; + case ExtendedState::SERVER_BAD: + return "SERVER_BAD"; + case ExtendedState::SERVER_CHUNK_IN_USE: + return "SERVER_CHUNK_IN_USE"; + case ExtendedState::SERVER_ERROR: + return "SERVER_ERROR"; + case ExtendedState::SERVER_BAD_RESPONSE: + return "SERVER_BAD_RESPONSE"; + case ExtendedState::TIMEOUT_EXPIRED: + return "TIMEOUT_EXPIRED"; + case ExtendedState::CANCELLED: + return "CANCELLED"; + } + throw logic_error("QservHttpMgtRequest::" + string(__func__) + + "(ExtendedState) incomplete implementation"); +} + +QservHttpMgtRequest::QservHttpMgtRequest(ServiceProvider::Ptr const& serviceProvider, string const& type, + string const& worker) + : _serviceProvider(serviceProvider), + _type(type), + _id(Generators::uniqueId()), + _worker(worker), + _state(State::CREATED), + _extendedState(ExtendedState::NONE) { + // This report is used solely for debugging purposes to allow tracking + // potential memory leaks within applications. + ++_numClassInstances; + LOGS(_log, LOG_LVL_TRACE, context() << "constructed instances: " << _numClassInstances); +} + +QservHttpMgtRequest::~QservHttpMgtRequest() { + --_numClassInstances; + LOGS(_log, LOG_LVL_TRACE, context() << "destructed instances: " << _numClassInstances); +} + +string QservHttpMgtRequest::state2string() const { + replica::Lock const lock(_mtx, context() + __func__); + return state2string(state(), extendedState()) + "::" + serverError(lock); +} + +string QservHttpMgtRequest::serverError() const { + replica::Lock const lock(_mtx, context() + __func__); + return serverError(lock); +} + +string QservHttpMgtRequest::serverError(replica::Lock const& lock) const { return _serverError; } + +string QservHttpMgtRequest::context() const { + return id() + " " + type() + " " + state2string(state(), extendedState()) + " "; +} + +Performance QservHttpMgtRequest::performance() const { + replica::Lock const lock(_mtx, context() + __func__); + return performance(lock); +} + +Performance QservHttpMgtRequest::performance(replica::Lock const& lock) const { return _performance; } + +void QservHttpMgtRequest::start(string const& jobId, unsigned int requestExpirationIvalSec) { + string const context_ = context() + __func__; + LOGS(_log, LOG_LVL_TRACE, context_); + + replica::Lock const lock(_mtx, context_); + _assertNotStarted(__func__); + + // This needs to be updated to override the default value of the counter + // which was created upon the object construction. + _performance.setUpdateStart(); + + // Check if configuration parameters are valid + auto const config = _serviceProvider->config(); + if (!config->isKnownWorker(_worker)) { + LOGS(_log, LOG_LVL_ERROR, context_ << " ** MISCONFIGURED ** unknown worker: '" << _worker << "'."); + _setState(lock, State::FINISHED, ExtendedState::CONFIG_ERROR); + notify(lock); + return; + } + + // Build an association with the corresponding parent job (optional). + _jobId = jobId; + + // Adjust the default value of the expiration ival (if requested) before + // creating and starting the request. + unsigned int actualExpirationIvalSec = requestExpirationIvalSec; + if (0 == actualExpirationIvalSec) { + actualExpirationIvalSec = config->get("xrootd", "request-timeout-sec"); + } + + createHttpReqImpl(lock); + + _httpRequest->setExpirationIval(actualExpirationIvalSec); + _httpRequest->start(); + + _setState(lock, State::IN_PROGRESS); +} + +void QservHttpMgtRequest::wait() { + LOGS(_log, LOG_LVL_DEBUG, context() << __func__); + if (_state == State::FINISHED) return; + unique_lock onFinishLock(_onFinishMtx); + _onFinishCv.wait(onFinishLock, [&] { return _finished; }); +} + +string const& QservHttpMgtRequest::jobId() const { + _assertStarted(__func__); + return _jobId; +} + +json const& QservHttpMgtRequest::info() const { + if (!((_state == State::FINISHED) && (_extendedState == ExtendedState::SUCCESS))) { + throw logic_error("QservHttpMgtRequest::" + string(__func__) + + " no info available in state: " + state2string(_state, _extendedState)); + } + return _info; +} + +void QservHttpMgtRequest::cancel() { + _assertStarted(__func__); + // No HTTP request would be sent if the request creation failed for some + // reason (like misconfiguration, etc.). Hence, there is nothing to cancel. + { + replica::Lock const lock(_mtx, context() + __func__); + if (_httpRequest == nullptr) return; + } + _httpRequest->cancel(); +} + +void QservHttpMgtRequest::createHttpReq(replica::Lock const& lock, string const& service, + string const& query) { + _assertNotStarted(__func__); + string const target = service + query + string(query.empty() ? "?" : "&") + "id" + _id + + "&instance_id=" + _serviceProvider->instanceId() + "&worker=" + _worker + + "&version=" + to_string(http::MetaModule::version); + _httpRequest = http::AsyncReq::create( + _serviceProvider->io_service(), + [self = shared_from_this()](shared_ptr const&) { self->_processResponse(); }, + "GET", _getHostPortTracker(), target); +} + +void QservHttpMgtRequest::createHttpReq(replica::Lock const& lock, string const& method, string const& target, + json const& body) { + _assertNotStarted(__func__); + json data = body; + data["id"] = _id; + data["instance_id"] = _serviceProvider->instanceId(); + data["worker"] = _worker; + data["auth_key"] = _serviceProvider->authKey(); + data["admin_auth_key"] = _serviceProvider->adminAuthKey(); + data["version"] = http::MetaModule::version; + unordered_map const headers = {{"Content-Type", "application/json"}}; + _httpRequest = http::AsyncReq::create( + _serviceProvider->io_service(), + [self = shared_from_this()](shared_ptr const&) { self->_processResponse(); }, + method, _getHostPortTracker(), target, data.dump(), headers); +} + +void QservHttpMgtRequest::finish(replica::Lock const& lock, ExtendedState extendedState, + string const& serverError) { + string const context_ = context() + __func__; + LOGS(_log, LOG_LVL_DEBUG, context_ << " serverError:" << serverError); + + // Set the optional server error state as well. + // IMPORTANT: this needs to be done before performing the state transition to insure + // clients will get a consistent view onto the object state. + _serverError = serverError; + + // Set new state to make sure all event handlers will recognize + // this scenario and avoid making any modifications to the request's state. + _setState(lock, State::FINISHED, extendedState); + + // We have to update the timestamp before invoking a user provided + // callback on the completion of the operation. + _performance.setUpdateFinish(); + + // TODO: temporarily disabled while this class is not supported by + // the persistent backend. + // + // _serviceProvider->databaseServices()->saveState(*this, _performance, _serverError); + + notify(lock); + + // Unblock threads (if any) waiting on the synchronization call to the method wait(). + _finished = true; + _onFinishCv.notify_all(); +} + +http::AsyncReq::GetHostPort QservHttpMgtRequest::_getHostPortTracker() const { + return [config = _serviceProvider->config(), + worker = _worker](http::AsyncReq::HostPort const&) -> http::AsyncReq::HostPort { + auto const info = config->workerInfo(worker); + return http::AsyncReq::HostPort{info.qservWorker.host.addr, info.qservWorker.port}; + }; +} + +void QservHttpMgtRequest::_processResponse() { + string const context_ = context() + string(__func__) + " "; + if (_state == State::FINISHED) return; + replica::Lock const lock(_mtx, context_); + if (_state == State::FINISHED) return; + + switch (_httpRequest->state()) { + case http::AsyncReq::State::FINISHED: + try { + _info = json::parse(_httpRequest->responseBody()); + if (_info.at("success").get() == 0) { + string const msg = "worker reported error: " + _info.at("error").get(); + finish(lock, ExtendedState::SERVER_BAD, msg); + } else { + // Let a subclass do the optional result validation and post processing. + // Note that the subclass has the final say on the completion status + // of the request. + finish(lock, dataReady(lock, _info)); + } + } catch (exception const& ex) { + string const msg = "failed to parse/process worker response, ex: " + string(ex.what()); + finish(lock, ExtendedState::SERVER_BAD_RESPONSE, msg); + } + break; + case http::AsyncReq::State::FAILED: + finish(lock, ExtendedState::SERVER_ERROR, + _httpRequest->errorMessage() + ", code: " + to_string(_httpRequest->responseCode())); + break; + case http::AsyncReq::State::BODY_LIMIT_ERROR: + finish(lock, ExtendedState::BODY_LIMIT_ERROR, + _httpRequest->errorMessage() + ", code: " + to_string(_httpRequest->responseCode())); + break; + case http::AsyncReq::State::CANCELLED: + finish(lock, ExtendedState::CANCELLED); + break; + case http::AsyncReq::State::EXPIRED: + finish(lock, ExtendedState::TIMEOUT_EXPIRED); + break; + default: + throw runtime_error(context_ + "unsupported state of the HTTP _httpRequest: " + + http::AsyncReq::state2str(_httpRequest->state())); + } +} + +void QservHttpMgtRequest::_assertStarted(string const& func, bool mustBeStarted) const { + string const context_ = context() + func; + LOGS(_log, LOG_LVL_TRACE, context_); + if (mustBeStarted) { + if (State::CREATED == _state) { + throw logic_error(context_ + " the request was not started."); + } + } else { + if (State::CREATED != _state) { + throw logic_error(context_ + " the request was already started."); + } + } +} + +void QservHttpMgtRequest::_setState(replica::Lock const& lock, State newState, + ExtendedState newExtendedState) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__ << " " << state2string(newState, newExtendedState)); + + // IMPORTANT: the top-level state is the last to be set when performing + // the state transition to insure clients will get a consistent view onto + // the object state. + { + unique_lock onFinishLock(_onFinishMtx); + _extendedState = newExtendedState; + _state = newState; + } + + // TODO: temporarily disabled while this class is not supported by + // the persistent backend. + // + // _serviceProvider->databaseServices()->saveState(*this, _performance, _serverError); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/QservHttpMgtRequest.h b/src/replica/QservHttpMgtRequest.h new file mode 100644 index 000000000..5cc1370e8 --- /dev/null +++ b/src/replica/QservHttpMgtRequest.h @@ -0,0 +1,391 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_QSERVHTTPMGTREQUEST_H +#define LSST_QSERV_REPLICA_QSERVHTTPMGTREQUEST_H + +// System headers +#include +#include +#include +#include +#include +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "http/AsyncReq.h" +#include "replica/Mutex.h" +#include "replica/Performance.h" +#include "replica/ServiceProvider.h" + +// This header declarations +namespace lsst::qserv::replica { + +/** + * @brief QservHttpMgtRequest is a base class for a family of the Qserv worker + * management requests within the master server. + */ +class QservHttpMgtRequest : public std::enable_shared_from_this { +public: + typedef std::shared_ptr Ptr; + + /// The lock type used by the implementations + typedef std::lock_guard LockType; + + /// The type which represents the primary public state of the request + enum State { + CREATED, ///< The request has been constructed, and no attempt to execute + /// it has been made. + IN_PROGRESS, ///< The request is in a progress. + FINISHED ///< The request is finished. See extended status for more details + /// (the completion status, etc.) + }; + + /// @return the string representation of the primary state + static std::string state2string(State state); + + /// Type ExtendedState represents the refined public sub-state of the request + /// once it's FINISHED as per the above defined primary state. + enum ExtendedState { + NONE, ///< No extended state exists at this time. + SUCCESS, ///< The request has been fully implemented. + CONFIG_ERROR, ///< Problems with request configuration were detected. + BODY_LIMIT_ERROR, ///< Response's body is larger than requested. + SERVER_BAD, ///< Server reports that the request can not be implemented because + /// of configuration or request's parameters problems. + SERVER_CHUNK_IN_USE, ///< Server reports that the request can not be implemented because + /// some of the required remote resources (chunks, etc.) are in use. + SERVER_ERROR, ///< The request could not be implemented due to an unrecoverable + /// server-side error. + SERVER_BAD_RESPONSE, ///< Data received from a server can't be correctly interpreted. + TIMEOUT_EXPIRED, ///< Expired due to a timeout. + CANCELLED ///< Explicitly cancelled on the client-side. + }; + + /// @return the string representation of the extended state + static std::string state2string(ExtendedState state); + + /// @return the string representation of the combined state + static std::string state2string(State state, ExtendedState extendedState) { + return state2string(state) + "::" + state2string(extendedState); + } + + QservHttpMgtRequest() = delete; + QservHttpMgtRequest(QservHttpMgtRequest const&) = delete; + QservHttpMgtRequest& operator=(QservHttpMgtRequest const&) = delete; + + virtual ~QservHttpMgtRequest(); + + /// @return reference to a provider of services + std::shared_ptr const& serviceProvider() { return _serviceProvider; } + + /// @return string representing of the request type. + std::string const& type() const { return _type; } + + /// @return unique identifier of the request + std::string const& id() const { return _id; } + + /// @return name of a worker + std::string const& worker() const { return _worker; } + + /// @return primary status of the request + State state() const { return _state; } + + /// @return extended status of the request + ExtendedState extendedState() const { return _extendedState; } + + /// @return string representation of the combined state of the object + std::string state2string() const; + + /// @return error message (if any) reported by the remote service. + std::string serverError() const; + + /// @return performance info + Performance performance() const; + + /** + * @return An identifier if the owning job (if the request has started). + * @throws std::logic_error If the request hasn't started. + */ + std::string const& jobId() const; + + /** + * @return The info object returned by the worker. + * @throw std::logic_error if called before the request finishes or if it failed. + */ + nlohmann::json const& info() const; + + /** + * @brief Begin processing the request. + * @param jobId (Optional) identifier of a job specifying a context of the request. + * @param requestExpirationIvalSec (Optional) parameter (if differs from 0) allowing + * to override the default value of the corresponding parameter from the Configuration. + */ + void start(std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); + + /// Wait for the completion of the request + void wait(); + + /** + * @brief Explicitly cancel any asynchronous operation(s) and put the object into + * the FINISHED::CANCELLED state. This operation is very similar to the + * timeout-based request expiration, except it's requested explicitly. + */ + void cancel(); + + /// @return The context string for debugging and diagnostic printouts. + std::string context() const; + + /// @return A dictionary of parameters and the corresponding values to be stored + /// in a database for a request. + virtual std::list> extendedPersistentState() const { + return std::list>(); + } + +protected: + /** + * @brief Construct the request with the pointer to the services provider. + * @param serviceProvider Is required to access configuration services. + * @param type The type name of he request (used for debugging and error reporting). + * @param worker The name of a worker. + */ + QservHttpMgtRequest(std::shared_ptr const& serviceProvider, std::string const& type, + std::string const& worker); + + /// @return A shared pointer of the desired subclass (no dynamic type checking). + template + std::shared_ptr shared_from_base() { + return std::static_pointer_cast(shared_from_this()); + } + + /** + * @brief Create an HTTP request. + * + * The methods is required to be provided by subclasses for creating + * subclass-specific requests using the coresponding helper methods. + * + * @see QservHttpMgtRequest::createHttpReq + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + */ + virtual void createHttpReqImpl(replica::Lock const& lock) = 0; + + /** + * @brief Create an HTTP "GET" request, but do not start it yet. + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param service The REST service (w/o the query part) to be called. + * @param (optional) HTTP query for the request. + * @throw std::logic_error If the method is called while the curent state + * is not State::CREATED, or if the HTTP request was already created. + */ + void createHttpReq(replica::Lock const& lock, std::string const& service, + std::string const& query = std::string()); + + /** + * @brief Create an HTTP request ("POST", "PUT" or "DELETE") that has the JSON body, + * but do not start it yet. + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param method An HTTP method ("POST", "PUT" or "DELETE") for the request, + * @param service The complete target (including the REST service and the query part) to be called. + * @param body A JSON object to be sent in the request's body. + * @throw std::logic_error If the method is called while the curent state + * is not State::CREATED, or if the HTTP request was already created. + */ + void createHttpReq(replica::Lock const& lock, std::string const& method, std::string const& target, + nlohmann::json const& body); + + /** + * @brief Notify a subclass that a data object was was succesfully retrieved + * from the worker. + * + * This method allows subclasses to implement the optional result validation and processing, + * including a translation of the JSON object into the subclas-specific result type. + * + * @note Any exceptions thrown by the method will result in setting the status + * ExtendedState::SERVER_BAD_RESPONSE to indicate a problem with interpreting the data. + * The method is also required to report its final verdican on the status of the object. + * Normally, it's going to be ExtendedState::SUCCESS. However, a subclass may set + * a different status, depending on its findings. + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param data The JSON result to be processed by a subclass. + * @return The final verdict made by the subclass on the completion status. + */ + virtual ExtendedState dataReady(replica::Lock const& lock, nlohmann::json const& data) { + return ExtendedState::SUCCESS; + } + + /** + * @brief Finalize request processing (as reported by subclasses) + * @note This is supposed to be the last operation to be called by subclasses + * upon a completion of the request. + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param extendedState The new extended state. + * @param serverError (optional) error message from a Qserv worker service. + */ + void finish(replica::Lock const& lock, ExtendedState extendedState, std::string const& serverError = ""); + + /** + * @brief Start user-notification protocol (in case if user-defined notifiers + * were provided to a subclass). The callback is expected to be made + * asynchronously in a separate thread to avoid blocking the current thread. + * + * This method has to be provided by subclasses to forward + * notification on request completion to a client which initiated + * the request, etc. + * + * The standard implementation of this method in a context of some + * subclass 'T' should looks like this: + * @code + * void T::notify(replica::Lock const& lock) { + * notifyDefaultImpl(lock, _onFinish); + * } + * @code + * @see QservHttpMgtRequest::notifyDefaultImpl + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + */ + virtual void notify(replica::Lock const& lock) = 0; + + /** + * @brief The helper function which pushes up-stream notifications on behalf of + * subclasses. Upon a completion of this method the callback function + * object will get reset to 'nullptr'. + * @note This default implementation works for callback functions which + * accept a single parameter - a smart reference onto an object of + * the corresponding subclass. Subclasses with more complex signatures of + * their callbacks should have their own implementations which may look + * similarly to this one. + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param onFinish A callback function (if set) to be called. + */ + template + void notifyDefaultImpl(replica::Lock const& lock, typename T::CallbackType& onFinish) { + if (nullptr != onFinish) { + // Clearing the stored callback after finishing the up-stream notification + // has two purposes: + // 1. it guaranties (exactly) one time notification + // 2. it breaks the up-stream dependency on a caller object if a shared + // pointer to the object was mentioned as the lambda-function's closure + serviceProvider()->io_service().post(std::bind(std::move(onFinish), shared_from_base())); + onFinish = nullptr; + } + } + + /** + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @return A server error string (if any). + */ + std::string serverError(replica::Lock const& lock) const; + + /** + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @return The performance info. + */ + Performance performance(replica::Lock const& lock) const; + + /// Mutex guarding internal state (also used by subclasses) + mutable replica::Mutex _mtx; + +private: + /// @return The callback function for tracking connection parameters of the worker. + http::AsyncReq::GetHostPort _getHostPortTracker() const; + + /// Extract and process data of the completed request. Notify a subclass in case of success. + void _processResponse(); + + /** + * @brief Ensure the request is in the desired state. + * @note The method doesn't require a lock on the mutex _mtx to be acqured before + * being called neither it acquires the lock by itself since the implementation + * of the method relies on checking a value of the atomic variable representing + * the primary state of the request. + * @param func A context from which the state test is requested (for tracing + * and error reporting). + * @param mustBeStarted 'true' if the request is expected to be in any + * state past State::CREATED, or 'false' if it's expected to be exactly + * in State::CREATED. + * @throws std::logic_error If the request was not found in the desired state. + */ + void _assertStarted(std::string const& func, bool mustBeStarted) const; + + // Shortcut methods based on the above defined one. + + void _assertStarted(std::string const& func) const { _assertStarted(func, true); } + void _assertNotStarted(std::string const& func) const { _assertStarted(func, false); } + + /** + * @brief Set the desired primary and extended state. + * + * The change of the state is done via a method to allow extra actions + * at this step, such as: 1) reporting change state in a debug stream, + * or 2) verifying the correctness of the state transition. + * + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param state The primary state of the request. + * @param extendedState The extended state of the request. + */ + void _setState(replica::Lock const& lock, State state, ExtendedState extendedState = ExtendedState::NONE); + + /// The global counter for the number of instances of any subclass + static std::atomic _numClassInstances; + + // Input parameters + + std::shared_ptr const _serviceProvider; + + std::string const _type; + std::string const _id; + std::string const _worker; + + // Two-level state of a request + std::atomic _state; ///< The primary state. + std::atomic _extendedState; ///< The sub-state. + + /// Error message (if any) reported by the remote service + std::string _serverError; + + Performance _performance; ///< Performance counters. + std::string _jobId; ///< An identifier of the parent job which started the request. + + std::shared_ptr _httpRequest; ///< The actual request sent to the worker. + nlohmann::json _info; ///< The data object returned by the worker. + + // Synchronization primitives for implementing QservMgtRequest::wait() + + bool _finished = false; + std::mutex _onFinishMtx; + std::condition_variable _onFinishCv; +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_QSERVHTTPMGTREQUEST_H diff --git a/src/replica/QservMgtServices.cc b/src/replica/QservMgtServices.cc index 7c28b3299..a28659e8d 100644 --- a/src/replica/QservMgtServices.cc +++ b/src/replica/QservMgtServices.cc @@ -24,16 +24,19 @@ // System headers #include +#include // Third party headers #include "XrdSsi/XrdSsiProvider.hh" #include "XrdSsi/XrdSsiService.hh" // Qserv headers +#include "http/MetaModule.h" #include "replica/Configuration.h" #include "replica/ServiceProvider.h" #include "util/BlockPost.h" #include "util/TimeUtils.h" +#include "wbase/TaskState.h" // LSST headers #include "lsst/log/Log.h" @@ -41,6 +44,7 @@ /// This C++ symbol is provided by the SSI shared library extern XrdSsiProvider* XrdSsiProviderClient; +using namespace lsst::qserv; using namespace std; namespace { @@ -277,6 +281,14 @@ TestEchoQservMgtRequest::Ptr QservMgtServices::echo(string const& worker, string return request; } +TestEchoQservHttpMgtRequest::Ptr QservMgtServices::echoOverHttp( + string const& worker, string const& data, string const& jobId, + TestEchoQservHttpMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) { + auto request = TestEchoQservHttpMgtRequest::create(serviceProvider(), worker, data, onFinish); + request->start(jobId, requestExpirationIvalSec); + return request; +} + GetStatusQservMgtRequest::Ptr QservMgtServices::status(std::string const& worker, std::string const& jobId, wbase::TaskSelector const& taskSelector, GetStatusQservMgtRequest::CallbackType const& onFinish, @@ -312,6 +324,14 @@ GetStatusQservMgtRequest::Ptr QservMgtServices::status(std::string const& worker return request; } +GetStatusQservHttpMgtRequest::Ptr QservMgtServices::statusOverHttp( + std::string const& worker, std::string const& jobId, wbase::TaskSelector const& taskSelector, + GetStatusQservHttpMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) { + auto request = GetStatusQservHttpMgtRequest::create(serviceProvider(), worker, taskSelector, onFinish); + request->start(jobId, requestExpirationIvalSec); + return request; +} + GetDbStatusQservMgtRequest::Ptr QservMgtServices::databaseStatus( std::string const& worker, std::string const& jobId, GetDbStatusQservMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) { @@ -346,6 +366,14 @@ GetDbStatusQservMgtRequest::Ptr QservMgtServices::databaseStatus( return request; } +GetDbStatusQservHttpMgtRequest::Ptr QservMgtServices::databaseStatusOverHttp( + string const& worker, string const& jobId, + GetDbStatusQservHttpMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) { + auto request = GetDbStatusQservHttpMgtRequest::create(serviceProvider(), worker, onFinish); + request->start(jobId, requestExpirationIvalSec); + return request; +} + GetConfigQservMgtRequest::Ptr QservMgtServices::config(std::string const& worker, std::string const& jobId, GetConfigQservMgtRequest::CallbackType const& onFinish, unsigned int requestExpirationIvalSec) { @@ -378,6 +406,14 @@ GetConfigQservMgtRequest::Ptr QservMgtServices::config(std::string const& worker return request; } +GetConfigQservHttpMgtRequest::Ptr QservMgtServices::configOverHttp( + string const& worker, string const& jobId, GetConfigQservHttpMgtRequest::CallbackType const& onFinish, + unsigned int requestExpirationIvalSec) { + auto request = GetConfigQservHttpMgtRequest::create(serviceProvider(), worker, onFinish); + request->start(jobId, requestExpirationIvalSec); + return request; +} + void QservMgtServices::_finish(string const& id) { string const context = id + " QservMgtServices::" + string(__func__) + " "; @@ -415,8 +451,8 @@ XrdSsiService* QservMgtServices::_xrdSsiService() { if (_service != nullptr) return _service; string const serviceProviderLocation = - _serviceProvider->config()->get("xrootd", "host") + ":" + - to_string(_serviceProvider->config()->get("xrootd", "port")); + serviceProvider()->config()->get("xrootd", "host") + ":" + + to_string(serviceProvider()->config()->get("xrootd", "port")); XrdSsiErrInfo errInfo; unsigned int const intervalBetweenReconnectsMs = 1000; @@ -432,9 +468,9 @@ XrdSsiService* QservMgtServices::_xrdSsiService() { uint64_t const timeSinceStartedSec = util::TimeUtils::now() / 1000 - startedConnectionAttemptsSec; - if (_serviceProvider->config()->get("xrootd", "allow-reconnect") && + if (serviceProvider()->config()->get("xrootd", "allow-reconnect") && timeSinceStartedSec < - _serviceProvider->config()->get("xrootd", "reconnect-timeout")) { + serviceProvider()->config()->get("xrootd", "reconnect-timeout")) { LOGS(_log, LOG_LVL_WARN, "QservMgtServices::" << __func__ << " failed to contact service provider at: " << serviceProviderLocation << ", error: " << errInfo.Get() diff --git a/src/replica/QservMgtServices.h b/src/replica/QservMgtServices.h index 64a581ced..644fd6c0f 100644 --- a/src/replica/QservMgtServices.h +++ b/src/replica/QservMgtServices.h @@ -30,12 +30,16 @@ // Qserv headers #include "replica/AddReplicaQservMgtRequest.h" #include "replica/GetReplicasQservMgtRequest.h" +#include "replica/GetDbStatusQservHttpMgtRequest.h" #include "replica/GetDbStatusQservMgtRequest.h" +#include "replica/GetConfigQservHttpMgtRequest.h" #include "replica/GetConfigQservMgtRequest.h" +#include "replica/GetStatusQservHttpMgtRequest.h" #include "replica/GetStatusQservMgtRequest.h" #include "replica/RemoveReplicaQservMgtRequest.h" #include "replica/ServiceProvider.h" #include "replica/SetReplicasQservMgtRequest.h" +#include "replica/TestEchoQservHttpMgtRequest.h" #include "replica/TestEchoQservMgtRequest.h" #include "replica/Mutex.h" #include "wbase/TaskState.h" @@ -43,6 +47,10 @@ // Forward declarations class XrdSsiService; +namespace lsst::qserv::base { +class TaskSelector; +} // namespace lsst::qserv::base + // This header declarations namespace lsst::qserv::replica { @@ -111,8 +119,9 @@ class QservMgtServices : public std::enable_shared_from_this { * @param onFinish A callback function called on a completion of the operation. * @param jobId An optional identifier of a job specifying a context * in which a request will be executed. - * @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing - * to override the default value of the corresponding parameter from the Configuration. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. * @return A pointer to the request object if the request was made. Return * nullptr otherwise. */ @@ -132,8 +141,9 @@ class QservMgtServices : public std::enable_shared_from_this { * @param onFinish A callback function called on a completion of the operation. * @param jobId An optional identifier of a job specifying a context * in which a request will be executed. - * @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing - * to override the default value of the corresponding parameter from the Configuration. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. * @return A pointer to the request object if the request was made. Return * nullptr otherwise. */ @@ -151,8 +161,9 @@ class QservMgtServices : public std::enable_shared_from_this { * @param onFinish A callback function to be called upon request completion. * @param jobId An optional identifier of a job specifying a context in which * a request will be executed. - * @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing - * to override the default value of the corresponding parameter from the Configuration. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. * @return A pointer to the request object if the request was made. Return * nullptr otherwise. */ @@ -172,9 +183,9 @@ class QservMgtServices : public std::enable_shared_from_this { * @param onFinish A callback function to be called upon request completion. * @param jobId An optional identifier of a job specifying a context in which * a request will be executed. - * @param requestExpirationIvalSec An optional parameter (if differs from 0) - * allowing to override the default value of the corresponding parameter from - * the Configuration. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. * @return A pointer to the request object if the request was made. Return * nullptr otherwise. */ @@ -192,8 +203,9 @@ class QservMgtServices : public std::enable_shared_from_this { * @param onFinish A callback function to be called upon request completion. * @param jobId An optional identifier of a job specifying a context in which * a request will be executed. - * @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing - * to override the default value of the corresponding parameter from the Configuration. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. * @return A pointer to the request object if the request was made. Return * nullptr otherwise. */ @@ -202,6 +214,11 @@ class QservMgtServices : public std::enable_shared_from_this { TestEchoQservMgtRequest::CallbackType const& onFinish = nullptr, unsigned int requestExpirationIvalSec = 0); + TestEchoQservHttpMgtRequest::Ptr echoOverHttp( + std::string const& worker, std::string const& data, std::string const& jobId = "", + TestEchoQservHttpMgtRequest::CallbackType const& onFinish = nullptr, + unsigned int requestExpirationIvalSec = 0); + /** * Request detailed status of a Qserv worker * @@ -210,8 +227,9 @@ class QservMgtServices : public std::enable_shared_from_this { * a request will be executed. * @param taskSelector An optional task selection criterias. * @param onFinish A callback function to be called upon request completion. - * @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing - * to override the default value of the corresponding parameter from the Configuration. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. * @return A pointer to the request object if the request was made. Return * nullptr otherwise. */ @@ -220,6 +238,12 @@ class QservMgtServices : public std::enable_shared_from_this { GetStatusQservMgtRequest::CallbackType const& onFinish = nullptr, unsigned int requestExpirationIvalSec = 0); + GetStatusQservHttpMgtRequest::Ptr statusOverHttp( + std::string const& worker, std::string const& jobId = "", + wbase::TaskSelector const& taskSelector = wbase::TaskSelector(), + GetStatusQservHttpMgtRequest::CallbackType const& onFinish = nullptr, + unsigned int requestExpirationIvalSec = 0); + /** * Request detailed status on the database service of a Qserv worker * @@ -227,8 +251,9 @@ class QservMgtServices : public std::enable_shared_from_this { * @param jobId An optional identifier of a job specifying a context in which * a request will be executed. * @param onFinish A callback function to be called upon request completion. - * @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing it - * to override the default value of the corresponding parameter from the Configuration. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. * @return A pointer to the request object if the request was made. Return * nullptr otherwise. */ @@ -237,6 +262,11 @@ class QservMgtServices : public std::enable_shared_from_this { GetDbStatusQservMgtRequest::CallbackType const& onFinish = nullptr, unsigned int requestExpirationIvalSec = 0); + GetDbStatusQservHttpMgtRequest::Ptr databaseStatusOverHttp( + std::string const& worker, std::string const& jobId = "", + GetDbStatusQservHttpMgtRequest::CallbackType const& onFinish = nullptr, + unsigned int requestExpirationIvalSec = 0); + /** * Request configuration parameters of a Qserv worker * @@ -244,8 +274,9 @@ class QservMgtServices : public std::enable_shared_from_this { * @param jobId An optional identifier of a job specifying a context in which * a request will be executed. * @param onFinish A callback function to be called upon request completion. - * @param requestExpirationIvalSec An optional parameter (if differs from 0) allowing it - * to override the default value of the corresponding parameter from the Configuration. + * @param requestExpirationIvalSec The maximum amount of time to wait before + * completion of the request. If a value of the parameter is set to 0 then no + * limit will be enforced. * @return A pointer to the request object if the request was made. Return * nullptr otherwise. */ @@ -253,6 +284,11 @@ class QservMgtServices : public std::enable_shared_from_this { GetConfigQservMgtRequest::CallbackType const& onFinish = nullptr, unsigned int requestExpirationIvalSec = 0); + GetConfigQservHttpMgtRequest::Ptr configOverHttp( + std::string const& worker, std::string const& jobId = "", + GetConfigQservHttpMgtRequest::CallbackType const& onFinish = nullptr, + unsigned int requestExpirationIvalSec = 0); + private: /** * @param serviceProvider Is required for accessing configuration parameters. diff --git a/src/replica/TestEchoQservHttpMgtRequest.cc b/src/replica/TestEchoQservHttpMgtRequest.cc new file mode 100644 index 000000000..2b2593ae6 --- /dev/null +++ b/src/replica/TestEchoQservHttpMgtRequest.cc @@ -0,0 +1,83 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/TestEchoQservHttpMgtRequest.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace nlohmann; +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.TestEchoQservHttpMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +shared_ptr TestEchoQservHttpMgtRequest::create( + shared_ptr const& serviceProvider, string const& worker, string const& data, + TestEchoQservHttpMgtRequest::CallbackType const& onFinish) { + return shared_ptr( + new TestEchoQservHttpMgtRequest(serviceProvider, worker, data, onFinish)); +} + +TestEchoQservHttpMgtRequest::TestEchoQservHttpMgtRequest( + shared_ptr const& serviceProvider, string const& worker, string const& data, + TestEchoQservHttpMgtRequest::CallbackType const& onFinish) + : QservHttpMgtRequest(serviceProvider, "QSERV_TEST_ECHO", worker), _data(data), _onFinish(onFinish) {} + +string const& TestEchoQservHttpMgtRequest::dataEcho() const { + if (not((state() == State::FINISHED) and (extendedState() == ExtendedState::SUCCESS))) { + throw logic_error("TestEchoQservHttpMgtRequest::" + string(__func__) + + " no data available in state: " + state2string(state(), extendedState())); + } + return _dataEcho; +} + +list> TestEchoQservHttpMgtRequest::extendedPersistentState() const { + list> result; + result.emplace_back("data_length_bytes", to_string(data().size())); + return result; +} + +void TestEchoQservHttpMgtRequest::createHttpReqImpl(replica::Lock const& lock) { + string const method = "POST"; + string const target = "/echo"; + json const data = json::object({{"data", _data}}); + createHttpReq(lock, method, target, data); +} + +QservHttpMgtRequest::ExtendedState TestEchoQservHttpMgtRequest::dataReady(replica::Lock const& lock, + json const& data) { + _dataEcho = data.at("data"); + return QservHttpMgtRequest::ExtendedState::SUCCESS; +} + +void TestEchoQservHttpMgtRequest::notify(replica::Lock const& lock) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__); + notifyDefaultImpl(lock, _onFinish); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/TestEchoQservHttpMgtRequest.h b/src/replica/TestEchoQservHttpMgtRequest.h new file mode 100644 index 000000000..a3b005457 --- /dev/null +++ b/src/replica/TestEchoQservHttpMgtRequest.h @@ -0,0 +1,114 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_TESTECHOQSERVHTTPMGTREQUEST_H +#define LSST_QSERV_REPLICA_TESTECHOQSERVHTTPMGTREQUEST_H + +// System headers +#include +#include +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "replica/QservHttpMgtRequest.h" + +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class TestEchoQservHttpMgtRequest a special kind of requests + * for testing Qserv workers. + */ +class TestEchoQservHttpMgtRequest : public QservHttpMgtRequest { +public: + /// The function type for notifications on the completion of the request + typedef std::function const&)> CallbackType; + + TestEchoQservHttpMgtRequest() = delete; + TestEchoQservHttpMgtRequest(TestEchoQservHttpMgtRequest const&) = delete; + TestEchoQservHttpMgtRequest& operator=(TestEchoQservHttpMgtRequest const&) = delete; + + ~TestEchoQservHttpMgtRequest() final = default; + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param serviceProvider A reference to a provider of services for accessing + * Configuration, saving the request's persistent state to the database. + * @param worker The name of a worker to send the request to. + * @param data The data string to be echoed back by the worker (if successful). + * @param onFinish (optional) callback function to be called upon request completion. + * @return A pointer to the created object. + */ + static std::shared_ptr create( + std::shared_ptr const& serviceProvider, std::string const& worker, + std::string const& data, CallbackType const& onFinish = nullptr); + + /// @return input data string sent to the worker + std::string const& data() const { return _data; } + + /** + * @return The data string echoed back by the worker. + * @note The method will throw exception std::logic_error if called before + * the request finishes or if it's finished with any status but SUCCESS. + */ + std::string const& dataEcho() const; + + /// @see QservHttpMgtRequest::extendedPersistentState() + virtual std::list> extendedPersistentState() const final; + +protected: + /// @see QservHttpMgtRequest::createHttpReq() + virtual void createHttpReqImpl(replica::Lock const& lock) final; + + /// @see QservHttpMgtRequest::dataReady() + virtual QservHttpMgtRequest::ExtendedState dataReady(replica::Lock const& lock, + nlohmann::json const& data) final; + + /// @see QservHttpMgtRequest::notify() + virtual void notify(replica::Lock const& lock) final; + +private: + /// @see TestEchoQservHttpMgtRequest::create() + TestEchoQservHttpMgtRequest(std::shared_ptr const& serviceProvider, + std::string const& worker, std::string const& data, + CallbackType const& onFinish); + + // Input parameters + + std::string const _data; + CallbackType _onFinish; ///< this object is reset after finishing the request + + /// The data string returned by the Qserv worker + std::string _dataEcho; +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_TESTECHOQSERVHTTPMGTREQUEST_H diff --git a/src/wbase/TaskState.h b/src/wbase/TaskState.h index e53e1ab98..70c8ce66b 100644 --- a/src/wbase/TaskState.h +++ b/src/wbase/TaskState.h @@ -22,7 +22,11 @@ #define LSST_QSERV_WBASE_TASKSTATE_H // System headers +#include #include +#include +#include +#include #include #include #include @@ -73,6 +77,11 @@ inline std::string taskState2str(TaskState state) { } } +inline std::ostream& operator<<(std::ostream& os, TaskState state) { + os << taskState2str(state); + return os; +} + /// @return The parsed state of the input string. /// @throw std::invalid_argument If the string can't be parsed into a valid state. inline TaskState str2taskState(std::string const& state) { @@ -102,6 +111,36 @@ struct TaskSelector { std::uint32_t maxTasks = 0U; }; +/** + * Stringify the task selector into a complete HTTP query string. + * The resulting query will have the following format: + * @code + * ?include_tasks=&max_tasks=[&query_ids=[,...][&task_states=[,...] + * @code + * @param taskSelector The selector to be processed. + * @return The corresponding query. + */ +inline std::string taskSelectorToHttpQuery(TaskSelector const& taskSelector) { + std::string query; + query += "?include_tasks=" + std::string(taskSelector.includeTasks ? "1" : "0"); + query += "&max_tasks=" + std::to_string(taskSelector.maxTasks); + if (!taskSelector.queryIds.empty()) { + std::ostringstream ss; + std::copy(taskSelector.queryIds.begin(), taskSelector.queryIds.end() - 1, + std::ostream_iterator(ss, ",")); + ss << taskSelector.queryIds.back(); + query += "&query_ids=" + ss.str(); + } + if (!taskSelector.taskStates.empty()) { + std::ostringstream ss; + std::copy(taskSelector.taskStates.begin(), taskSelector.taskStates.end() - 1, + std::ostream_iterator(ss, ",")); + ss << taskSelector.taskStates.back(); + query += "&task_states=" + ss.str(); + } + return query; +} + } // namespace lsst::qserv::wbase #endif // LSST_QSERV_WBASE_TASKSTATE_H