Skip to content

Commit

Permalink
Migrated Replication applications to monitor Qserv workers via HTTP
Browse files Browse the repository at this point in the history
These include the Controller's REST service as well as other applications.
  • Loading branch information
iagaponenko committed Nov 18, 2023
1 parent 59a7bea commit 0911c9f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 33 deletions.
8 changes: 4 additions & 4 deletions src/replica/ClusterHealthJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ void ClusterHealthJob::startImpl(replica::Lock const& lock) {
_requests[replicationRequest->id()] = replicationRequest;
++_numStarted;

auto const qservRequest = controller()->serviceProvider()->qservMgtServices()->echo(
auto const qservRequest = controller()->serviceProvider()->qservMgtServices()->echoOverHttp(
worker, testData, id(), /* jobId */
[self](TestEchoQservMgtRequest::Ptr request) { self->_onRequestFinish(request); },
[self](TestEchoQservHttpMgtRequest::Ptr request) { self->_onRequestFinish(request); },
timeoutSec());
_qservRequests[replicationRequest->id()] = qservRequest;
++_numStarted;
Expand Down Expand Up @@ -217,7 +217,7 @@ void ClusterHealthJob::_onRequestFinish(ServiceStatusRequest::Ptr const& request
if (++_numFinished == _numStarted) finish(lock, ExtendedState::SUCCESS);
}

void ClusterHealthJob::_onRequestFinish(TestEchoQservMgtRequest::Ptr const& request) {
void ClusterHealthJob::_onRequestFinish(TestEchoQservHttpMgtRequest::Ptr const& request) {
LOGS(_log, LOG_LVL_DEBUG,
context() << __func__ << "[qserv]"
<< " worker=" << request->worker());
Expand All @@ -229,7 +229,7 @@ void ClusterHealthJob::_onRequestFinish(TestEchoQservMgtRequest::Ptr const& requ
if (state() == State::FINISHED) return;

_health.updateQservState(request->worker(),
request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS);
request->extendedState() == QservHttpMgtRequest::ExtendedState::SUCCESS);

if (++_numFinished == _numStarted) finish(lock, ExtendedState::SUCCESS);
}
Expand Down
6 changes: 3 additions & 3 deletions src/replica/ClusterHealthJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
// Qserv headers
#include "replica/Job.h"
#include "replica/ServiceManagementRequest.h"
#include "replica/TestEchoQservMgtRequest.h"
#include "replica/TestEchoQservHttpMgtRequest.h"

// This header declarations
namespace lsst::qserv::replica {
Expand Down Expand Up @@ -180,7 +180,7 @@ class ClusterHealthJob : public Job {
* The callback function to be invoked on a completion of the Qserv worker probes.
* @param request A pointer to a request.
*/
void _onRequestFinish(TestEchoQservMgtRequest::Ptr const& request);
void _onRequestFinish(TestEchoQservHttpMgtRequest::Ptr const& request);

// Input parameters

Expand All @@ -192,7 +192,7 @@ class ClusterHealthJob : public Job {
std::map<std::string, ServiceStatusRequest::Ptr> _requests;

/// Requests sent to the Qserv workers registered by their identifiers
std::map<std::string, TestEchoQservMgtRequest::Ptr> _qservRequests;
std::map<std::string, TestEchoQservHttpMgtRequest::Ptr> _qservRequests;

/// Result to be returned
ClusterHealth _health;
Expand Down
81 changes: 62 additions & 19 deletions src/replica/HttpQservMonitorModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ using namespace lsst::qserv;
using namespace lsst::qserv::replica;
using namespace lsst::qserv::replica::database::mysql;

// #define QSERV_SSI_CONTROL_PLANE 1

namespace {

/**
Expand Down Expand Up @@ -213,22 +215,39 @@ json HttpQservMonitorModule::_worker() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetStatusQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->status(
worker, noParentJobId, taskSelector, onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#else
string const noParentJobId;
GetStatusQservHttpMgtRequest::CallbackType const onFinish = nullptr;
auto const request = controller()->serviceProvider()->qservMgtServices()->statusOverHttp(
worker, noParentJobId, taskSelector, onFinish, timeoutSec);
request->wait();
if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) {
string const msg = "worker request failed, error: " +
QservHttpMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#endif
json result = json::object();
result["status"] = json::object();

map<string, set<int>> schedulers2chunks;
set<int> chunks;

bool const success = request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS;
json const& info = success ? request->info() : json();
_processWorkerInfo(worker, keepResources, info, result["status"], schedulers2chunks, chunks);

result["schedulers_to_chunks"] = _schedulers2chunks2json(schedulers2chunks);
result["chunks"] = _chunkInfo(chunks);
return result;
Expand All @@ -244,21 +263,33 @@ json HttpQservMonitorModule::_workerConfig() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetConfigQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->config(worker, noParentJobId,
onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["config"] = request->info();
return result;
info = request->info();
#else
string const noParentJobId;
GetConfigQservHttpMgtRequest::CallbackType const onFinish = nullptr;
auto const request = controller()->serviceProvider()->qservMgtServices()->configOverHttp(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();
if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) {
string const msg = "worker request failed, error: " +
QservHttpMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#endif
return json::object({{"config", info}});
}

json HttpQservMonitorModule::_workerDb() {
Expand All @@ -271,21 +302,33 @@ json HttpQservMonitorModule::_workerDb() {
debug(__func__, "worker=" + worker);
debug(__func__, "timeout_sec=" + to_string(timeoutSec));

json info;
#ifdef QSERV_SSI_CONTROL_PLANE
string const noParentJobId;
GetDbStatusQservMgtRequest::CallbackType const onFinish = nullptr;

auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatus(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();

if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) {
string const msg = "database operation failed, error: " +
QservMgtRequest::state2string(request->extendedState());
string const msg =
"worker request failed, error: " + QservMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
json result = json::object();
result["status"] = request->info();
return result;
info = request->info();
#else
string const noParentJobId;
GetDbStatusQservHttpMgtRequest::CallbackType const onFinish = nullptr;
auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatusOverHttp(
worker, noParentJobId, onFinish, timeoutSec);
request->wait();
if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) {
string const msg = "worker request failed, error: " +
QservHttpMgtRequest::state2string(request->extendedState());
throw http::Error(__func__, msg);
}
info = request->info();
#endif
return json::object({{"status", info}});
}

json HttpQservMonitorModule::_czar() {
Expand Down
15 changes: 8 additions & 7 deletions src/replica/QservWorkerPingApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
// Qserv headers
#include "replica/QservMgtServices.h"
#include "replica/ServiceProvider.h"
#include "replica/TestEchoQservMgtRequest.h"
#include "replica/TestEchoQservHttpMgtRequest.h"
#include "util/BlockPost.h"

using namespace std;
Expand Down Expand Up @@ -95,24 +95,25 @@ int QservWorkerPingApp::runImpl() {
mutex mtx;
condition_variable cv;

auto const logEvent = [&](unique_lock<mutex> const& lock, TestEchoQservMgtRequest::Ptr const& request,
auto const logEvent = [&](unique_lock<mutex> const& lock, TestEchoQservHttpMgtRequest::Ptr const& request,
string const& event) {
if (!_verbose) return;
cout << "active: " << setw(6) << numActive << "success: " << setw(6) << numSuccess
<< "failed: " << setw(6) << numFailed << " id=" << request->id()
<< " state=" << request->state2string() << " " << event << endl;
};

auto const onStart = [&](unique_lock<mutex> const& lock, TestEchoQservMgtRequest::Ptr const& request) {
auto const onStart = [&](unique_lock<mutex> const& lock,
TestEchoQservHttpMgtRequest::Ptr const& request) {
numActive++;
logEvent(lock, request, "started");
};

auto const onFinish = [&](TestEchoQservMgtRequest::Ptr const& request) {
auto const onFinish = [&](TestEchoQservHttpMgtRequest::Ptr const& request) {
{
unique_lock<mutex> lock(mtx);
numActive--;
if (request->extendedState() == QservMgtRequest::SUCCESS) {
if (request->extendedState() == QservHttpMgtRequest::SUCCESS) {
numSuccess++;
} else {
numFailed++;
Expand All @@ -123,8 +124,8 @@ int QservWorkerPingApp::runImpl() {
};

for (size_t i = 0; i < _numRequests; ++i) {
auto const request = serviceProvider()->qservMgtServices()->echo(_worker, _data, noParentJobId,
onFinish, _requestExpirationIvalSec);
auto const request = serviceProvider()->qservMgtServices()->echoOverHttp(
_worker, _data, noParentJobId, onFinish, _requestExpirationIvalSec);
unique_lock<mutex> lock(mtx);
onStart(lock, request);
cv.wait(lock, [&] { return numActive < _maxRequests; });
Expand Down

0 comments on commit 0911c9f

Please sign in to comment.