diff --git a/src/replica/apps/WorkerApp.cc b/src/replica/apps/WorkerApp.cc
index 31c023640..f1362b91a 100644
--- a/src/replica/apps/WorkerApp.cc
+++ b/src/replica/apps/WorkerApp.cc
@@ -39,7 +39,7 @@
#include "replica/services/ServiceProvider.h"
#include "replica/util/FileUtils.h"
#include "replica/worker/FileServer.h"
-#include "replica/worker/WorkerProcessor.h"
+#include "replica/worker/WorkerHttpSvc.h"
#include "replica/worker/WorkerServer.h"
// LSST headers
@@ -113,6 +113,9 @@ int WorkerApp::runImpl() {
auto const reqProcSvr = WorkerServer::create(serviceProvider(), worker);
thread reqProcSvrThread([reqProcSvr]() { reqProcSvr->run(); });
+ auto const reqProcHttpSvr = WorkerHttpSvc::create(serviceProvider(), worker);
+ thread reqProcHttpSvrThread([reqProcHttpSvr]() { reqProcHttpSvr->run(); });
+
auto const fileSvr = FileServer::create(serviceProvider(), worker);
thread fileSvrThread([fileSvr]() { fileSvr->run(); });
diff --git a/src/replica/proto/CMakeLists.txt b/src/replica/proto/CMakeLists.txt
index b61599d8c..7eb8d830d 100644
--- a/src/replica/proto/CMakeLists.txt
+++ b/src/replica/proto/CMakeLists.txt
@@ -4,4 +4,5 @@ add_library(replica_proto OBJECT)
target_sources(replica_proto PRIVATE
${REPLICA_PB_SRCS}
${REPLICA_PB_HDRS}
+ Protocol.cc
)
diff --git a/src/replica/proto/Protocol.cc b/src/replica/proto/Protocol.cc
new file mode 100644
index 000000000..3c4b878c3
--- /dev/null
+++ b/src/replica/proto/Protocol.cc
@@ -0,0 +1,134 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+
+// Class header
+#include "replica/proto/Protocol.h"
+
+// System headers
+#include
+
+using namespace std;
+
+namespace lsst::qserv::replica::protocol {
+
+std::string toString(Status status) {
+ switch (status) {
+ case Status::CREATED:
+ return "CREATED";
+ case Status::SUCCESS:
+ return "SUCCESS";
+ case Status::QUEUED:
+ return "QUEUED";
+ case Status::IN_PROGRESS:
+ return "IN_PROGRESS";
+ case Status::IS_CANCELLING:
+ return "IS_CANCELLING";
+ case Status::BAD:
+ return "BAD";
+ case Status::FAILED:
+ return "FAILED";
+ case Status::CANCELLED:
+ return "CANCELLED";
+ default:
+ throw logic_error("Unhandled status: " + to_string(static_cast(status)));
+ }
+}
+
+std::string toString(StatusExt extendedStatus) {
+ switch (extendedStatus) {
+ case StatusExt::NONE:
+ return "NONE";
+ case StatusExt::INVALID_PARAM:
+ return "INVALID_PARAM";
+ case StatusExt::INVALID_ID:
+ return "INVALID_ID";
+ case StatusExt::FOLDER_STAT:
+ return "FOLDER_STAT";
+ case StatusExt::FOLDER_CREATE:
+ return "FOLDER_CREATE";
+ case StatusExt::FILE_STAT:
+ return "FILE_STAT";
+ case StatusExt::FILE_SIZE:
+ return "FILE_SIZE";
+ case StatusExt::FOLDER_READ:
+ return "FOLDER_READ";
+ case StatusExt::FILE_READ:
+ return "FILE_READ";
+ case StatusExt::FILE_ROPEN:
+ return "FILE_ROPEN";
+ case StatusExt::FILE_CREATE:
+ return "FILE_CREATE";
+ case StatusExt::FILE_OPEN:
+ return "FILE_OPEN";
+ case StatusExt::FILE_RESIZE:
+ return "FILE_RESIZE";
+ case StatusExt::FILE_WRITE:
+ return "FILE_WRITE";
+ case StatusExt::FILE_COPY:
+ return "FILE_COPY";
+ case StatusExt::FILE_DELETE:
+ return "FILE_DELETE";
+ case StatusExt::FILE_RENAME:
+ return "FILE_RENAME";
+ case StatusExt::FILE_EXISTS:
+ return "FILE_EXISTS";
+ case StatusExt::SPACE_REQ:
+ return "SPACE_REQ";
+ case StatusExt::NO_FOLDER:
+ return "NO_FOLDER";
+ case StatusExt::NO_FILE:
+ return "NO_FILE";
+ case StatusExt::NO_ACCESS:
+ return "NO_ACCESS";
+ case StatusExt::NO_SPACE:
+ return "NO_SPACE";
+ case StatusExt::FILE_MTIME:
+ return "FILE_MTIME";
+ case StatusExt::MYSQL_ERROR:
+ return "MYSQL_ERROR";
+ case StatusExt::LARGE_RESULT:
+ return "LARGE_RESULT";
+ case StatusExt::NO_SUCH_TABLE:
+ return "NO_SUCH_TABLE";
+ case StatusExt::NOT_PARTITIONED_TABLE:
+ return "NOT_PARTITIONED_TABLE";
+ case StatusExt::NO_SUCH_PARTITION:
+ return "NO_SUCH_PARTITION";
+ case StatusExt::MULTIPLE:
+ return "MULTIPLE";
+ case StatusExt::OTHER_EXCEPTION:
+ return "OTHER_EXCEPTION";
+ case StatusExt::FOREIGN_INSTANCE:
+ return "FOREIGN_INSTANCE";
+ case StatusExt::DUPLICATE_KEY:
+ return "DUPLICATE_KEY";
+ case StatusExt::CANT_DROP_KEY:
+ return "CANT_DROP_KEY";
+ default:
+ throw logic_error("Unhandled extended status: " + to_string(static_cast(extendedStatus)));
+ }
+}
+
+string toString(Status status, StatusExt extendedStatus) {
+ return toString(status) + "::" + toString(extendedStatus);
+}
+
+} // namespace lsst::qserv::replica::protocol
diff --git a/src/replica/proto/Protocol.h b/src/replica/proto/Protocol.h
new file mode 100644
index 000000000..42ddf12e2
--- /dev/null
+++ b/src/replica/proto/Protocol.h
@@ -0,0 +1,136 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+#ifndef LSST_QSERV_REPLICA_PROTOCOL_H
+#define LSST_QSERV_REPLICA_PROTOCOL_H
+
+// System headers
+#include
+
+// Third party headers
+#include "nlohmann/json.hpp"
+
+// This header declarations
+namespace lsst::qserv::replica::protocol {
+
+/// Subtypes of the SQL requests.
+enum class SqlRequestType : int {
+
+ QUERY = 0,
+ CREATE_DATABASE = 1,
+ DROP_DATABASE = 2,
+ ENABLE_DATABASE = 3, ///< in Qserv
+ DISABLE_DATABASE = 4, ///< in Qserv
+ GRANT_ACCESS = 5,
+ CREATE_TABLE = 6,
+ DROP_TABLE = 7,
+ REMOVE_TABLE_PARTITIONING = 8,
+ DROP_TABLE_PARTITION = 9,
+ GET_TABLE_INDEX = 10,
+ CREATE_TABLE_INDEX = 11,
+ DROP_TABLE_INDEX = 12,
+ ALTER_TABLE = 13,
+ TABLE_ROW_STATS = 14
+};
+
+/// Types of the table indexes specified in the index management requests requests.
+enum class SqlIndexSpec : int { DEFAULT = 1, UNIQUE = 2, FULLTEXT = 3, SPATIAL = 4 };
+
+/// Status values returned by all request related to operations with
+/// replicas. Request management operations always return messages whose types
+/// match the return types of the corresponding (original) replica-related requests.
+/// Service management requests have their own set of status values.
+///
+enum class Status : int {
+ CREATED = 0,
+ SUCCESS = 1,
+ QUEUED = 2,
+ IN_PROGRESS = 3,
+ IS_CANCELLING = 4,
+ BAD = 5,
+ FAILED = 6,
+ CANCELLED = 7
+};
+
+enum class StatusExt : int {
+ NONE = 0, ///< Unspecified problem.
+ INVALID_PARAM = 1, ///< Invalid parameter(s) of a request.
+ INVALID_ID = 2, ///< An invalid request identifier.
+ FOLDER_STAT = 4, ///< Failed to obtain fstat() for a folder.
+ FOLDER_CREATE = 5, ///< Failed to create a folder.
+ FILE_STAT = 6, ///< Failed to obtain fstat() for a file.
+ FILE_SIZE = 7, ///< Failed to obtain a size of a file.
+ FOLDER_READ = 8, ///< Failed to read the contents of a folder.
+ FILE_READ = 9, ///< Failed to read the contents of a file.
+ FILE_ROPEN = 10, ///< Failed to open a remote file.
+ FILE_CREATE = 11, ///< Failed to create a file.
+ FILE_OPEN = 12, ///< Failed to open a file.
+ FILE_RESIZE = 13, ///< Failed to resize a file.
+ FILE_WRITE = 14, ///< Failed to write into a file.
+ FILE_COPY = 15, ///< Failed to copy a file.
+ FILE_DELETE = 16, ///< Failed to delete a file.
+ FILE_RENAME = 17, ///< Failed to rename a file.
+ FILE_EXISTS = 18, ///< File already exists.
+ SPACE_REQ = 19, ///< Space availability check failed.
+ NO_FOLDER = 20, ///< Folder doesn't exist.
+ NO_FILE = 21, ///< File doesn't exist.
+ NO_ACCESS = 22, ///< No access to a file or a folder.
+ NO_SPACE = 23, ///< No space left on a device as required by an operation.
+ FILE_MTIME = 24, ///< Get/set 'mtime' operation failed.
+ MYSQL_ERROR = 25, ///< General MySQL error (other than any specific ones listed here).
+ LARGE_RESULT = 26, ///< Result exceeds a limit set in a request.
+ NO_SUCH_TABLE = 27, ///< No table found while performing a MySQL operation.
+ NOT_PARTITIONED_TABLE = 28, ///< The table is not MySQL partitioned as it was expected.
+ NO_SUCH_PARTITION = 29, ///< No MySQL partition found in a table as it was expected.
+ MULTIPLE = 30, ///< Multiple unspecified errors encountered when processing a request.
+ OTHER_EXCEPTION = 31, ///< Other exception not listed here.
+ FOREIGN_INSTANCE = 32, ///< Detected a request from a Controller serving an unrelated Qserv.
+ DUPLICATE_KEY = 33, ///< Duplicate key found when creating an index or altering a table schema.
+ CANT_DROP_KEY = 34 ///< Can't drop a field or a key which doesn't exist.
+};
+
+/// @return the string representation of the status
+std::string toString(Status status);
+
+/// @return the string representation of the extended status
+std::string toString(StatusExt extendedStatus);
+
+/// @return the string representation of the full status
+std::string toString(Status status, StatusExt extendedStatus);
+
+/// Status of a replica.
+enum class ReplicaStatus : int { NOT_FOUND = 0, CORRUPT = 1, INCOMPLETE = 2, COMPLETE = 3 };
+
+/// Status of a service.
+enum class ServiceState : int { SUSPEND_IN_PROGRESS = 0, SUSPENDED = 1, RUNNING = 2 };
+
+/// The header to be sent with the requests processed through the worker's queueing system.
+struct QueuedRequestHdr {
+ std::string id;
+ int priority;
+ unsigned int timeout;
+ QueuedRequestHdr(std::string const& id_, int priority_, unsigned int timeout_)
+ : id(id_), priority(priority_), timeout(timeout_) {}
+ nlohmann::json toJson() const { return {{"id", id}, {"priority", priority}, {"timeout", timeout}}; };
+};
+
+} // namespace lsst::qserv::replica::protocol
+
+#endif // LSST_QSERV_REPLICA_PROTOCOL_H
diff --git a/src/replica/util/Performance.cc b/src/replica/util/Performance.cc
index 8e3292d68..ae30b0ac3 100644
--- a/src/replica/util/Performance.cc
+++ b/src/replica/util/Performance.cc
@@ -30,6 +30,7 @@
#include "lsst/log/Log.h"
using namespace std;
+using json = nlohmann::json;
namespace {
diff --git a/src/replica/util/Performance.h b/src/replica/util/Performance.h
index fcbfd394a..15320d08b 100644
--- a/src/replica/util/Performance.h
+++ b/src/replica/util/Performance.h
@@ -33,6 +33,9 @@
#include
#include
+// Third party headers
+#include "nlohmann/json.hpp"
+
// Forward declarations
namespace lsst::qserv::replica {
class ProtocolPerformance;
@@ -56,7 +59,6 @@ class Performance {
* All (but the request creation one) timestamps will be initialized with 0.
*/
Performance();
-
Performance(Performance const&) = default;
Performance& operator=(Performance const&) = default;
@@ -64,45 +66,28 @@ class Performance {
/**
* Update object state with counters from the protocol buffer object
- *
- * @param workerPerformanceInfo
- * counters to be carried over into an internal state
+ * @param workerPerformanceInfo counters to be carried over into an internal state
*/
void update(ProtocolPerformance const& workerPerformanceInfo);
/**
* Update the Controller's 'start' time
- *
- * @return
- * the previous state of the counter
+ * @return the previous state of the counter
*/
uint64_t setUpdateStart();
/**
* Update the Controller's 'finish' time
- *
- * @return
- * the previous state of the counter
+ * @return the previous state of the counter
*/
uint64_t setUpdateFinish();
- /// Created by the Controller
- uint64_t c_create_time;
-
- /// Started by the Controller
- uint64_t c_start_time;
-
- /// Received by a worker service
- uint64_t w_receive_time;
-
- /// Execution started by a worker service
- uint64_t w_start_time;
-
- /// Execution finished by a worker service
- uint64_t w_finish_time;
-
- /// A subscriber notified by the Controller
- uint64_t c_finish_time;
+ uint64_t c_create_time; ///< Created by the Controller
+ uint64_t c_start_time; ///< Started by the Controller
+ uint64_t w_receive_time; ///< Received by a worker service
+ uint64_t w_start_time; ///< Execution started by a worker service
+ uint64_t w_finish_time; ///< Execution finished by a worker service
+ uint64_t c_finish_time; ///< A subscriber notified by the Controller
};
/// Overloaded streaming operator for class Performance
@@ -127,6 +112,7 @@ class WorkerPerformance {
uint64_t setUpdateFinish();
std::unique_ptr info() const;
+ nlohmann::json toJson() const;
std::atomic receive_time; ///< Received by a worker service
std::atomic start_time; ///< Execution started by a worker service
diff --git a/src/replica/worker/CMakeLists.txt b/src/replica/worker/CMakeLists.txt
index a37868d82..c1ae9a057 100644
--- a/src/replica/worker/CMakeLists.txt
+++ b/src/replica/worker/CMakeLists.txt
@@ -6,9 +6,15 @@ target_sources(replica_worker PRIVATE
FileServerConnection.cc
WorkerDeleteRequest.cc
WorkerDirectorIndexRequest.cc
+ WorkerEchoHttpRequest.cc
WorkerEchoRequest.cc
WorkerFindAllRequest.cc
WorkerFindRequest.cc
+ WorkerHttpProcessor.cc
+ WorkerHttpProcessorThread.cc
+ WorkerHttpRequest.cc
+ WorkerHttpSvc.cc
+ WorkerHttpSvcMod.cc
WorkerProcessor.cc
WorkerProcessorThread.cc
WorkerReplicationRequest.cc
diff --git a/src/replica/worker/WorkerEchoHttpRequest.cc b/src/replica/worker/WorkerEchoHttpRequest.cc
new file mode 100644
index 000000000..e2f224e12
--- /dev/null
+++ b/src/replica/worker/WorkerEchoHttpRequest.cc
@@ -0,0 +1,87 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+
+// Class header
+#include "replica/worker/WorkerEchoHttpRequest.h"
+
+// System headers
+#include
+
+// Qserv headers
+#include "util/BlockPost.h"
+
+// LSST headers
+#include "lsst/log/Log.h"
+
+using namespace std;
+using json = nlohmann::json;
+
+namespace {
+
+LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerEchoHttpRequest");
+
+} // namespace
+
+namespace lsst::qserv::replica {
+
+shared_ptr WorkerEchoHttpRequest::create(
+ shared_ptr const& serviceProvider, string const& worker,
+ protocol::QueuedRequestHdr const& hdr, json const& req, ExpirationCallbackType const& onExpired) {
+ auto ptr = shared_ptr(
+ new WorkerEchoHttpRequest(serviceProvider, worker, hdr, req, onExpired));
+ ptr->init();
+ return ptr;
+}
+
+WorkerEchoHttpRequest::WorkerEchoHttpRequest(shared_ptr const& serviceProvider,
+ string const& worker, protocol::QueuedRequestHdr const& hdr,
+ json const& req, ExpirationCallbackType const& onExpired)
+ : WorkerHttpRequest(serviceProvider, worker, "TEST_ECHO", hdr, req, onExpired),
+ _delay(req.at("delay")),
+ _delayLeft(req.at("delay")) {}
+
+void WorkerEchoHttpRequest::getResult(json& result) const {
+ LOGS(_log, LOG_LVL_DEBUG, context(__func__));
+ replica::Lock lock(_mtx, context(__func__));
+ result["data"] = req().at("data");
+}
+
+bool WorkerEchoHttpRequest::execute() {
+ LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " _delay:" << _delay << " _delayLeft:" << _delayLeft);
+
+ replica::Lock lock(_mtx, context(__func__));
+ checkIfCancelling(lock, __func__);
+
+ // Block the thread for the random number of milliseconds in the interval
+ // below. Then update the amount of time which is still left.
+ util::BlockPost blockPost(1000, 2000);
+ uint64_t const span = blockPost.wait();
+ _delayLeft -= (span < _delayLeft) ? span : _delayLeft;
+
+ // Done if have reached or exceeded the initial delay
+ if (0 == _delayLeft) {
+ setStatus(lock, protocol::Status::SUCCESS);
+ return true;
+ }
+ return false;
+}
+
+} // namespace lsst::qserv::replica
diff --git a/src/replica/worker/WorkerEchoHttpRequest.h b/src/replica/worker/WorkerEchoHttpRequest.h
new file mode 100644
index 000000000..94940cc68
--- /dev/null
+++ b/src/replica/worker/WorkerEchoHttpRequest.h
@@ -0,0 +1,94 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+#ifndef LSST_QSERV_REPLICA_WORKERECHOHTTPREQUEST_H
+#define LSST_QSERV_REPLICA_WORKERECHOHTTPREQUEST_H
+
+// System headers
+#include
+#include
+
+// Qserv headers
+#include "replica/worker/WorkerHttpRequest.h"
+
+// Third party headers
+#include "nlohmann/json.hpp"
+
+// Forward declarations
+
+namespace lsst::qserv::replica {
+class ServiceProvider;
+} // namespace lsst::qserv::replica
+
+namespace lsst::qserv::replica::protocol {
+struct QueuedRequestHdr;
+} // namespace lsst::qserv::replica::protocol
+
+// This header declarations
+namespace lsst::qserv::replica {
+
+/**
+ * Class WorkerEchoHttpRequest implements test requests within the worker servers.
+ * Requests of this type don't have any side effects (in terms of modifying
+ * any files or databases).
+ */
+class WorkerEchoHttpRequest : public WorkerHttpRequest {
+public:
+ /**
+ * Static factory method is needed to prevent issue with the lifespan
+ * and memory management of instances created otherwise (as values or via
+ * low-level pointers).
+ *
+ * @param serviceProvider provider is needed to access the Configuration
+ * of a setup and for validating the input parameters
+ * @param worker the name of a worker. The name must match the worker which
+ * is going to execute the request.
+ * @param hdr request header (common parameters of the queued request)
+ * @param req the request object received from a client (request-specific parameters)
+ * @param onExpired request expiration callback function
+ * @return pointer to the created object
+ */
+ static std::shared_ptr create(
+ std::shared_ptr const& serviceProvider, std::string const& worker,
+ protocol::QueuedRequestHdr const& hdr, nlohmann::json const& req,
+ ExpirationCallbackType const& onExpired);
+
+ WorkerEchoHttpRequest() = delete;
+ WorkerEchoHttpRequest(WorkerEchoHttpRequest const&) = delete;
+ WorkerEchoHttpRequest& operator=(WorkerEchoHttpRequest const&) = delete;
+
+ ~WorkerEchoHttpRequest() override = default;
+
+ bool execute() override;
+
+protected:
+ WorkerEchoHttpRequest(std::shared_ptr const& serviceProvider, std::string const& worker,
+ protocol::QueuedRequestHdr const& hdr, nlohmann::json const& req,
+ ExpirationCallbackType const& onExpired);
+
+ void getResult(nlohmann::json& result) const override;
+
+ uint64_t _delay; ///< The amount of the initial delay
+ uint64_t _delayLeft; ///< The amount of the initial delay which is still left
+};
+
+} // namespace lsst::qserv::replica
+
+#endif // LSST_QSERV_REPLICA_WORKERECHOHTTPREQUEST_H
diff --git a/src/replica/worker/WorkerHttpProcessor.cc b/src/replica/worker/WorkerHttpProcessor.cc
new file mode 100644
index 000000000..9b1918933
--- /dev/null
+++ b/src/replica/worker/WorkerHttpProcessor.cc
@@ -0,0 +1,547 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+
+// Class header
+#include "replica/worker/WorkerHttpProcessor.h"
+
+// System headers
+#include
+#include
+#include
+
+// Qserv headers
+#include "replica/config/Configuration.h"
+#include "replica/mysql/DatabaseMySQL.h"
+#include "replica/services/ServiceProvider.h"
+#include "replica/worker/WorkerHttpProcessorThread.h"
+#include "replica/worker/WorkerHttpRequest.h"
+// #include "replica/worker/WorkerCreateReplicaHttpRequest.h"
+// #include "replica/worker/WorkerDeleteReplicaHttpRequest.h"
+// #include "replica/worker/WorkerDirectorIndexHttpRequest.h"
+#include "replica/worker/WorkerEchoHttpRequest.h"
+// #include "replica/worker/WorkerFindReplicaHttpRequest.h"
+// #include "replica/worker/WorkerFindAllReplicasHttpRequest.h"
+// #include "replica/worker/WorkerSqlHttpRequest.h"
+#include "util/BlockPost.h"
+#include "util/TimeUtils.h"
+
+// LSST headers
+#include "lsst/log/Log.h"
+
+using namespace std;
+using namespace std::placeholders;
+using json = nlohmann::json;
+
+namespace {
+LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerHttpProcessor");
+} // namespace
+
+namespace lsst::qserv::replica {
+
+bool WorkerHttpProcessor::PriorityQueueType::remove(string const& id) {
+ auto itr = find_if(c.begin(), c.end(),
+ [&id](shared_ptr const& ptr) { return ptr->id() == id; });
+ if (itr != c.end()) {
+ c.erase(itr);
+ make_heap(c.begin(), c.end(), comp);
+ return true;
+ }
+ return false;
+}
+
+shared_ptr WorkerHttpProcessor::create(
+ shared_ptr const& serviceProvider, string const& worker) {
+ return shared_ptr(new WorkerHttpProcessor(serviceProvider, worker));
+}
+
+WorkerHttpProcessor::WorkerHttpProcessor(shared_ptr const& serviceProvider,
+ string const& worker)
+ : _serviceProvider(serviceProvider),
+ _worker(worker),
+ _connectionPool(database::mysql::ConnectionPool::create(
+ Configuration::qservWorkerDbParams(),
+ serviceProvider->config()->get("database", "services-pool-size"))),
+ _state(protocol::ServiceState::SUSPENDED),
+ _startTime(util::TimeUtils::now()) {}
+
+void WorkerHttpProcessor::run() {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__));
+ replica::Lock lock(_mtx, _context(__func__));
+
+ if (_state == protocol::ServiceState::SUSPENDED) {
+ size_t const numThreads =
+ _serviceProvider->config()->get("worker", "num-svc-processing-threads");
+ if (not numThreads) {
+ throw out_of_range(_classMethodContext(__func__) +
+ "invalid configuration parameter for the number of processing threads. "
+ "The value of the parameter must be greater than 0");
+ }
+
+ // Create threads if needed
+ if (_threads.empty()) {
+ auto const self = shared_from_this();
+ for (size_t i = 0; i < numThreads; ++i) {
+ _threads.push_back(WorkerHttpProcessorThread::create(self));
+ }
+ }
+
+ // Tell each thread to run
+ for (auto&& t : _threads) {
+ t->run();
+ }
+ _state = protocol::ServiceState::RUNNING;
+ }
+}
+
+void WorkerHttpProcessor::stop() {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__));
+ replica::Lock lock(_mtx, _context(__func__));
+
+ if (_state == protocol::ServiceState::RUNNING) {
+ // Tell each thread to stop.
+ for (auto&& t : _threads) {
+ t->stop();
+ }
+
+ // Begin transitioning to the final state via this intermediate one.
+ // The transition will finish asynchronous when all threads will report
+ // desired changes in their states.
+ _state = protocol::ServiceState::SUSPEND_IN_PROGRESS;
+ }
+}
+
+void WorkerHttpProcessor::drain() {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__));
+ replica::Lock lock(_mtx, _context(__func__));
+
+ // Collect identifiers of requests to be affected by the operation
+ list ids;
+ for (auto&& ptr : _newRequests) ids.push_back(ptr->id());
+ for (auto&& entry : _inProgressRequests) ids.push_back(entry.first);
+ for (auto&& id : ids) _stopRequestImpl(lock, id);
+}
+
+void WorkerHttpProcessor::reconfig() {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__));
+ replica::Lock lock(_mtx, _context(__func__));
+ _serviceProvider->config()->reload();
+}
+
+json WorkerHttpProcessor::createReplica(protocol::QueuedRequestHdr const& hdr, json const& req) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << hdr.id);
+ // return _submit(replica::Lock(_mtx, context), context, hdr, req);
+ return json::object();
+}
+
+json WorkerHttpProcessor::deleteReplica(protocol::QueuedRequestHdr const& hdr, json const& req) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << hdr.id);
+ // return _submit(replica::Lock(_mtx, context), context, hdr, req);
+ return json::object();
+}
+
+json WorkerHttpProcessor::findReplica(protocol::QueuedRequestHdr const& hdr, json const& req) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << hdr.id);
+ // return _submit(replica::Lock(_mtx, context), context, hdr, req);
+ return json::object();
+}
+
+json WorkerHttpProcessor::findAllReplicas(protocol::QueuedRequestHdr const& hdr, json const& req) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << hdr.id);
+ // return _submit(replica::Lock(_mtx, context), context, hdr, req);
+ return json::object();
+}
+
+json WorkerHttpProcessor::echo(protocol::QueuedRequestHdr const& hdr, json const& req) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << hdr.id);
+ string const context = _context(__func__);
+ return _submit(replica::Lock(_mtx, context), context, hdr, req);
+}
+
+json WorkerHttpProcessor::sql(protocol::QueuedRequestHdr const& hdr, json const& req) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << hdr.id);
+ // return _submit(replica::Lock(_mtx, context), context, hdr, req);
+ return json::object();
+}
+
+json WorkerHttpProcessor::index(protocol::QueuedRequestHdr const& hdr, json const& req) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << hdr.id);
+ // return _submit(replica::Lock(_mtx, context), context, hdr, req);
+ return json::object();
+}
+
+json WorkerHttpProcessor::requestStatus(string const& id) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << id);
+ replica::Lock lock(_mtx, _context(__func__));
+
+ // Still waiting in the queue?
+ shared_ptr targetRequestPtr;
+ for (auto ptr : _newRequests) {
+ if (ptr->id() == id) {
+ targetRequestPtr = ptr;
+ break;
+ }
+ }
+ if (targetRequestPtr == nullptr) {
+ // Is it already being processed?
+ auto itrInProgress = _inProgressRequests.find(id);
+ if (itrInProgress != _inProgressRequests.end()) {
+ targetRequestPtr = itrInProgress->second;
+ }
+ if (targetRequestPtr == nullptr) {
+ // Has it finished?
+ auto itrFinished = _finishedRequests.find(id);
+ if (itrFinished != _finishedRequests.end()) {
+ targetRequestPtr = itrFinished->second;
+ }
+ // No such request?
+ if (targetRequestPtr == nullptr) {
+ return json::object({
+ {"status", protocol::Status::BAD},
+ {"status_ext", protocol::StatusExt::INVALID_ID},
+ });
+ }
+ }
+ }
+ return targetRequestPtr->toJson();
+}
+
+json WorkerHttpProcessor::stopRequest(string const& id) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << id);
+ replica::Lock lock(_mtx, _context(__func__));
+ json response = json::object();
+ auto const request = _stopRequestImpl(lock, id);
+ if (request == nullptr) {
+ response["status"] = protocol::Status::BAD;
+ response["status_ext"] = protocol::StatusExt::INVALID_ID;
+ } else {
+ response = request->toJson();
+ }
+ return response;
+}
+
+json WorkerHttpProcessor::trackRequest(string const& id) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << id);
+ replica::Lock lock(_mtx, _context(__func__));
+ json response = json::object();
+ auto const request = _trackRequestImpl(lock, id);
+ if (request == nullptr) {
+ response["status"] = protocol::Status::BAD;
+ response["status_ext"] = protocol::StatusExt::INVALID_ID;
+ } else {
+ bool const includeResultIfFinished = true;
+ response = request->toJson(includeResultIfFinished);
+ }
+ return response;
+}
+
+bool WorkerHttpProcessor::disposeRequest(string const& id) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << id);
+ replica::Lock lock(_mtx, _context(__func__));
+
+ // Note that only the finished requests are allowed to be disposed.
+ if (auto itr = _finishedRequests.find(id); itr != _finishedRequests.end()) {
+ itr->second->dispose();
+ _finishedRequests.erase(itr);
+ return true;
+ }
+ return false;
+}
+
+size_t WorkerHttpProcessor::numNewRequests() const {
+ replica::Lock lock(_mtx, _context(__func__));
+ return _newRequests.size();
+}
+
+size_t WorkerHttpProcessor::numInProgressRequests() const {
+ replica::Lock lock(_mtx, _context(__func__));
+ return _inProgressRequests.size();
+}
+
+size_t WorkerHttpProcessor::numFinishedRequests() const {
+ replica::Lock lock(_mtx, _context(__func__));
+ return _finishedRequests.size();
+}
+
+json WorkerHttpProcessor::toJson(protocol::Status status, bool includeRequests) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__));
+ replica::Lock lock(_mtx, _context(__func__));
+
+ json response;
+ response["status"] = status;
+ response["status_ext"] = protocol::StatusExt::NONE;
+ response["service_state"] = state();
+ response["num_new_requests"] = _newRequests.size();
+ response["num_in_progress_requests"] = _inProgressRequests.size();
+ response["num_finished_requests"] = _finishedRequests.size();
+ response["new_requests"] = json::array();
+ response["in_progress_requests"] = json::array();
+ response["finished_requests"] = json::array();
+
+ if (includeRequests) {
+ for (auto const& request : _newRequests) {
+ response["new_requests"].push_back(request->toJson());
+ }
+ for (auto const& entry : _inProgressRequests) {
+ response["in_progress_requests"].push_back(entry.second->toJson());
+ }
+ for (auto const& entry : _finishedRequests) {
+ response["finished_requests"].push_back(entry.second->toJson());
+ }
+ }
+ return response;
+}
+
+string WorkerHttpProcessor::_classMethodContext(string const& func) { return "WorkerHttpProcessor::" + func; }
+
+void WorkerHttpProcessor::_logError(string const& context, string const& message) const {
+ LOGS(_log, LOG_LVL_ERROR, context << " " << message);
+}
+
+shared_ptr WorkerHttpProcessor::_stopRequestImpl(replica::Lock const& lock,
+ string const& id) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << id);
+
+ // Still waiting in the queue?
+ //
+ // ATTENTION: the loop variable is a copy of (not a reference to) a shared
+ // pointer to allow removing (if needed) the corresponding entry from the
+ // input collection while retaining a valid copy of the pointer to be placed
+ // into the next stage collection.
+
+ for (auto ptr : _newRequests) {
+ if (ptr->id() == id) {
+ // Cancel it and move it into the final queue in case if a client
+ // won't be able to receive the desired status of the request due to
+ // a protocol failure, etc.
+ ptr->cancel();
+ switch (ptr->status()) {
+ case protocol::Status::CANCELLED: {
+ _newRequests.remove(id);
+ _finishedRequests[ptr->id()] = ptr;
+ return ptr;
+ }
+ default:
+ throw logic_error(_classMethodContext(__func__) + " unexpected request status " +
+ protocol::toString(ptr->status()) + " in new requests");
+ }
+ }
+ }
+
+ // Is it already being processed?
+ auto itrInProgress = _inProgressRequests.find(id);
+ if (itrInProgress != _inProgressRequests.end()) {
+ auto ptr = itrInProgress->second;
+ // Tell the request to begin the cancelling protocol. The protocol
+ // will take care of moving the request into the final queue when
+ // the cancellation will finish.
+ //
+ // At the meant time we just notify the client about the cancellation status
+ // of the request and let it come back later to check the updated status.
+ ptr->cancel();
+ switch (ptr->status()) {
+ // These are the most typical states for request in this queue
+ case protocol::Status::CANCELLED:
+ case protocol::Status::IS_CANCELLING:
+
+ // The following two states are also allowed here because
+ // in-progress requests are still allowed to progress to the completed
+ // states before reporting their new state via method:
+ // WorkerHttpProcessor::_processingFinished()
+ // Sometimes, the request just can't finish this in time due to
+ // replica::Lock lock(_mtx) held by the current method. We shouldn't worry
+ // about this situation here. The request will be moved into the next
+ // queue as soon as replica::Lock lock(_mtx) will be released.
+ case protocol::Status::SUCCESS:
+ case protocol::Status::FAILED:
+ return ptr;
+ default:
+ throw logic_error(_classMethodContext(__func__) + " unexpected request status " +
+ protocol::toString(ptr->status()) + " in in-progress requests");
+ }
+ }
+
+ // Has it finished?
+ auto itrFinished = _finishedRequests.find(id);
+ if (itrFinished != _finishedRequests.end()) {
+ auto ptr = itrFinished->second;
+ // There is nothing else we can do here other than just
+ // reporting the completion status of the request. It's up to a client
+ // to figure out what to do about this situation.
+ switch (ptr->status()) {
+ case protocol::Status::CANCELLED:
+ case protocol::Status::SUCCESS:
+ case protocol::Status::FAILED:
+ return ptr;
+ default:
+ throw logic_error(_classMethodContext(__func__) + " unexpected request status " +
+ protocol::toString(ptr->status()) + " in finished requests");
+ }
+ }
+
+ // No request found!
+ return nullptr;
+}
+
+shared_ptr WorkerHttpProcessor::_trackRequestImpl(replica::Lock const& lock,
+ string const& id) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << id);
+
+ // Still waiting in the queue?
+ for (auto&& ptr : _newRequests) {
+ if (ptr->id() == id) {
+ switch (ptr->status()) {
+ // This state requirement is strict for the non-active requests
+ case protocol::Status::CREATED:
+ return ptr;
+ default:
+ throw logic_error(_classMethodContext(__func__) + " unexpected request status " +
+ protocol::toString(ptr->status()) + " in new requests");
+ }
+ }
+ }
+
+ // Is it already being processed?
+ auto itrInProgress = _inProgressRequests.find(id);
+ if (itrInProgress != _inProgressRequests.end()) {
+ auto ptr = itrInProgress->second;
+ switch (ptr->status()) {
+ // These are the most typical states for request in this queue
+ case protocol::Status::IS_CANCELLING:
+ case protocol::Status::IN_PROGRESS:
+
+ // The following three states are also allowed here because
+ // in-progress requests are still allowed to progress to the completed
+ // states before reporting their new state via method:
+ // WorkerHttpProcessor::_processingFinished()
+ // Sometimes, the request just can't finish this in time due to
+ // replica::Lock lock(_mtx) held by the current method. We shouldn't worry
+ // about this situation here. The request will be moved into the next
+ // queue as soon as replica::Lock lock(_mtx) will be released.
+ case protocol::Status::CANCELLED:
+ case protocol::Status::SUCCESS:
+ case protocol::Status::FAILED:
+ return ptr;
+ default:
+ throw logic_error(_classMethodContext(__func__) + " unexpected request status " +
+ protocol::toString(ptr->status()) + " in in-progress requests");
+ }
+ }
+
+ // Has it finished?
+ auto itrFinished = _finishedRequests.find(id);
+ if (itrFinished != _finishedRequests.end()) {
+ auto ptr = itrFinished->second;
+ switch (ptr->status()) {
+ // This state requirement is strict for the completed requests
+ case protocol::Status::CANCELLED:
+ case protocol::Status::SUCCESS:
+ case protocol::Status::FAILED:
+ return ptr;
+ default:
+ throw logic_error(_classMethodContext(__func__) + " unexpected request status " +
+ protocol::toString(ptr->status()) + " in finished requests");
+ }
+ }
+
+ // No request found!
+ return nullptr;
+}
+
+shared_ptr WorkerHttpProcessor::_fetchNextForProcessing(
+ shared_ptr const& processorThread, unsigned int timeoutMilliseconds) {
+ LOGS(_log, LOG_LVL_TRACE,
+ _context(__func__) << " thread: " << processorThread->id() << " timeout: " << timeoutMilliseconds);
+
+ // For generating random intervals within the maximum range of seconds
+ // requested by a client.
+ //
+ // TODO: Re-implement this loop to use a condition variable instead.
+ // This will improve the performance of the processor which is limited
+ // by the half-latency of the wait interval.
+ util::BlockPost blockPost(0, min(10U, timeoutMilliseconds));
+
+ unsigned int totalElapsedTime = 0;
+ while (totalElapsedTime < timeoutMilliseconds) {
+ // IMPORTANT: make sure no wait is happening within the same
+ // scope where the thread safe block is defined. Otherwise
+ // the queue will be locked for all threads for the duration of
+ // the wait.
+ {
+ replica::Lock lock(_mtx, _context(__func__));
+ if (!_newRequests.empty()) {
+ shared_ptr request = _newRequests.top();
+ _newRequests.pop();
+ request->start();
+ _inProgressRequests[request->id()] = request;
+ return request;
+ }
+ }
+ totalElapsedTime += blockPost.wait();
+ }
+
+ // Return null pointer since noting has been found within the specified
+ // timeout.
+ return nullptr;
+}
+
+void WorkerHttpProcessor::_processingRefused(shared_ptr const& request) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " id: " << request->id());
+ replica::Lock lock(_mtx, _context(__func__));
+
+ // Note that disposed requests won't be found in any queue.
+ auto itr = _inProgressRequests.find(request->id());
+ if (itr != _inProgressRequests.end()) {
+ // Update request's state before moving it back into
+ // the input queue.
+ itr->second->stop();
+ _newRequests.push(itr->second);
+ _inProgressRequests.erase(itr);
+ }
+}
+
+void WorkerHttpProcessor::_processingFinished(shared_ptr const& request) {
+ LOGS(_log, LOG_LVL_DEBUG,
+ _context(__func__) << " id: " << request->id()
+ << " status: " << protocol::toString(request->status()));
+ replica::Lock lock(_mtx, _context(__func__));
+
+ // Note that disposed requests won't be found in any queue.
+ auto itr = _inProgressRequests.find(request->id());
+ if (itr != _inProgressRequests.end()) {
+ _finishedRequests[itr->first] = itr->second;
+ _inProgressRequests.erase(itr);
+ }
+}
+
+void WorkerHttpProcessor::_processorThreadStopped(
+ shared_ptr const& processorThread) {
+ LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " thread: " << processorThread->id());
+ replica::Lock lock(_mtx, _context(__func__));
+ if (_state == protocol::ServiceState::SUSPEND_IN_PROGRESS) {
+ // Complete state transition if all threads are stopped
+ for (auto&& t : _threads) {
+ if (t->isRunning()) return;
+ }
+ _state = protocol::ServiceState::SUSPENDED;
+ }
+}
+
+} // namespace lsst::qserv::replica
diff --git a/src/replica/worker/WorkerHttpProcessor.h b/src/replica/worker/WorkerHttpProcessor.h
new file mode 100644
index 000000000..d05e495a4
--- /dev/null
+++ b/src/replica/worker/WorkerHttpProcessor.h
@@ -0,0 +1,365 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+#ifndef LSST_QSERV_REPLICA_WORKERHTTPPROCESSOR_H
+#define LSST_QSERV_REPLICA_WORKERHTTPPROCESSOR_H
+
+// System headers
+#include
+#include
+#include