Skip to content

Commit

Permalink
Migrated: ReplicationRequest DeleteRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Nov 22, 2024
1 parent 6e44e0d commit bc651da
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 214 deletions.
15 changes: 7 additions & 8 deletions src/replica/apps/ControllerApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,15 +525,14 @@ int ControllerApp::runImpl() {
Request::Ptr request;

if ("REPLICATE" == _requestType) {
request = controller->replicate(
_workerName, _sourceWorkerName, _databaseName, _chunkNumber,
[](Request::Ptr const& request_) { request_->print(); }, _priority, !_doNotTrackRequest,
_allowDuplicates);

request = ReplicationRequest::create(
controller, _workerName, _sourceWorkerName, _databaseName, _chunkNumber,
[](ReplicationRequest::Ptr const& request_) { request_->print(); }, _priority,
!_doNotTrackRequest, _allowDuplicates);
} else if ("DELETE" == _requestType) {
request = controller->deleteReplica(_workerName, _databaseName, _chunkNumber, Request::defaultPrinter,
_priority, !_doNotTrackRequest, _allowDuplicates);

request = DeleteRequest::create(controller, _workerName, _databaseName, _chunkNumber,
Request::defaultPrinter, _priority, !_doNotTrackRequest,
_allowDuplicates);
} else if ("FIND" == _requestType) {
request = controller->findReplica(_workerName, _databaseName, _chunkNumber, Request::defaultPrinter,
_priority, _computeCheckSum, !_doNotTrackRequest);
Expand Down
22 changes: 0 additions & 22 deletions src/replica/contr/Controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,28 +205,6 @@ void Controller::verifyFolders(bool createMissingFolders) const {
FileUtils::verifyFolders("CONTROLLER", folders, createMissingFolders);
}

ReplicationRequest::Ptr Controller::replicate(string const& workerName, string const& sourceWorkerName,
string const& database, unsigned int chunk,
ReplicationRequest::CallbackType const& onFinish, int priority,
bool keepTracking, bool allowDuplicate, string const& jobId,
unsigned int requestExpirationIvalSec) {
LOGS(_log, LOG_LVL_TRACE, _context(__func__));
return _submit<ReplicationRequest, decltype(sourceWorkerName), decltype(database), decltype(chunk),
decltype(allowDuplicate)>(workerName, sourceWorkerName, database, chunk, allowDuplicate,
onFinish, priority, keepTracking, jobId,
requestExpirationIvalSec);
}

DeleteRequest::Ptr Controller::deleteReplica(string const& workerName, string const& database,
unsigned int chunk, DeleteRequest::CallbackType const& onFinish,
int priority, bool keepTracking, bool allowDuplicate,
string const& jobId, unsigned int requestExpirationIvalSec) {
LOGS(_log, LOG_LVL_TRACE, _context(__func__));
return _submit<DeleteRequest, decltype(database), decltype(chunk), decltype(allowDuplicate)>(
workerName, database, chunk, allowDuplicate, onFinish, priority, keepTracking, jobId,
requestExpirationIvalSec);
}

FindRequest::Ptr Controller::findReplica(string const& workerName, string const& database, unsigned int chunk,
FindRequest::CallbackType const& onFinish, int priority,
bool computeCheckSum, bool keepTracking, string const& jobId,
Expand Down
16 changes: 0 additions & 16 deletions src/replica/contr/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,8 @@ class Controller : public std::enable_shared_from_this<Controller> {
~Controller() = default;

ControllerIdentity const& identity() const { return _identity; }

uint64_t startTime() const { return _startTime; }

ServiceProvider::Ptr const& serviceProvider() const { return _serviceProvider; }

boost::asio::io_service& io_service() { return serviceProvider()->io_service(); }

/**
Expand All @@ -215,19 +212,6 @@ class Controller : public std::enable_shared_from_this<Controller> {
*/
void verifyFolders(bool createMissingFolders = false) const;

std::shared_ptr<ReplicationRequest> replicate(
std::string const& workerName, std::string const& sourceWorkerName, std::string const& database,
unsigned int chunk,
std::function<void(std::shared_ptr<ReplicationRequest>)> const& onFinish = nullptr,
int priority = PRIORITY_NORMAL, bool keepTracking = true, bool allowDuplicate = true,
std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0);

std::shared_ptr<DeleteRequest> deleteReplica(
std::string const& workerName, std::string const& database, unsigned int chunk,
std::function<void(std::shared_ptr<DeleteRequest>)> const& onFinish = nullptr,
int priority = PRIORITY_NORMAL, bool keepTracking = true, bool allowDuplicate = true,
std::string const& jobId = "", unsigned int requestExpirationIvalSec = 0);

std::shared_ptr<FindRequest> findReplica(
std::string const& workerName, std::string const& database, unsigned int chunk,
std::function<void(std::shared_ptr<FindRequest>)> const& onFinish = nullptr,
Expand Down
47 changes: 10 additions & 37 deletions src/replica/jobs/CreateReplicaJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ CreateReplicaJob::CreateReplicaJob(string const& databaseFamily, unsigned int ch

CreateReplicaJobResult const& CreateReplicaJob::getReplicaData() const {
LOGS(_log, LOG_LVL_DEBUG, context() << __func__);

if (state() == State::FINISHED) return _replicaData;

throw logic_error("CreateReplicaJob::" + string(__func__) +
" the method can't be called while the job hasn't finished");
}
Expand All @@ -90,24 +88,20 @@ list<pair<string, string>> CreateReplicaJob::extendedPersistentState() const {

list<pair<string, string>> CreateReplicaJob::persistentLogData() const {
list<pair<string, string>> result;

auto&& replicaData = getReplicaData();

// Per-worker counters for the following categories:
//
// created-chunks:
// the total number of chunks created on the workers as a result
// of the operation

map<string, map<string, size_t>> workerCategoryCounter;

for (auto&& info : replicaData.replicas) {
workerCategoryCounter[info.worker()]["created-chunks"]++;
}
for (auto&& workerItr : workerCategoryCounter) {
auto&& worker = workerItr.first;
string val = "worker=" + worker;

for (auto&& categoryItr : workerItr.second) {
auto&& category = categoryItr.first;
size_t const counter = categoryItr.second;
Expand All @@ -122,42 +116,34 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) {
LOGS(_log, LOG_LVL_DEBUG, context() << "startImpl");

// Check if configuration parameters are valid

auto const& config = controller()->serviceProvider()->config();

if (not(config->isKnownDatabaseFamily(databaseFamily()) and config->isKnownWorker(sourceWorker()) and
config->isKnownWorker(destinationWorker()) and (sourceWorker() != destinationWorker()))) {
LOGS(_log, LOG_LVL_ERROR,
context() << string(__func__) << " ** MISCONFIGURED ** "
<< " database family: '" << databaseFamily() << "'"
<< " source worker: '" << sourceWorker() << "'"
<< " destination worker: '" << destinationWorker() << "'");

finish(lock, ExtendedState::CONFIG_ERROR);
return;
}

// Make sure no such replicas exist yet at the destination

vector<ReplicaInfo> destinationReplicas;
try {
controller()->serviceProvider()->databaseServices()->findWorkerReplicas(
destinationReplicas, chunk(), destinationWorker(), databaseFamily());

} catch (invalid_argument const& ex) {
LOGS(_log, LOG_LVL_ERROR,
context() << string(__func__) << " ** MISCONFIGURED ** "
<< " chunk: " << chunk() << " destinationWorker: " << destinationWorker()
<< " databaseFamily: " << databaseFamily() << " exception: " << ex.what());

throw;

} catch (exception const& ex) {
LOGS(_log, LOG_LVL_ERROR,
context() << string(__func__) << " ** failed to find replicas ** "
<< " chunk: " << chunk() << " destinationWorker: " << destinationWorker()
<< " databaseFamily: " << databaseFamily() << " exception: " << ex.what());

finish(lock, ExtendedState::FAILED);
return;
}
Expand All @@ -167,7 +153,6 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) {
<< destinationReplicas.size() << " replicas ** "
<< " chunk: " << chunk() << " destinationWorker: " << destinationWorker()
<< " databaseFamily: " << databaseFamily());

finish(lock, ExtendedState::FAILED);
return;
}
Expand All @@ -181,7 +166,6 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) {
//
// 2. launching FindRequest for each member of the database family to
// see if the chunk is available on a source node.

vector<ReplicaInfo> sourceReplicas;
try {
controller()->serviceProvider()->databaseServices()->findWorkerReplicas(
Expand All @@ -194,7 +178,6 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) {
<< " databaseFamily: " << databaseFamily() << " exception: " << ex.what());

throw;

} catch (exception const& ex) {
LOGS(_log, LOG_LVL_ERROR,
context() << string(__func__) << " ** failed to find replicas ** "
Expand All @@ -219,17 +202,15 @@ void CreateReplicaJob::startImpl(replica::Lock const& lock) {
//
// VERY IMPORTANT: the requests are sent for participating databases
// only because some catalogs may not have a full coverage

auto self = shared_from_base<CreateReplicaJob>();

bool const keepTracking = true;
bool const allowDuplicate = true;
for (auto&& replica : sourceReplicas) {
_requests.push_back(controller()->replicate(
destinationWorker(), sourceWorker(), replica.database(), chunk(),
[self](ReplicationRequest::Ptr ptr) { self->_onRequestFinish(ptr); }, priority(),
true, /* keepTracking */
true, /* allowDuplicate */
id() /* jobId */
));
_requests.push_back(ReplicationRequest::create(
controller(), destinationWorker(), sourceWorker(), replica.database(), chunk(),
[self = shared_from_base<CreateReplicaJob>()](ReplicationRequest::Ptr ptr) {
self->_onRequestFinish(ptr);
},
priority(), keepTracking, allowDuplicate, id()));
}
}

Expand All @@ -238,11 +219,10 @@ void CreateReplicaJob::cancelImpl(replica::Lock const& lock) {

// The algorithm will also clear resources taken by various
// locally created objects.

//
// To ensure no lingering "side effects" will be left after cancelling this
// job the request cancellation should be also followed (where it makes a sense)
// by stopping the request at corresponding worker service.

for (auto&& ptr : _requests) {
ptr->cancel();
if (ptr->state() != Request::State::FINISHED)
Expand All @@ -266,9 +246,7 @@ void CreateReplicaJob::_onRequestFinish(ReplicationRequest::Ptr const& request)
<< " sourceWorker=" << sourceWorker() << " chunk=" << chunk());

if (state() == State::FINISHED) return;

replica::Lock lock(_mtx, context() + string(__func__) + "(ReplicationeRequest)");

if (state() == State::FINISHED) return;

++_numRequestsFinished;
Expand All @@ -280,7 +258,6 @@ void CreateReplicaJob::_onRequestFinish(ReplicationRequest::Ptr const& request)

// Evaluate the status of on-going operations to see if the replica creation
// stage has finished.

if (_numRequestsFinished == _requests.size()) {
if (_numRequestsSuccess == _requests.size()) {
// Notify Qserv about the change in a disposition of replicas.
Expand All @@ -290,12 +267,10 @@ void CreateReplicaJob::_onRequestFinish(ReplicationRequest::Ptr const& request)
// NOTE: The current implementation will not be affected by a result
// of the operation. Neither any upstream notifications will be
// sent to a requester of this job.

vector<string> databases;
for (auto&& databaseEntry : _replicaData.chunks[chunk()]) {
databases.push_back(databaseEntry.first);
}

ServiceProvider::Ptr const serviceProvider = controller()->serviceProvider();
if (serviceProvider->config()->get<unsigned int>("xrootd", "auto-notify") != 0) {
_qservAddReplica(lock, chunk(), databases, destinationWorker());
Expand All @@ -314,11 +289,9 @@ void CreateReplicaJob::_qservAddReplica(replica::Lock const& lock, unsigned int
context() << __func__ << " ** START ** Qserv notification on ADD replica:"
<< ", chunk=" << chunk << ", databases=" << util::String::toString(databases)
<< " worker=" << worker);

auto self = shared_from_this();
controller()->serviceProvider()->qservMgtServices()->addReplica(
chunk, databases, worker,
[self, onFinish](AddReplicaQservMgtRequest::Ptr const& request) {
[self = shared_from_this(), onFinish](AddReplicaQservMgtRequest::Ptr const& request) {
LOGS(_log, LOG_LVL_DEBUG,
self->context() << __func__ << " ** FINISH ** Qserv notification on ADD replica:"
<< " chunk=" << request->chunk()
Expand Down
Loading

0 comments on commit bc651da

Please sign in to comment.