Skip to content

Commit

Permalink
TEMPORARY COMMIT: work in progress on the HTTP-based backend of the R…
Browse files Browse the repository at this point in the history
…eplication worker service
  • Loading branch information
iagaponenko committed Dec 13, 2024
1 parent 7ef9789 commit 62953c7
Show file tree
Hide file tree
Showing 29 changed files with 4,832 additions and 28 deletions.
5 changes: 4 additions & 1 deletion src/replica/apps/WorkerApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
#include "replica/services/ServiceProvider.h"
#include "replica/util/FileUtils.h"
#include "replica/worker/FileServer.h"
#include "replica/worker/WorkerProcessor.h"
#include "replica/worker/WorkerHttpSvc.h"
#include "replica/worker/WorkerServer.h"

// LSST headers
Expand Down Expand Up @@ -113,6 +113,9 @@ int WorkerApp::runImpl() {
auto const reqProcSvr = WorkerServer::create(serviceProvider(), worker);
thread reqProcSvrThread([reqProcSvr]() { reqProcSvr->run(); });

auto const reqProcHttpSvr = WorkerHttpSvc::create(serviceProvider(), worker);
thread reqProcHttpSvrThread([reqProcHttpSvr]() { reqProcHttpSvr->run(); });

auto const fileSvr = FileServer::create(serviceProvider(), worker);
thread fileSvrThread([fileSvr]() { fileSvr->run(); });

Expand Down
1 change: 1 addition & 0 deletions src/replica/proto/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ add_library(replica_proto OBJECT)
target_sources(replica_proto PRIVATE
${REPLICA_PB_SRCS}
${REPLICA_PB_HDRS}
Protocol.cc
)
134 changes: 134 additions & 0 deletions src/replica/proto/Protocol.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "replica/proto/Protocol.h"

// System headers
#include <stdexcept>

using namespace std;

namespace lsst::qserv::replica::protocol {

std::string toString(Status status) {
switch (status) {
case Status::CREATED:
return "CREATED";
case Status::SUCCESS:
return "SUCCESS";
case Status::QUEUED:
return "QUEUED";
case Status::IN_PROGRESS:
return "IN_PROGRESS";
case Status::IS_CANCELLING:
return "IS_CANCELLING";
case Status::BAD:
return "BAD";
case Status::FAILED:
return "FAILED";
case Status::CANCELLED:
return "CANCELLED";
default:
throw logic_error("Unhandled status: " + to_string(static_cast<int>(status)));
}
}

std::string toString(StatusExt extendedStatus) {
switch (extendedStatus) {
case StatusExt::NONE:
return "NONE";
case StatusExt::INVALID_PARAM:
return "INVALID_PARAM";
case StatusExt::INVALID_ID:
return "INVALID_ID";
case StatusExt::FOLDER_STAT:
return "FOLDER_STAT";
case StatusExt::FOLDER_CREATE:
return "FOLDER_CREATE";
case StatusExt::FILE_STAT:
return "FILE_STAT";
case StatusExt::FILE_SIZE:
return "FILE_SIZE";
case StatusExt::FOLDER_READ:
return "FOLDER_READ";
case StatusExt::FILE_READ:
return "FILE_READ";
case StatusExt::FILE_ROPEN:
return "FILE_ROPEN";
case StatusExt::FILE_CREATE:
return "FILE_CREATE";
case StatusExt::FILE_OPEN:
return "FILE_OPEN";
case StatusExt::FILE_RESIZE:
return "FILE_RESIZE";
case StatusExt::FILE_WRITE:
return "FILE_WRITE";
case StatusExt::FILE_COPY:
return "FILE_COPY";
case StatusExt::FILE_DELETE:
return "FILE_DELETE";
case StatusExt::FILE_RENAME:
return "FILE_RENAME";
case StatusExt::FILE_EXISTS:
return "FILE_EXISTS";
case StatusExt::SPACE_REQ:
return "SPACE_REQ";
case StatusExt::NO_FOLDER:
return "NO_FOLDER";
case StatusExt::NO_FILE:
return "NO_FILE";
case StatusExt::NO_ACCESS:
return "NO_ACCESS";
case StatusExt::NO_SPACE:
return "NO_SPACE";
case StatusExt::FILE_MTIME:
return "FILE_MTIME";
case StatusExt::MYSQL_ERROR:
return "MYSQL_ERROR";
case StatusExt::LARGE_RESULT:
return "LARGE_RESULT";
case StatusExt::NO_SUCH_TABLE:
return "NO_SUCH_TABLE";
case StatusExt::NOT_PARTITIONED_TABLE:
return "NOT_PARTITIONED_TABLE";
case StatusExt::NO_SUCH_PARTITION:
return "NO_SUCH_PARTITION";
case StatusExt::MULTIPLE:
return "MULTIPLE";
case StatusExt::OTHER_EXCEPTION:
return "OTHER_EXCEPTION";
case StatusExt::FOREIGN_INSTANCE:
return "FOREIGN_INSTANCE";
case StatusExt::DUPLICATE_KEY:
return "DUPLICATE_KEY";
case StatusExt::CANT_DROP_KEY:
return "CANT_DROP_KEY";
default:
throw logic_error("Unhandled extended status: " + to_string(static_cast<int>(extendedStatus)));
}
}

string toString(Status status, StatusExt extendedStatus) {
return toString(status) + "::" + toString(extendedStatus);
}

} // namespace lsst::qserv::replica::protocol
136 changes: 136 additions & 0 deletions src/replica/proto/Protocol.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_REPLICA_PROTOCOL_H
#define LSST_QSERV_REPLICA_PROTOCOL_H

// System headers
#include <string>

// Third party headers
#include "nlohmann/json.hpp"

// This header declarations
namespace lsst::qserv::replica::protocol {

/// Subtypes of the SQL requests.
enum class SqlRequestType : int {

QUERY = 0,
CREATE_DATABASE = 1,
DROP_DATABASE = 2,
ENABLE_DATABASE = 3, ///< in Qserv
DISABLE_DATABASE = 4, ///< in Qserv
GRANT_ACCESS = 5,
CREATE_TABLE = 6,
DROP_TABLE = 7,
REMOVE_TABLE_PARTITIONING = 8,
DROP_TABLE_PARTITION = 9,
GET_TABLE_INDEX = 10,
CREATE_TABLE_INDEX = 11,
DROP_TABLE_INDEX = 12,
ALTER_TABLE = 13,
TABLE_ROW_STATS = 14
};

/// Types of the table indexes specified in the index management requests requests.
enum class SqlIndexSpec : int { DEFAULT = 1, UNIQUE = 2, FULLTEXT = 3, SPATIAL = 4 };

/// Status values returned by all request related to operations with
/// replicas. Request management operations always return messages whose types
/// match the return types of the corresponding (original) replica-related requests.
/// Service management requests have their own set of status values.
///
enum class Status : int {
CREATED = 0,
SUCCESS = 1,
QUEUED = 2,
IN_PROGRESS = 3,
IS_CANCELLING = 4,
BAD = 5,
FAILED = 6,
CANCELLED = 7
};

enum class StatusExt : int {
NONE = 0, ///< Unspecified problem.
INVALID_PARAM = 1, ///< Invalid parameter(s) of a request.
INVALID_ID = 2, ///< An invalid request identifier.
FOLDER_STAT = 4, ///< Failed to obtain fstat() for a folder.
FOLDER_CREATE = 5, ///< Failed to create a folder.
FILE_STAT = 6, ///< Failed to obtain fstat() for a file.
FILE_SIZE = 7, ///< Failed to obtain a size of a file.
FOLDER_READ = 8, ///< Failed to read the contents of a folder.
FILE_READ = 9, ///< Failed to read the contents of a file.
FILE_ROPEN = 10, ///< Failed to open a remote file.
FILE_CREATE = 11, ///< Failed to create a file.
FILE_OPEN = 12, ///< Failed to open a file.
FILE_RESIZE = 13, ///< Failed to resize a file.
FILE_WRITE = 14, ///< Failed to write into a file.
FILE_COPY = 15, ///< Failed to copy a file.
FILE_DELETE = 16, ///< Failed to delete a file.
FILE_RENAME = 17, ///< Failed to rename a file.
FILE_EXISTS = 18, ///< File already exists.
SPACE_REQ = 19, ///< Space availability check failed.
NO_FOLDER = 20, ///< Folder doesn't exist.
NO_FILE = 21, ///< File doesn't exist.
NO_ACCESS = 22, ///< No access to a file or a folder.
NO_SPACE = 23, ///< No space left on a device as required by an operation.
FILE_MTIME = 24, ///< Get/set 'mtime' operation failed.
MYSQL_ERROR = 25, ///< General MySQL error (other than any specific ones listed here).
LARGE_RESULT = 26, ///< Result exceeds a limit set in a request.
NO_SUCH_TABLE = 27, ///< No table found while performing a MySQL operation.
NOT_PARTITIONED_TABLE = 28, ///< The table is not MySQL partitioned as it was expected.
NO_SUCH_PARTITION = 29, ///< No MySQL partition found in a table as it was expected.
MULTIPLE = 30, ///< Multiple unspecified errors encountered when processing a request.
OTHER_EXCEPTION = 31, ///< Other exception not listed here.
FOREIGN_INSTANCE = 32, ///< Detected a request from a Controller serving an unrelated Qserv.
DUPLICATE_KEY = 33, ///< Duplicate key found when creating an index or altering a table schema.
CANT_DROP_KEY = 34 ///< Can't drop a field or a key which doesn't exist.
};

/// @return the string representation of the status
std::string toString(Status status);

/// @return the string representation of the extended status
std::string toString(StatusExt extendedStatus);

/// @return the string representation of the full status
std::string toString(Status status, StatusExt extendedStatus);

/// Status of a replica.
enum class ReplicaStatus : int { NOT_FOUND = 0, CORRUPT = 1, INCOMPLETE = 2, COMPLETE = 3 };

/// Status of a service.
enum class ServiceState : int { SUSPEND_IN_PROGRESS = 0, SUSPENDED = 1, RUNNING = 2 };

/// The header to be sent with the requests processed through the worker's queueing system.
struct QueuedRequestHdr {
std::string id;
int priority;
unsigned int timeout;
QueuedRequestHdr(std::string const& id_, int priority_, unsigned int timeout_)
: id(id_), priority(priority_), timeout(timeout_) {}
nlohmann::json toJson() const { return {{"id", id}, {"priority", priority}, {"timeout", timeout}}; };
};

} // namespace lsst::qserv::replica::protocol

#endif // LSST_QSERV_REPLICA_PROTOCOL_H
1 change: 1 addition & 0 deletions src/replica/util/Performance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "lsst/log/Log.h"

using namespace std;
using json = nlohmann::json;

namespace {

Expand Down
40 changes: 13 additions & 27 deletions src/replica/util/Performance.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
#include <memory>
#include <ostream>

// Third party headers
#include "nlohmann/json.hpp"

// Forward declarations
namespace lsst::qserv::replica {
class ProtocolPerformance;
Expand All @@ -56,53 +59,35 @@ class Performance {
* All (but the request creation one) timestamps will be initialized with 0.
*/
Performance();

Performance(Performance const&) = default;
Performance& operator=(Performance const&) = default;

~Performance() = default;

/**
* Update object state with counters from the protocol buffer object
*
* @param workerPerformanceInfo
* counters to be carried over into an internal state
* @param workerPerformanceInfo counters to be carried over into an internal state
*/
void update(ProtocolPerformance const& workerPerformanceInfo);

/**
* Update the Controller's 'start' time
*
* @return
* the previous state of the counter
* @return the previous state of the counter
*/
uint64_t setUpdateStart();

/**
* Update the Controller's 'finish' time
*
* @return
* the previous state of the counter
* @return the previous state of the counter
*/
uint64_t setUpdateFinish();

/// Created by the Controller
uint64_t c_create_time;

/// Started by the Controller
uint64_t c_start_time;

/// Received by a worker service
uint64_t w_receive_time;

/// Execution started by a worker service
uint64_t w_start_time;

/// Execution finished by a worker service
uint64_t w_finish_time;

/// A subscriber notified by the Controller
uint64_t c_finish_time;
uint64_t c_create_time; ///< Created by the Controller
uint64_t c_start_time; ///< Started by the Controller
uint64_t w_receive_time; ///< Received by a worker service
uint64_t w_start_time; ///< Execution started by a worker service
uint64_t w_finish_time; ///< Execution finished by a worker service
uint64_t c_finish_time; ///< A subscriber notified by the Controller
};

/// Overloaded streaming operator for class Performance
Expand All @@ -127,6 +112,7 @@ class WorkerPerformance {
uint64_t setUpdateFinish();

std::unique_ptr<ProtocolPerformance> info() const;
nlohmann::json toJson() const;

std::atomic<uint64_t> receive_time; ///< Received by a worker service
std::atomic<uint64_t> start_time; ///< Execution started by a worker service
Expand Down
Loading

0 comments on commit 62953c7

Please sign in to comment.