From 53510713134d94b412e0ccf4131f144fe90f9f5c Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 16 Nov 2023 04:06:14 +0000 Subject: [PATCH] Migrated Replication applications to monitor Qserv workers via HTTP These include the Controller's REST service as well as other applications. --- src/replica/ClusterHealthJob.cc | 8 +-- src/replica/ClusterHealthJob.h | 6 +- src/replica/HttpQservMonitorModule.cc | 81 ++++++++++++++++++++------- src/replica/QservWorkerPingApp.cc | 15 ++--- 4 files changed, 77 insertions(+), 33 deletions(-) diff --git a/src/replica/ClusterHealthJob.cc b/src/replica/ClusterHealthJob.cc index efe3c8375b..b0d97981e8 100644 --- a/src/replica/ClusterHealthJob.cc +++ b/src/replica/ClusterHealthJob.cc @@ -165,9 +165,9 @@ void ClusterHealthJob::startImpl(replica::Lock const& lock) { _requests[replicationRequest->id()] = replicationRequest; ++_numStarted; - auto const qservRequest = controller()->serviceProvider()->qservMgtServices()->echo( + auto const qservRequest = controller()->serviceProvider()->qservMgtServices()->echoOverHttp( worker, testData, id(), /* jobId */ - [self](TestEchoQservMgtRequest::Ptr request) { self->_onRequestFinish(request); }, + [self](TestEchoQservHttpMgtRequest::Ptr request) { self->_onRequestFinish(request); }, timeoutSec()); _qservRequests[replicationRequest->id()] = qservRequest; ++_numStarted; @@ -217,7 +217,7 @@ void ClusterHealthJob::_onRequestFinish(ServiceStatusRequest::Ptr const& request if (++_numFinished == _numStarted) finish(lock, ExtendedState::SUCCESS); } -void ClusterHealthJob::_onRequestFinish(TestEchoQservMgtRequest::Ptr const& request) { +void ClusterHealthJob::_onRequestFinish(TestEchoQservHttpMgtRequest::Ptr const& request) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[qserv]" << " worker=" << request->worker()); @@ -229,7 +229,7 @@ void ClusterHealthJob::_onRequestFinish(TestEchoQservMgtRequest::Ptr const& requ if (state() == State::FINISHED) return; _health.updateQservState(request->worker(), - request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS); + request->extendedState() == QservHttpMgtRequest::ExtendedState::SUCCESS); if (++_numFinished == _numStarted) finish(lock, ExtendedState::SUCCESS); } diff --git a/src/replica/ClusterHealthJob.h b/src/replica/ClusterHealthJob.h index b58eb61115..0747645a65 100644 --- a/src/replica/ClusterHealthJob.h +++ b/src/replica/ClusterHealthJob.h @@ -30,7 +30,7 @@ // Qserv headers #include "replica/Job.h" #include "replica/ServiceManagementRequest.h" -#include "replica/TestEchoQservMgtRequest.h" +#include "replica/TestEchoQservHttpMgtRequest.h" // This header declarations namespace lsst::qserv::replica { @@ -180,7 +180,7 @@ class ClusterHealthJob : public Job { * The callback function to be invoked on a completion of the Qserv worker probes. * @param request A pointer to a request. */ - void _onRequestFinish(TestEchoQservMgtRequest::Ptr const& request); + void _onRequestFinish(TestEchoQservHttpMgtRequest::Ptr const& request); // Input parameters @@ -192,7 +192,7 @@ class ClusterHealthJob : public Job { std::map _requests; /// Requests sent to the Qserv workers registered by their identifiers - std::map _qservRequests; + std::map _qservRequests; /// Result to be returned ClusterHealth _health; diff --git a/src/replica/HttpQservMonitorModule.cc b/src/replica/HttpQservMonitorModule.cc index 439ecf81f4..80bd044896 100644 --- a/src/replica/HttpQservMonitorModule.cc +++ b/src/replica/HttpQservMonitorModule.cc @@ -58,6 +58,8 @@ using namespace lsst::qserv; using namespace lsst::qserv::replica; using namespace lsst::qserv::replica::database::mysql; +// #define QSERV_SSI_CONTROL_PLANE 1 + namespace { /** @@ -213,22 +215,39 @@ json HttpQservMonitorModule::_worker() { debug(__func__, "worker=" + worker); debug(__func__, "timeout_sec=" + to_string(timeoutSec)); + json info; +#ifdef QSERV_SSI_CONTROL_PLANE string const noParentJobId; GetStatusQservMgtRequest::CallbackType const onFinish = nullptr; - auto const request = controller()->serviceProvider()->qservMgtServices()->status( worker, noParentJobId, taskSelector, onFinish, timeoutSec); request->wait(); - + if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) { + string const msg = + "worker request failed, error: " + QservMgtRequest::state2string(request->extendedState()); + throw http::Error(__func__, msg); + } + info = request->info(); +#else + string const noParentJobId; + GetStatusQservHttpMgtRequest::CallbackType const onFinish = nullptr; + auto const request = controller()->serviceProvider()->qservMgtServices()->statusOverHttp( + worker, noParentJobId, taskSelector, onFinish, timeoutSec); + request->wait(); + if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) { + string const msg = "worker request failed, error: " + + QservHttpMgtRequest::state2string(request->extendedState()); + throw http::Error(__func__, msg); + } + info = request->info(); +#endif json result = json::object(); result["status"] = json::object(); map> schedulers2chunks; set chunks; - - bool const success = request->extendedState() == QservMgtRequest::ExtendedState::SUCCESS; - json const& info = success ? request->info() : json(); _processWorkerInfo(worker, keepResources, info, result["status"], schedulers2chunks, chunks); + result["schedulers_to_chunks"] = _schedulers2chunks2json(schedulers2chunks); result["chunks"] = _chunkInfo(chunks); return result; @@ -244,21 +263,33 @@ json HttpQservMonitorModule::_workerConfig() { debug(__func__, "worker=" + worker); debug(__func__, "timeout_sec=" + to_string(timeoutSec)); + json info; +#ifdef QSERV_SSI_CONTROL_PLANE string const noParentJobId; GetConfigQservMgtRequest::CallbackType const onFinish = nullptr; - auto const request = controller()->serviceProvider()->qservMgtServices()->config(worker, noParentJobId, onFinish, timeoutSec); request->wait(); - if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) { - string const msg = "database operation failed, error: " + - QservMgtRequest::state2string(request->extendedState()); + string const msg = + "worker request failed, error: " + QservMgtRequest::state2string(request->extendedState()); throw http::Error(__func__, msg); } - json result = json::object(); - result["config"] = request->info(); - return result; + info = request->info(); +#else + string const noParentJobId; + GetConfigQservHttpMgtRequest::CallbackType const onFinish = nullptr; + auto const request = controller()->serviceProvider()->qservMgtServices()->configOverHttp( + worker, noParentJobId, onFinish, timeoutSec); + request->wait(); + if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) { + string const msg = "worker request failed, error: " + + QservHttpMgtRequest::state2string(request->extendedState()); + throw http::Error(__func__, msg); + } + info = request->info(); +#endif + return json::object({{"config", info}}); } json HttpQservMonitorModule::_workerDb() { @@ -271,21 +302,33 @@ json HttpQservMonitorModule::_workerDb() { debug(__func__, "worker=" + worker); debug(__func__, "timeout_sec=" + to_string(timeoutSec)); + json info; +#ifdef QSERV_SSI_CONTROL_PLANE string const noParentJobId; GetDbStatusQservMgtRequest::CallbackType const onFinish = nullptr; - auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatus( worker, noParentJobId, onFinish, timeoutSec); request->wait(); - if (request->extendedState() != QservMgtRequest::ExtendedState::SUCCESS) { - string const msg = "database operation failed, error: " + - QservMgtRequest::state2string(request->extendedState()); + string const msg = + "worker request failed, error: " + QservMgtRequest::state2string(request->extendedState()); throw http::Error(__func__, msg); } - json result = json::object(); - result["status"] = request->info(); - return result; + info = request->info(); +#else + string const noParentJobId; + GetDbStatusQservHttpMgtRequest::CallbackType const onFinish = nullptr; + auto const request = controller()->serviceProvider()->qservMgtServices()->databaseStatusOverHttp( + worker, noParentJobId, onFinish, timeoutSec); + request->wait(); + if (request->extendedState() != QservHttpMgtRequest::ExtendedState::SUCCESS) { + string const msg = "worker request failed, error: " + + QservHttpMgtRequest::state2string(request->extendedState()); + throw http::Error(__func__, msg); + } + info = request->info(); +#endif + return json::object({{"status", info}}); } json HttpQservMonitorModule::_czar() { diff --git a/src/replica/QservWorkerPingApp.cc b/src/replica/QservWorkerPingApp.cc index 1c7ed58b7c..ddefe202b9 100644 --- a/src/replica/QservWorkerPingApp.cc +++ b/src/replica/QservWorkerPingApp.cc @@ -32,7 +32,7 @@ // Qserv headers #include "replica/QservMgtServices.h" #include "replica/ServiceProvider.h" -#include "replica/TestEchoQservMgtRequest.h" +#include "replica/TestEchoQservHttpMgtRequest.h" #include "util/BlockPost.h" using namespace std; @@ -95,7 +95,7 @@ int QservWorkerPingApp::runImpl() { mutex mtx; condition_variable cv; - auto const logEvent = [&](unique_lock const& lock, TestEchoQservMgtRequest::Ptr const& request, + auto const logEvent = [&](unique_lock const& lock, TestEchoQservHttpMgtRequest::Ptr const& request, string const& event) { if (!_verbose) return; cout << "active: " << setw(6) << numActive << "success: " << setw(6) << numSuccess @@ -103,16 +103,17 @@ int QservWorkerPingApp::runImpl() { << " state=" << request->state2string() << " " << event << endl; }; - auto const onStart = [&](unique_lock const& lock, TestEchoQservMgtRequest::Ptr const& request) { + auto const onStart = [&](unique_lock const& lock, + TestEchoQservHttpMgtRequest::Ptr const& request) { numActive++; logEvent(lock, request, "started"); }; - auto const onFinish = [&](TestEchoQservMgtRequest::Ptr const& request) { + auto const onFinish = [&](TestEchoQservHttpMgtRequest::Ptr const& request) { { unique_lock lock(mtx); numActive--; - if (request->extendedState() == QservMgtRequest::SUCCESS) { + if (request->extendedState() == QservHttpMgtRequest::SUCCESS) { numSuccess++; } else { numFailed++; @@ -123,8 +124,8 @@ int QservWorkerPingApp::runImpl() { }; for (size_t i = 0; i < _numRequests; ++i) { - auto const request = serviceProvider()->qservMgtServices()->echo(_worker, _data, noParentJobId, - onFinish, _requestExpirationIvalSec); + auto const request = serviceProvider()->qservMgtServices()->echoOverHttp( + _worker, _data, noParentJobId, onFinish, _requestExpirationIvalSec); unique_lock lock(mtx); onStart(lock, request); cv.wait(lock, [&] { return numActive < _maxRequests; });