diff --git a/src/replica/apps/WorkerApp.cc b/src/replica/apps/WorkerApp.cc
index 31c023640..f1362b91a 100644
--- a/src/replica/apps/WorkerApp.cc
+++ b/src/replica/apps/WorkerApp.cc
@@ -39,7 +39,7 @@
#include "replica/services/ServiceProvider.h"
#include "replica/util/FileUtils.h"
#include "replica/worker/FileServer.h"
-#include "replica/worker/WorkerProcessor.h"
+#include "replica/worker/WorkerHttpSvc.h"
#include "replica/worker/WorkerServer.h"
// LSST headers
@@ -113,6 +113,9 @@ int WorkerApp::runImpl() {
auto const reqProcSvr = WorkerServer::create(serviceProvider(), worker);
thread reqProcSvrThread([reqProcSvr]() { reqProcSvr->run(); });
+ auto const reqProcHttpSvr = WorkerHttpSvc::create(serviceProvider(), worker);
+ thread reqProcHttpSvrThread([reqProcHttpSvr]() { reqProcHttpSvr->run(); });
+
auto const fileSvr = FileServer::create(serviceProvider(), worker);
thread fileSvrThread([fileSvr]() { fileSvr->run(); });
diff --git a/src/replica/proto/CMakeLists.txt b/src/replica/proto/CMakeLists.txt
index b61599d8c..7eb8d830d 100644
--- a/src/replica/proto/CMakeLists.txt
+++ b/src/replica/proto/CMakeLists.txt
@@ -4,4 +4,5 @@ add_library(replica_proto OBJECT)
target_sources(replica_proto PRIVATE
${REPLICA_PB_SRCS}
${REPLICA_PB_HDRS}
+ Protocol.cc
)
diff --git a/src/replica/proto/Protocol.cc b/src/replica/proto/Protocol.cc
new file mode 100644
index 000000000..7d53155c5
--- /dev/null
+++ b/src/replica/proto/Protocol.cc
@@ -0,0 +1,171 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+
+// Class header
+#include "replica/proto/Protocol.h"
+
+// System headers
+#include
+
+using namespace std;
+
+namespace lsst::qserv::replica::protocol {
+
+string toString(SqlRequestType status) {
+ switch (status) {
+ case SqlRequestType::QUERY:
+ return "QUERY";
+ case SqlRequestType::CREATE_DATABASE:
+ return "CREATE_DATABASE";
+ case SqlRequestType::DROP_DATABASE:
+ return "DROP_DATABASE";
+ case SqlRequestType::ENABLE_DATABASE:
+ return "ENABLE_DATABASE";
+ case SqlRequestType::DISABLE_DATABASE:
+ return "DISABLE_DATABASE";
+ case SqlRequestType::GRANT_ACCESS:
+ return "GRANT_ACCESS";
+ case SqlRequestType::CREATE_TABLE:
+ return "CREATE_TABLE";
+ case SqlRequestType::DROP_TABLE:
+ return "DROP_TABLE";
+ case SqlRequestType::REMOVE_TABLE_PARTITIONING:
+ return "REMOVE_TABLE_PARTITIONING";
+ case SqlRequestType::DROP_TABLE_PARTITION:
+ return "DROP_TABLE_PARTITION";
+ case SqlRequestType::GET_TABLE_INDEX:
+ return "GET_TABLE_INDEX";
+ case SqlRequestType::CREATE_TABLE_INDEX:
+ return "CREATE_TABLE_INDEX";
+ case SqlRequestType::DROP_TABLE_INDEX:
+ return "DROP_TABLE_INDEX";
+ case SqlRequestType::ALTER_TABLE:
+ return "ALTER_TABLE";
+ case SqlRequestType::TABLE_ROW_STATS:
+ return "TABLE_ROW_STATS";
+ default:
+ throw logic_error("Unhandled SQL request type: " + to_string(static_cast(status)));
+ }
+}
+
+string toString(Status status) {
+ switch (status) {
+ case Status::CREATED:
+ return "CREATED";
+ case Status::SUCCESS:
+ return "SUCCESS";
+ case Status::QUEUED:
+ return "QUEUED";
+ case Status::IN_PROGRESS:
+ return "IN_PROGRESS";
+ case Status::IS_CANCELLING:
+ return "IS_CANCELLING";
+ case Status::BAD:
+ return "BAD";
+ case Status::FAILED:
+ return "FAILED";
+ case Status::CANCELLED:
+ return "CANCELLED";
+ default:
+ throw logic_error("Unhandled status: " + to_string(static_cast(status)));
+ }
+}
+
+string toString(StatusExt extendedStatus) {
+ switch (extendedStatus) {
+ case StatusExt::NONE:
+ return "NONE";
+ case StatusExt::INVALID_PARAM:
+ return "INVALID_PARAM";
+ case StatusExt::INVALID_ID:
+ return "INVALID_ID";
+ case StatusExt::FOLDER_STAT:
+ return "FOLDER_STAT";
+ case StatusExt::FOLDER_CREATE:
+ return "FOLDER_CREATE";
+ case StatusExt::FILE_STAT:
+ return "FILE_STAT";
+ case StatusExt::FILE_SIZE:
+ return "FILE_SIZE";
+ case StatusExt::FOLDER_READ:
+ return "FOLDER_READ";
+ case StatusExt::FILE_READ:
+ return "FILE_READ";
+ case StatusExt::FILE_ROPEN:
+ return "FILE_ROPEN";
+ case StatusExt::FILE_CREATE:
+ return "FILE_CREATE";
+ case StatusExt::FILE_OPEN:
+ return "FILE_OPEN";
+ case StatusExt::FILE_RESIZE:
+ return "FILE_RESIZE";
+ case StatusExt::FILE_WRITE:
+ return "FILE_WRITE";
+ case StatusExt::FILE_COPY:
+ return "FILE_COPY";
+ case StatusExt::FILE_DELETE:
+ return "FILE_DELETE";
+ case StatusExt::FILE_RENAME:
+ return "FILE_RENAME";
+ case StatusExt::FILE_EXISTS:
+ return "FILE_EXISTS";
+ case StatusExt::SPACE_REQ:
+ return "SPACE_REQ";
+ case StatusExt::NO_FOLDER:
+ return "NO_FOLDER";
+ case StatusExt::NO_FILE:
+ return "NO_FILE";
+ case StatusExt::NO_ACCESS:
+ return "NO_ACCESS";
+ case StatusExt::NO_SPACE:
+ return "NO_SPACE";
+ case StatusExt::FILE_MTIME:
+ return "FILE_MTIME";
+ case StatusExt::MYSQL_ERROR:
+ return "MYSQL_ERROR";
+ case StatusExt::LARGE_RESULT:
+ return "LARGE_RESULT";
+ case StatusExt::NO_SUCH_TABLE:
+ return "NO_SUCH_TABLE";
+ case StatusExt::NOT_PARTITIONED_TABLE:
+ return "NOT_PARTITIONED_TABLE";
+ case StatusExt::NO_SUCH_PARTITION:
+ return "NO_SUCH_PARTITION";
+ case StatusExt::MULTIPLE:
+ return "MULTIPLE";
+ case StatusExt::OTHER_EXCEPTION:
+ return "OTHER_EXCEPTION";
+ case StatusExt::FOREIGN_INSTANCE:
+ return "FOREIGN_INSTANCE";
+ case StatusExt::DUPLICATE_KEY:
+ return "DUPLICATE_KEY";
+ case StatusExt::CANT_DROP_KEY:
+ return "CANT_DROP_KEY";
+ default:
+ throw logic_error("Unhandled extended status: " + to_string(static_cast(extendedStatus)));
+ }
+}
+
+string toString(Status status, StatusExt extendedStatus) {
+ return toString(status) + "::" + toString(extendedStatus);
+}
+
+} // namespace lsst::qserv::replica::protocol
diff --git a/src/replica/proto/Protocol.h b/src/replica/proto/Protocol.h
new file mode 100644
index 000000000..b6ca3f916
--- /dev/null
+++ b/src/replica/proto/Protocol.h
@@ -0,0 +1,139 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+#ifndef LSST_QSERV_REPLICA_PROTOCOL_H
+#define LSST_QSERV_REPLICA_PROTOCOL_H
+
+// System headers
+#include
+
+// Third party headers
+#include "nlohmann/json.hpp"
+
+// This header declarations
+namespace lsst::qserv::replica::protocol {
+
+/// Subtypes of the SQL requests.
+enum class SqlRequestType : int {
+
+ QUERY = 0,
+ CREATE_DATABASE = 1,
+ DROP_DATABASE = 2,
+ ENABLE_DATABASE = 3, ///< in Qserv
+ DISABLE_DATABASE = 4, ///< in Qserv
+ GRANT_ACCESS = 5,
+ CREATE_TABLE = 6,
+ DROP_TABLE = 7,
+ REMOVE_TABLE_PARTITIONING = 8,
+ DROP_TABLE_PARTITION = 9,
+ GET_TABLE_INDEX = 10,
+ CREATE_TABLE_INDEX = 11,
+ DROP_TABLE_INDEX = 12,
+ ALTER_TABLE = 13,
+ TABLE_ROW_STATS = 14
+};
+
+/// @return the string representation of the SQL request type
+std::string toString(SqlRequestType status);
+
+/// Types of the table indexes specified in the index management requests requests.
+enum class SqlIndexSpec : int { DEFAULT = 1, UNIQUE = 2, FULLTEXT = 3, SPATIAL = 4 };
+
+/// Status values returned by all request related to operations with
+/// replicas. Request management operations always return messages whose types
+/// match the return types of the corresponding (original) replica-related requests.
+/// Service management requests have their own set of status values.
+///
+enum class Status : int {
+ CREATED = 0,
+ SUCCESS = 1,
+ QUEUED = 2,
+ IN_PROGRESS = 3,
+ IS_CANCELLING = 4,
+ BAD = 5,
+ FAILED = 6,
+ CANCELLED = 7
+};
+
+enum class StatusExt : int {
+ NONE = 0, ///< Unspecified problem.
+ INVALID_PARAM = 1, ///< Invalid parameter(s) of a request.
+ INVALID_ID = 2, ///< An invalid request identifier.
+ FOLDER_STAT = 4, ///< Failed to obtain fstat() for a folder.
+ FOLDER_CREATE = 5, ///< Failed to create a folder.
+ FILE_STAT = 6, ///< Failed to obtain fstat() for a file.
+ FILE_SIZE = 7, ///< Failed to obtain a size of a file.
+ FOLDER_READ = 8, ///< Failed to read the contents of a folder.
+ FILE_READ = 9, ///< Failed to read the contents of a file.
+ FILE_ROPEN = 10, ///< Failed to open a remote file.
+ FILE_CREATE = 11, ///< Failed to create a file.
+ FILE_OPEN = 12, ///< Failed to open a file.
+ FILE_RESIZE = 13, ///< Failed to resize a file.
+ FILE_WRITE = 14, ///< Failed to write into a file.
+ FILE_COPY = 15, ///< Failed to copy a file.
+ FILE_DELETE = 16, ///< Failed to delete a file.
+ FILE_RENAME = 17, ///< Failed to rename a file.
+ FILE_EXISTS = 18, ///< File already exists.
+ SPACE_REQ = 19, ///< Space availability check failed.
+ NO_FOLDER = 20, ///< Folder doesn't exist.
+ NO_FILE = 21, ///< File doesn't exist.
+ NO_ACCESS = 22, ///< No access to a file or a folder.
+ NO_SPACE = 23, ///< No space left on a device as required by an operation.
+ FILE_MTIME = 24, ///< Get/set 'mtime' operation failed.
+ MYSQL_ERROR = 25, ///< General MySQL error (other than any specific ones listed here).
+ LARGE_RESULT = 26, ///< Result exceeds a limit set in a request.
+ NO_SUCH_TABLE = 27, ///< No table found while performing a MySQL operation.
+ NOT_PARTITIONED_TABLE = 28, ///< The table is not MySQL partitioned as it was expected.
+ NO_SUCH_PARTITION = 29, ///< No MySQL partition found in a table as it was expected.
+ MULTIPLE = 30, ///< Multiple unspecified errors encountered when processing a request.
+ OTHER_EXCEPTION = 31, ///< Other exception not listed here.
+ FOREIGN_INSTANCE = 32, ///< Detected a request from a Controller serving an unrelated Qserv.
+ DUPLICATE_KEY = 33, ///< Duplicate key found when creating an index or altering a table schema.
+ CANT_DROP_KEY = 34 ///< Can't drop a field or a key which doesn't exist.
+};
+
+/// @return the string representation of the status
+std::string toString(Status status);
+
+/// @return the string representation of the extended status
+std::string toString(StatusExt extendedStatus);
+
+/// @return the string representation of the full status
+std::string toString(Status status, StatusExt extendedStatus);
+
+/// Status of a replica.
+enum class ReplicaStatus : int { NOT_FOUND = 0, CORRUPT = 1, INCOMPLETE = 2, COMPLETE = 3 };
+
+/// Status of a service.
+enum class ServiceState : int { SUSPEND_IN_PROGRESS = 0, SUSPENDED = 1, RUNNING = 2 };
+
+/// The header to be sent with the requests processed through the worker's queueing system.
+struct QueuedRequestHdr {
+ std::string id;
+ int priority;
+ unsigned int timeout;
+ QueuedRequestHdr(std::string const& id_, int priority_, unsigned int timeout_)
+ : id(id_), priority(priority_), timeout(timeout_) {}
+ nlohmann::json toJson() const { return {{"id", id}, {"priority", priority}, {"timeout", timeout}}; };
+};
+
+} // namespace lsst::qserv::replica::protocol
+
+#endif // LSST_QSERV_REPLICA_PROTOCOL_H
diff --git a/src/replica/util/Common.cc b/src/replica/util/Common.cc
index 11c08df7c..0c9830944 100644
--- a/src/replica/util/Common.cc
+++ b/src/replica/util/Common.cc
@@ -29,10 +29,9 @@
#include "boost/uuid/uuid.hpp"
#include "boost/uuid/uuid_generators.hpp"
#include "boost/uuid/uuid_io.hpp"
-#include "nlohmann/json.hpp"
using namespace std;
-using namespace nlohmann;
+using json = nlohmann::json;
namespace lsst::qserv::replica {
@@ -80,6 +79,43 @@ string Generators::uniqueId() {
return boost::uuids::to_string(id);
}
+///////////////////////////////////////////
+// SqlColDef //
+///////////////////////////////////////////
+
+list parseSqlColumns(json const& columnsJsonArray) {
+ if (!columnsJsonArray.is_array()) {
+ throw invalid_argument("lsst::qserv::replica::" + string(__func__) +
+ " columnsJsonArray is not an array");
+ }
+ list columns;
+ for (auto const& column : columnsJsonArray) {
+ columns.emplace_back(column.at("name"), column.at("type"));
+ }
+ return columns;
+}
+
+///////////////////////////////////////////
+// SqlIndexDef //
+///////////////////////////////////////////
+
+SqlIndexDef::SqlIndexDef(json const& indexSpecJson) {
+ if (!indexSpecJson.is_object()) {
+ throw invalid_argument("lsst::qserv::replica::" + string(__func__) +
+ " indexSpecJson is not an object");
+ }
+ spec = indexSpecJson.value("spec", "DEFAULT");
+ name = indexSpecJson.at("name");
+ comment = indexSpecJson.value("comment", "");
+ auto const keysJsonArray = indexSpecJson.at("keys");
+ if (!keysJsonArray.is_array()) {
+ throw invalid_argument("lsst::qserv::replica::" + string(__func__) + " keys is not an array");
+ }
+ for (auto const& key : keysJsonArray) {
+ keys.emplace_back(key.at("name"), key.at("length"), key.at("ascending"));
+ }
+}
+
////////////////////////////////////////////
// Parameters of requests //
////////////////////////////////////////////
diff --git a/src/replica/util/Common.h b/src/replica/util/Common.h
index 3970f771a..ee1e1fc2d 100644
--- a/src/replica/util/Common.h
+++ b/src/replica/util/Common.h
@@ -36,6 +36,9 @@
#include
#include
+// Third party headers
+#include "nlohmann/json.hpp"
+
// Qserv headers
#include "replica/proto/protocol.pb.h"
#include "replica/util/Mutex.h"
@@ -112,6 +115,13 @@ inline bool operator==(SqlColDef const& lhs, SqlColDef const& rhs) {
inline bool operator!=(SqlColDef const& lhs, SqlColDef const& rhs) { return !operator==(lhs, rhs); }
+/**
+ * @param columnsJsonArray The JSON array containing the column definitions.
+ * @return The list of column definitions.
+ * @throw std::invalid_argument If the input JSON array is not valid.
+ */
+std::list parseSqlColumns(nlohmann::json const& columnsJsonArray);
+
/**
* This class is an abstraction for columns within table index
* specifications.
@@ -130,6 +140,30 @@ class SqlIndexColumn {
bool ascending = true;
};
+/**
+ * This class is an abstraction for the index definitions.
+ */
+class SqlIndexDef {
+public:
+ SqlIndexDef() = default;
+
+ /**
+ * Parse the definition from then input JSON object.
+ * @param indexSpecJson The JSON object containing the index definitions.
+ * @throw std::invalid_argument If the input JSON object is not valid.
+ */
+ SqlIndexDef(nlohmann::json const& indexSpecJson);
+
+ SqlIndexDef(SqlIndexDef const&) = default;
+ SqlIndexDef& operator=(SqlIndexDef const&) = default;
+ ~SqlIndexDef() = default;
+
+ std::string spec;
+ std::string name;
+ std::string comment;
+ std::list> keys;
+};
+
/**
* Class ReplicationRequestParams encapsulates parameters of the replica
* creation requests.
diff --git a/src/replica/util/Performance.cc b/src/replica/util/Performance.cc
index 8e3292d68..ae30b0ac3 100644
--- a/src/replica/util/Performance.cc
+++ b/src/replica/util/Performance.cc
@@ -30,6 +30,7 @@
#include "lsst/log/Log.h"
using namespace std;
+using json = nlohmann::json;
namespace {
diff --git a/src/replica/util/Performance.h b/src/replica/util/Performance.h
index fcbfd394a..15320d08b 100644
--- a/src/replica/util/Performance.h
+++ b/src/replica/util/Performance.h
@@ -33,6 +33,9 @@
#include
#include
+// Third party headers
+#include "nlohmann/json.hpp"
+
// Forward declarations
namespace lsst::qserv::replica {
class ProtocolPerformance;
@@ -56,7 +59,6 @@ class Performance {
* All (but the request creation one) timestamps will be initialized with 0.
*/
Performance();
-
Performance(Performance const&) = default;
Performance& operator=(Performance const&) = default;
@@ -64,45 +66,28 @@ class Performance {
/**
* Update object state with counters from the protocol buffer object
- *
- * @param workerPerformanceInfo
- * counters to be carried over into an internal state
+ * @param workerPerformanceInfo counters to be carried over into an internal state
*/
void update(ProtocolPerformance const& workerPerformanceInfo);
/**
* Update the Controller's 'start' time
- *
- * @return
- * the previous state of the counter
+ * @return the previous state of the counter
*/
uint64_t setUpdateStart();
/**
* Update the Controller's 'finish' time
- *
- * @return
- * the previous state of the counter
+ * @return the previous state of the counter
*/
uint64_t setUpdateFinish();
- /// Created by the Controller
- uint64_t c_create_time;
-
- /// Started by the Controller
- uint64_t c_start_time;
-
- /// Received by a worker service
- uint64_t w_receive_time;
-
- /// Execution started by a worker service
- uint64_t w_start_time;
-
- /// Execution finished by a worker service
- uint64_t w_finish_time;
-
- /// A subscriber notified by the Controller
- uint64_t c_finish_time;
+ uint64_t c_create_time; ///< Created by the Controller
+ uint64_t c_start_time; ///< Started by the Controller
+ uint64_t w_receive_time; ///< Received by a worker service
+ uint64_t w_start_time; ///< Execution started by a worker service
+ uint64_t w_finish_time; ///< Execution finished by a worker service
+ uint64_t c_finish_time; ///< A subscriber notified by the Controller
};
/// Overloaded streaming operator for class Performance
@@ -127,6 +112,7 @@ class WorkerPerformance {
uint64_t setUpdateFinish();
std::unique_ptr info() const;
+ nlohmann::json toJson() const;
std::atomic receive_time; ///< Received by a worker service
std::atomic start_time; ///< Execution started by a worker service
diff --git a/src/replica/worker/CMakeLists.txt b/src/replica/worker/CMakeLists.txt
index a37868d82..7b4d8ff46 100644
--- a/src/replica/worker/CMakeLists.txt
+++ b/src/replica/worker/CMakeLists.txt
@@ -4,17 +4,29 @@ target_sources(replica_worker PRIVATE
FileClient.cc
FileServer.cc
FileServerConnection.cc
+ WorkerCreateReplicaHttpRequest.cc
+ WorkerDeleteReplicaHttpRequest.cc
WorkerDeleteRequest.cc
+ WorkerDirectorIndexHttpRequest.cc
WorkerDirectorIndexRequest.cc
+ WorkerEchoHttpRequest.cc
WorkerEchoRequest.cc
WorkerFindAllRequest.cc
+ WorkerFindAllReplicasHttpRequest.cc
+ WorkerFindReplicaHttpRequest.cc
WorkerFindRequest.cc
+ WorkerHttpProcessor.cc
+ WorkerHttpProcessorThread.cc
+ WorkerHttpRequest.cc
+ WorkerHttpSvc.cc
+ WorkerHttpSvcMod.cc
WorkerProcessor.cc
WorkerProcessorThread.cc
WorkerReplicationRequest.cc
WorkerRequest.cc
WorkerServer.cc
WorkerServerConnection.cc
+ WorkerSqlHttpRequest.cc
WorkerSqlRequest.cc
)
target_link_libraries(replica_worker PUBLIC
diff --git a/src/replica/worker/WorkerCreateReplicaHttpRequest.cc b/src/replica/worker/WorkerCreateReplicaHttpRequest.cc
new file mode 100644
index 000000000..582b7c959
--- /dev/null
+++ b/src/replica/worker/WorkerCreateReplicaHttpRequest.cc
@@ -0,0 +1,467 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+
+// Class header
+#include "replica/worker/WorkerCreateReplicaHttpRequest.h"
+
+// System headers
+#include
+#include
+#include
+#include
+
+// Qserv headers
+#include "replica/config/Configuration.h"
+#include "replica/proto/Protocol.h"
+#include "replica/services/ServiceProvider.h"
+#include "replica/util/FileUtils.h"
+#include "replica/worker/FileClient.h"
+#include "util/TimeUtils.h"
+
+// LSST headers
+#include "lsst/log/Log.h"
+
+#define CONTEXT context("WorkerCreateReplicaHttpRequest", __func__)
+
+using namespace std;
+namespace fs = boost::filesystem;
+using json = nlohmann::json;
+
+namespace {
+
+LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerCreateReplicaHttpRequest");
+
+} // namespace
+
+namespace lsst::qserv::replica {
+
+shared_ptr WorkerCreateReplicaHttpRequest::create(
+ shared_ptr const& serviceProvider, string const& worker,
+ protocol::QueuedRequestHdr const& hdr, json const& req, ExpirationCallbackType const& onExpired) {
+ auto ptr = shared_ptr(
+ new WorkerCreateReplicaHttpRequest(serviceProvider, worker, hdr, req, onExpired));
+ ptr->init();
+ return ptr;
+}
+
+WorkerCreateReplicaHttpRequest::WorkerCreateReplicaHttpRequest(
+ shared_ptr const& serviceProvider, string const& worker,
+ protocol::QueuedRequestHdr const& hdr, json const& req, ExpirationCallbackType const& onExpired)
+ : WorkerHttpRequest(serviceProvider, worker, "REPLICATE", hdr, req, onExpired),
+ _databaseInfo(serviceProvider->config()->databaseInfo(req.at("database"))),
+ _chunk(req.at("chunk")),
+ _sourceWorker(req.at("worker")),
+ _sourceWorkerHost(req.at("worker_host")),
+ _sourceWorkerPort(req.at("worker_port")),
+ _sourceWorkerHostPort(_sourceWorkerHost + ":" + to_string(_sourceWorkerPort)),
+ _initialized(false),
+ _files(FileUtils::partitionedFiles(_databaseInfo, _chunk)),
+ _tmpFilePtr(nullptr),
+ _buf(0),
+ _bufSize(serviceProvider->config()->get("worker", "fs-buf-size-bytes")) {
+ if (worker == _sourceWorker) {
+ throw invalid_argument(CONTEXT + " workers are the same in the request.");
+ }
+ if (_sourceWorkerHost.empty()) {
+ throw invalid_argument(CONTEXT + " the DNS name or an IP address of the worker not provided.");
+ }
+}
+
+WorkerCreateReplicaHttpRequest::~WorkerCreateReplicaHttpRequest() {
+ replica::Lock lock(_mtx, CONTEXT);
+ _releaseResources(lock);
+}
+
+void WorkerCreateReplicaHttpRequest::getResult(json& result) const {
+ // No locking is needed here since the method is called only after
+ // the request is completed.
+ result["replica_info"] = _replicaInfo.toJson();
+}
+
+bool WorkerCreateReplicaHttpRequest::execute() {
+ LOGS(_log, LOG_LVL_DEBUG,
+ CONTEXT << " sourceWorkerHostPort: " << _sourceWorkerHostPort << " database: " << _databaseInfo.name
+ << " chunk: " << _chunk);
+
+ replica::Lock lock(_mtx, CONTEXT);
+ checkIfCancelling(lock, CONTEXT);
+
+ // Obtain the list of files to be migrated
+ //
+ // IMPLEMENTATION NOTES:
+ //
+ // - Note using the overloaded operator '/' which is used to form
+ // folders and files path names below. The operator will concatenate
+ // names and also insert a file separator for an operating system
+ // on which this code will get compiled.
+ //
+ // - Temporary file names at a destination folders are prepended with
+ // prefix '_' to prevent colliding with the canonical names. They will
+ // be renamed in the last step.
+ //
+ // - All operations with the file system namespace (creating new non-temporary
+ // files, checking for folders and files, renaming files, creating folders, etc.)
+ // are guarded by acquiring replica::Lock lock(_mtxDataFolderOperations) where it's needed.
+
+ WorkerHttpRequest::ErrorContext errorContext;
+
+ ///////////////////////////////////////////////////////
+ // Initialization phase (runs only once) //
+ ///////////////////////////////////////////////////////
+
+ if (!_initialized) {
+ _initialized = true;
+
+ fs::path const outDir =
+ fs::path(serviceProvider()->config()->get("worker", "data-dir")) / _databaseInfo.name;
+
+ vector tmpFiles;
+ vector outFiles;
+ for (auto&& file : _files) {
+ fs::path const tmpFile = outDir / ("_" + file);
+ tmpFiles.push_back(tmpFile);
+
+ fs::path const outFile = outDir / file;
+ outFiles.push_back(outFile);
+
+ _file2descr[file].inSizeBytes = 0;
+ _file2descr[file].outSizeBytes = 0;
+ _file2descr[file].mtime = 0;
+ _file2descr[file].cs = 0;
+ _file2descr[file].tmpFile = tmpFile;
+ _file2descr[file].outFile = outFile;
+ _file2descr[file].beginTransferTime = 0;
+ _file2descr[file].endTransferTime = 0;
+ }
+
+ // Check input files, check and sanitize the destination folder
+
+ boost::system::error_code ec;
+ {
+ replica::Lock dataFolderLock(_mtxDataFolderOperations, CONTEXT);
+
+ // Check for a presence of input files and calculate space requirement
+
+ uintmax_t totalBytes = 0; // the total number of bytes in all input files to be moved
+ map file2size; // the number of bytes in each file
+
+ for (auto&& file : _files) {
+ // Open the file on the remote server in the no-content-read mode
+ auto const inFilePtr = FileClient::stat(_serviceProvider, _sourceWorkerHost,
+ _sourceWorkerPort, _databaseInfo.name, file);
+ errorContext =
+ errorContext or
+ reportErrorIf(inFilePtr == nullptr, protocol::StatusExt::FILE_ROPEN,
+ "failed to open input file on remote worker: " + _sourceWorker + " (" +
+ _sourceWorkerHostPort + "), database: " + _databaseInfo.name +
+ ", file: " + file);
+ if (errorContext.failed) {
+ setStatus(lock, protocol::Status::FAILED, errorContext.extendedStatus);
+ return true;
+ }
+ file2size[file] = inFilePtr->size();
+ totalBytes += inFilePtr->size();
+ _file2descr[file].inSizeBytes = inFilePtr->size();
+ _file2descr[file].mtime = inFilePtr->mtime();
+ }
+
+ // Check and sanitize the output directory
+
+ bool const outDirExists = fs::exists(outDir, ec);
+ errorContext =
+ errorContext or
+ reportErrorIf(ec.value() != 0, protocol::StatusExt::FOLDER_STAT,
+ "failed to check the status of output directory: " + outDir.string()) or
+ reportErrorIf(!outDirExists, protocol::StatusExt::NO_FOLDER,
+ "the output directory doesn't exist: " + outDir.string());
+
+ // The files with canonical(!) names should NOT exist at the destination
+ // folder.
+ for (auto&& file : outFiles) {
+ fs::file_status const stat = fs::status(file, ec);
+ errorContext = errorContext or
+ reportErrorIf(stat.type() == fs::status_error, protocol::StatusExt::FILE_STAT,
+ "failed to check the status of output file: " + file.string()) or
+ reportErrorIf(fs::exists(stat), protocol::StatusExt::FILE_EXISTS,
+ "the output file already exists: " + file.string());
+ }
+
+ // Check if there are any files with the temporary names at the destination
+ // folder and if so then get rid of them.
+ for (auto&& file : tmpFiles) {
+ fs::file_status const stat = fs::status(file, ec);
+ errorContext =
+ errorContext or
+ reportErrorIf(stat.type() == fs::status_error, protocol::StatusExt::FILE_STAT,
+ "failed to check the status of temporary file: " + file.string());
+ if (fs::exists(stat)) {
+ fs::remove(file, ec);
+ errorContext = errorContext or
+ reportErrorIf(ec.value() != 0, protocol::StatusExt::FILE_DELETE,
+ "failed to remove temporary file: " + file.string());
+ }
+ }
+
+ // Make sure a file system at the destination has enough space
+ // to accommodate new files
+ //
+ // NOTE: this operation runs after cleaning up temporary files
+ fs::space_info const space = fs::space(outDir, ec);
+ errorContext =
+ errorContext or
+ reportErrorIf(
+ ec.value() != 0, protocol::StatusExt::SPACE_REQ,
+ "failed to obtaine space information at output folder: " + outDir.string()) or
+ reportErrorIf(space.available < totalBytes, protocol::StatusExt::NO_SPACE,
+ "not enough free space availble at output folder: " + outDir.string());
+
+ // Pre-create temporary files with the final size to assert disk space
+ // availability before filling these files with the actual payload.
+ for (auto&& file : _files) {
+ fs::path const tmpFile = _file2descr[file].tmpFile;
+
+ // Create a file of size 0
+ FILE* tmpFilePtr = fopen(tmpFile.string().c_str(), "wb");
+ errorContext = errorContext or
+ reportErrorIf(tmpFilePtr == nullptr, protocol::StatusExt::FILE_CREATE,
+ "failed to open/create temporary file: " + tmpFile.string() +
+ ", error: " + strerror(errno));
+ if (tmpFilePtr) {
+ fflush(tmpFilePtr);
+ fclose(tmpFilePtr);
+ }
+
+ // Resize the file (will be filled with \0)
+ fs::resize_file(tmpFile, file2size[file], ec);
+ errorContext = errorContext or
+ reportErrorIf(ec.value() != 0, protocol::StatusExt::FILE_RESIZE,
+ "failed to resize the temporary file: " + tmpFile.string());
+ }
+ }
+ if (errorContext.failed) {
+ setStatus(lock, protocol::Status::FAILED, errorContext.extendedStatus);
+ return true;
+ }
+
+ // Allocate the record buffer
+ _buf = new uint8_t[_bufSize];
+ if (_buf == nullptr) throw runtime_error(CONTEXT + " buffer allocation failed");
+
+ // Setup the iterator for the name of the very first file to be copied
+ _fileItr = _files.begin();
+ if (!_openFiles(lock)) return true;
+ }
+
+ // Copy the next record from the currently open remote file
+ // into the corresponding temporary files at the destination folder
+ // w/o acquiring the directory lock.
+ //
+ // NOTE: the while loop below is meant to skip files which are empty
+ while (_files.end() != _fileItr) {
+ // Copy the next record if any is available
+ size_t num = 0;
+ try {
+ num = _inFilePtr->read(_buf, _bufSize);
+ if (num) {
+ if (num == fwrite(_buf, sizeof(uint8_t), num, _tmpFilePtr)) {
+ // Update the descriptor (the number of bytes copied so far
+ // and the control sum)
+ _file2descr[*_fileItr].outSizeBytes += num;
+ uint64_t& cs = _file2descr[*_fileItr].cs;
+ for (uint8_t *ptr = _buf, *end = _buf + num; ptr != end; ++ptr) {
+ cs += *ptr;
+ }
+
+ // Keep updating this stats while copying the files
+ _file2descr[*_fileItr].endTransferTime = util::TimeUtils::now();
+ _updateInfo(lock);
+
+ // Keep copying the same file
+ return false;
+ }
+ errorContext = errorContext or reportErrorIf(true, protocol::StatusExt::FILE_WRITE,
+ "failed to write into temporary file: " +
+ _file2descr[*_fileItr].tmpFile.string() +
+ ", error: " + strerror(errno));
+ }
+ } catch (FileClientError const& ex) {
+ errorContext =
+ errorContext or
+ reportErrorIf(true, protocol::StatusExt::FILE_READ,
+ "failed to read input file from remote worker: " + _sourceWorker + " (" +
+ _sourceWorkerHostPort + "), database: " + _databaseInfo.name +
+ ", file: " + *_fileItr);
+ }
+
+ // Make sure the number of bytes copied from the remote server
+ // matches expectations.
+ errorContext =
+ errorContext or
+ reportErrorIf(_file2descr[*_fileItr].inSizeBytes != _file2descr[*_fileItr].outSizeBytes,
+ protocol::StatusExt::FILE_READ,
+ "short read of the input file from remote worker: " + _sourceWorker + " (" +
+ _sourceWorkerHostPort + "), database: " + _databaseInfo.name +
+ ", file: " + *_fileItr);
+ if (errorContext.failed) {
+ setStatus(lock, protocol::Status::FAILED, errorContext.extendedStatus);
+ _releaseResources(lock);
+ return true;
+ }
+
+ // Flush and close the current file
+ fflush(_tmpFilePtr);
+ fclose(_tmpFilePtr);
+ _tmpFilePtr = 0;
+
+ // Keep updating this stats after finishing to copy each file
+ _file2descr[*_fileItr].endTransferTime = util::TimeUtils::now();
+ _updateInfo(lock);
+
+ // Move the iterator to the name of the next file to be copied
+ ++_fileItr;
+ if (_files.end() != _fileItr) {
+ if (!_openFiles(lock)) {
+ _releaseResources(lock);
+ return true;
+ }
+ }
+ }
+
+ // Finalize the operation, de-allocate resources, etc.
+ return _finalize(lock);
+}
+
+bool WorkerCreateReplicaHttpRequest::_openFiles(replica::Lock const& lock) {
+ LOGS(_log, LOG_LVL_DEBUG,
+ CONTEXT << " sourceWorkerHostPort: " << _sourceWorkerHostPort << " database: " << _databaseInfo.name
+ << " chunk: " << _chunk << " file: " << *_fileItr);
+
+ WorkerHttpRequest::ErrorContext errorContext;
+
+ // Open the input file on the remote server
+ _inFilePtr = FileClient::open(_serviceProvider, _sourceWorkerHost, _sourceWorkerPort, _databaseInfo.name,
+ *_fileItr);
+ errorContext = errorContext or
+ reportErrorIf(_inFilePtr == nullptr, protocol::StatusExt::FILE_ROPEN,
+ "failed to open input file on remote worker: " + _sourceWorker + " (" +
+ _sourceWorkerHostPort + "), database: " + _databaseInfo.name +
+ ", file: " + *_fileItr);
+ if (errorContext.failed) {
+ setStatus(lock, protocol::Status::FAILED, errorContext.extendedStatus);
+ return false;
+ }
+
+ // Reopen a temporary output file locally in the 'append binary mode'
+ // then 'rewind' to the beginning of the file before writing into it.
+ fs::path const tmpFile = _file2descr[*_fileItr].tmpFile;
+
+ _tmpFilePtr = fopen(tmpFile.string().c_str(), "wb");
+ errorContext = errorContext or reportErrorIf(_tmpFilePtr == nullptr, protocol::StatusExt::FILE_OPEN,
+ "failed to open temporary file: " + tmpFile.string() +
+ ", error: " + strerror(errno));
+ if (errorContext.failed) {
+ setStatus(lock, protocol::Status::FAILED, errorContext.extendedStatus);
+ return false;
+ }
+ rewind(_tmpFilePtr);
+ _file2descr[*_fileItr].beginTransferTime = util::TimeUtils::now();
+ return true;
+}
+
+bool WorkerCreateReplicaHttpRequest::_finalize(replica::Lock const& lock) {
+ LOGS(_log, LOG_LVL_DEBUG,
+ CONTEXT << " sourceWorkerHostPort: " << _sourceWorkerHostPort << " database: " << _databaseInfo.name
+ << " chunk: " << _chunk);
+
+ // Unconditionally regardless of the completion of the file renaming attempt
+ _releaseResources(lock);
+
+ // Rename temporary files into the canonical ones
+ // Note that this operation changes the directory namespace in a way
+ // which may affect other users (like replica lookup operations, etc.). Hence we're
+ // acquiring the directory lock to guarantee a consistent view onto the folder.
+ replica::Lock dataFolderLock(_mtxDataFolderOperations, CONTEXT);
+
+ // ATTENTION: as per ISO/IEC 9945 the file rename operation will
+ // remove empty files. Not sure if this should be treated
+ // in a special way?
+ WorkerHttpRequest::ErrorContext errorContext;
+ boost::system::error_code ec;
+ for (auto&& file : _files) {
+ fs::path const tmpFile = _file2descr[file].tmpFile;
+ fs::path const outFile = _file2descr[file].outFile;
+
+ fs::rename(tmpFile, outFile, ec);
+ errorContext = errorContext or reportErrorIf(ec.value() != 0, protocol::StatusExt::FILE_RENAME,
+ "failed to rename file: " + tmpFile.string());
+ fs::last_write_time(outFile, _file2descr[file].mtime, ec);
+ errorContext = errorContext or reportErrorIf(ec.value() != 0, protocol::StatusExt::FILE_MTIME,
+ "failed to change 'mtime' of file: " + tmpFile.string());
+ }
+ if (errorContext.failed) {
+ setStatus(lock, protocol::Status::FAILED, errorContext.extendedStatus);
+ return true;
+ }
+ setStatus(lock, protocol::Status::SUCCESS);
+ return true;
+}
+
+void WorkerCreateReplicaHttpRequest::_updateInfo(replica::Lock const& lock) {
+ size_t totalInSizeBytes = 0;
+ size_t totalOutSizeBytes = 0;
+ ReplicaInfo::FileInfoCollection fileInfoCollection;
+ for (auto&& file : _files) {
+ fileInfoCollection.emplace_back(
+ ReplicaInfo::FileInfo({file, _file2descr[file].outSizeBytes, _file2descr[file].mtime,
+ to_string(_file2descr[file].cs), _file2descr[file].beginTransferTime,
+ _file2descr[file].endTransferTime, _file2descr[file].inSizeBytes}));
+ totalInSizeBytes += _file2descr[file].inSizeBytes;
+ totalOutSizeBytes += _file2descr[file].outSizeBytes;
+ }
+ ReplicaInfo::Status const status =
+ (_files.size() == fileInfoCollection.size()) and (totalInSizeBytes == totalOutSizeBytes)
+ ? ReplicaInfo::Status::COMPLETE
+ : ReplicaInfo::Status::INCOMPLETE;
+
+ // Fill in the info on the chunk before finishing the operation
+ WorkerCreateReplicaHttpRequest::_replicaInfo = ReplicaInfo(status, worker(), _databaseInfo.name, _chunk,
+ util::TimeUtils::now(), fileInfoCollection);
+}
+
+void WorkerCreateReplicaHttpRequest::_releaseResources(replica::Lock const& lock) {
+ // Drop a connection to the remote server
+ _inFilePtr.reset();
+
+ // Close the output file
+ if (_tmpFilePtr) {
+ fflush(_tmpFilePtr);
+ fclose(_tmpFilePtr);
+ _tmpFilePtr = nullptr;
+ }
+
+ // Release the record buffer
+ if (_buf) {
+ delete[] _buf;
+ _buf = nullptr;
+ }
+}
+
+} // namespace lsst::qserv::replica
diff --git a/src/replica/worker/WorkerCreateReplicaHttpRequest.h b/src/replica/worker/WorkerCreateReplicaHttpRequest.h
new file mode 100644
index 000000000..364a92934
--- /dev/null
+++ b/src/replica/worker/WorkerCreateReplicaHttpRequest.h
@@ -0,0 +1,186 @@
+/*
+ * LSST Data Management System
+ *
+ * This product includes software developed by the
+ * LSST Project (http://www.lsst.org/).
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the LSST License Statement and
+ * the GNU General Public License along with this program. If not,
+ * see .
+ */
+#ifndef LSST_QSERV_REPLICA_WORKERCREATEREPLICAHTTPREQUEST_H
+#define LSST_QSERV_REPLICA_WORKERCREATEREPLICAHTTPREQUEST_H
+
+// System headers
+#include
+#include
+#include
+#include