From 4aa6d8ef0a53c0f3987d718a6eb5fa6992160d5c Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Thu, 21 Nov 2024 14:45:44 -0800 Subject: [PATCH] Migrated: ReplicationRequest DeleteRequest FindRequest FindAllRequest EchoRequest DirectorIndexRequest Sql*Request --- src/replica/apps/ControllerApp.cc | 95 ++++------- src/replica/apps/MessengerTestApp.cc | 4 +- src/replica/contr/Controller.cc | 158 ------------------ src/replica/contr/Controller.h | 108 ------------ src/replica/contr/HttpQservSqlModule.cc | 2 +- src/replica/jobs/CreateReplicaJob.cc | 47 ++---- src/replica/jobs/DeleteReplicaJob.cc | 39 ++--- src/replica/jobs/DeleteWorkerJob.cc | 39 ++--- src/replica/jobs/DirectorIndexJob.cc | 5 +- src/replica/jobs/FindAllJob.cc | 15 +- src/replica/jobs/FixUpJob.cc | 38 +---- src/replica/jobs/SqlAlterTablesJob.cc | 15 +- src/replica/jobs/SqlCreateDbJob.cc | 15 +- src/replica/jobs/SqlCreateIndexesJob.cc | 15 +- src/replica/jobs/SqlDeleteDbJob.cc | 15 +- src/replica/jobs/SqlDisableDbJob.cc | 15 +- src/replica/jobs/SqlEnableDbJob.cc | 15 +- src/replica/jobs/SqlGrantAccessJob.cc | 14 +- src/replica/jobs/SqlQueryJob.cc | 14 +- src/replica/jobs/VerifyJob.cc | 39 ++--- src/replica/requests/DeleteRequest.cc | 30 ++-- src/replica/requests/DeleteRequest.h | 33 ++-- src/replica/requests/DirectorIndexRequest.cc | 71 ++++---- src/replica/requests/DirectorIndexRequest.h | 34 ++-- src/replica/requests/EchoRequest.cc | 38 ++--- src/replica/requests/EchoRequest.h | 23 +-- src/replica/requests/FindAllRequest.cc | 39 ++--- src/replica/requests/FindAllRequest.h | 33 ++-- src/replica/requests/FindRequest.cc | 41 ++--- src/replica/requests/FindRequest.h | 25 +-- src/replica/requests/ReplicationRequest.cc | 40 ++--- src/replica/requests/ReplicationRequest.h | 37 ++-- src/replica/requests/SqlAlterTablesRequest.cc | 30 ++-- src/replica/requests/SqlAlterTablesRequest.h | 38 +++-- src/replica/requests/SqlCreateDbRequest.cc | 29 ++-- src/replica/requests/SqlCreateDbRequest.h | 33 ++-- .../requests/SqlCreateIndexesRequest.cc | 36 ++-- .../requests/SqlCreateIndexesRequest.h | 42 ++--- src/replica/requests/SqlDeleteDbRequest.cc | 29 ++-- src/replica/requests/SqlDeleteDbRequest.h | 34 ++-- src/replica/requests/SqlDisableDbRequest.cc | 28 ++-- src/replica/requests/SqlDisableDbRequest.h | 33 ++-- src/replica/requests/SqlEnableDbRequest.cc | 29 ++-- src/replica/requests/SqlEnableDbRequest.h | 34 ++-- src/replica/requests/SqlGrantAccessRequest.cc | 33 ++-- src/replica/requests/SqlGrantAccessRequest.h | 37 ++-- src/replica/requests/SqlQueryRequest.cc | 29 ++-- src/replica/requests/SqlQueryRequest.h | 41 +++-- 48 files changed, 652 insertions(+), 1034 deletions(-) diff --git a/src/replica/apps/ControllerApp.cc b/src/replica/apps/ControllerApp.cc index 628621cf7..e4013511e 100644 --- a/src/replica/apps/ControllerApp.cc +++ b/src/replica/apps/ControllerApp.cc @@ -525,31 +525,29 @@ int ControllerApp::runImpl() { Request::Ptr request; if ("REPLICATE" == _requestType) { - request = controller->replicate( - _workerName, _sourceWorkerName, _databaseName, _chunkNumber, - [](Request::Ptr const& request_) { request_->print(); }, _priority, !_doNotTrackRequest, - _allowDuplicates); - + request = ReplicationRequest::create( + controller, _workerName, _sourceWorkerName, _databaseName, _chunkNumber, + [](ReplicationRequest::Ptr const& request_) { request_->print(); }, _priority, + !_doNotTrackRequest, _allowDuplicates); } else if ("DELETE" == _requestType) { - request = controller->deleteReplica(_workerName, _databaseName, _chunkNumber, Request::defaultPrinter, - _priority, !_doNotTrackRequest, _allowDuplicates); - + request = DeleteRequest::create(controller, _workerName, _databaseName, _chunkNumber, + Request::defaultPrinter, _priority, !_doNotTrackRequest, + _allowDuplicates); } else if ("FIND" == _requestType) { - request = controller->findReplica(_workerName, _databaseName, _chunkNumber, Request::defaultPrinter, - _priority, _computeCheckSum, !_doNotTrackRequest); - + request = FindRequest::create(controller, _workerName, _databaseName, _chunkNumber, + Request::defaultPrinter, _priority, _computeCheckSum, + !_doNotTrackRequest); } else if ("FIND_ALL" == _requestType) { - request = controller->findAllReplicas(_workerName, _databaseName, !_doNotSaveReplicaInfo, - Request::defaultPrinter, _priority, !_doNotTrackRequest); - + request = FindAllRequest::create(controller, _workerName, _databaseName, !_doNotSaveReplicaInfo, + Request::defaultPrinter, _priority, !_doNotTrackRequest); } else if ("ECHO" == _requestType) { - request = controller->echo(_workerName, _echoData, _echoDelayMilliseconds, Request::defaultPrinter, - _priority, !_doNotTrackRequest); - + request = EchoRequest::create(controller, _workerName, _echoData, _echoDelayMilliseconds, + Request::defaultPrinter, _priority, !_doNotTrackRequest); } else if ("INDEX" == _requestType) { bool const hasTransactions = _transactionId != numeric_limits::max(); - request = controller->directorIndex( - _workerName, _sqlDatabase, _sqlTable, _chunkNumber, hasTransactions, _transactionId, + request = DirectorIndexRequest::create( + controller, _workerName, _sqlDatabase, _sqlTable, _chunkNumber, hasTransactions, + _transactionId, [&](DirectorIndexRequest::Ptr const& request_) { Request::defaultPrinter(request_); auto const& responseData = request_->responseData(); @@ -570,85 +568,70 @@ int ControllerApp::runImpl() { } }, _priority, !_doNotTrackRequest); - } else if ("SQL_ALTER_TABLES" == _requestType) { vector const tables = {_sqlTable}; - request = controller->sqlAlterTables(_workerName, _sqlDatabase, tables, _sqlAlterSpec, - SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - + request = SqlAlterTablesRequest::create(controller, _workerName, _sqlDatabase, tables, _sqlAlterSpec, + SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); } else if ("SQL_QUERY" == _requestType) { - request = controller->sqlQuery(_workerName, _sqlQuery, _sqlUser, _sqlPassword, _sqlMaxRows, - SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - - } else if ("SQL_CREATE_DATABASE" == _requestType) { - request = controller->sqlCreateDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority, + request = SqlQueryRequest::create(controller, _workerName, _sqlQuery, _sqlUser, _sqlPassword, + _sqlMaxRows, SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - + } else if ("SQL_CREATE_DATABASE" == _requestType) { + request = SqlCreateDbRequest::create(controller, _workerName, _sqlDatabase, + SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); } else if ("SQL_DELETE_DATABASE" == _requestType) { - request = controller->sqlDeleteDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority, - !_doNotTrackRequest); - + request = SqlDeleteDbRequest::create(controller, _workerName, _sqlDatabase, + SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); } else if ("SQL_ENABLE_DATABASE" == _requestType) { - request = controller->sqlEnableDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority, - !_doNotTrackRequest); - + request = SqlEnableDbRequest::create(controller, _workerName, _sqlDatabase, + SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); } else if ("SQL_DISABLE_DATABASE" == _requestType) { - request = controller->sqlDisableDb(_workerName, _sqlDatabase, SqlRequest::extendedPrinter, _priority, - !_doNotTrackRequest); - + request = SqlDisableDbRequest::create(controller, _workerName, _sqlDatabase, + SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); } else if ("SQL_GRANT_ACCESS" == _requestType) { - request = controller->sqlGrantAccess(_workerName, _sqlDatabase, _sqlUser, SqlRequest::extendedPrinter, - _priority, !_doNotTrackRequest); - + request = SqlGrantAccessRequest::create(controller, _workerName, _sqlDatabase, _sqlUser, + SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); } else if ("SQL_CREATE_TABLE" == _requestType) { request = controller->sqlCreateTable(_workerName, _sqlDatabase, _sqlTable, _sqlEngine, _sqlPartitionByColumn, SqlSchemaUtils::readFromTextFile(_sqlSchemaFile), SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("SQL_CREATE_TABLES" == _requestType) { vector const tables = {_sqlTable}; request = controller->sqlCreateTables(_workerName, _sqlDatabase, tables, _sqlEngine, _sqlPartitionByColumn, SqlSchemaUtils::readFromTextFile(_sqlSchemaFile), SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("SQL_DELETE_TABLE" == _requestType) { vector const tables = {_sqlTable}; request = controller->sqlDeleteTable(_workerName, _sqlDatabase, tables, SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("SQL_REMOVE_TABLE_PARTITIONS" == _requestType) { vector const tables = {_sqlTable}; request = controller->sqlRemoveTablePartitions(_workerName, _sqlDatabase, tables, SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("SQL_DELETE_TABLE_PARTITION" == _requestType) { vector const tables = {_sqlTable}; request = controller->sqlDeleteTablePartition(_workerName, _sqlDatabase, tables, _transactionId, SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("SQL_CREATE_TABLE_INDEXES" == _requestType) { vector const tables = {_sqlTable}; - request = controller->sqlCreateTableIndexes( - _workerName, _sqlDatabase, tables, SqlRequestParams::IndexSpec(_sqlIndexSpecStr), + request = SqlCreateIndexesRequest::create( + controller, _workerName, _sqlDatabase, tables, SqlRequestParams::IndexSpec(_sqlIndexSpecStr), _sqlIndexName, _sqlIndexComment, SqlSchemaUtils::readIndexSpecFromTextFile(_sqlIndexColumnsFile), SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("SQL_DROP_TABLE_INDEXES" == _requestType) { vector const tables = {_sqlTable}; request = controller->sqlDropTableIndexes(_workerName, _sqlDatabase, tables, _sqlIndexName, SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("SQL_GET_TABLE_INDEXES" == _requestType) { vector const tables = {_sqlTable}; request = controller->sqlGetTableIndexes(_workerName, _sqlDatabase, tables, SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("SQL_TABLE_ROW_STATS" == _requestType) { auto const databaseInfo = controller->serviceProvider()->config()->databaseInfo(_sqlDatabase); bool const isPartitioned = databaseInfo.findTable(_sqlTable).isPartitioned; @@ -656,39 +639,29 @@ int ControllerApp::runImpl() { isPartitioned ? ChunkedTable(_sqlTable, _chunkNumber, _isOverlap).name() : _sqlTable}; request = controller->sqlRowStats(_workerName, _sqlDatabase, tables, SqlRequest::extendedPrinter, _priority, !_doNotTrackRequest); - } else if ("STATUS" == _requestType) { request = _launchStatusRequest(controller); - } else if ("STOP" == _requestType) { request = _launchStatusRequest(controller); - } else if ("DISPOSE" == _requestType) { vector const targetIds = {_affectedRequestId}; request = controller->dispose(_workerName, targetIds, Request::defaultPrinter); - } else if ("SERVICE_SUSPEND" == _requestType) { request = controller->suspendWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter); - } else if ("SERVICE_RESUME" == _requestType) { request = controller->resumeWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter); - } else if ("SERVICE_STATUS" == _requestType) { request = controller->statusOfWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter); - } else if ("SERVICE_REQUESTS" == _requestType) { request = controller->requestsOfWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter); - } else if ("SERVICE_DRAIN" == _requestType) { request = controller->drainWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter); - } else if ("SERVICE_RECONFIG" == _requestType) { request = controller->reconfigWorkerService(_workerName, ServiceManagementRequestBase::extendedPrinter); - } else { throw logic_error(context + "unsupported request: " + _affectedRequest); } diff --git a/src/replica/apps/MessengerTestApp.cc b/src/replica/apps/MessengerTestApp.cc index 220471fa6..32ccee438 100644 --- a/src/replica/apps/MessengerTestApp.cc +++ b/src/replica/apps/MessengerTestApp.cc @@ -117,8 +117,8 @@ int MessengerTestApp::runImpl() { _onNumActiveCv.wait(lock, [&] { return _numActive < _maxActiveRequests; }); // Submit the next request. - auto const request = controller->echo( - _workerName, _data, _proccesingTimeSec, + auto const request = EchoRequest::create( + controller, _workerName, _data, _proccesingTimeSec, [&](EchoRequest::Ptr request) { { unique_lock lock(_mtx); diff --git a/src/replica/contr/Controller.cc b/src/replica/contr/Controller.cc index 5ff0a0871..4bc8a0fbf 100644 --- a/src/replica/contr/Controller.cc +++ b/src/replica/contr/Controller.cc @@ -35,29 +35,15 @@ #include "replica/config/Configuration.h" #include "replica/config/ConfigWorker.h" #include "replica/registry/Registry.h" -#include "replica/requests/DeleteRequest.h" #include "replica/requests/DisposeRequest.h" -#include "replica/requests/EchoRequest.h" -#include "replica/requests/FindRequest.h" -#include "replica/requests/FindAllRequest.h" -#include "replica/requests/DirectorIndexRequest.h" #include "replica/requests/Messenger.h" -#include "replica/requests/ReplicationRequest.h" #include "replica/requests/ServiceManagementRequest.h" -#include "replica/requests/SqlAlterTablesRequest.h" -#include "replica/requests/SqlQueryRequest.h" -#include "replica/requests/SqlCreateDbRequest.h" -#include "replica/requests/SqlCreateIndexesRequest.h" #include "replica/requests/SqlCreateTableRequest.h" #include "replica/requests/SqlCreateTablesRequest.h" -#include "replica/requests/SqlDeleteDbRequest.h" #include "replica/requests/SqlDeleteTablePartitionRequest.h" #include "replica/requests/SqlDeleteTableRequest.h" -#include "replica/requests/SqlDisableDbRequest.h" #include "replica/requests/SqlDropIndexesRequest.h" #include "replica/requests/SqlGetIndexesRequest.h" -#include "replica/requests/SqlEnableDbRequest.h" -#include "replica/requests/SqlGrantAccessRequest.h" #include "replica/requests/SqlRemoveTablePartitionsRequest.h" #include "replica/requests/SqlRowStatsRequest.h" #include "replica/requests/StatusRequest.h" @@ -205,150 +191,6 @@ void Controller::verifyFolders(bool createMissingFolders) const { FileUtils::verifyFolders("CONTROLLER", folders, createMissingFolders); } -ReplicationRequest::Ptr Controller::replicate(string const& workerName, string const& sourceWorkerName, - string const& database, unsigned int chunk, - ReplicationRequest::CallbackType const& onFinish, int priority, - bool keepTracking, bool allowDuplicate, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit(workerName, sourceWorkerName, database, chunk, allowDuplicate, - onFinish, priority, keepTracking, jobId, - requestExpirationIvalSec); -} - -DeleteRequest::Ptr Controller::deleteReplica(string const& workerName, string const& database, - unsigned int chunk, DeleteRequest::CallbackType const& onFinish, - int priority, bool keepTracking, bool allowDuplicate, - string const& jobId, unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, database, chunk, allowDuplicate, onFinish, priority, keepTracking, jobId, - requestExpirationIvalSec); -} - -FindRequest::Ptr Controller::findReplica(string const& workerName, string const& database, unsigned int chunk, - FindRequest::CallbackType const& onFinish, int priority, - bool computeCheckSum, bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, database, chunk, computeCheckSum, onFinish, priority, keepTracking, jobId, - requestExpirationIvalSec); -} - -FindAllRequest::Ptr Controller::findAllReplicas(string const& workerName, string const& database, - bool saveReplicaInfo, - FindAllRequest::CallbackType const& onFinish, int priority, - bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, database, saveReplicaInfo, onFinish, priority, keepTracking, jobId, - requestExpirationIvalSec); -} - -EchoRequest::Ptr Controller::echo(string const& workerName, string const& data, uint64_t delay, - EchoRequest::CallbackType const& onFinish, int priority, bool keepTracking, - string const& jobId, unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, data, delay, onFinish, priority, keepTracking, jobId, requestExpirationIvalSec); -} - -DirectorIndexRequest::Ptr Controller::directorIndex(string const& workerName, string const& database, - string const& directorTable, unsigned int chunk, - bool hasTransactions, TransactionId transactionId, - DirectorIndexRequest::CallbackType const& onFinish, - int priority, bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, database, directorTable, chunk, hasTransactions, transactionId, onFinish, priority, - keepTracking, jobId, requestExpirationIvalSec); -} - -SqlAlterTablesRequest::Ptr Controller::sqlAlterTables( - string const& workerName, string const& database, vector const& tables, - string const& alterSpec, function const& onFinish, int priority, - bool keepTracking, string const& jobId, unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, database, tables, alterSpec, onFinish, priority, keepTracking, jobId, - requestExpirationIvalSec); -} - -SqlQueryRequest::Ptr Controller::sqlQuery(string const& workerName, string const& query, string const& user, - string const& password, uint64_t maxRows, - SqlQueryRequest::CallbackType const& onFinish, int priority, - bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, query, user, password, maxRows, onFinish, priority, keepTracking, jobId, - requestExpirationIvalSec); -} - -SqlCreateDbRequest::Ptr Controller::sqlCreateDb(string const& workerName, string const& database, - SqlCreateDbRequest::CallbackType const& onFinish, - int priority, bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit(workerName, database, onFinish, priority, - keepTracking, jobId, requestExpirationIvalSec); -} - -SqlDeleteDbRequest::Ptr Controller::sqlDeleteDb(string const& workerName, string const& database, - SqlDeleteDbRequest::CallbackType const& onFinish, - int priority, bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit(workerName, database, onFinish, priority, - keepTracking, jobId, requestExpirationIvalSec); -} - -SqlEnableDbRequest::Ptr Controller::sqlEnableDb(string const& workerName, string const& database, - SqlEnableDbRequest::CallbackType const& onFinish, - int priority, bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit(workerName, database, onFinish, priority, - keepTracking, jobId, requestExpirationIvalSec); -} - -SqlDisableDbRequest::Ptr Controller::sqlDisableDb(string const& workerName, string const& database, - SqlDisableDbRequest::CallbackType const& onFinish, - int priority, bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit(workerName, database, onFinish, priority, - keepTracking, jobId, requestExpirationIvalSec); -} - -SqlGrantAccessRequest::Ptr Controller::sqlGrantAccess(string const& workerName, string const& database, - string const& user, - SqlGrantAccessRequest::CallbackType const& onFinish, - int priority, bool keepTracking, string const& jobId, - unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, database, user, onFinish, priority, keepTracking, jobId, requestExpirationIvalSec); -} - -SqlCreateIndexesRequest::Ptr Controller::sqlCreateTableIndexes( - string const& workerName, string const& database, vector const& tables, - SqlRequestParams::IndexSpec const& indexSpec, string const& indexName, string const& indexComment, - vector const& indexColumns, - function const& onFinish, int priority, bool keepTracking, - string const& jobId, unsigned int requestExpirationIvalSec) { - LOGS(_log, LOG_LVL_TRACE, _context(__func__)); - return _submit( - workerName, database, tables, indexSpec, indexName, indexComment, indexColumns, onFinish, - priority, keepTracking, jobId, requestExpirationIvalSec); -} - SqlCreateTableRequest::Ptr Controller::sqlCreateTable(string const& workerName, string const& database, string const& table, string const& engine, string const& partitionByColumn, diff --git a/src/replica/contr/Controller.h b/src/replica/contr/Controller.h index 9741eb8a1..8417a1d15 100644 --- a/src/replica/contr/Controller.h +++ b/src/replica/contr/Controller.h @@ -43,20 +43,6 @@ // Forward declarations namespace lsst::qserv::replica { -class ReplicationRequest; -class DeleteRequest; -class FindRequest; -class FindAllRequest; -class EchoRequest; -class DirectorIndexRequest; -class SqlAlterTablesRequest; -class SqlQueryRequest; -class SqlCreateDbRequest; -class SqlDeleteDbRequest; -class SqlEnableDbRequest; -class SqlDisableDbRequest; -class SqlGrantAccessRequest; -class SqlCreateIndexesRequest; class SqlCreateTableRequest; class SqlCreateTablesRequest; class SqlDeleteTableRequest; @@ -199,11 +185,8 @@ class Controller : public std::enable_shared_from_this { ~Controller() = default; ControllerIdentity const& identity() const { return _identity; } - uint64_t startTime() const { return _startTime; } - ServiceProvider::Ptr const& serviceProvider() const { return _serviceProvider; } - boost::asio::io_service& io_service() { return serviceProvider()->io_service(); } /** @@ -215,97 +198,6 @@ class Controller : public std::enable_shared_from_this { */ void verifyFolders(bool createMissingFolders = false) const; - std::shared_ptr replicate( - std::string const& workerName, std::string const& sourceWorkerName, std::string const& database, - unsigned int chunk, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, bool allowDuplicate = true, - std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr deleteReplica( - std::string const& workerName, std::string const& database, unsigned int chunk, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, bool allowDuplicate = true, - std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr findReplica( - std::string const& workerName, std::string const& database, unsigned int chunk, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool computeCheckSum = false, bool keepTracking = true, - std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr findAllReplicas( - std::string const& workerName, std::string const& database, bool saveReplicaInfo = true, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr echo( - std::string const& workerName, std::string const& data, uint64_t delay, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr directorIndex( - std::string const& workerName, std::string const& database, std::string const& directorTable, - unsigned int chunk, bool hasTransactions, TransactionId transactionId, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr sqlAlterTables( - std::string const& workerName, std::string const& database, - std::vector const& tables, std::string const& alterSpec, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr sqlQuery( - std::string const& workerName, std::string const& query, std::string const& user, - std::string const& password, uint64_t maxRows, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr sqlCreateDb( - std::string const& workerName, std::string const& database, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr sqlDeleteDb( - std::string const& workerName, std::string const& database, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr sqlEnableDb( - std::string const& workerName, std::string const& database, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr sqlDisableDb( - std::string const& workerName, std::string const& database, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr sqlGrantAccess( - std::string const& workerName, std::string const& database, std::string const& user, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - - std::shared_ptr sqlCreateTableIndexes( - std::string const& workerName, std::string const& database, - std::vector const& tables, SqlRequestParams::IndexSpec const& indexSpec, - std::string const& indexName, std::string const& indexComment, - std::vector const& indexColumns, - std::function)> const& onFinish = nullptr, - int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", - unsigned int requestExpirationIvalSec = 0); - std::shared_ptr sqlCreateTable( std::string const& workerName, std::string const& database, std::string const& table, std::string const& engine, std::string const& partitionByColumn, diff --git a/src/replica/contr/HttpQservSqlModule.cc b/src/replica/contr/HttpQservSqlModule.cc index a58f66086..80fd0e5b1 100644 --- a/src/replica/contr/HttpQservSqlModule.cc +++ b/src/replica/contr/HttpQservSqlModule.cc @@ -68,7 +68,7 @@ json HttpQservSqlModule::_execute() { debug(__func__, "user=" + user); debug(__func__, "maxRows=" + to_string(maxRows)); - auto const request = controller()->sqlQuery(worker, query, user, password, maxRows); + auto const request = SqlQueryRequest::create(controller(), worker, query, user, password, maxRows); request->wait(); json result; diff --git a/src/replica/jobs/CreateReplicaJob.cc b/src/replica/jobs/CreateReplicaJob.cc index 410e97507..2b5e231b4 100644 --- a/src/replica/jobs/CreateReplicaJob.cc +++ b/src/replica/jobs/CreateReplicaJob.cc @@ -72,9 +72,7 @@ CreateReplicaJob::CreateReplicaJob(string const& databaseFamily, unsigned int ch CreateReplicaJobResult const& CreateReplicaJob::getReplicaData() const { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - if (state() == State::FINISHED) return _replicaData; - throw logic_error("CreateReplicaJob::" + string(__func__) + " the method can't be called while the job hasn't finished"); } @@ -90,7 +88,6 @@ list> CreateReplicaJob::extendedPersistentState() const { list> CreateReplicaJob::persistentLogData() const { list> result; - auto&& replicaData = getReplicaData(); // Per-worker counters for the following categories: @@ -98,16 +95,13 @@ list> CreateReplicaJob::persistentLogData() const { // created-chunks: // the total number of chunks created on the workers as a result // of the operation - map> workerCategoryCounter; - for (auto&& info : replicaData.replicas) { workerCategoryCounter[info.worker()]["created-chunks"]++; } for (auto&& workerItr : workerCategoryCounter) { auto&& worker = workerItr.first; string val = "worker=" + worker; - for (auto&& categoryItr : workerItr.second) { auto&& category = categoryItr.first; size_t const counter = categoryItr.second; @@ -122,9 +116,7 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << "startImpl"); // Check if configuration parameters are valid - auto const& config = controller()->serviceProvider()->config(); - if (not(config->isKnownDatabaseFamily(databaseFamily()) and config->isKnownWorker(sourceWorker()) and config->isKnownWorker(destinationWorker()) and (sourceWorker() != destinationWorker()))) { LOGS(_log, LOG_LVL_ERROR, @@ -132,32 +124,26 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) { << " database family: '" << databaseFamily() << "'" << " source worker: '" << sourceWorker() << "'" << " destination worker: '" << destinationWorker() << "'"); - finish(lock, ExtendedState::CONFIG_ERROR); return; } // Make sure no such replicas exist yet at the destination - vector destinationReplicas; try { controller()->serviceProvider()->databaseServices()->findWorkerReplicas( destinationReplicas, chunk(), destinationWorker(), databaseFamily()); - } catch (invalid_argument const& ex) { LOGS(_log, LOG_LVL_ERROR, context() << string(__func__) << " ** MISCONFIGURED ** " << " chunk: " << chunk() << " destinationWorker: " << destinationWorker() << " databaseFamily: " << databaseFamily() << " exception: " << ex.what()); - throw; - } catch (exception const& ex) { LOGS(_log, LOG_LVL_ERROR, context() << string(__func__) << " ** failed to find replicas ** " << " chunk: " << chunk() << " destinationWorker: " << destinationWorker() << " databaseFamily: " << databaseFamily() << " exception: " << ex.what()); - finish(lock, ExtendedState::FAILED); return; } @@ -167,7 +153,6 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) { << destinationReplicas.size() << " replicas ** " << " chunk: " << chunk() << " destinationWorker: " << destinationWorker() << " databaseFamily: " << databaseFamily()); - finish(lock, ExtendedState::FAILED); return; } @@ -181,7 +166,6 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) { // // 2. launching FindRequest for each member of the database family to // see if the chunk is available on a source node. - vector sourceReplicas; try { controller()->serviceProvider()->databaseServices()->findWorkerReplicas( @@ -194,7 +178,6 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) { << " databaseFamily: " << databaseFamily() << " exception: " << ex.what()); throw; - } catch (exception const& ex) { LOGS(_log, LOG_LVL_ERROR, context() << string(__func__) << " ** failed to find replicas ** " @@ -219,17 +202,15 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) { // // VERY IMPORTANT: the requests are sent for participating databases // only because some catalogs may not have a full coverage - - auto self = shared_from_base(); - + bool const keepTracking = true; + bool const allowDuplicate = true; for (auto&& replica : sourceReplicas) { - _requests.push_back(controller()->replicate( - destinationWorker(), sourceWorker(), replica.database(), chunk(), - [self](ReplicationRequest::Ptr ptr) { self->_onRequestFinish(ptr); }, priority(), - true, /* keepTracking */ - true, /* allowDuplicate */ - id() /* jobId */ - )); + _requests.push_back(ReplicationRequest::create( + controller(), destinationWorker(), sourceWorker(), replica.database(), chunk(), + [self = shared_from_base()](ReplicationRequest::Ptr ptr) { + self->_onRequestFinish(ptr); + }, + priority(), keepTracking, allowDuplicate, id())); } } @@ -238,11 +219,10 @@ void CreateReplicaJob::cancelImpl(replica::Lock const& lock) { // The algorithm will also clear resources taken by various // locally created objects. - + // // To ensure no lingering "side effects" will be left after cancelling this // job the request cancellation should be also followed (where it makes a sense) // by stopping the request at corresponding worker service. - for (auto&& ptr : _requests) { ptr->cancel(); if (ptr->state() != Request::State::FINISHED) @@ -266,9 +246,7 @@ void CreateReplicaJob::_onRequestFinish(ReplicationRequest::Ptr const& request) << " sourceWorker=" << sourceWorker() << " chunk=" << chunk()); if (state() == State::FINISHED) return; - replica::Lock lock(_mtx, context() + string(__func__) + "(ReplicationeRequest)"); - if (state() == State::FINISHED) return; ++_numRequestsFinished; @@ -280,7 +258,6 @@ void CreateReplicaJob::_onRequestFinish(ReplicationRequest::Ptr const& request) // Evaluate the status of on-going operations to see if the replica creation // stage has finished. - if (_numRequestsFinished == _requests.size()) { if (_numRequestsSuccess == _requests.size()) { // Notify Qserv about the change in a disposition of replicas. @@ -290,12 +267,10 @@ void CreateReplicaJob::_onRequestFinish(ReplicationRequest::Ptr const& request) // NOTE: The current implementation will not be affected by a result // of the operation. Neither any upstream notifications will be // sent to a requester of this job. - vector databases; for (auto&& databaseEntry : _replicaData.chunks[chunk()]) { databases.push_back(databaseEntry.first); } - ServiceProvider::Ptr const serviceProvider = controller()->serviceProvider(); if (serviceProvider->config()->get("xrootd", "auto-notify") != 0) { _qservAddReplica(lock, chunk(), databases, destinationWorker()); @@ -314,11 +289,9 @@ void CreateReplicaJob::_qservAddReplica(replica::Lock const& lock, unsigned int context() << __func__ << " ** START ** Qserv notification on ADD replica:" << ", chunk=" << chunk << ", databases=" << util::String::toString(databases) << " worker=" << worker); - - auto self = shared_from_this(); controller()->serviceProvider()->qservMgtServices()->addReplica( chunk, databases, worker, - [self, onFinish](AddReplicaQservMgtRequest::Ptr const& request) { + [self = shared_from_this(), onFinish](AddReplicaQservMgtRequest::Ptr const& request) { LOGS(_log, LOG_LVL_DEBUG, self->context() << __func__ << " ** FINISH ** Qserv notification on ADD replica:" << " chunk=" << request->chunk() diff --git a/src/replica/jobs/DeleteReplicaJob.cc b/src/replica/jobs/DeleteReplicaJob.cc index e9ab9a94a..09babbf5b 100644 --- a/src/replica/jobs/DeleteReplicaJob.cc +++ b/src/replica/jobs/DeleteReplicaJob.cc @@ -70,9 +70,7 @@ DeleteReplicaJob::DeleteReplicaJob(string const& databaseFamily, unsigned int ch DeleteReplicaJobResult const& DeleteReplicaJob::getReplicaData() const { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - if (state() == State::FINISHED) return _replicaData; - throw logic_error("DeleteReplicaJob::" + string(__func__) + " the method can't be called while the job hasn't finished"); } @@ -87,7 +85,6 @@ list> DeleteReplicaJob::extendedPersistentState() const { list> DeleteReplicaJob::persistentLogData() const { list> result; - auto&& replicaData = getReplicaData(); // Per-worker counters for the following categories: @@ -95,16 +92,13 @@ list> DeleteReplicaJob::persistentLogData() const { // deleted-chunks: // the total number of chunks deleted from the workers as a result // of the operation - map> workerCategoryCounter; - for (auto&& info : replicaData.replicas) { workerCategoryCounter[info.worker()]["deleted-chunks"]++; } for (auto&& workerItr : workerCategoryCounter) { auto&& workerName = workerItr.first; string val = "worker=" + workerName; - for (auto&& categoryItr : workerItr.second) { auto&& category = categoryItr.first; size_t const counter = categoryItr.second; @@ -119,9 +113,7 @@ void DeleteReplicaJob::startImpl(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); // Check if configuration parameters are valid - auto const& config = controller()->serviceProvider()->config(); - if (not(config->isKnownDatabaseFamily(databaseFamily()) and config->isKnownWorker(workerName()))) { LOGS(_log, LOG_LVL_ERROR, context() << __func__ << " ** MISCONFIGURED ** " @@ -141,17 +133,14 @@ void DeleteReplicaJob::startImpl(replica::Lock const& lock) { // // 2. launching FindRequest for each member of the database family to // see if the chunk is available on a source node. - try { controller()->serviceProvider()->databaseServices()->findWorkerReplicas( _replicas, chunk(), workerName(), databaseFamily()); - } catch (invalid_argument const& ex) { LOGS(_log, LOG_LVL_ERROR, context() << __func__ << " ** MISCONFIGURED ** " << " chunk: " << chunk() << " worker: " << workerName() << " databaseFamily: " << databaseFamily() << " exception: " << ex.what()); - finish(lock, ExtendedState::CONFIG_ERROR); return; @@ -160,7 +149,6 @@ void DeleteReplicaJob::startImpl(replica::Lock const& lock) { context() << __func__ << " ** failed to find replicas ** " << " chunk: " << chunk() << " worker: " << workerName() << " databaseFamily: " << databaseFamily() << " exception: " << ex.what()); - finish(lock, ExtendedState::FAILED); return; } @@ -169,7 +157,6 @@ void DeleteReplicaJob::startImpl(replica::Lock const& lock) { context() << __func__ << " ** worker has no replicas to be deleted ** " << " chunk: " << chunk() << " worker: " << workerName() << " databaseFamily: " << databaseFamily()); - finish(lock, ExtendedState::FAILED); return; } @@ -181,23 +168,20 @@ void DeleteReplicaJob::startImpl(replica::Lock const& lock) { if (serviceProvider->config()->get("xrootd", "auto-notify") != 0) { // Start right away _beginDeleteReplica(lock); - } else { // Notify Qserv first. Then start once a confirmation is received - vector databases; for (auto&& replica : _replicas) { databases.push_back(replica.database()); } - auto self = shared_from_base(); - // Force the removal regardless of the replica usage status. // See the implementation of the corresponding worker management service // for specific detail on what "remove" means in that service's context. bool const force = true; _qservRemoveReplica(lock, chunk(), databases, workerName(), force, - [self](RemoveReplicaQservMgtRequest::Ptr const& request) { + [self = shared_from_base()]( + RemoveReplicaQservMgtRequest::Ptr const& request) { replica::Lock lock(self->_mtx, self->context() + string(__func__) + "::qservRemoveReplica"); switch (request->extendedState()) { @@ -226,11 +210,10 @@ void DeleteReplicaJob::cancelImpl(replica::Lock const& lock) { // The algorithm will also clear resources taken by various // locally created objects. - + // // To ensure no lingering "side effects" will be left after cancelling this // job the request cancellation should be also followed (where it makes a sense) // by stopping the request at corresponding worker service. - auto const noCallbackOnFinish = nullptr; bool const keepTracking = true; for (auto&& ptr : _requests) { @@ -249,18 +232,17 @@ void DeleteReplicaJob::notify(replica::Lock const& lock) { } void DeleteReplicaJob::_beginDeleteReplica(replica::Lock const& lock) { - auto self = shared_from_base(); - // VERY IMPORTANT: the requests are sent for participating databases // only because some catalogs may not have a full coverage - bool const keepTracking = true; bool const allowDuplicate = true; for (auto&& replica : _replicas) { - _requests.push_back(controller()->deleteReplica( - workerName(), replica.database(), chunk(), - [self](DeleteRequest::Ptr ptr) { self->_onRequestFinish(ptr); }, priority(), keepTracking, - allowDuplicate, id())); + _requests.push_back(DeleteRequest::create( + controller(), workerName(), replica.database(), chunk(), + [self = shared_from_base()](DeleteRequest::Ptr ptr) { + self->_onRequestFinish(ptr); + }, + priority(), keepTracking, allowDuplicate, id())); } } @@ -298,10 +280,9 @@ void DeleteReplicaJob::_qservRemoveReplica(replica::Lock const& lock, unsigned i << " chunk=" << chunk << ", databases=" << util::String::toString(databases) << ", worker=" << workerName << ", force=" << (force ? "true" : "false")); - auto self = shared_from_this(); controller()->serviceProvider()->qservMgtServices()->removeReplica( chunk, databases, workerName, force, - [self, onFinish](RemoveReplicaQservMgtRequest::Ptr const& request) { + [self = shared_from_this(), onFinish](RemoveReplicaQservMgtRequest::Ptr const& request) { LOGS(_log, LOG_LVL_DEBUG, self->context() << __func__ << " ** FINISH ** Qserv notification on REMOVE replica:" << " chunk=" << request->chunk() diff --git a/src/replica/jobs/DeleteWorkerJob.cc b/src/replica/jobs/DeleteWorkerJob.cc index 2868cdce4..80e9f8a3f 100644 --- a/src/replica/jobs/DeleteWorkerJob.cc +++ b/src/replica/jobs/DeleteWorkerJob.cc @@ -67,9 +67,7 @@ DeleteWorkerJob::DeleteWorkerJob(string const& workerName, bool permanentDelete, DeleteWorkerJobResult const& DeleteWorkerJob::getReplicaData() const { LOGS(_log, LOG_LVL_DEBUG, context()); - if (state() == State::FINISHED) return _replicaData; - throw logic_error("DeleteWorkerJob::" + string(__func__) + " the method can't be called while the job hasn't finished"); } @@ -84,22 +82,17 @@ list> DeleteWorkerJob::extendedPersistentState() const { list> DeleteWorkerJob::persistentLogData() const { list> result; - auto&& replicaData = getReplicaData(); - // Encode new chunk replicas (if any) which had to be created to compensate // for lost ones. + auto&& replicaData = getReplicaData(); for (auto&& familyChunkDatabaseWorkerInfo : replicaData.chunks) { auto&& family = familyChunkDatabaseWorkerInfo.first; - for (auto&& chunkDatabaseWorkerInfo : familyChunkDatabaseWorkerInfo.second) { auto&& chunk = chunkDatabaseWorkerInfo.first; - for (auto&& databaseWorkerInfo : chunkDatabaseWorkerInfo.second) { auto&& database = databaseWorkerInfo.first; - for (auto&& workerInfo : databaseWorkerInfo.second) { auto&& workerName = workerInfo.first; - result.emplace_back("new-replica", "family=" + family + " chunk=" + to_string(chunk) + " database=" + database + " worker=" + workerName); @@ -111,10 +104,8 @@ list> DeleteWorkerJob::persistentLogData() const { // Encode orphan replicas (if any) which only existed on the evicted worker for (auto&& chunkDatabaseReplicaInfo : replicaData.orphanChunks) { auto&& chunk = chunkDatabaseReplicaInfo.first; - for (auto&& databaseReplicaInfo : chunkDatabaseReplicaInfo.second) { auto&& database = databaseReplicaInfo.first; - result.emplace_back("orphan-replica", "chunk=" + to_string(chunk) + " database=" + database); } } @@ -140,18 +131,17 @@ void DeleteWorkerJob::startImpl(replica::Lock const& lock) { auto const drainRequest = controller()->drainWorkerService( workerName(), noCallbackOnFinish, priority(), id(), requestExpirationIvalSec); drainRequest->wait(); - if (drainRequest->extendedState() == Request::ExtendedState::SUCCESS) { if (drainRequest->getServiceState().state == ServiceState::State::RUNNING) { // Try to get the most recent state the worker's replicas // for all known databases bool const saveReplicaInfo = true; // always save the replica info in a database because // the algorithm depends on it. - auto self = shared_from_base(); for (auto&& database : controller()->serviceProvider()->config()->databases()) { - auto const request = controller()->findAllReplicas( - workerName(), database, saveReplicaInfo, - [self](FindAllRequest::Ptr const& request) { + auto const request = FindAllRequest::create( + controller(), workerName(), database, saveReplicaInfo, + [self = shared_from_base()]( + FindAllRequest::Ptr const& request) { self->_onRequestFinish(request); }, priority()); @@ -174,12 +164,11 @@ void DeleteWorkerJob::startImpl(replica::Lock const& lock) { void DeleteWorkerJob::cancelImpl(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - auto const noCallbackOnFinish = nullptr; - auto const keepTracking = true; - // To ensure no lingering "side effects" will be left after cancelling this // job the request cancellation should be also followed (where it makes a sense) // by stopping the request at corresponding worker service. + auto const noCallbackOnFinish = nullptr; + auto const keepTracking = true; for (auto&& ptr : _findAllRequests) { ptr->cancel(); if (ptr->state() != Request::State::FINISHED) { @@ -189,7 +178,6 @@ void DeleteWorkerJob::cancelImpl(replica::Lock const& lock) { } // Stop chained jobs (if any) as well - for (auto&& ptr : _replicateJobs) ptr->cancel(); } @@ -227,13 +215,14 @@ void DeleteWorkerJob::_disableWorker(replica::Lock const& lock) { _numLaunched = 0; _numFinished = 0; _numSuccess = 0; - - auto self = shared_from_base(); - for (auto&& databaseFamily : controller()->serviceProvider()->config()->databaseFamilies()) { ReplicateJob::Ptr const job = ReplicateJob::create( databaseFamily, 0, /* numReplicas -- pull from Configuration */ - controller(), id(), [self](ReplicateJob::Ptr job) { self->_onJobFinish(job); }, priority()); + controller(), id(), + [self = shared_from_base()](ReplicateJob::Ptr job) { + self->_onJobFinish(job); + }, + priority()); job->start(); _replicateJobs.push_back(job); _numLaunched++; @@ -251,7 +240,6 @@ void DeleteWorkerJob::_onJobFinish(ReplicateJob::Ptr const& job) { if (state() == State::FINISHED) return; _numFinished++; - if (job->extendedState() != ExtendedState::SUCCESS) { finish(lock, ExtendedState::FAILED); return; @@ -259,14 +247,12 @@ void DeleteWorkerJob::_onJobFinish(ReplicateJob::Ptr const& job) { // Process the normal completion of the child job _numSuccess++; - LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "(ReplicateJob) " << "job->getReplicaData().chunks.size(): " << job->getReplicaData().chunks.size()); // Merge results into the current job's result object _replicaData.chunks[job->databaseFamily()] = job->getReplicaData().chunks; - if (_numFinished == _numLaunched) { // Construct a collection of orphan replicas if possible ReplicaInfoCollection replicas; @@ -275,7 +261,6 @@ void DeleteWorkerJob::_onJobFinish(ReplicateJob::Ptr const& job) { for (ReplicaInfo const& replica : replicas) { unsigned int const chunk = replica.chunk(); string const& database = replica.database(); - bool replicated = false; for (auto&& databaseFamilyEntry : _replicaData.chunks) { auto const& chunks = databaseFamilyEntry.second; diff --git a/src/replica/jobs/DirectorIndexJob.cc b/src/replica/jobs/DirectorIndexJob.cc index eeb044a56..dd11eb724 100644 --- a/src/replica/jobs/DirectorIndexJob.cc +++ b/src/replica/jobs/DirectorIndexJob.cc @@ -515,8 +515,9 @@ list DirectorIndexJob::_launchRequests(replica::Lock auto const chunk = _chunks[workerName].front(); _chunks[workerName].pop(); - requests.push_back(controller()->directorIndex( - workerName, database(), directorTable(), chunk, hasTransactions(), transactionId(), + requests.push_back(DirectorIndexRequest::create( + controller(), workerName, database(), directorTable(), chunk, hasTransactions(), + transactionId(), [self](DirectorIndexRequest::Ptr const& request) { self->_onRequestFinish(request); }, priority(), keepTracking, id())); } diff --git a/src/replica/jobs/FindAllJob.cc b/src/replica/jobs/FindAllJob.cc index 2591fd9f1..52c1c1f97 100644 --- a/src/replica/jobs/FindAllJob.cc +++ b/src/replica/jobs/FindAllJob.cc @@ -144,20 +144,19 @@ list> FindAllJob::persistentLogData() const { void FindAllJob::startImpl(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - auto const self = shared_from_base(); - auto const workerNames = allWorkers() ? controller()->serviceProvider()->config()->allWorkers() : controller()->serviceProvider()->config()->workers(); - for (auto&& workerName : workerNames) { _replicaData.workers[workerName] = false; for (auto&& database : _databases) { _workerDatabaseSuccess[workerName][database] = false; - _requests.push_back(controller()->findAllReplicas( - workerName, database, saveReplicaInfo(), - [self](FindAllRequest::Ptr request) { self->_onRequestFinish(request); }, priority(), - true, /* keepTracking*/ - id() /* jobId */ + _requests.push_back(FindAllRequest::create( + controller(), workerName, database, saveReplicaInfo(), + [self = shared_from_base()](FindAllRequest::Ptr request) { + self->_onRequestFinish(request); + }, + priority(), true, /* keepTracking*/ + id() /* jobId */ )); _numLaunched++; } diff --git a/src/replica/jobs/FixUpJob.cc b/src/replica/jobs/FixUpJob.cc index 6be4e1854..eba06faff 100644 --- a/src/replica/jobs/FixUpJob.cc +++ b/src/replica/jobs/FixUpJob.cc @@ -59,9 +59,7 @@ FixUpJob::FixUpJob(string const& databaseFamily, Controller::Ptr const& controll FixUpJobResult const& FixUpJob::getReplicaData() const { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - if (state() == State::FINISHED) return _replicaData; - throw logic_error(typeName() + "::" + string(__func__) + " the method can't be called while the job hasn't finished"); } @@ -74,11 +72,9 @@ list> FixUpJob::extendedPersistentState() const { list> FixUpJob::persistentLogData() const { list> result; - auto&& replicaData = getReplicaData(); // Report workers failed to respond to the requests - for (auto&& workerInfo : replicaData.workers) { auto&& workerName = workerInfo.first; auto const numFailedRequests = workerInfo.second; @@ -93,16 +89,13 @@ list> FixUpJob::persistentLogData() const { // created-chunks: // the total number of chunks created on the workers as a result // of the operation - map> workerCategoryCounter; - for (auto&& info : replicaData.replicas) { workerCategoryCounter[info.worker()]["created-chunks"]++; } for (auto&& workerItr : workerCategoryCounter) { auto&& workerName = workerItr.first; string val = "worker=" + workerName; - for (auto&& categoryItr : workerItr.second) { auto&& category = categoryItr.first; size_t const counter = categoryItr.second; @@ -117,14 +110,13 @@ void FixUpJob::startImpl(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); // Launch the chained job to get chunk disposition - - auto self = shared_from_base(); bool const saveReplicInfo = true; // always save the replica info in a database because // the algorithm depends on it. bool const allWorkers = false; // only consider enabled workers _findAllJob = FindAllJob::create( databaseFamily(), saveReplicInfo, allWorkers, controller(), id(), - [self](FindAllJob::Ptr job) { self->_onPrecursorJobFinish(); }, priority()); + [self = shared_from_base()](FindAllJob::Ptr job) { self->_onPrecursorJobFinish(); }, + priority()); _findAllJob->start(); } @@ -133,7 +125,6 @@ void FixUpJob::cancelImpl(replica::Lock const& lock) { // The algorithm will also clear resources taken by various // locally created objects. - if (_findAllJob and (_findAllJob->state() != State::FINISHED)) { _findAllJob->cancel(); } @@ -142,10 +133,8 @@ void FixUpJob::cancelImpl(replica::Lock const& lock) { // To ensure no lingering "side effects" will be left after cancelling this // job the request cancellation should be also followed (where it makes a sense) // by stopping the request at corresponding worker service. - auto const noCallbackOnFinish = nullptr; bool const keepTracking = true; - for (auto&& ptr : _requests) { ptr->cancel(); if (ptr->state() != Request::State::FINISHED) @@ -165,14 +154,11 @@ void FixUpJob::_onPrecursorJobFinish() { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); if (state() == State::FINISHED) return; - replica::Lock lock(_mtx, context() + __func__); - if (state() == State::FINISHED) return; // Proceed with the replication effort only if the precursor job // has succeeded. - if (_findAllJob->extendedState() != ExtendedState::SUCCESS) { finish(lock, ExtendedState::FAILED); return; @@ -180,22 +166,18 @@ void FixUpJob::_onPrecursorJobFinish() { // Analyze results and prepare a replication plan to fix chunk // co-location for under-represented chunks - FindAllJobResult const& replicaData = _findAllJob->getReplicaData(); for (auto&& chunk2workers : replicaData.isColocated) { unsigned int chunk = chunk2workers.first; - for (auto&& worker2colocated : chunk2workers.second) { string const& destinationWorker = worker2colocated.first; bool const isColocated = worker2colocated.second; - if (isColocated) continue; // Iterate over all participating databases, find the ones which aren't // represented on the worker, find a suitable source worker which has // a complete chunk for the database and which (the worker) is not the same // as the current one and submit the replication request. - for (auto&& database : replicaData.databases.at(chunk)) { if (not replicaData.chunks.chunk(chunk).database(database).workerExists(destinationWorker)) { // Finding a source worker first @@ -250,9 +232,7 @@ void FixUpJob::_onRequestFinish(ReplicationRequest::Ptr const& request) { << " database=" << database << " worker=" << workerName << " chunk=" << chunk); if (state() == State::FINISHED) return; - replica::Lock lock(_mtx, context() + __func__); - if (state() == State::FINISHED) return; _numFinished++; @@ -275,8 +255,6 @@ void FixUpJob::_onRequestFinish(ReplicationRequest::Ptr const& request) { size_t FixUpJob::_launchNext(replica::Lock const& lock, string const& destinationWorker, size_t maxRequests) { if (maxRequests == 0) return 0; - - auto const self = shared_from_base(); auto&& tasks = _destinationWorker2tasks[destinationWorker]; bool const keepTracking = true; bool const allowDuplicate = true; @@ -286,13 +264,13 @@ size_t FixUpJob::_launchNext(replica::Lock const& lock, string const& destinatio // Launch the replication request and register it for further // tracking (or cancellation, should the one be requested) - ReplicationTask const& task = tasks.front(); - - _requests.push_back(controller()->replicate( - task.destinationWorker, task.sourceWorker, task.database, task.chunk, - [self](ReplicationRequest::Ptr const& ptr) { self->_onRequestFinish(ptr); }, 0, /* priority */ - keepTracking, allowDuplicate, id())); + _requests.push_back(ReplicationRequest::create( + controller(), task.destinationWorker, task.sourceWorker, task.database, task.chunk, + [self = shared_from_base()](ReplicationRequest::Ptr ptr) { + self->_onRequestFinish(ptr); + }, + priority(), keepTracking, allowDuplicate, id())); tasks.pop(); numLaunched++; } diff --git a/src/replica/jobs/SqlAlterTablesJob.cc b/src/replica/jobs/SqlAlterTablesJob.cc index c27a87768..8c3fd592e 100644 --- a/src/replica/jobs/SqlAlterTablesJob.cc +++ b/src/replica/jobs/SqlAlterTablesJob.cc @@ -73,7 +73,6 @@ list> SqlAlterTablesJob::extendedPersistentState() const { list SqlAlterTablesJob::launchRequests(replica::Lock const& lock, string const& worker, size_t maxRequestsPerWorker) { list requests; - if (maxRequestsPerWorker == 0) return requests; // Make sure this worker has already been served @@ -86,14 +85,14 @@ list SqlAlterTablesJob::launchRequests(replica::Lock const& loc // Divide tables into subsets allocated to the "batch" requests. Then launch // the requests for the current worker. - auto const self = shared_from_base(); for (auto&& tables : distributeTables(tables2process, maxRequestsPerWorker)) { - requests.push_back(controller()->sqlAlterTables( - worker, database(), tables, alterSpec(), - [self](SqlAlterTablesRequest::Ptr const& request) { self->onRequestFinish(request); }, - priority(), true, /* keepTracking*/ - id() /* jobId */ - )); + bool const keepTracking = true; + requests.push_back(SqlAlterTablesRequest::create( + controller(), worker, database(), tables, alterSpec(), + [self = shared_from_base()](SqlAlterTablesRequest::Ptr const& request) { + self->onRequestFinish(request); + }, + priority(), keepTracking, id())); } return requests; } diff --git a/src/replica/jobs/SqlCreateDbJob.cc b/src/replica/jobs/SqlCreateDbJob.cc index 61befd54d..86ca8ab5f 100644 --- a/src/replica/jobs/SqlCreateDbJob.cc +++ b/src/replica/jobs/SqlCreateDbJob.cc @@ -70,16 +70,15 @@ list SqlCreateDbJob::launchRequests(replica::Lock const& lock, size_t maxRequestsPerWorker) { // Launch exactly one request per worker unless it was already // launched earlier - list requests; if (not _workers.count(worker) and maxRequestsPerWorker != 0) { - auto const self = shared_from_base(); - requests.push_back(controller()->sqlCreateDb( - worker, database(), - [self](SqlCreateDbRequest::Ptr const& request) { self->onRequestFinish(request); }, - priority(), true, /* keepTracking*/ - id() /* jobId */ - )); + bool const keepTracking = true; + requests.push_back(SqlCreateDbRequest::create( + controller(), worker, database(), + [self = shared_from_base()](SqlCreateDbRequest::Ptr const& request) { + self->onRequestFinish(request); + }, + priority(), keepTracking, id())); _workers.insert(worker); } return requests; diff --git a/src/replica/jobs/SqlCreateIndexesJob.cc b/src/replica/jobs/SqlCreateIndexesJob.cc index 9eebb9df0..ce4f6a5f5 100644 --- a/src/replica/jobs/SqlCreateIndexesJob.cc +++ b/src/replica/jobs/SqlCreateIndexesJob.cc @@ -91,7 +91,6 @@ list> SqlCreateIndexesJob::extendedPersistentState() const list SqlCreateIndexesJob::launchRequests(replica::Lock const& lock, string const& worker, size_t maxRequestsPerWorker) { list requests; - if (maxRequestsPerWorker == 0) return requests; // Make sure this worker has already been served @@ -104,14 +103,14 @@ list SqlCreateIndexesJob::launchRequests(replica::Lock const& l // Divide tables into subsets allocated to the "batch" requests. Then launch // the requests for the current worker. - auto const self = shared_from_base(); for (auto&& tables : distributeTables(tables2process, maxRequestsPerWorker)) { - requests.push_back(controller()->sqlCreateTableIndexes( - worker, database(), tables, indexSpec(), indexName(), indexComment(), indexColumns(), - [self](SqlCreateIndexesRequest::Ptr const& request) { self->onRequestFinish(request); }, - priority(), true, /* keepTracking*/ - id() /* jobId */ - )); + bool const keepTracking = true; + requests.push_back(SqlCreateIndexesRequest::create( + controller(), worker, database(), tables, indexSpec(), indexName(), indexComment(), + indexColumns(), + [self = shared_from_base()]( + SqlCreateIndexesRequest::Ptr const& request) { self->onRequestFinish(request); }, + priority(), keepTracking, id())); } return requests; } diff --git a/src/replica/jobs/SqlDeleteDbJob.cc b/src/replica/jobs/SqlDeleteDbJob.cc index 30f84c284..e9e0e8327 100644 --- a/src/replica/jobs/SqlDeleteDbJob.cc +++ b/src/replica/jobs/SqlDeleteDbJob.cc @@ -70,16 +70,15 @@ list SqlDeleteDbJob::launchRequests(replica::Lock const& lock, size_t maxRequestsPerWorker) { // Launch exactly one request per worker unless it was already // launched earlier - list requests; if (not _workers.count(worker) and maxRequestsPerWorker != 0) { - auto const self = shared_from_base(); - requests.push_back(controller()->sqlDeleteDb( - worker, database(), - [self](SqlDeleteDbRequest::Ptr const& request) { self->onRequestFinish(request); }, - priority(), true, /* keepTracking*/ - id() /* jobId */ - )); + bool const keepTracking = true; + requests.push_back(SqlDeleteDbRequest::create( + controller(), worker, database(), + [self = shared_from_base()](SqlDeleteDbRequest::Ptr const& request) { + self->onRequestFinish(request); + }, + priority(), keepTracking, id())); _workers.insert(worker); } return requests; diff --git a/src/replica/jobs/SqlDisableDbJob.cc b/src/replica/jobs/SqlDisableDbJob.cc index eb6712e6d..77e0fe1a8 100644 --- a/src/replica/jobs/SqlDisableDbJob.cc +++ b/src/replica/jobs/SqlDisableDbJob.cc @@ -70,16 +70,15 @@ list SqlDisableDbJob::launchRequests(replica::Lock const& lock, size_t maxRequestsPerWorker) { // Launch exactly one request per worker unless it was already // launched earlier - list requests; if (not _workers.count(worker) and maxRequestsPerWorker != 0) { - auto const self = shared_from_base(); - requests.push_back(controller()->sqlDisableDb( - worker, database(), - [self](SqlDisableDbRequest::Ptr const& request) { self->onRequestFinish(request); }, - priority(), true, /* keepTracking*/ - id() /* jobId */ - )); + bool const keepTracking = true; + requests.push_back(SqlDisableDbRequest::create( + controller(), worker, database(), + [self = shared_from_base()](SqlDisableDbRequest::Ptr const& request) { + self->onRequestFinish(request); + }, + priority(), keepTracking, id())); _workers.insert(worker); } return requests; diff --git a/src/replica/jobs/SqlEnableDbJob.cc b/src/replica/jobs/SqlEnableDbJob.cc index 06e548978..efdd458a6 100644 --- a/src/replica/jobs/SqlEnableDbJob.cc +++ b/src/replica/jobs/SqlEnableDbJob.cc @@ -70,16 +70,15 @@ list SqlEnableDbJob::launchRequests(replica::Lock const& lock, size_t maxRequestsPerWorker) { // Launch exactly one request per worker unless it was already // launched earlier - list requests; if (not _workers.count(worker) and maxRequestsPerWorker != 0) { - auto const self = shared_from_base(); - requests.push_back(controller()->sqlEnableDb( - worker, database(), - [self](SqlEnableDbRequest::Ptr const& request) { self->onRequestFinish(request); }, - priority(), true, /* keepTracking*/ - id() /* jobId */ - )); + bool const keepTracking = true; + requests.push_back(SqlEnableDbRequest::create( + controller(), worker, database(), + [self = shared_from_base()](SqlEnableDbRequest::Ptr const& request) { + self->onRequestFinish(request); + }, + priority(), keepTracking, id())); _workers.insert(worker); } return requests; diff --git a/src/replica/jobs/SqlGrantAccessJob.cc b/src/replica/jobs/SqlGrantAccessJob.cc index 220fed935..02804d282 100644 --- a/src/replica/jobs/SqlGrantAccessJob.cc +++ b/src/replica/jobs/SqlGrantAccessJob.cc @@ -77,13 +77,13 @@ list SqlGrantAccessJob::launchRequests(replica::Lock const& loc list requests; if (not _workers.count(worker) and maxRequestsPerWorker != 0) { - auto const self = shared_from_base(); - requests.push_back(controller()->sqlGrantAccess( - worker, database(), user(), - [self](SqlGrantAccessRequest::Ptr const& request) { self->onRequestFinish(request); }, - priority(), true, /* keepTracking*/ - id() /* jobId */ - )); + bool const keepTracking = true; + requests.push_back(SqlGrantAccessRequest::create( + controller(), worker, database(), user(), + [self = shared_from_base()](SqlGrantAccessRequest::Ptr const& request) { + self->onRequestFinish(request); + }, + priority(), keepTracking, id())); _workers.insert(worker); } return requests; diff --git a/src/replica/jobs/SqlQueryJob.cc b/src/replica/jobs/SqlQueryJob.cc index dbbf8269d..2dee5f93c 100644 --- a/src/replica/jobs/SqlQueryJob.cc +++ b/src/replica/jobs/SqlQueryJob.cc @@ -76,15 +76,15 @@ list SqlQueryJob::launchRequests(replica::Lock const& lock, str size_t maxRequestsPerWorker) { // Launch exactly one request per worker unless it was already // launched earlier - list requests; if (not _workers.count(worker) and maxRequestsPerWorker != 0) { - auto const self = shared_from_base(); - requests.push_back(controller()->sqlQuery( - worker, query(), user(), password(), maxRows(), - [self](SqlQueryRequest::Ptr const& request) { self->onRequestFinish(request); }, priority(), - true, /* keepTracking*/ - id() /* jobId */ + requests.push_back(SqlQueryRequest::create( + controller(), worker, query(), user(), password(), maxRows(), + [self = shared_from_base()](SqlQueryRequest::Ptr const& request) { + self->onRequestFinish(request); + }, + priority(), true, /* keepTracking*/ + id() /* jobId */ )); _workers.insert(worker); } diff --git a/src/replica/jobs/VerifyJob.cc b/src/replica/jobs/VerifyJob.cc index c58c2abb7..ac7a57037 100644 --- a/src/replica/jobs/VerifyJob.cc +++ b/src/replica/jobs/VerifyJob.cc @@ -187,13 +187,9 @@ list> VerifyJob::extendedPersistentState() const { void VerifyJob::startImpl(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - auto self = shared_from_base(); - // Launch the first batch of requests - vector replicas; _nextReplicas(lock, replicas, maxReplicas()); - if (replicas.empty()) { // In theory this should never happen unless the installation // doesn't have a single chunk. @@ -206,10 +202,12 @@ void VerifyJob::startImpl(replica::Lock const& lock) { auto const currentJobPriority = priority(); bool const keepTracking = true; for (ReplicaInfo const& replica : replicas) { - auto request = controller()->findReplica( - replica.worker(), replica.database(), replica.chunk(), - [self](FindRequest::Ptr request) { self->_onRequestFinish(request); }, currentJobPriority, - computeCheckSum(), keepTracking, id()); + auto request = FindRequest::create( + controller(), replica.worker(), replica.database(), replica.chunk(), + [self = shared_from_base()](FindRequest::Ptr request) { + self->_onRequestFinish(request); + }, + currentJobPriority, computeCheckSum(), keepTracking, id()); _replicas[request->id()] = replica; _requests[request->id()] = request; } @@ -249,9 +247,7 @@ void VerifyJob::_onRequestFinish(FindRequest::Ptr const& request) { << " chunk=" << request->chunk()); if (state() == State::FINISHED) return; - replica::Lock lock(_mtx, context() + __func__); - if (state() == State::FINISHED) return; // The default version of the object won't have any difference @@ -259,15 +255,12 @@ void VerifyJob::_onRequestFinish(FindRequest::Ptr const& request) { ReplicaDiff selfReplicaDiff; // against the previous state of the current replica vector otherReplicaDiff; // against other known replicas - auto self = shared_from_base(); - if (request->extendedState() == Request::ExtendedState::SUCCESS) { // TODO: // - check if the replica still exists. It's fine if it's gone // because some jobs may choose either to purge extra replicas // or re-balance the cluster. So, no subscriber notification is needed // here. - ; // Compare new state of the replica against its older one which was @@ -280,17 +273,14 @@ void VerifyJob::_onRequestFinish(FindRequest::Ptr const& request) { // ATTENTIONS: Replica differences are reported into the log stream only // when no interest to be notified in the differences // expressed by a caller (no callback provided). - ReplicaInfo const& oldReplica = _replicas[request->id()]; selfReplicaDiff = ReplicaDiff(oldReplica, request->responseData()); if (selfReplicaDiff() and not _onReplicaDifference) { LOGS(_log, LOG_LVL_INFO, context() << "replica mismatch for self\n" << selfReplicaDiff); } - vector otherReplicas; controller()->serviceProvider()->databaseServices()->findReplicas(otherReplicas, oldReplica.chunk(), oldReplica.database()); - for (auto&& replica : otherReplicas) { ReplicaDiff diff(request->responseData(), replica); if (not diff.isSelf()) { @@ -302,33 +292,30 @@ void VerifyJob::_onRequestFinish(FindRequest::Ptr const& request) { } else { // Report the error and keep going - LOGS(_log, LOG_LVL_ERROR, context() << "failed request " << request->context() << " worker: " << request->workerName() << " database: " << request->database() << " chunk: " << request->chunk()); } // Remove the processed replica, fetch another one and begin processing it - _replicas.erase(request->id()); _requests.erase(request->id()); - vector replicas; _nextReplicas(lock, replicas, 1); - if (0 == replicas.size()) { LOGS(_log, LOG_LVL_ERROR, context() << __func__ << " ** no replicas found in the database **"); // In theory this should never happen unless all replicas are gone // from the installation. - finish(lock, ExtendedState::FAILED); return; } for (ReplicaInfo const& replica : replicas) { - auto request = controller()->findReplica( - replica.worker(), replica.database(), replica.chunk(), - [self](FindRequest::Ptr request) { self->_onRequestFinish(request); }, + auto request = FindRequest::create( + controller(), replica.worker(), replica.database(), replica.chunk(), + [self = shared_from_base()](FindRequest::Ptr request) { + self->_onRequestFinish(request); + }, priority(), /* inherited from the one of the current job */ computeCheckSum(), true, /* keepTracking*/ id() /* jobId */ @@ -339,10 +326,8 @@ void VerifyJob::_onRequestFinish(FindRequest::Ptr const& request) { // The callback is being made asynchronously in a separate thread // to avoid blocking the current thread. - if (_onReplicaDifference) { - auto self = shared_from_base(); - thread notifier([self, selfReplicaDiff, otherReplicaDiff]() { + thread notifier([self = shared_from_base(), selfReplicaDiff, otherReplicaDiff]() { self->_onReplicaDifference(self, selfReplicaDiff, otherReplicaDiff); }); notifier.detach(); diff --git a/src/replica/requests/DeleteRequest.cc b/src/replica/requests/DeleteRequest.cc index 33fffd88f..099ff6205 100644 --- a/src/replica/requests/DeleteRequest.cc +++ b/src/replica/requests/DeleteRequest.cc @@ -32,7 +32,6 @@ // Qserv headers #include "replica/config/Configuration.h" #include "replica/contr/Controller.h" -#include "replica/requests/Messenger.h" #include "replica/services/DatabaseServices.h" #include "replica/services/ServiceProvider.h" #include "replica/util/ProtocolBuffer.h" @@ -51,27 +50,28 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.DeleteRequest"); namespace lsst::qserv::replica { -DeleteRequest::Ptr DeleteRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, - string const& database, unsigned int chunk, bool allowDuplicate, +DeleteRequest::Ptr DeleteRequest::create(shared_ptr const& controller, string const& workerName, + string const& database, unsigned int chunk, CallbackType const& onFinish, int priority, bool keepTracking, - shared_ptr const& messenger) { - return DeleteRequest::Ptr(new DeleteRequest(serviceProvider, io_service, worker, database, chunk, - allowDuplicate, onFinish, priority, keepTracking, messenger)); + bool allowDuplicate, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = DeleteRequest::Ptr(new DeleteRequest(controller, workerName, database, chunk, onFinish, + priority, keepTracking, allowDuplicate)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -DeleteRequest::DeleteRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - string const& worker, string const& database, unsigned int chunk, - bool allowDuplicate, CallbackType const& onFinish, int priority, - bool keepTracking, shared_ptr const& messenger) - : RequestMessenger(serviceProvider, io_service, "REPLICA_DELETE", worker, priority, keepTracking, - allowDuplicate, +DeleteRequest::DeleteRequest(shared_ptr const& controller, string const& workerName, + string const& database, unsigned int chunk, CallbackType const& onFinish, + int priority, bool keepTracking, bool allowDuplicate) + : RequestMessenger(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "REPLICA_DELETE", workerName, priority, keepTracking, allowDuplicate, true, // disposeRequired - messenger), + controller->serviceProvider()->messenger()), _database(database), _chunk(chunk), _onFinish(onFinish) { - Request::serviceProvider()->config()->assertDatabaseIsValid(database); + controller->serviceProvider()->config()->assertDatabaseIsValid(database); } void DeleteRequest::startImpl(replica::Lock const& lock) { diff --git a/src/replica/requests/DeleteRequest.h b/src/replica/requests/DeleteRequest.h index db92b43fa..5af264f79 100644 --- a/src/replica/requests/DeleteRequest.h +++ b/src/replica/requests/DeleteRequest.h @@ -34,7 +34,7 @@ // Forward declarations namespace lsst::qserv::replica { -class Messenger; +class Controller; } // namespace lsst::qserv::replica // This header declarations @@ -76,22 +76,24 @@ class DeleteRequest : public RequestMessenger { * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param serviceProvider a host of services for various communications - * @param worker the identifier of a worker node (the one where the chunk is supposed + * @param controller the Controller associated with the request + * @param workerName the identifier of a worker node (the one where the chunk is supposed * to be located) at a destination of the chunk. * @param database the name of a database * @param chunk the number of a chunk to replicate (implies all relevant tables) - * @param allowDuplicate follow a previously made request if the current one duplicates it - * @param onFinish an optional callback function to be called upon a completion of the request. - * @param priority a priority level of the request - * @param keepTracking keep tracking the request before it finishes or fails - * @param messenger an interface for communicating with workers + * @param onFinish (optional) an optional callback function to be called upon a completion of the request. + * @param priority (optional) a priority level of the request + * @param keepTracking (optional) keep tracking the request before it finishes or fails + * @param allowDuplicate (optional) follow a previously made request if the current one duplicates it + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return pointer to the created object */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, unsigned int chunk, - bool allowDuplicate, CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, unsigned int chunk, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool keepTracking = true, bool allowDuplicate = true, + std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); protected: /// @see Request::startImpl() @@ -111,10 +113,9 @@ class DeleteRequest : public RequestMessenger { private: /// @see DeleteRequest::create() - DeleteRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, unsigned int chunk, - bool allowDuplicate, CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + DeleteRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, unsigned int chunk, CallbackType const& onFinish, int priority, + bool keepTracking, bool allowDuplicate); /** * Send the serialized content of the buffer to a worker. diff --git a/src/replica/requests/DirectorIndexRequest.cc b/src/replica/requests/DirectorIndexRequest.cc index 4ec4a0760..bb1acfd5d 100644 --- a/src/replica/requests/DirectorIndexRequest.cc +++ b/src/replica/requests/DirectorIndexRequest.cc @@ -33,7 +33,6 @@ // Qserv headers #include "replica/contr/Controller.h" -#include "replica/requests/Messenger.h" #include "replica/services/DatabaseServices.h" #include "replica/services/ServiceProvider.h" #include "replica/util/ProtocolBuffer.h" @@ -59,37 +58,41 @@ ostream& operator<<(ostream& os, DirectorIndexRequestInfo const& info) { return os; } -DirectorIndexRequest::Ptr DirectorIndexRequest::create( - ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - string const& workerName, string const& database, string const& directorTable, unsigned int chunk, - bool hasTransactions, TransactionId transactionId, CallbackType const& onFinish, int priority, - bool keepTracking, shared_ptr const& messenger) { - return DirectorIndexRequest::Ptr(new DirectorIndexRequest( - serviceProvider, io_service, workerName, database, directorTable, chunk, hasTransactions, - transactionId, onFinish, priority, keepTracking, messenger)); +DirectorIndexRequest::Ptr DirectorIndexRequest::create(std::shared_ptr const& controller, + string const& workerName, string const& database, + string const& directorTable, unsigned int chunk, + bool hasTransactions, TransactionId transactionId, + CallbackType const& onFinish, int priority, + bool keepTracking, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = DirectorIndexRequest::Ptr( + new DirectorIndexRequest(controller, workerName, database, directorTable, chunk, hasTransactions, + transactionId, onFinish, priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -DirectorIndexRequest::DirectorIndexRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& workerName, - string const& database, string const& directorTable, - unsigned int chunk, bool hasTransactions, - TransactionId transactionId, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) - : RequestMessenger(serviceProvider, io_service, "INDEX", workerName, priority, keepTracking, +DirectorIndexRequest::DirectorIndexRequest(std::shared_ptr const& controller, + string const& workerName, string const& database, + string const& directorTable, unsigned int chunk, + bool hasTransactions, TransactionId transactionId, + CallbackType const& onFinish, int priority, bool keepTracking) + : RequestMessenger(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "INDEX", workerName, priority, keepTracking, false, // allowDuplicate true, // disposeRequired - messenger), + controller->serviceProvider()->messenger()), _database(database), _directorTable(directorTable), _chunk(chunk), _hasTransactions(hasTransactions), _transactionId(transactionId), _onFinish(onFinish) { - Request::serviceProvider()->config()->assertDatabaseIsValid(database); - _responseData.fileName = serviceProvider->config()->get("database", "qserv-master-tmp-dir") + - "/" + database + "_" + directorTable + "_" + to_string(chunk) + - (hasTransactions ? "_p" + to_string(transactionId) : ""); + controller->serviceProvider()->config()->assertDatabaseIsValid(database); + _responseData.fileName = + controller->serviceProvider()->config()->get("database", "qserv-master-tmp-dir") + "/" + + database + "_" + directorTable + "_" + to_string(chunk) + + (hasTransactions ? "_p" + to_string(transactionId) : ""); } DirectorIndexRequest::~DirectorIndexRequest() { @@ -121,7 +124,6 @@ void DirectorIndexRequest::_sendInitialRequest(replica::Lock const& lock) { // Serialize the Request message header and the request itself into // the network buffer. - buffer()->resize(); ProtocolRequestHeader hdr; @@ -131,7 +133,6 @@ void DirectorIndexRequest::_sendInitialRequest(replica::Lock const& lock) { hdr.set_timeout(requestExpirationIvalSec()); hdr.set_priority(priority()); hdr.set_instance_id(serviceProvider()->instanceId()); - buffer()->serialize(hdr); ProtocolRequestDirectorIndex message; @@ -150,20 +151,16 @@ void DirectorIndexRequest::_sendInitialRequest(replica::Lock const& lock) { void DirectorIndexRequest::awaken(boost::system::error_code const& ec) { string const context_ = context() + string(__func__) + " "; LOGS(_log, LOG_LVL_DEBUG, context_); - if (isAborted(ec)) return; - if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context_); if (state() == State::FINISHED) return; - _sendStatusRequest(lock); } void DirectorIndexRequest::_sendStatusRequest(replica::Lock const& lock) { // Serialize the Status message header and the request itself into // the network buffer. - buffer()->resize(); ProtocolRequestHeader hdr; @@ -171,13 +168,11 @@ void DirectorIndexRequest::_sendStatusRequest(replica::Lock const& lock) { hdr.set_type(ProtocolRequestHeader::REQUEST); hdr.set_management_type(ProtocolManagementRequestType::REQUEST_STATUS); hdr.set_instance_id(serviceProvider()->instanceId()); - buffer()->serialize(hdr); ProtocolRequestStatus message; message.set_id(id()); message.set_queued_type(ProtocolQueuedRequestType::INDEX); - buffer()->serialize(message); _send(lock); @@ -185,10 +180,10 @@ void DirectorIndexRequest::_sendStatusRequest(replica::Lock const& lock) { void DirectorIndexRequest::_send(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - auto self = shared_from_base(); messenger()->send( workerName(), id(), priority(), buffer(), - [self](string const& id, bool success, ProtocolResponseDirectorIndex const& response) { + [self = shared_from_base()](string const& id, bool success, + ProtocolResponseDirectorIndex const& response) { self->_analyze(success, response); }); } @@ -205,7 +200,6 @@ void DirectorIndexRequest::_analyze(bool success, ProtocolResponseDirectorIndex if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context_); if (state() == State::FINISHED) return; - if (!success) { finish(lock, CLIENT_ERROR); return; @@ -252,10 +246,11 @@ void DirectorIndexRequest::_analyze(bool success, ProtocolResponseDirectorIndex // Also note the elevated priority level for the request disposal operations. // This will guarantee (in most cases) that such requests will be fast-track delivered // to (and processed by) the worker. - auto self = shared_from_base(); - dispose(lock, PRIORITY_VERY_HIGH, [self](auto id, auto success, auto message) { - self->_disposed(success, message); - }); + dispose(lock, PRIORITY_VERY_HIGH, + [self = shared_from_base()](auto id, auto success, + auto message) { + self->_disposed(success, message); + }); } } catch (exception const& ex) { _responseData.error = ex.what(); @@ -294,11 +289,9 @@ void DirectorIndexRequest::_analyze(bool success, ProtocolResponseDirectorIndex void DirectorIndexRequest::_disposed(bool success, ProtocolResponseDispose const& message) { string const context_ = context() + string(__func__) + " success=" + bool2str(success) + " "; LOGS(_log, LOG_LVL_DEBUG, context_); - if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context_); if (state() == State::FINISHED) return; - if (!success) { finish(lock, CLIENT_ERROR); return; diff --git a/src/replica/requests/DirectorIndexRequest.h b/src/replica/requests/DirectorIndexRequest.h index 9b0909f41..dddd065d5 100644 --- a/src/replica/requests/DirectorIndexRequest.h +++ b/src/replica/requests/DirectorIndexRequest.h @@ -35,7 +35,7 @@ // Forward declarations namespace lsst::qserv::replica { -class Messenger; +class Controller; } // namespace lsst::qserv::replica // This header declarations @@ -96,7 +96,7 @@ class DirectorIndexRequest : public RequestMessenger { * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param serviceProvider a host of services for various communications + * @param controller the Controller associated with the request * @param workerName the identifier of a worker node (the one where the chunks * expected to be located) * @param database the name of a database @@ -106,18 +106,21 @@ class DirectorIndexRequest : public RequestMessenger { * stores a value of the corresponding super-transaction * @param transactionId (optional) identifier of a super-transaction. This parameter is used * only if the above defined flag 'hasTransactions' is set. - * @param onFinish an optional callback function to be called upon a completion of + * @param onFinish (optional) callback function to be called upon a completion of * the request - * @param priority a priority level of the request - * @param keepTracking keep tracking the request before it finishes or fails - * @param messenger an interface for communicating with workers + * @param priority (optional) a priority level of the request + * @param keepTracking (optional) keep tracking the request before it finishes or fails + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return pointer to the created object */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& database, - std::string const& directorTable, unsigned int chunk, bool hasTransactions, - TransactionId transactionId, CallbackType const& onFinish, int priority, - bool keepTracking, std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, std::string const& directorTable, unsigned int chunk, + bool hasTransactions, TransactionId transactionId, + CallbackType const& onFinish = nullptr, int priority = PRIORITY_NORMAL, + bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); std::list> extendedPersistentState() const final; @@ -135,11 +138,10 @@ class DirectorIndexRequest : public RequestMessenger { void awaken(boost::system::error_code const& ec) final; private: - DirectorIndexRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& database, - std::string const& directorTable, unsigned int chunk, bool hasTransactions, - TransactionId transactionId, CallbackType const& onFinish, int priority, - bool keepTracking, std::shared_ptr const& messenger); + DirectorIndexRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, std::string const& directorTable, unsigned int chunk, + bool hasTransactions, TransactionId transactionId, CallbackType const& onFinish, + int priority, bool keepTracking); /** * Send the initial request for pulling data from the server. diff --git a/src/replica/requests/EchoRequest.cc b/src/replica/requests/EchoRequest.cc index b62bd8ba9..c0725b00a 100644 --- a/src/replica/requests/EchoRequest.cc +++ b/src/replica/requests/EchoRequest.cc @@ -31,7 +31,6 @@ // Qserv headers #include "replica/contr/Controller.h" -#include "replica/requests/Messenger.h" #include "replica/services/DatabaseServices.h" #include "replica/services/ServiceProvider.h" #include "replica/util/ProtocolBuffer.h" @@ -51,23 +50,24 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.EchoRequest"); namespace lsst::qserv::replica { -EchoRequest::Ptr EchoRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& workerName, +EchoRequest::Ptr EchoRequest::create(shared_ptr const& controller, string const& workerName, string const& data, uint64_t delay, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) { - return EchoRequest::Ptr(new EchoRequest(serviceProvider, io_service, workerName, data, delay, onFinish, - priority, keepTracking, messenger)); + int priority, bool keepTracking, std::string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = EchoRequest::Ptr( + new EchoRequest(controller, workerName, data, delay, onFinish, priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -EchoRequest::EchoRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - string const& workerName, string const& data, uint64_t delay, - CallbackType const& onFinish, int priority, bool keepTracking, - shared_ptr const& messenger) - : RequestMessenger(serviceProvider, io_service, "TEST_ECHO", workerName, priority, keepTracking, +EchoRequest::EchoRequest(shared_ptr const& controller, string const& workerName, + string const& data, uint64_t delay, CallbackType const& onFinish, int priority, + bool keepTracking) + : RequestMessenger(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "TEST_ECHO", workerName, priority, keepTracking, false, // allowDuplicate true, // disposeRequired - messenger), + controller->serviceProvider()->messenger()), _data(data), _delay(delay), _onFinish(onFinish) {} @@ -81,7 +81,6 @@ void EchoRequest::startImpl(replica::Lock const& lock) { // Serialize the Request message header and the request itself into // the network buffer. - buffer()->resize(); ProtocolRequestHeader hdr; @@ -91,13 +90,11 @@ void EchoRequest::startImpl(replica::Lock const& lock) { hdr.set_timeout(requestExpirationIvalSec()); hdr.set_priority(priority()); hdr.set_instance_id(serviceProvider()->instanceId()); - buffer()->serialize(hdr); ProtocolRequestEcho message; message.set_data(data()); message.set_delay(delay()); - buffer()->serialize(message); _send(lock); @@ -107,14 +104,12 @@ void EchoRequest::awaken(boost::system::error_code const& ec) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); if (isAborted(ec)) return; - if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context() + __func__); if (state() == State::FINISHED) return; // Serialize the Status message header and the request itself into // the network buffer. - buffer()->resize(); ProtocolRequestHeader hdr; @@ -122,13 +117,11 @@ void EchoRequest::awaken(boost::system::error_code const& ec) { hdr.set_type(ProtocolRequestHeader::REQUEST); hdr.set_management_type(ProtocolManagementRequestType::REQUEST_STATUS); hdr.set_instance_id(serviceProvider()->instanceId()); - buffer()->serialize(hdr); ProtocolRequestStatus message; message.set_id(id()); message.set_queued_type(ProtocolQueuedRequestType::TEST_ECHO); - buffer()->serialize(message); _send(lock); @@ -136,10 +129,10 @@ void EchoRequest::awaken(boost::system::error_code const& ec) { void EchoRequest::_send(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - auto self = shared_from_base(); messenger()->send( workerName(), id(), priority(), buffer(), - [self](string const& id, bool success, ProtocolResponseEcho const& response) { + [self = shared_from_base()](string const& id, bool success, + ProtocolResponseEcho const& response) { self->_analyze(success, response); }); } @@ -155,7 +148,6 @@ void EchoRequest::_analyze(bool success, ProtocolResponseEcho const& message) { if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context() + __func__); if (state() == State::FINISHED) return; - if (not success) { finish(lock, CLIENT_ERROR); return; diff --git a/src/replica/requests/EchoRequest.h b/src/replica/requests/EchoRequest.h index f155e76d5..824e973c0 100644 --- a/src/replica/requests/EchoRequest.h +++ b/src/replica/requests/EchoRequest.h @@ -34,7 +34,7 @@ // Forward declarations namespace lsst::qserv::replica { -class Messenger; +class Controller; } // namespace lsst::qserv::replica // This header declarations @@ -78,20 +78,22 @@ class EchoRequest : public RequestMessenger { * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param serviceProvider provider of various services + * @param controller the Controller associated with the request * @param workerName identifier of a worker node * @param data data string to be echoed back by a worker * @param delay execution time (milliseconds) of the request at worker * @param onFinish (optional) callback function to call upon completion of the request * @param priority priority level of the request * @param keepTracking keep tracking the request before it finishes or fails - * @param messenger interface for communicating with workers + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return pointer to the created object */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& data, uint64_t delay, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& data, uint64_t delay, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); /// @see Request::extendedPersistentState() std::list> extendedPersistentState() const override; @@ -111,10 +113,9 @@ class EchoRequest : public RequestMessenger { private: /// @see EchoRequest::create() - EchoRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& data, uint64_t delay, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + EchoRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& data, uint64_t delay, CallbackType const& onFinish, int priority, + bool keepTracking); /** * Send the serialized content of the buffer to a worker diff --git a/src/replica/requests/FindAllRequest.cc b/src/replica/requests/FindAllRequest.cc index 46d799fe2..c6242711e 100644 --- a/src/replica/requests/FindAllRequest.cc +++ b/src/replica/requests/FindAllRequest.cc @@ -31,7 +31,6 @@ // Qserv headers #include "replica/contr/Controller.h" -#include "replica/requests/Messenger.h" #include "replica/services/DatabaseServices.h" #include "replica/services/ServiceProvider.h" #include "replica/util/ProtocolBuffer.h" @@ -50,29 +49,28 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.FindAllRequest"); namespace lsst::qserv::replica { -FindAllRequest::Ptr FindAllRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& workerName, +FindAllRequest::Ptr FindAllRequest::create(shared_ptr const& controller, string const& workerName, string const& database, bool saveReplicaInfo, CallbackType const& onFinish, int priority, bool keepTracking, - shared_ptr const& messenger) { - return FindAllRequest::Ptr(new FindAllRequest(serviceProvider, io_service, workerName, database, - saveReplicaInfo, onFinish, priority, keepTracking, - messenger)); + string const& jobId, unsigned int requestExpirationIvalSec) { + auto ptr = FindAllRequest::Ptr(new FindAllRequest(controller, workerName, database, saveReplicaInfo, + onFinish, priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -FindAllRequest::FindAllRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& workerName, +FindAllRequest::FindAllRequest(shared_ptr const& controller, string const& workerName, string const& database, bool saveReplicaInfo, CallbackType const& onFinish, - int priority, bool keepTracking, shared_ptr const& messenger) - : RequestMessenger(serviceProvider, io_service, "REPLICA_FIND_ALL", workerName, priority, - keepTracking, + int priority, bool keepTracking) + : RequestMessenger(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "REPLICA_FIND_ALL", workerName, priority, keepTracking, false, // allowDuplicate true, // disposeRequired - messenger), + controller->serviceProvider()->messenger()), _database(database), _saveReplicaInfo(saveReplicaInfo), _onFinish(onFinish) { - Request::serviceProvider()->config()->assertDatabaseIsValid(database); + controller->serviceProvider()->config()->assertDatabaseIsValid(database); } const ReplicaInfoCollection& FindAllRequest::responseData() const { return _replicaInfoCollection; } @@ -82,7 +80,6 @@ void FindAllRequest::startImpl(replica::Lock const& lock) { // Serialize the Request message header and the request itself into // the network buffer. - buffer()->resize(); ProtocolRequestHeader hdr; @@ -92,12 +89,10 @@ void FindAllRequest::startImpl(replica::Lock const& lock) { hdr.set_timeout(requestExpirationIvalSec()); hdr.set_priority(priority()); hdr.set_instance_id(serviceProvider()->instanceId()); - buffer()->serialize(hdr); ProtocolRequestFindAll message; message.set_database(database()); - buffer()->serialize(message); _send(lock); @@ -107,14 +102,12 @@ void FindAllRequest::awaken(boost::system::error_code const& ec) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); if (isAborted(ec)) return; - if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context() + __func__); if (state() == State::FINISHED) return; // Serialize the Status message header and the request itself into // the network buffer. - buffer()->resize(); ProtocolRequestHeader hdr; @@ -122,24 +115,21 @@ void FindAllRequest::awaken(boost::system::error_code const& ec) { hdr.set_type(ProtocolRequestHeader::REQUEST); hdr.set_management_type(ProtocolManagementRequestType::REQUEST_STATUS); hdr.set_instance_id(serviceProvider()->instanceId()); - buffer()->serialize(hdr); ProtocolRequestStatus message; message.set_id(id()); message.set_queued_type(ProtocolQueuedRequestType::REPLICA_FIND_ALL); - buffer()->serialize(message); - // Send the message _send(lock); } void FindAllRequest::_send(replica::Lock const& lock) { - auto self = shared_from_base(); messenger()->send( workerName(), id(), priority(), buffer(), - [self](string const& id, bool success, ProtocolResponseFindAll const& response) { + [self = shared_from_base()](string const& id, bool success, + ProtocolResponseFindAll const& response) { self->_analyze(success, response); }); } @@ -155,7 +145,6 @@ void FindAllRequest::_analyze(bool success, ProtocolResponseFindAll const& messa if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context() + __func__); if (state() == State::FINISHED) return; - if (not success) { finish(lock, CLIENT_ERROR); return; diff --git a/src/replica/requests/FindAllRequest.h b/src/replica/requests/FindAllRequest.h index 9b25da410..0f85bdcdf 100644 --- a/src/replica/requests/FindAllRequest.h +++ b/src/replica/requests/FindAllRequest.h @@ -34,7 +34,7 @@ // Forward declarations namespace lsst::qserv::replica { -class Messenger; +class Controller; } // namespace lsst::qserv::replica // This header declarations @@ -58,7 +58,6 @@ class FindAllRequest : public RequestMessenger { ~FindAllRequest() final = default; std::string const& database() const { return _database; } - bool saveReplicaInfo() const { return _saveReplicaInfo; } /// @return target request specific parameters @@ -78,22 +77,25 @@ class FindAllRequest : public RequestMessenger { * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param serviceProvider a host of services for various communications + * @param controller the Controller associated with the request * @param workerName the identifier of a worker node (the one where the chunks * expected to be located) * @param database the name of a database - * @param saveReplicaInfo save replica info in a database - * @param onFinish an optional callback function to be called upon a completion of + * @param saveReplicaInfo (optional) save replica info in a database + * @param onFinish (optional) callback function to be called upon a completion of * the request - * @param priority a priority level of the request - * @param keepTracking keep tracking the request before it finishes or fails - * @param messenger an interface for communicating with workers + * @param priority (optional) a priority level of the request + * @param keepTracking (optional) keep tracking the request before it finishes or fails + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return pointer to the created object */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& database, bool saveReplicaInfo, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, bool saveReplicaInfo = true, + CallbackType const& onFinish = nullptr, int priority = PRIORITY_NORMAL, + bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); /// @see Request::extendedPersistentState() std::list> extendedPersistentState() const final; @@ -113,10 +115,9 @@ class FindAllRequest : public RequestMessenger { private: /// @see FindAllRequest::create() - FindAllRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& database, bool saveReplicaInfo, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + FindAllRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, bool saveReplicaInfo, CallbackType const& onFinish, + int priority, bool keepTracking); /** * Send the serialized content of the buffer to a worker diff --git a/src/replica/requests/FindRequest.cc b/src/replica/requests/FindRequest.cc index 708f7c316..e947b3cbc 100644 --- a/src/replica/requests/FindRequest.cc +++ b/src/replica/requests/FindRequest.cc @@ -51,28 +51,29 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.FindRequest"); namespace lsst::qserv::replica { -FindRequest::Ptr FindRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& workerName, - string const& database, unsigned int chunk, bool computeCheckSum, - CallbackType const& onFinish, int priority, bool keepTracking, - shared_ptr const& messenger) { - return FindRequest::Ptr(new FindRequest(serviceProvider, io_service, workerName, database, chunk, - computeCheckSum, onFinish, priority, keepTracking, messenger)); +FindRequest::Ptr FindRequest::create(shared_ptr const& controller, string const& workerName, + string const& database, unsigned int chunk, CallbackType const& onFinish, + int priority, bool computeCheckSum, bool keepTracking, + string const& jobId, unsigned int requestExpirationIvalSec) { + auto ptr = FindRequest::Ptr(new FindRequest(controller, workerName, database, chunk, onFinish, priority, + computeCheckSum, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -FindRequest::FindRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - string const& workerName, string const& database, unsigned int chunk, - bool computeCheckSum, CallbackType const& onFinish, int priority, bool keepTracking, - shared_ptr const& messenger) - : RequestMessenger(serviceProvider, io_service, "REPLICA_FIND", workerName, priority, keepTracking, +FindRequest::FindRequest(shared_ptr const& controller, string const& workerName, + string const& database, unsigned int chunk, CallbackType const& onFinish, + int priority, bool computeCheckSum, bool keepTracking) + : RequestMessenger(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "REPLICA_FIND", workerName, priority, keepTracking, false, // allowDuplicate true, // disposeRequired - messenger), + controller->serviceProvider()->messenger()), _database(database), _chunk(chunk), _computeCheckSum(computeCheckSum), _onFinish(onFinish) { - Request::serviceProvider()->config()->assertDatabaseIsValid(database); + controller->serviceProvider()->config()->assertDatabaseIsValid(database); } ReplicaInfo const& FindRequest::responseData() const { return _replicaInfo; } @@ -85,7 +86,6 @@ void FindRequest::startImpl(replica::Lock const& lock) { // Serialize the Request message header and the request itself into // the network buffer. - buffer()->resize(); ProtocolRequestHeader hdr; @@ -95,14 +95,12 @@ void FindRequest::startImpl(replica::Lock const& lock) { hdr.set_timeout(requestExpirationIvalSec()); hdr.set_priority(priority()); hdr.set_instance_id(serviceProvider()->instanceId()); - buffer()->serialize(hdr); ProtocolRequestFind message; message.set_database(database()); message.set_chunk(chunk()); message.set_compute_cs(computeCheckSum()); - buffer()->serialize(message); _send(lock); @@ -112,14 +110,12 @@ void FindRequest::awaken(boost::system::error_code const& ec) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); if (isAborted(ec)) return; - if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context() + __func__); if (state() == State::FINISHED) return; // Serialize the Status message header and the request itself into // the network buffer. - buffer()->resize(); ProtocolRequestHeader hdr; @@ -127,13 +123,11 @@ void FindRequest::awaken(boost::system::error_code const& ec) { hdr.set_type(ProtocolRequestHeader::REQUEST); hdr.set_management_type(ProtocolManagementRequestType::REQUEST_STATUS); hdr.set_instance_id(serviceProvider()->instanceId()); - buffer()->serialize(hdr); ProtocolRequestStatus message; message.set_id(id()); message.set_queued_type(ProtocolQueuedRequestType::REPLICA_FIND); - buffer()->serialize(message); _send(lock); @@ -141,10 +135,10 @@ void FindRequest::awaken(boost::system::error_code const& ec) { void FindRequest::_send(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__); - auto self = shared_from_base(); messenger()->send( workerName(), id(), priority(), buffer(), - [self](string const& id, bool success, ProtocolResponseFind const& response) { + [self = shared_from_base()](string const& id, bool success, + ProtocolResponseFind const& response) { self->_analyze(success, response); }); } @@ -160,7 +154,6 @@ void FindRequest::_analyze(bool success, ProtocolResponseFind const& message) { if (state() == State::FINISHED) return; replica::Lock lock(_mtx, context() + __func__); if (state() == State::FINISHED) return; - if (not success) { finish(lock, CLIENT_ERROR); return; diff --git a/src/replica/requests/FindRequest.h b/src/replica/requests/FindRequest.h index 9d6f0f946..b720320ae 100644 --- a/src/replica/requests/FindRequest.h +++ b/src/replica/requests/FindRequest.h @@ -34,7 +34,7 @@ // Forward declarations namespace lsst::qserv::replica { -class Messenger; +class Controller; } // namespace lsst::qserv::replica // This header declarations @@ -78,23 +78,25 @@ class FindRequest : public RequestMessenger { * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param serviceProvider a host of services for various communications + * @param controller the Controller associated with the request * @param workerName the identifier of a worker node (the one where the chunk is * expected to be located) at a destination of the chunk * @param database the name of a database * @param chunk the number of a chunk to find (implies all relevant tables) - * @param computeCheckSum tell a worker server to compute check/control sum on each file * @param onFinish an optional callback function to be called upon a completion of * the request. * @param priority a priority level of the request + * @param computeCheckSum tell a worker server to compute check/control sum on each file * @param keepTracking keep tracking the request before it finishes or fails - * @param messenger an interface for communicating with workers + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return pointer to the created object */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& database, unsigned int chunk, - bool computeCheckSum, CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, unsigned int chunk, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool computeCheckSum = false, bool keepTracking = true, + std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0); /// @see Request::extendedPersistentState() std::list> extendedPersistentState() const final; @@ -113,10 +115,9 @@ class FindRequest : public RequestMessenger { void awaken(boost::system::error_code const& ec) final; private: - FindRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& database, unsigned int chunk, - bool computeCheckSum, CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + FindRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, unsigned int chunk, CallbackType const& onFinish, int priority, + bool computeCheckSum, bool keepTracking); /** * Send the serialized content of the buffer to a worker diff --git a/src/replica/requests/ReplicationRequest.cc b/src/replica/requests/ReplicationRequest.cc index 9292e53e0..c07ad349a 100644 --- a/src/replica/requests/ReplicationRequest.cc +++ b/src/replica/requests/ReplicationRequest.cc @@ -32,7 +32,6 @@ // Qserv headers #include "replica/config/Configuration.h" #include "replica/contr/Controller.h" -#include "replica/requests/Messenger.h" #include "replica/services/DatabaseServices.h" #include "replica/services/ServiceProvider.h" #include "replica/util/ProtocolBuffer.h" @@ -51,36 +50,33 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.ReplicationRequest"); namespace lsst::qserv::replica { -ReplicationRequest::Ptr ReplicationRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, - string const& workerName, string const& sourceWorkerName, - string const& database, unsigned int chunk, - bool allowDuplicate, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) { - return ReplicationRequest::Ptr(new ReplicationRequest(serviceProvider, io_service, workerName, - sourceWorkerName, database, chunk, allowDuplicate, - onFinish, priority, keepTracking, messenger)); +ReplicationRequest::Ptr ReplicationRequest::create( + shared_ptr const& controller, string const& workerName, string const& sourceWorkerName, + string const& database, unsigned int chunk, CallbackType const& onFinish, int priority, + bool keepTracking, bool allowDuplicate, string const& jobId, unsigned int requestExpirationIvalSec) { + auto ptr = ReplicationRequest::Ptr(new ReplicationRequest(controller, workerName, sourceWorkerName, + database, chunk, onFinish, priority, + keepTracking, allowDuplicate)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -ReplicationRequest::ReplicationRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& workerName, +ReplicationRequest::ReplicationRequest(shared_ptr const& controller, string const& workerName, string const& sourceWorkerName, string const& database, - unsigned int chunk, bool allowDuplicate, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) - : RequestMessenger(serviceProvider, io_service, "REPLICA_CREATE", workerName, priority, keepTracking, - allowDuplicate, + unsigned int chunk, CallbackType const& onFinish, int priority, + bool keepTracking, bool allowDuplicate) + : RequestMessenger(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "REPLICA_CREATE", workerName, priority, keepTracking, allowDuplicate, true, // disposeRequired - messenger), + controller->serviceProvider()->messenger()), _database(database), _chunk(chunk), _sourceWorkerName(sourceWorkerName), _onFinish(onFinish), _replicaInfo() { - Request::serviceProvider()->config()->assertWorkerIsValid(sourceWorkerName); - Request::serviceProvider()->config()->assertWorkersAreDifferent(sourceWorkerName, workerName); - Request::serviceProvider()->config()->assertDatabaseIsValid(database); + controller->serviceProvider()->config()->assertWorkerIsValid(sourceWorkerName); + controller->serviceProvider()->config()->assertWorkersAreDifferent(sourceWorkerName, workerName); + controller->serviceProvider()->config()->assertDatabaseIsValid(database); } void ReplicationRequest::startImpl(replica::Lock const& lock) { diff --git a/src/replica/requests/ReplicationRequest.h b/src/replica/requests/ReplicationRequest.h index 3ae0631b6..30821f1c8 100644 --- a/src/replica/requests/ReplicationRequest.h +++ b/src/replica/requests/ReplicationRequest.h @@ -37,7 +37,7 @@ // Forward declarations namespace lsst::qserv::replica { -class Messenger; +class Controller; } // namespace lsst::qserv::replica // This header declarations @@ -74,31 +74,32 @@ class ReplicationRequest : public RequestMessenger { ReplicaInfo const& responseData() const { return _replicaInfo; } /** - * Create a new request with specified parameters. + * Create and start a new request with specified parameters. * * Static factory method is needed to prevent issue with the lifespan * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param serviceProvider a host of services for various communications - * @param io_service BOOST ASIO API + * @param controller the Controller associated with the request * @param workerName the identifier of a worker node (the one to be affected by the replication) * at a destination of the chunk * @param sourceWorkerName the identifier of a worker node at a source of the chunk * @param database the name of a database * @param chunk the number of a chunk to replicate (implies all relevant tables) - * @param allowDuplicate follow a previously made request if the current one duplicates it - * @param onFinish an optional callback function to be called upon a completion of the request. - * @param priority a priority level of the request - * @param keepTracking keep tracking the request before it finishes or fails - * @param messenger worker messaging service + * @param onFinish (optional) an optional callback function to be called upon a completion of the request. + * @param priority (optional) a priority level of the request + * @param keepTracking (optional) keep tracking the request before it finishes or fails + * @param allowDuplicate (optional) follow a previously made request if the current one duplicates it + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return pointer to the created object */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& sourceWorkerName, - std::string const& database, unsigned int chunk, bool allowDuplicate, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& sourceWorkerName, std::string const& database, unsigned int chunk, + CallbackType const& onFinish = nullptr, int priority = PRIORITY_NORMAL, + bool keepTracking = true, bool allowDuplicate = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); /// @see Request::extendedPersistentState() std::list> extendedPersistentState() const override; @@ -118,11 +119,9 @@ class ReplicationRequest : public RequestMessenger { private: /// @see ReplicationRequest::create() - ReplicationRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& workerName, std::string const& sourceWorkerName, - std::string const& database, unsigned int chunk, bool allowDuplicate, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + ReplicationRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& sourceWorkerName, std::string const& database, unsigned int chunk, + CallbackType const& onFinish, int priority, bool keepTracking, bool allowDuplicate); /** * Send the serialized content of the buffer to a worker diff --git a/src/replica/requests/SqlAlterTablesRequest.cc b/src/replica/requests/SqlAlterTablesRequest.cc index cea86e1ba..8dc4541b8 100644 --- a/src/replica/requests/SqlAlterTablesRequest.cc +++ b/src/replica/requests/SqlAlterTablesRequest.cc @@ -23,6 +23,7 @@ #include "replica/requests/SqlAlterTablesRequest.h" // Qserv headers +#include "replica/contr/Controller.h" #include "replica/services/ServiceProvider.h" // LSST headers @@ -39,22 +40,22 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.SqlAlterTablesRequest"); namespace lsst::qserv::replica { SqlAlterTablesRequest::Ptr SqlAlterTablesRequest::create( - ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - string const& worker, string const& database, vector const& tables, string const& alterSpec, - CallbackType const& onFinish, int priority, bool keepTracking, - shared_ptr const& messenger) { - return Ptr(new SqlAlterTablesRequest(serviceProvider, io_service, worker, database, tables, alterSpec, - onFinish, priority, keepTracking, messenger)); + shared_ptr const& controller, string const& workerName, string const& database, + vector const& tables, string const& alterSpec, CallbackType const& onFinish, int priority, + bool keepTracking, string const& jobId, unsigned int requestExpirationIvalSec) { + auto ptr = Ptr(new SqlAlterTablesRequest(controller, workerName, database, tables, alterSpec, onFinish, + priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -SqlAlterTablesRequest::SqlAlterTablesRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, - string const& database, vector const& tables, - string const& alterSpec, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) - : SqlRequest(serviceProvider, io_service, "SQL_ALTER_TABLES", worker, 0, /* maxRows */ - priority, keepTracking, messenger), +SqlAlterTablesRequest::SqlAlterTablesRequest(shared_ptr const& controller, + string const& workerName, string const& database, + vector const& tables, string const& alterSpec, + CallbackType const& onFinish, int priority, bool keepTracking) + : SqlRequest(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "SQL_ALTER_TABLES", workerName, 0, /* maxRows */ + priority, keepTracking, controller->serviceProvider()->messenger()), _onFinish(onFinish) { // Finish initializing the request body's content requestBody.set_type(ProtocolRequestSql::ALTER_TABLE); @@ -70,7 +71,6 @@ SqlAlterTablesRequest::SqlAlterTablesRequest(ServiceProvider::Ptr const& service void SqlAlterTablesRequest::notify(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[" << ProtocolRequestSql_Type_Name(requestBody.type()) << "]"); - notifyDefaultImpl(lock, _onFinish); } diff --git a/src/replica/requests/SqlAlterTablesRequest.h b/src/replica/requests/SqlAlterTablesRequest.h index 59a6ca7b1..94aeb7634 100644 --- a/src/replica/requests/SqlAlterTablesRequest.h +++ b/src/replica/requests/SqlAlterTablesRequest.h @@ -33,6 +33,11 @@ #include "replica/requests/SqlRequest.h" #include "replica/util/Common.h" +// Forward declarations +namespace lsst::qserv::replica { +class Controller; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -55,35 +60,34 @@ class SqlAlterTablesRequest : public SqlRequest { /** * Create a new request with specified parameters. - * @param serviceProvider Is needed to access the Configuration and - * the Controller for communicating with the worker. - * @param io_service The BOOST ASIO communication end-point. - * @param worker An identifier of a worker node. + * @param controller the Controller associated with the request + * @param workerName An identifier of a worker node. * @param database The name of an existing database where the tables are residing. * @param tables The names of tables affected by the operation. * @param alterSpec A specification of what to change following 'ALTER TABLE '. * @param onFinish (optional) A callback function to call upon completion of * the request. - * @param priority A priority level of the request. - * @param keepTracking Keep tracking the request before it finishes or fails. - * @param messenger An interface for communicating with workers. + * @param priority (optional) A priority level of the request. + * @param keepTracking (optional) Keep tracking the request before it finishes or fails. + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return A smart pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, - std::vector const& tables, std::string const& alterSpec, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, std::vector const& tables, + std::string const& alterSpec, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: void notify(replica::Lock const& lock) final; private: - SqlAlterTablesRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, - std::vector const& tables, std::string const& alterSpec, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + SqlAlterTablesRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, std::vector const& tables, + std::string const& alterSpec, CallbackType const& onFinish, int priority, + bool keepTracking); CallbackType _onFinish; ///< @note is reset when the request finishes }; diff --git a/src/replica/requests/SqlCreateDbRequest.cc b/src/replica/requests/SqlCreateDbRequest.cc index 7f349581e..a051a535b 100644 --- a/src/replica/requests/SqlCreateDbRequest.cc +++ b/src/replica/requests/SqlCreateDbRequest.cc @@ -23,6 +23,7 @@ #include "replica/requests/SqlCreateDbRequest.h" // Qserv headers +#include "replica/contr/Controller.h" #include "replica/services/ServiceProvider.h" // LSST headers @@ -38,22 +39,23 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.SqlCreateDbRequest"); namespace lsst::qserv::replica { -SqlCreateDbRequest::Ptr SqlCreateDbRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, - std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) { - return Ptr(new SqlCreateDbRequest(serviceProvider, io_service, worker, database, onFinish, priority, - keepTracking, messenger)); +SqlCreateDbRequest::Ptr SqlCreateDbRequest::create(shared_ptr const& controller, + string const& workerName, std::string const& database, + CallbackType const& onFinish, int priority, + bool keepTracking, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = + Ptr(new SqlCreateDbRequest(controller, workerName, database, onFinish, priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -SqlCreateDbRequest::SqlCreateDbRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, +SqlCreateDbRequest::SqlCreateDbRequest(shared_ptr const& controller, string const& workerName, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) - : SqlRequest(serviceProvider, io_service, "SQL_CREATE_DATABASE", worker, 0 /* maxRows */, priority, - keepTracking, messenger), + int priority, bool keepTracking) + : SqlRequest(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "SQL_CREATE_DATABASE", workerName, 0 /* maxRows */, priority, keepTracking, + controller->serviceProvider()->messenger()), _onFinish(onFinish) { // Finish initializing the request body's content requestBody.set_type(ProtocolRequestSql::CREATE_DATABASE); @@ -63,7 +65,6 @@ SqlCreateDbRequest::SqlCreateDbRequest(ServiceProvider::Ptr const& serviceProvid void SqlCreateDbRequest::notify(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[" << ProtocolRequestSql_Type_Name(requestBody.type()) << "]"); - notifyDefaultImpl(lock, _onFinish); } diff --git a/src/replica/requests/SqlCreateDbRequest.h b/src/replica/requests/SqlCreateDbRequest.h index 287d899f3..2d4f46ff0 100644 --- a/src/replica/requests/SqlCreateDbRequest.h +++ b/src/replica/requests/SqlCreateDbRequest.h @@ -29,6 +29,11 @@ // Qserv headers #include "replica/requests/SqlRequest.h" +// Forward declarations +namespace lsst::qserv::replica { +class Controller; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -58,19 +63,21 @@ class SqlCreateDbRequest : public SqlRequest { * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param serviceProvider Services of the Replication framework. - * @param io_service Asynchronous communication services. - * @param worker A unique identifier of a worker node. + * @param controller the Controller associated with the request + * @param workerName A unique identifier of a worker node. * @param database The name of a database to be created. - * @param onFinish The (optional) callback function to call upon completion of the request. - * @param priority A priority level of the request. - * @param keepTracking Keep tracking the request before it finishes or fails. - * @param messenger An service for communications with workers. + * @param onFinish (optional) The callback function to call upon completion of the request. + * @param priority (optional)A priority level of the request. + * @param keepTracking (optional) Keep tracking the request before it finishes or fails. + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return A pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: /// @see Request::notify() @@ -78,9 +85,9 @@ class SqlCreateDbRequest : public SqlRequest { private: /// @see SqlCreateDbRequest::create() - SqlCreateDbRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + SqlCreateDbRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, CallbackType const& onFinish, int priority, + bool keepTracking); CallbackType _onFinish; ///< @note is reset when the request finishes }; diff --git a/src/replica/requests/SqlCreateIndexesRequest.cc b/src/replica/requests/SqlCreateIndexesRequest.cc index 467780dda..f13f6080b 100644 --- a/src/replica/requests/SqlCreateIndexesRequest.cc +++ b/src/replica/requests/SqlCreateIndexesRequest.cc @@ -23,6 +23,7 @@ #include "replica/requests/SqlCreateIndexesRequest.h" // Qserv headers +#include "replica/contr/Controller.h" #include "replica/services/ServiceProvider.h" // LSST headers @@ -39,26 +40,24 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.SqlCreateIndexesRequest"); namespace lsst::qserv::replica { SqlCreateIndexesRequest::Ptr SqlCreateIndexesRequest::create( - ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - string const& worker, string const& database, vector const& tables, - SqlRequestParams::IndexSpec const& indexSpec, string const& indexName, string const& indexComment, - vector const& indexColumns, CallbackType const& onFinish, int priority, - bool keepTracking, shared_ptr const& messenger) { - return Ptr(new SqlCreateIndexesRequest(serviceProvider, io_service, worker, database, tables, indexSpec, - indexName, indexComment, indexColumns, onFinish, priority, - keepTracking, messenger)); + shared_ptr const& controller, string const& workerName, string const& database, + vector const& tables, SqlRequestParams::IndexSpec const& indexSpec, string const& indexName, + string const& indexComment, vector const& indexColumns, CallbackType const& onFinish, + int priority, bool keepTracking, string const& jobId, unsigned int requestExpirationIvalSec) { + auto ptr = Ptr(new SqlCreateIndexesRequest(controller, workerName, database, tables, indexSpec, indexName, + indexComment, indexColumns, onFinish, priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -SqlCreateIndexesRequest::SqlCreateIndexesRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, - string const& database, vector const& tables, - SqlRequestParams::IndexSpec const& indexSpec, - string const& indexName, string const& indexComment, - vector const& indexColumns, - CallbackType const& onFinish, int priority, - bool keepTracking, shared_ptr const& messenger) - : SqlRequest(serviceProvider, io_service, "SQL_CREATE_TABLE_INDEXES", worker, 0, /* maxRows */ - priority, keepTracking, messenger), +SqlCreateIndexesRequest::SqlCreateIndexesRequest( + shared_ptr const& controller, string const& workerName, string const& database, + vector const& tables, SqlRequestParams::IndexSpec const& indexSpec, string const& indexName, + string const& indexComment, vector const& indexColumns, CallbackType const& onFinish, + int priority, bool keepTracking) + : SqlRequest(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "SQL_CREATE_TABLE_INDEXES", workerName, 0, /* maxRows */ + priority, keepTracking, controller->serviceProvider()->messenger()), _onFinish(onFinish) { // Finish initializing the request body's content requestBody.set_type(ProtocolRequestSql::CREATE_TABLE_INDEX); @@ -83,7 +82,6 @@ SqlCreateIndexesRequest::SqlCreateIndexesRequest(ServiceProvider::Ptr const& ser void SqlCreateIndexesRequest::notify(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[" << ProtocolRequestSql_Type_Name(requestBody.type()) << "]"); - notifyDefaultImpl(lock, _onFinish); } diff --git a/src/replica/requests/SqlCreateIndexesRequest.h b/src/replica/requests/SqlCreateIndexesRequest.h index 4a958398c..03ed6a562 100644 --- a/src/replica/requests/SqlCreateIndexesRequest.h +++ b/src/replica/requests/SqlCreateIndexesRequest.h @@ -33,6 +33,11 @@ #include "replica/requests/SqlRequest.h" #include "replica/util/Common.h" +// Forward declarations +namespace lsst::qserv::replica { +class Controller; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -62,42 +67,41 @@ class SqlCreateIndexesRequest : public SqlRequest { * and memory management of instances created otherwise (as values or via * low-level pointers). * - * @param serviceProvider Is needed to access the Configuration and - * the Controller for communicating with the worker. - * @param io_service The BOOST ASIO communication end-point. - * @param worker An identifier of a worker node. + * @param controller the Controller associated with the request + * @param workerName An identifier of a worker node. * @param database The name of an existing database where the tables are residing. * @param tables The names of tables affected by the operation. * @param indexSpec The type specification of the index. * @param indexName The name of the index. * @param indexComment An arbitrary comment string documenting the index. * @param indexColumns Column definitions (name,length,ordering) for the index. - * @param onFinish (optional) A callback function to call upon completion of + * @param onFinish (optional) The callback function to call upon completion of * the request. - * @param priority A priority level of the request. - * @param keepTracking Keep tracking the request before it finishes or fails. - * @param messenger An interface for communicating with workers. + * @param priority (optional) The priority level of the request. + * @param keepTracking (optional) Keep tracking the request before it finishes or fails. + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * * @return A pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, - std::vector const& tables, SqlRequestParams::IndexSpec const& indexSpec, - std::string const& indexName, std::string const& indexComment, - std::vector const& indexColumns, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, std::vector const& tables, + SqlRequestParams::IndexSpec const& indexSpec, std::string const& indexName, + std::string const& indexComment, std::vector const& indexColumns, + CallbackType const& onFinish = nullptr, int priority = PRIORITY_NORMAL, + bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: void notify(replica::Lock const& lock) final; private: - SqlCreateIndexesRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, - std::vector const& tables, + SqlCreateIndexesRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, std::vector const& tables, SqlRequestParams::IndexSpec const& indexSpec, std::string const& indexName, std::string const& indexComment, std::vector const& indexColumns, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + CallbackType const& onFinish, int priority, bool keepTracking); CallbackType _onFinish; ///< @note is reset when the request finishes }; diff --git a/src/replica/requests/SqlDeleteDbRequest.cc b/src/replica/requests/SqlDeleteDbRequest.cc index 3d465866b..5012598c1 100644 --- a/src/replica/requests/SqlDeleteDbRequest.cc +++ b/src/replica/requests/SqlDeleteDbRequest.cc @@ -23,6 +23,7 @@ #include "replica/requests/SqlDeleteDbRequest.h" // Qserv headers +#include "replica/contr/Controller.h" #include "replica/services/ServiceProvider.h" // LSST headers @@ -38,22 +39,23 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.SqlDeleteDbRequest"); namespace lsst::qserv::replica { -SqlDeleteDbRequest::Ptr SqlDeleteDbRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, - std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) { - return Ptr(new SqlDeleteDbRequest(serviceProvider, io_service, worker, database, onFinish, priority, - keepTracking, messenger)); +SqlDeleteDbRequest::Ptr SqlDeleteDbRequest::create(shared_ptr const& controller, + string const& workerName, std::string const& database, + CallbackType const& onFinish, int priority, + bool keepTracking, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = + Ptr(new SqlDeleteDbRequest(controller, workerName, database, onFinish, priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -SqlDeleteDbRequest::SqlDeleteDbRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, +SqlDeleteDbRequest::SqlDeleteDbRequest(shared_ptr const& controller, string const& workerName, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) - : SqlRequest(serviceProvider, io_service, "SQL_DROP_DATABASE", worker, 0 /* maxRows */, priority, - keepTracking, messenger), + int priority, bool keepTracking) + : SqlRequest(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "SQL_DROP_DATABASE", workerName, 0 /* maxRows */, priority, keepTracking, + controller->serviceProvider()->messenger()), _onFinish(onFinish) { // Finish initializing the request body's content requestBody.set_type(ProtocolRequestSql::DROP_DATABASE); @@ -63,7 +65,6 @@ SqlDeleteDbRequest::SqlDeleteDbRequest(ServiceProvider::Ptr const& serviceProvid void SqlDeleteDbRequest::notify(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[" << ProtocolRequestSql_Type_Name(requestBody.type()) << "]"); - notifyDefaultImpl(lock, _onFinish); } diff --git a/src/replica/requests/SqlDeleteDbRequest.h b/src/replica/requests/SqlDeleteDbRequest.h index b3fb1e38c..c41c504b5 100644 --- a/src/replica/requests/SqlDeleteDbRequest.h +++ b/src/replica/requests/SqlDeleteDbRequest.h @@ -29,6 +29,11 @@ // Qserv headers #include "replica/requests/SqlRequest.h" +// Forward declarations +namespace lsst::qserv::replica { +class Controller; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -57,21 +62,22 @@ class SqlDeleteDbRequest : public SqlRequest { * Static factory method is needed to prevent issue with the lifespan * and memory management of instances created otherwise (as values or via * low-level pointers). - * @param serviceProvider Is needed to access the Configuration and - * the Controller for communicating with the worker. - * @param io_service The BOOST ASIO communication end-point. - * @param worker An identifier of a worker node. + * @param controller the Controller associated with the request + * @param workerName An identifier of a worker node. * @param database The name of an existing database to be deleted. - * @param onFinish The (optional) callback function to call upon completion of + * @param onFinish (optional) The callback function to call upon completion of * the request. - * @param priority The priority level of the request. - * @param keepTracking Keep tracking the request before it finishes or fails. - * @param messenger An interface for communicating with workers. + * @param priority (optional) The priority level of the request. + * @param keepTracking (optional) Keep tracking the request before it finishes or fails. + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return A pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: /// @see Request::notify() @@ -79,9 +85,9 @@ class SqlDeleteDbRequest : public SqlRequest { private: /// @see SqlDeleteDbRequest::create() - SqlDeleteDbRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + SqlDeleteDbRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, CallbackType const& onFinish, int priority, + bool keepTracking); CallbackType _onFinish; ///< @note is reset when the request finishes }; diff --git a/src/replica/requests/SqlDisableDbRequest.cc b/src/replica/requests/SqlDisableDbRequest.cc index 152c8159d..3bb841820 100644 --- a/src/replica/requests/SqlDisableDbRequest.cc +++ b/src/replica/requests/SqlDisableDbRequest.cc @@ -23,6 +23,7 @@ #include "replica/requests/SqlDisableDbRequest.h" // Qserv headers +#include "replica/contr/Controller.h" #include "replica/services/ServiceProvider.h" // LSST headers @@ -38,23 +39,23 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.SqlDisableDbRequest"); namespace lsst::qserv::replica { -SqlDisableDbRequest::Ptr SqlDisableDbRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, - string const& worker, std::string const& database, +SqlDisableDbRequest::Ptr SqlDisableDbRequest::create(shared_ptr const& controller, + string const& workerName, std::string const& database, CallbackType const& onFinish, int priority, - bool keepTracking, - shared_ptr const& messenger) { - return Ptr(new SqlDisableDbRequest(serviceProvider, io_service, worker, database, onFinish, priority, - keepTracking, messenger)); + bool keepTracking, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = + Ptr(new SqlDisableDbRequest(controller, workerName, database, onFinish, priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -SqlDisableDbRequest::SqlDisableDbRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, +SqlDisableDbRequest::SqlDisableDbRequest(shared_ptr const& controller, string const& workerName, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) - : SqlRequest(serviceProvider, io_service, "SQL_DISABLE_DATABASE", worker, 0 /* maxRows */, priority, - keepTracking, messenger), + int priority, bool keepTracking) + : SqlRequest(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "SQL_DISABLE_DATABASE", workerName, 0 /* maxRows */, priority, keepTracking, + controller->serviceProvider()->messenger()), _onFinish(onFinish) { // Finish initializing the request body's content requestBody.set_type(ProtocolRequestSql::DISABLE_DATABASE); @@ -64,7 +65,6 @@ SqlDisableDbRequest::SqlDisableDbRequest(ServiceProvider::Ptr const& serviceProv void SqlDisableDbRequest::notify(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[" << ProtocolRequestSql_Type_Name(requestBody.type()) << "]"); - notifyDefaultImpl(lock, _onFinish); } diff --git a/src/replica/requests/SqlDisableDbRequest.h b/src/replica/requests/SqlDisableDbRequest.h index c8e87b533..b03f0ffca 100644 --- a/src/replica/requests/SqlDisableDbRequest.h +++ b/src/replica/requests/SqlDisableDbRequest.h @@ -29,6 +29,11 @@ // Qserv headers #include "replica/requests/SqlRequest.h" +// Forward declarations +namespace lsst::qserv::replica { +class Controller; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -57,21 +62,23 @@ class SqlDisableDbRequest : public SqlRequest { * Static factory method is needed to prevent issue with the lifespan * and memory management of instances created otherwise (as values or via * low-level pointers). - * @param serviceProvider Is needed to access the Configuration and - * the Controller for communicating with the worker. + * @param controller the Controller associated with the request * @param io_service The BOOST ASIO communication end-point. - * @param worker An identifier of a worker node. + * @param workerName An identifier of a worker node. * @param database The name of an existing database to be disabled. - * @param onFinish The (optional) callback function to call upon completion of + * @param onFinish (optional) The callback function to call upon completion of * the request. - * @param priority The priority level of the request. - * @param keepTracking Keep tracking the request before it finishes or fails. - * @param messenger An interface for communicating with workers. + * @param priority (optional) The priority level of the request. + * @param keepTracking (optional) Keep tracking the request before it finishes or fails. + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return A pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: /// @see Request::notify() @@ -79,9 +86,9 @@ class SqlDisableDbRequest : public SqlRequest { private: /// @see SqlDisableDbRequest::create() - SqlDisableDbRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + SqlDisableDbRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, CallbackType const& onFinish, int priority, + bool keepTracking); CallbackType _onFinish; ///< @note is reset when the request finishes }; diff --git a/src/replica/requests/SqlEnableDbRequest.cc b/src/replica/requests/SqlEnableDbRequest.cc index 061895e28..53a660223 100644 --- a/src/replica/requests/SqlEnableDbRequest.cc +++ b/src/replica/requests/SqlEnableDbRequest.cc @@ -23,6 +23,7 @@ #include "replica/requests/SqlEnableDbRequest.h" // Qserv headers +#include "replica/contr/Controller.h" #include "replica/services/ServiceProvider.h" // LSST headers @@ -38,22 +39,23 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.SqlEnableDbRequest"); namespace lsst::qserv::replica { -SqlEnableDbRequest::Ptr SqlEnableDbRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, - std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) { - return Ptr(new SqlEnableDbRequest(serviceProvider, io_service, worker, database, onFinish, priority, - keepTracking, messenger)); +SqlEnableDbRequest::Ptr SqlEnableDbRequest::create(shared_ptr const& controller, + string const& workerName, std::string const& database, + CallbackType const& onFinish, int priority, + bool keepTracking, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = + Ptr(new SqlEnableDbRequest(controller, workerName, database, onFinish, priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -SqlEnableDbRequest::SqlEnableDbRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, +SqlEnableDbRequest::SqlEnableDbRequest(shared_ptr const& controller, string const& workerName, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, - shared_ptr const& messenger) - : SqlRequest(serviceProvider, io_service, "SQL_ENABLE_DATABASE", worker, 0 /* maxRows */, priority, - keepTracking, messenger), + int priority, bool keepTracking) + : SqlRequest(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "SQL_ENABLE_DATABASE", workerName, 0 /* maxRows */, priority, keepTracking, + controller->serviceProvider()->messenger()), _onFinish(onFinish) { // Finish initializing the request body's content requestBody.set_type(ProtocolRequestSql::ENABLE_DATABASE); @@ -63,7 +65,6 @@ SqlEnableDbRequest::SqlEnableDbRequest(ServiceProvider::Ptr const& serviceProvid void SqlEnableDbRequest::notify(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[" << ProtocolRequestSql_Type_Name(requestBody.type()) << "]"); - notifyDefaultImpl(lock, _onFinish); } diff --git a/src/replica/requests/SqlEnableDbRequest.h b/src/replica/requests/SqlEnableDbRequest.h index 541c8833d..15a4dce16 100644 --- a/src/replica/requests/SqlEnableDbRequest.h +++ b/src/replica/requests/SqlEnableDbRequest.h @@ -29,6 +29,11 @@ // Qserv headers #include "replica/requests/SqlRequest.h" +// Forward declarations +namespace lsst::qserv::replica { +class Controller; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -57,21 +62,22 @@ class SqlEnableDbRequest : public SqlRequest { * Static factory method is needed to prevent issue with the lifespan * and memory management of instances created otherwise (as values or via * low-level pointers). - * @param serviceProvider Is needed to access the Configuration and - * the Controller for communicating with the worker. - * @param io_service The BOOST ASIO communication end-point. - * @param worker An identifier of a worker node. + * @param controller the Controller associated with the request + * @param workerName An identifier of a worker node. * @param database The name of an existing database to be enabled. - * @param onFinish The (optional) callback function to call upon completion of + * @param onFinish (optional) The callback function to call upon completion of * the request. - * @param priority The priority level of the request. - * @param keepTracking Keep tracking the request before it finishes or fails. - * @param messenger An interface for communicating with workers. + * @param priority (optional) The priority level of the request. + * @param keepTracking (optional) Keep tracking the request before it finishes or fails. + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return A pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: /// @see Request::notify() @@ -79,9 +85,9 @@ class SqlEnableDbRequest : public SqlRequest { private: /// @see SqlEnableDbRequest::create() - SqlEnableDbRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + SqlEnableDbRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, CallbackType const& onFinish, int priority, + bool keepTracking); CallbackType _onFinish; ///< @note is reset when the request finishes }; diff --git a/src/replica/requests/SqlGrantAccessRequest.cc b/src/replica/requests/SqlGrantAccessRequest.cc index a0e99c774..f454e8d7e 100644 --- a/src/replica/requests/SqlGrantAccessRequest.cc +++ b/src/replica/requests/SqlGrantAccessRequest.cc @@ -23,6 +23,7 @@ #include "replica/requests/SqlGrantAccessRequest.h" // Qserv headers +#include "replica/contr/Controller.h" #include "replica/services/ServiceProvider.h" // LSST headers @@ -38,24 +39,25 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.SqlGrantAccessRequest"); namespace lsst::qserv::replica { -SqlGrantAccessRequest::Ptr SqlGrantAccessRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, - string const& worker, std::string const& database, - std::string const& user, +SqlGrantAccessRequest::Ptr SqlGrantAccessRequest::create(shared_ptr const& controller, + string const& workerName, + std::string const& database, std::string const& user, CallbackType const& onFinish, int priority, - bool keepTracking, - shared_ptr const& messenger) { - return Ptr(new SqlGrantAccessRequest(serviceProvider, io_service, worker, database, user, onFinish, - priority, keepTracking, messenger)); + bool keepTracking, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = Ptr(new SqlGrantAccessRequest(controller, workerName, database, user, onFinish, priority, + keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -SqlGrantAccessRequest::SqlGrantAccessRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, - std::string const& database, std::string const& user, - CallbackType const& onFinish, int priority, bool keepTracking, - shared_ptr const& messenger) - : SqlRequest(serviceProvider, io_service, "SQL_GRANT_ACCESS", worker, 0 /* maxRows */, priority, - keepTracking, messenger), +SqlGrantAccessRequest::SqlGrantAccessRequest(shared_ptr const& controller, + string const& workerName, std::string const& database, + std::string const& user, CallbackType const& onFinish, + int priority, bool keepTracking) + : SqlRequest(controller->serviceProvider(), controller->serviceProvider()->io_service(), + "SQL_GRANT_ACCESS", workerName, 0 /* maxRows */, priority, keepTracking, + controller->serviceProvider()->messenger()), _onFinish(onFinish) { // Finish initializing the request body's content requestBody.set_type(ProtocolRequestSql::GRANT_ACCESS); @@ -66,7 +68,6 @@ SqlGrantAccessRequest::SqlGrantAccessRequest(ServiceProvider::Ptr const& service void SqlGrantAccessRequest::notify(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[" << ProtocolRequestSql_Type_Name(requestBody.type()) << "]"); - notifyDefaultImpl(lock, _onFinish); } diff --git a/src/replica/requests/SqlGrantAccessRequest.h b/src/replica/requests/SqlGrantAccessRequest.h index 8a64d3b92..a2361033e 100644 --- a/src/replica/requests/SqlGrantAccessRequest.h +++ b/src/replica/requests/SqlGrantAccessRequest.h @@ -29,6 +29,11 @@ // Qserv headers #include "replica/requests/SqlRequest.h" +// Forward declarations +namespace lsst::qserv::replica { +class Controller; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -58,23 +63,24 @@ class SqlGrantAccessRequest : public SqlRequest { * Static factory method is needed to prevent issue with the lifespan * and memory management of instances created otherwise (as values or via * low-level pointers). - * @param serviceProvider Is needed to access the Configuration and - * the Controller for communicating with the worker. - * @param io_service The BOOST ASIO communication end-point. - * @param worker An identifier of a worker node. + * @param controller the Controller associated with the request + * @param workerName An identifier of a worker node. * @param database The name of an existing database. * @param user The name of an existing database account to be affected by the operation. - * @param onFinish The (optional) callback function to call upon completion of + * @param onFinish (optional) The callback function to call upon completion of * the request. - * @param priority The priority level of the request. - * @param keepTracking Keep tracking the request before it finishes or fails. - * @param messenger An interface for communicating with workers. + * @param priority (optional) The priority level of the request. + * @param keepTracking (optional) Keep tracking the request before it finishes or fails. + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return A pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, std::string const& user, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, std::string const& user, + CallbackType const& onFinish = nullptr, int priority = PRIORITY_NORMAL, + bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: /// @see Request::notify() @@ -82,10 +88,9 @@ class SqlGrantAccessRequest : public SqlRequest { private: /// @see SqlGrantAccessRequest::create() - SqlGrantAccessRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& database, std::string const& user, - CallbackType const& onFinish, int priority, bool keepTracking, - std::shared_ptr const& messenger); + SqlGrantAccessRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& database, std::string const& user, CallbackType const& onFinish, + int priority, bool keepTracking); CallbackType _onFinish; ///< @note is reset when the request finishes }; diff --git a/src/replica/requests/SqlQueryRequest.cc b/src/replica/requests/SqlQueryRequest.cc index fabde8fe1..b7fb31075 100644 --- a/src/replica/requests/SqlQueryRequest.cc +++ b/src/replica/requests/SqlQueryRequest.cc @@ -23,6 +23,7 @@ #include "replica/requests/SqlQueryRequest.h" // Qserv headers +#include "replica/contr/Controller.h" #include "replica/services/ServiceProvider.h" // LSST headers @@ -38,23 +39,24 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.SqlQueryRequest"); namespace lsst::qserv::replica { -SqlQueryRequest::Ptr SqlQueryRequest::create(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, - std::string const& query, std::string const& user, - std::string const& password, uint64_t maxRows, - CallbackType const& onFinish, int priority, bool keepTracking, - shared_ptr const& messenger) { - return Ptr(new SqlQueryRequest(serviceProvider, io_service, worker, query, user, password, maxRows, - onFinish, priority, keepTracking, messenger)); +SqlQueryRequest::Ptr SqlQueryRequest::create(shared_ptr const& controller, + string const& workerName, std::string const& query, + std::string const& user, std::string const& password, + uint64_t maxRows, CallbackType const& onFinish, int priority, + bool keepTracking, string const& jobId, + unsigned int requestExpirationIvalSec) { + auto ptr = Ptr(new SqlQueryRequest(controller, workerName, query, user, password, maxRows, onFinish, + priority, keepTracking)); + ptr->start(controller, jobId, requestExpirationIvalSec); + return ptr; } -SqlQueryRequest::SqlQueryRequest(ServiceProvider::Ptr const& serviceProvider, - boost::asio::io_service& io_service, string const& worker, +SqlQueryRequest::SqlQueryRequest(shared_ptr const& controller, string const& workerName, std::string const& query, std::string const& user, std::string const& password, uint64_t maxRows, CallbackType const& onFinish, - int priority, bool keepTracking, shared_ptr const& messenger) - : SqlRequest(serviceProvider, io_service, "SQL_QUERY", worker, maxRows, priority, keepTracking, - messenger), + int priority, bool keepTracking) + : SqlRequest(controller->serviceProvider(), controller->serviceProvider()->io_service(), "SQL_QUERY", + workerName, maxRows, priority, keepTracking, controller->serviceProvider()->messenger()), _onFinish(onFinish) { // Finish initializing the request body's content requestBody.set_type(ProtocolRequestSql::QUERY); @@ -66,7 +68,6 @@ SqlQueryRequest::SqlQueryRequest(ServiceProvider::Ptr const& serviceProvider, void SqlQueryRequest::notify(replica::Lock const& lock) { LOGS(_log, LOG_LVL_DEBUG, context() << __func__ << "[" << ProtocolRequestSql_Type_Name(requestBody.type()) << "]"); - notifyDefaultImpl(lock, _onFinish); } diff --git a/src/replica/requests/SqlQueryRequest.h b/src/replica/requests/SqlQueryRequest.h index dfc34223e..7d1ea54e3 100644 --- a/src/replica/requests/SqlQueryRequest.h +++ b/src/replica/requests/SqlQueryRequest.h @@ -30,6 +30,11 @@ // Qserv headers #include "replica/requests/SqlRequest.h" +// Forward declarations +namespace lsst::qserv::replica { +class Controller; +} // namespace lsst::qserv::replica + // This header declarations namespace lsst::qserv::replica { @@ -60,29 +65,30 @@ class SqlQueryRequest : public SqlRequest { * Static factory method is needed to prevent issue with the lifespan * and memory management of instances created otherwise (as values or via * low-level pointers). - * @param serviceProvider Is needed to access the Configuration and - * the Controller for communicating with the worker. - * @param io_service The BOOST ASIO communication end-point. - * @param worker An identifier of a worker node. + * @param controller the Controller associated with the request + * @param workerName An identifier of a worker node. * @param query The query to be executed. * @param user The name of a database account for connecting to the database service. * @param password The database account password for connecting to the database service. - * @param maxRows The (optional) limit for the maximum number of rows to be returned with - * the request. Leaving the default value of the parameter to 0 will result in not imposing + * @param maxRows The limit for the maximum number of rows to be returned with + * the request. Setting a value of the parameter to 0 will result in not imposing * any explicit restrictions on a size of the result set. Note that other, resource-defined * restrictions will still apply. The later includes the maximum size of the Google Protobuf * objects, the amount of available memory, etc. - * @param onFinish The (optional) callback function to call upon completion of + * @param onFinish (optional) The callback function to call upon completion of * the request. - * @param priority The priority level of the request. - * @param keepTracking Keep tracking the request before it finishes or fails. - * @param messenger An interface for communicating with workers. + * @param priority (optional) The priority level of the request. + * @param keepTracking (optional) Keep tracking the request before it finishes or fails. + * @param jobId (optional) an identifier of a job to which the request belongs + * @param requestExpirationIvalSec (optional) the time in seconds after which the request will expire. + * The default value of '0' means an effective expiration time will be pull from the configuration. * @return A pointer to the created object. */ - static Ptr create(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& query, std::string const& user, - std::string const& password, uint64_t maxRows, CallbackType const& onFinish, - int priority, bool keepTracking, std::shared_ptr const& messenger); + static Ptr create(std::shared_ptr const& controller, std::string const& workerName, + std::string const& query, std::string const& user, std::string const& password, + uint64_t maxRows, CallbackType const& onFinish = nullptr, + int priority = PRIORITY_NORMAL, bool keepTracking = true, std::string const& jobId = "", + unsigned int requestExpirationIvalSec = 0); protected: /// @see Request::notify() @@ -90,10 +96,9 @@ class SqlQueryRequest : public SqlRequest { private: /// @see SqlQueryRequest::create() - SqlQueryRequest(ServiceProvider::Ptr const& serviceProvider, boost::asio::io_service& io_service, - std::string const& worker, std::string const& query, std::string const& user, - std::string const& password, uint64_t maxRows, CallbackType const& onFinish, int priority, - bool keepTracking, std::shared_ptr const& messenger); + SqlQueryRequest(std::shared_ptr const& controller, std::string const& workerName, + std::string const& query, std::string const& user, std::string const& password, + uint64_t maxRows, CallbackType const& onFinish, int priority, bool keepTracking); CallbackType _onFinish; ///< @note is reset when the request finishes };