Skip to content

Commit

Permalink
Migrated Controller's REST services to monitor Qserv workers via HTTP
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Nov 18, 2023
1 parent 2ca61cb commit dcfc8b8
Showing 1 changed file with 62 additions and 19 deletions.
81 changes: 62 additions & 19 deletions src/replica/HttpQservMonitorModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ using namespace lsst::qserv;
using namespace lsst::qserv::replica;
using namespace lsst::qserv::replica::database::mysql;

// #define QSERV_SSI_CONTROL_PLANE 1

namespace {

/**
Expand Down Expand Up @@ -213,22 +215,39 @@ json HttpQservMonitorModule::_worker() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetStatusQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->status(
worker, noParentJobId, taskSelector, onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#else
string const noParentJobId;
GetStatusQservHttpMgtRequest::CallbackType const onFinish = nullptr;
auto const request = controller()->serviceProvider()->qservMgtServices()->statusOverHttp(
worker, noParentJobId, taskSelector, onFinish, timeoutSec);
request->wait();
if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) {
string const msg = "worker request failed, error: " +
QservHttpMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#endif
json result = json::object();
result["status"] = json::object();

map<string, set<int>> schedulers2chunks;
set<int> chunks;

bool const success = request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS;
json const& info = success ? request->info() : json();
_processWorkerInfo(worker, keepResources, info, result["status"], schedulers2chunks, chunks);

result["schedulers_to_chunks"] = _schedulers2chunks2json(schedulers2chunks);
result["chunks"] = _chunkInfo(chunks);
return result;
Expand All @@ -244,21 +263,33 @@ json HttpQservMonitorModule::_workerConfig() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetConfigQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->config(worker, noParentJobId,
onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["config"] = request->info();
return result;
info = request->info();
#else
string const noParentJobId;
GetConfigQservHttpMgtRequest::CallbackType const onFinish = nullptr;
auto const request = controller()->serviceProvider()->qservMgtServices()->configOverHttp(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();
if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) {
string const msg = "worker request failed, error: " +
QservHttpMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#endif
return json::object({{"config", info}});
}

json HttpQservMonitorModule::_workerDb() {
Expand All @@ -271,21 +302,33 @@ json HttpQservMonitorModule::_workerDb() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetDbStatusQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatus(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["status"] = request->info();
return result;
info = request->info();
#else
string const noParentJobId;
GetDbStatusQservHttpMgtRequest::CallbackType const onFinish = nullptr;
auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatusOverHttp(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();
if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) {
string const msg = "worker request failed, error: " +
QservHttpMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#endif
return json::object({{"status", info}});
}

json HttpQservMonitorModule::_czar() {
Expand Down

0 comments on commit dcfc8b8

Please sign in to comment.