Skip to content

Commit

Permalink
Migrated Controller's REST services to monitor Qserv workers via HTTP
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Nov 16, 2023
1 parent b863893 commit 96b11f6
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 19 deletions.
79 changes: 60 additions & 19 deletions src/replica/HttpQservMonitorModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ using namespace lsst::qserv;
using namespace lsst::qserv::replica;
using namespace lsst::qserv::replica::database::mysql;

// #define QSERV_SSI_CONTROL_PLANE 1

namespace {

/**
Expand Down Expand Up @@ -213,22 +215,33 @@ json HttpQservMonitorModule::_worker() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetStatusQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->status(
worker, noParentJobId, taskSelector, onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#else
http::AsyncReq::CallbackType const onFinish = nullptr;
auto const request = controller()->serviceProvider()->qservMgtServices()->status(worker, taskSelector,
onFinish, timeoutSec);
request->wait();
info = _extractWorkerResponse(__func__, request);
#endif
json result = json::object();
result["status"] = json::object();

map<string, set<int>> schedulers2chunks;
set<int> chunks;

bool const success = request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS;
json const& info = success ? request->info() : json();
_processWorkerInfo(worker, keepResources, info, result["status"], schedulers2chunks, chunks);

result["schedulers_to_chunks"] = _schedulers2chunks2json(schedulers2chunks);
result["chunks"] = _chunkInfo(chunks);
return result;
Expand All @@ -244,21 +257,27 @@ json HttpQservMonitorModule::_workerConfig() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetConfigQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->config(worker, noParentJobId,
onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["config"] = request->info();
return result;
info = request->info();
#else
http::AsyncReq::CallbackType const onFinish = nullptr;
auto const request =
controller()->serviceProvider()->qservMgtServices()->config(worker, onFinish, timeoutSec);
request->wait();
info = _extractWorkerResponse(__func__, request);
#endif
return json::object({{"config", info}});
}

json HttpQservMonitorModule::_workerDb() {
Expand All @@ -271,21 +290,43 @@ json HttpQservMonitorModule::_workerDb() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetDbStatusQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatus(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["status"] = request->info();
return result;
info = request->info();
#else
http::AsyncReq::CallbackType const onFinish = nullptr;
auto const request =
controller()->serviceProvider()->qservMgtServices()->databaseStatus(worker, onFinish, timeoutSec);
request->wait();
info = _extractWorkerResponse(__func__, request);
#endif
return json::object({{"status", info}});
}

json HttpQservMonitorModule::_extractWorkerResponse(string const& func,
shared_ptr<http::AsyncReq> const& request) const {
if (request->state() != http::AsyncReq::State::FINISHED) {
string const msg = "worker request failed, state: " + http::AsyncReq::state2str(request->state()) +
", error: " + request->errorMessage() +
", code: " + to_string(request->responseCode());
throw http::Error(func, msg);
}
json info = json::parse(request->responseBody());
if (info.at("success").get<int>() == 0) {
string const msg = "worker reported error: " + info.at("error").get<string>();
throw http::Error(func, msg);
}
return info;
}

json HttpQservMonitorModule::_czar() {
Expand Down
16 changes: 16 additions & 0 deletions src/replica/HttpQservMonitorModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#include "replica/HttpModule.h"

// Forward declarations
namespace lsst::qserv::http {
struct AsyncReq;
} // namespace lsst::qserv::http

namespace lsst::qserv::wbase {
struct TaskSelector;
} // namespace lsst::qserv::wbase
Expand Down Expand Up @@ -113,6 +117,18 @@ class HttpQservMonitorModule : public HttpModule {
*/
nlohmann::json _workerDb();

/**
* Check the completion status of the request and extract a result reported by
* a worker service that was called.
* @param func The calling context (for error reporting).
* @param request The request to be inspected.
* @return The result object returned by the called service.
* @throw http::Error For any errors ancountered during request processing or
* reported by the worker service in the response payload.
*/
nlohmann::json _extractWorkerResponse(std::string const& func,
std::shared_ptr<http::AsyncReq> const& request) const;

/**
* Process a request for extracting various status info of Czar.
*/
Expand Down

0 comments on commit 96b11f6

Please sign in to comment.