diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index 991dc2015..60b291cab 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -76,6 +76,7 @@ target_sources(replica PRIVATE GetReplicasQservMgtRequest.cc GetDbStatusQservMgtRequest.cc GetConfigQservMgtRequest.cc + GetStatusQservHttpMgtRequest.cc GetStatusQservMgtRequest.cc HealthMonitorTask.cc HttpAsyncReqApp.cc @@ -130,6 +131,7 @@ target_sources(replica PRIVATE PurgeJob.cc QhttpTestApp.cc QservGetReplicasJob.cc + QservHttpMgtRequest.cc QservMgtRequest.cc QservMgtServices.cc QservStatusJob.cc diff --git a/src/replica/GetStatusQservHttpMgtRequest.cc b/src/replica/GetStatusQservHttpMgtRequest.cc new file mode 100644 index 000000000..cbd83a080 --- /dev/null +++ b/src/replica/GetStatusQservHttpMgtRequest.cc @@ -0,0 +1,123 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/GetStatusQservHttpMgtRequest.h" + +// System headers +#include +#include + +// Qserv headers +#include "http/AsyncReq.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace nlohmann; +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetStatusQservHttpMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +shared_ptr GetStatusQservHttpMgtRequest::create( + shared_ptr const& serviceProvider, string const& worker, + wbase::TaskSelector const& taskSelector, GetStatusQservHttpMgtRequest::CallbackType const& onFinish) { + return shared_ptr( + new GetStatusQservHttpMgtRequest(serviceProvider, worker, taskSelector, onFinish)); +} + +GetStatusQservHttpMgtRequest::GetStatusQservHttpMgtRequest( + shared_ptr const& serviceProvider, string const& worker, + wbase::TaskSelector const& taskSelector, GetStatusQservHttpMgtRequest::CallbackType const& onFinish) + : QservHttpMgtRequest(serviceProvider, "QSERV_GET_STATUS", worker), + _taskSelector(taskSelector), + _onFinish(onFinish) {} + +json const& GetStatusQservHttpMgtRequest::info() const { + if (!((state() == State::FINISHED) && (extendedState() == ExtendedState::SUCCESS))) { + throw logic_error("GetStatusQservHttpMgtRequest::" + string(__func__) + + " no info available in state: " + state2string(state(), extendedState())); + } + return _info; +} + +void GetStatusQservHttpMgtRequest::createHttpReqImpl(replica::Lock const& lock) { + auto const onHttpReqFinish = [self = shared_from_base()]( + shared_ptr const& request) { + self->_processResponse(request); + }; + string const service = "/status"; + string const query = wbase::taskSelectorToHttpQuery(_taskSelector); + createHttpReq(lock, onHttpReqFinish, service, query); +} + +void GetStatusQservHttpMgtRequest::notify(replica::Lock const& lock) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__); + notifyDefaultImpl(lock, _onFinish); +} + +void GetStatusQservHttpMgtRequest::_processResponse(shared_ptr const& request) { + string const context_ = context() + string(__func__) + " "; + if (state() == State::FINISHED) return; + replica::Lock const lock(_mtx, context_); + if (state() == State::FINISHED) return; + + switch (request->state()) { + case http::AsyncReq::State::FINISHED: + try { + _info = json::parse(request->responseBody()); + finish(lock, QservHttpMgtRequest::ExtendedState::SUCCESS); + } catch (exception const& ex) { + string const msg = "failed to parse worker response, ex: " + string(ex.what()); + finish(lock, QservHttpMgtRequest::ExtendedState::SERVER_BAD_RESPONSE, msg); + } + if (_info.at("success").get() == 0) { + string const msg = "worker reported error: " + _info.at("error").get(); + finish(lock, QservHttpMgtRequest::ExtendedState::SERVER_BAD, msg); + } + break; + case http::AsyncReq::State::FAILED: + finish(lock, QservHttpMgtRequest::ExtendedState::SERVER_ERROR, + request->errorMessage() + ", code: " + to_string(request->responseCode())); + break; + case http::AsyncReq::State::BODY_LIMIT_ERROR: + finish(lock, QservHttpMgtRequest::ExtendedState::BODY_LIMIT_ERROR, + request->errorMessage() + ", code: " + to_string(request->responseCode())); + break; + case http::AsyncReq::State::CANCELLED: + finish(lock, QservHttpMgtRequest::ExtendedState::CANCELLED); + break; + case http::AsyncReq::State::EXPIRED: + finish(lock, QservHttpMgtRequest::ExtendedState::TIMEOUT_EXPIRED); + break; + default: + throw runtime_error(context_ + "unsupported state of the HTTP request: " + + http::AsyncReq::state2str(request->state())); + } +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/GetStatusQservHttpMgtRequest.h b/src/replica/GetStatusQservHttpMgtRequest.h new file mode 100644 index 000000000..b5f5ecd55 --- /dev/null +++ b/src/replica/GetStatusQservHttpMgtRequest.h @@ -0,0 +1,109 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_GETSTATUSQSERVHTTPMGTREQUEST_H +#define LSST_QSERV_REPLICA_GETSTATUSQSERVHTTPMGTREQUEST_H + +// System headers +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "replica/QservHttpMgtRequest.h" +#include "wbase/TaskState.h" + +namespace lsst::qserv::replica { +class ServiceProvider; +} // namespace lsst::qserv::replica + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class GetStatusQservHttpMgtRequest is a request for obtaining various info + * (status, counters, monitoring) reported the Qserv workers. + */ +class GetStatusQservHttpMgtRequest : public QservHttpMgtRequest { +public: + /// The function type for notifications on the completion of the request + typedef std::function const&)> CallbackType; + + GetStatusQservHttpMgtRequest() = delete; + GetStatusQservHttpMgtRequest(GetStatusQservHttpMgtRequest const&) = delete; + GetStatusQservHttpMgtRequest& operator=(GetStatusQservHttpMgtRequest const&) = delete; + + ~GetStatusQservHttpMgtRequest() final = default; + + /** + * Static factory method is needed to prevent issues with the lifespan + * and memory management of instances created otherwise (as values or via + * low-level pointers). + * @param serviceProvider A reference to a provider of services for accessing + * Configuration, saving the request's persistent state to the database. + * @param worker The name of a worker to send the request to. + * @param taskSelector (optional) task selection criterias + * @param onFinish (optional) callback function to be called upon request completion. + * @return A pointer to the created object. + */ + static std::shared_ptr create( + std::shared_ptr const& serviceProvider, std::string const& worker, + wbase::TaskSelector const& taskSelector = wbase::TaskSelector(), + CallbackType const& onFinish = nullptr); + + /** + * @return The info object returned by the worker. + * @throw std::logic_error if called before the request finishes or if it failed. + */ + nlohmann::json const& info() const; + +protected: + /// @see QservHttpMgtRequest::createHttpReq() + void createHttpReqImpl(replica::Lock const& lock) final; + + /// @see QservHttpMgtRequest::notify() + void notify(replica::Lock const& lock) final; + +private: + /// @see GetStatusQservHttpMgtRequest::create() + GetStatusQservHttpMgtRequest(std::shared_ptr const& serviceProvider, + std::string const& worker, wbase::TaskSelector const& taskSelector, + CallbackType const& onFinish); + + /** + * Extract and process data of the completed request. + * @param The HTTP request to be analyzed. + */ + void _processResponse(std::shared_ptr const& request); + + // Input parameters + + wbase::TaskSelector const _taskSelector; + CallbackType _onFinish; ///< this object is reset after finishing the request + + /// The info object returned by the Qserv worker + nlohmann::json _info; +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_GETSTATUSQSERVHTTPMGTREQUEST_H diff --git a/src/replica/QservHttpMgtRequest.cc b/src/replica/QservHttpMgtRequest.cc new file mode 100644 index 000000000..9668943b0 --- /dev/null +++ b/src/replica/QservHttpMgtRequest.cc @@ -0,0 +1,262 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "replica/QservHttpMgtRequest.h" + +// System headers +#include + +// Qserv headers +#include "http/MetaModule.h" +#include "replica/Configuration.h" +#include "replica/Common.h" +#include "replica/DatabaseServices.h" + +// LSST headers +#include "lsst/log/Log.h" + +using namespace nlohmann; +using namespace std; + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.QservHttpMgtRequest"); + +} // namespace + +namespace lsst::qserv::replica { + +atomic QservHttpMgtRequest::_numClassInstances(0); + +string QservHttpMgtRequest::state2string(State state) { + switch (state) { + case State::CREATED: + return "CREATED"; + case State::IN_PROGRESS: + return "IN_PROGRESS"; + case State::FINISHED: + return "FINISHED"; + } + throw logic_error("QservHttpMgtRequest::" + string(__func__) + "(State) incomplete implementation"); +} + +string QservHttpMgtRequest::state2string(ExtendedState state) { + switch (state) { + case ExtendedState::NONE: + return "NONE"; + case ExtendedState::SUCCESS: + return "SUCCESS"; + case ExtendedState::CONFIG_ERROR: + return "CONFIG_ERROR"; + case ExtendedState::BODY_LIMIT_ERROR: + return "BODY_LIMIT_ERROR"; + case ExtendedState::SERVER_BAD: + return "SERVER_BAD"; + case ExtendedState::SERVER_CHUNK_IN_USE: + return "SERVER_CHUNK_IN_USE"; + case ExtendedState::SERVER_ERROR: + return "SERVER_ERROR"; + case ExtendedState::SERVER_BAD_RESPONSE: + return "SERVER_BAD_RESPONSE"; + case ExtendedState::TIMEOUT_EXPIRED: + return "TIMEOUT_EXPIRED"; + case ExtendedState::CANCELLED: + return "CANCELLED"; + } + throw logic_error("QservHttpMgtRequest::" + string(__func__) + + "(ExtendedState) incomplete implementation"); +} + +QservHttpMgtRequest::QservHttpMgtRequest(ServiceProvider::Ptr const& serviceProvider, string const& type, + string const& worker) + : _serviceProvider(serviceProvider), + _type(type), + _id(Generators::uniqueId()), + _worker(worker), + _state(State::CREATED), + _extendedState(ExtendedState::NONE) { + // This report is used solely for debugging purposes to allow tracking + // potential memory leaks within applications. + ++_numClassInstances; + LOGS(_log, LOG_LVL_TRACE, context() << "constructed instances: " << _numClassInstances); +} + +QservHttpMgtRequest::~QservHttpMgtRequest() { + --_numClassInstances; + LOGS(_log, LOG_LVL_TRACE, context() << "destructed instances: " << _numClassInstances); +} + +string QservHttpMgtRequest::state2string() const { + replica::Lock lock(_mtx, context() + __func__); + return state2string(state(), extendedState()) + "::" + serverError(lock); +} + +string QservHttpMgtRequest::serverError() const { + replica::Lock lock(_mtx, context() + __func__); + return serverError(lock); +} + +string QservHttpMgtRequest::serverError(replica::Lock const& lock) const { return _serverError; } + +string QservHttpMgtRequest::context() const { + return id() + " " + type() + " " + state2string(state(), extendedState()) + " "; +} + +Performance QservHttpMgtRequest::performance() const { + replica::Lock lock(_mtx, context() + __func__); + return performance(lock); +} + +Performance QservHttpMgtRequest::performance(replica::Lock const& lock) const { return _performance; } + +void QservHttpMgtRequest::start(string const& jobId, unsigned int requestExpirationIvalSec) { + string const context_ = context() + __func__; + LOGS(_log, LOG_LVL_TRACE, context_); + + replica::Lock lock(_mtx, context_); + _assertNotStarted(__func__); + + // This needs to be updated to override the default value of the counter + // which was created upon the object construction. + _performance.setUpdateStart(); + + // Check if configuration parameters are valid + auto const config = _serviceProvider->config(); + if (!config->isKnownWorker(_worker)) { + LOGS(_log, LOG_LVL_ERROR, context_ << " ** MISCONFIGURED ** unknown worker: '" << _worker << "'."); + _setState(lock, State::FINISHED, ExtendedState::CONFIG_ERROR); + notify(lock); + return; + } + + // Build an association with the corresponding parent job (optional). + _jobId = jobId; + + // Adjust the default value of the expiration ival (if requested) before + // creating and starting the request. + unsigned int actualExpirationIvalSec = requestExpirationIvalSec; + if (0 == actualExpirationIvalSec) { + actualExpirationIvalSec = config->get("xrootd", "request-timeout-sec"); + } + + createHttpReqImpl(lock); + + _httpRequest->setExpirationIval(actualExpirationIvalSec); + _httpRequest->start(); + + _setState(lock, State::IN_PROGRESS); +} + +void QservHttpMgtRequest::wait() { + _assertStarted(__func__); + _httpRequest->wait(); +} + +string const& QservHttpMgtRequest::jobId() const { + _assertStarted(__func__); + return _jobId; +} + +void QservHttpMgtRequest::cancel() { + _assertStarted(__func__); + _httpRequest->cancel(); +} + +void QservHttpMgtRequest::createHttpReq(replica::Lock const& lock, + http::AsyncReq::CallbackType const& onHttpReqFinish, + string const& service, string const& query) { + _assertNotStarted(__func__); + auto const getHostPort = [config = _serviceProvider->config(), + worker = _worker](http::AsyncReq::HostPort const&) -> http::AsyncReq::HostPort { + auto const info = config->workerInfo(worker); + return http::AsyncReq::HostPort{info.qservWorker.host.addr, info.qservWorker.port}; + }; + string const target = service + query + string(query.empty() ? "?" : "&") + + "instance_id=" + _serviceProvider->instanceId() + "&worker=" + _worker + + "&version=" + to_string(http::MetaModule::version); + _httpRequest = http::AsyncReq::create(_serviceProvider->io_service(), onHttpReqFinish, "GET", getHostPort, + target); +} + +void QservHttpMgtRequest::createHttpReq(replica::Lock const& lock, + http::AsyncReq::CallbackType const& onHttpReqFinish, + string const& method, string const& service, json const& body) { + _assertNotStarted(__func__); +} + +void QservHttpMgtRequest::finish(replica::Lock const& lock, ExtendedState extendedState, + string const& serverError) { + string const context_ = context() + __func__; + LOGS(_log, LOG_LVL_DEBUG, context_ << " serverError:" << serverError); + + // Set the optional server error state as well. + // IMPORTANT: this needs to be done before performing the state transition to insure + // clients will get a consistent view onto the object state. + _serverError = serverError; + + // Set new state to make sure all event handlers will recognize + // this scenario and avoid making any modifications to the request's state. + _setState(lock, State::FINISHED, extendedState); + + // We have to update the timestamp before invoking a user provided + // callback on the completion of the operation. + _performance.setUpdateFinish(); + + // TODO: temporarily disabled while this class is not supported by + // the persistent backend. + // + // _serviceProvider->databaseServices()->saveState(*this, _performance, _serverError); + + notify(lock); +} + +void QservHttpMgtRequest::_assertStarted(string const& func, bool mustBeStarted) const { + string const context_ = context() + func; + LOGS(_log, LOG_LVL_TRACE, context_); + if (mustBeStarted) { + if (State::CREATED == _state) { + throw logic_error(context_ + " the request was not started."); + } + } else { + if (State::CREATED != _state) { + throw logic_error(context_ + " the request was already started."); + } + } +} + +void QservHttpMgtRequest::_setState(replica::Lock const& lock, State newState, + ExtendedState newExtendedState) { + LOGS(_log, LOG_LVL_TRACE, context() << __func__ << " " << state2string(newState, newExtendedState)); + + // IMPORTANT: the top-level state is the last to be set when performing + // the state transition to insure clients will get a consistent view onto + // the object state. + _extendedState = newExtendedState; + _state = newState; + + // TODO: temporarily disabled while this class is not supported by + // the persistent backend. + // + // _serviceProvider->databaseServices()->saveState(*this, _performance, _serverError); +} + +} // namespace lsst::qserv::replica diff --git a/src/replica/QservHttpMgtRequest.h b/src/replica/QservHttpMgtRequest.h new file mode 100644 index 000000000..109b8d0d5 --- /dev/null +++ b/src/replica/QservHttpMgtRequest.h @@ -0,0 +1,349 @@ +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +#ifndef LSST_QSERV_REPLICA_QSERVHTTPMGTREQUEST_H +#define LSST_QSERV_REPLICA_QSERVHTTPMGTREQUEST_H + +// System headers +#include +#include +#include +#include +#include +#include + +// Third party headers +#include "nlohmann/json.hpp" + +// Qserv headers +#include "http/AsyncReq.h" +#include "replica/Mutex.h" +#include "replica/Performance.h" +#include "replica/ServiceProvider.h" + +// This header declarations +namespace lsst::qserv::replica { + +/** + * Class QservHttpMgtRequest is a base class for a family of the Qserv worker + * management requests within the master server. + */ +class QservHttpMgtRequest : public std::enable_shared_from_this { +public: + typedef std::shared_ptr Ptr; + + /// The lock type used by the implementations + typedef std::lock_guard LockType; + + /// The type which represents the primary public state of the request + enum State { + CREATED, ///< The request has been constructed, and no attempt to execute + /// it has been made. + IN_PROGRESS, ///< The request is in a progress. + FINISHED ///< The request is finished. See extended status for more details + /// (the completion status, etc.) + }; + + /// @return the string representation of the primary state + static std::string state2string(State state); + + /// Type ExtendedState represents the refined public sub-state of the request + /// once it's FINISHED as per the above defined primary state. + enum ExtendedState { + NONE, ///< No extended state exists at this time. + SUCCESS, ///< The request has been fully implemented. + CONFIG_ERROR, ///< Problems with request configuration were detected. + BODY_LIMIT_ERROR, ///< Response's body is larger than requested. + SERVER_BAD, ///< Server reports that the request can not be implemented because + /// of configuration or request's parameters problems. + SERVER_CHUNK_IN_USE, ///< Server reports that the request can not be implemented because + /// some of the required remote resources (chunks, etc.) are in use. + SERVER_ERROR, ///< The request could not be implemented due to an unrecoverable + /// server-side error. + SERVER_BAD_RESPONSE, ///< Data received from a server can't be correctly interpreted. + TIMEOUT_EXPIRED, ///< Expired due to a timeout. + CANCELLED ///< Explicitly cancelled on the client-side. + }; + + /// @return the string representation of the extended state + static std::string state2string(ExtendedState state); + + /// @return the string representation of the combined state + static std::string state2string(State state, ExtendedState extendedState) { + return state2string(state) + "::" + state2string(extendedState); + } + + QservHttpMgtRequest() = delete; + QservHttpMgtRequest(QservHttpMgtRequest const&) = delete; + QservHttpMgtRequest& operator=(QservHttpMgtRequest const&) = delete; + + virtual ~QservHttpMgtRequest(); + + /// @return reference to a provider of services + std::shared_ptr const& serviceProvider() { return _serviceProvider; } + + /// @return string representing of the request type. + std::string const& type() const { return _type; } + + /// @return unique identifier of the request + std::string const& id() const { return _id; } + + /// @return name of a worker + std::string const& worker() const { return _worker; } + + /// @return primary status of the request + State state() const { return _state; } + + /// @return extended status of the request + ExtendedState extendedState() const { return _extendedState; } + + /// @return string representation of the combined state of the object + std::string state2string() const; + + /// @return error message (if any) reported by the remote service. + std::string serverError() const; + + /// @return performance info + Performance performance() const; + + /** + * Return an identifier if the owning job (if the request has started) + * @throws std::logic_error If the request hasn't started. + */ + std::string const& jobId() const; + + /** + * Begin processing the request. + * @param jobId (Optional) identifier of a job specifying a context of the request. + * @param requestExpirationIvalSec (Optional) parameter (if differs from 0) allowing + * to override the default value of the corresponding parameter from the Configuration. + */ + void start(std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); + + /// Wait for the completion of the request + void wait(); + + /** + * Explicitly cancel any asynchronous operation(s) and put the object into + * the FINISHED::CANCELLED state. This operation is very similar to the + * timeout-based request expiration, except it's requested explicitly. + */ + void cancel(); + + /// @return The context string for debugging and diagnostic printouts. + std::string context() const; + + /** + * @return A dictionary of parameters and the corresponding values to + * be stored in a database for a request. + */ + virtual std::list> extendedPersistentState() const { + return std::list>(); + } + +protected: + /** + * Construct the request with the pointer to the services provider. + * @param serviceProvider Is required to access configuration services. + * @param type The type name of he request (used for debugging and error reporting). + * @param worker The name of a worker. + */ + QservHttpMgtRequest(std::shared_ptr const& serviceProvider, std::string const& type, + std::string const& worker); + + /// @return A shared pointer of the desired subclass (no dynamic type checking). + template + std::shared_ptr shared_from_base() { + return std::static_pointer_cast(shared_from_this()); + } + + /** + * Create an HTTP request. + * + * The methods is required to be provided by subclasses for creating + * subclass-specific requests using the coresponding helper methods. + * + * @see QservHttpMgtRequest::createHttpReq + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + */ + virtual void createHttpReqImpl(replica::Lock const& lock) = 0; + + /** + * Create an HTTP "GET" request, but do not start it yet. + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @throw std::logic_error If the method is called while the curent state + * is not State::CREATED, or if the HTTP request was already created. + */ + void createHttpReq(replica::Lock const& lock, http::AsyncReq::CallbackType const& onHttpReqFinish, + std::string const& service, std::string const& query); + + /** + * Create an HTTP request ("POST", "PUT" or "DELETE") that has the JSON body, + * but do not start it yet. + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @throw std::logic_error If the method is called while the curent state + * is not State::CREATED, or if the HTTP request was already created. + */ + void createHttpReq(replica::Lock const& lock, http::AsyncReq::CallbackType const& onHttpReqFinish, + std::string const& method, std::string const& service, nlohmann::json const& body); + + /** + * Finalize request processing (as reported by subclasses) + * + * This is supposed to be the last operation to be called by subclasses + * upon a completion of the request. + * + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param extendedState The new extended state. + * @param serverError (optional) error message from a Qserv worker service. + */ + void finish(replica::Lock const& lock, ExtendedState extendedState, std::string const& serverError = ""); + + /** + * Start user-notification protocol (in case if user-defined notifiers + * were provided to a subclass). The callback is expected to be made + * asynchronously in a separate thread to avoid blocking the current thread. + * + * This method has to be provided by subclasses to forward + * notification on request completion to a client which initiated + * the request, etc. + * + * The standard implementation of this method in a context of some + * subclass 'T' should looks like this: + * @code + * void T::notify(replica::Lock const& lock) { + * notifyDefaultImpl(lock, _onFinish); + * } + * @code + * @see QservHttpMgtRequest::notifyDefaultImpl + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + */ + virtual void notify(replica::Lock const& lock) = 0; + + /** + * The helper function which pushes up-stream notifications on behalf of + * subclasses. Upon a completion of this method the callback function + * object will get reset to 'nullptr'. + * @note This default implementation works for callback functions which + * accept a single parameter - a smart reference onto an object of + * the corresponding subclass. Subclasses with more complex signatures of + * their callbacks should have their own implementations which may look + * similarly to this one. + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param onFinish A callback function (if set) to be called. + */ + template + void notifyDefaultImpl(replica::Lock const& lock, typename T::CallbackType& onFinish) { + if (nullptr != onFinish) { + // Clearing the stored callback after finishing the up-stream notification + // has two purposes: + // 1. it guaranties (exactly) one time notification + // 2. it breaks the up-stream dependency on a caller object if a shared + // pointer to the object was mentioned as the lambda-function's closure + serviceProvider()->io_service().post(std::bind(std::move(onFinish), shared_from_base())); + onFinish = nullptr; + } + } + + /** + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @return A server error string (if any). + */ + std::string serverError(replica::Lock const& lock) const; + + /** + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @return The performance info. + */ + Performance performance(replica::Lock const& lock) const; + + /// Mutex guarding internal state (also used by subclasses) + mutable replica::Mutex _mtx; + +private: + /** + * Ensure the request is in the desired state. + * @note The method doesn't require a lock on the mutex _mtx to be acqured before + * being called neither it acquires the lock by itself since the implementation + * of the method relies on checking a value of the atomic variable representing + * the primary state of the request. + * @param func A context from which the state test is requested (for tracing + * and error reporting). + * @param mustBeStarted 'true' if the request is expected to be in any + * state past State::CREATED, or 'false' if it's expected to be exactly + * in State::CREATED. + * @throws std::logic_error If the request was not found in the desired state. + */ + void _assertStarted(std::string const& func, bool mustBeStarted) const; + + // Shortcut methods based on the above defined one. + + void _assertStarted(std::string const& func) const { _assertStarted(func, true); } + void _assertNotStarted(std::string const& func) const { _assertStarted(func, false); } + + /** + * Set the desired primary and extended state. + * + * The change of the state is done via a method to allow extra actions + * at this step, such as: 1) reporting change state in a debug stream, + * or 2) verifying the correctness of the state transition. + * + * @param lock A lock on QservHttpMgtRequest::_mtx must be acquired before + * calling this method. + * @param state The primary state of the request. + * @param extendedState The extended state of the request. + */ + void _setState(replica::Lock const& lock, State state, ExtendedState extendedState = ExtendedState::NONE); + + /// The global counter for the number of instances of any subclass + static std::atomic _numClassInstances; + + // Input parameters + + std::shared_ptr const _serviceProvider; + + std::string const _type; + std::string const _id; + std::string const _worker; + + // Two-level state of a request + std::atomic _state; ///< The primary state. + std::atomic _extendedState; ///< The sub-state. + + /// Error message (if any) reported by the remote service + std::string _serverError; + + Performance _performance; ///< Performance counters. + std::string _jobId; ///< An identifier of the parent job which started the request. + + std::shared_ptr _httpRequest; ///< The actual request sent to the worker. +}; + +} // namespace lsst::qserv::replica + +#endif // LSST_QSERV_REPLICA_QSERVHTTPMGTREQUEST_H