Skip to content

Commit

Permalink
Refactoring and simplifications in the Qserv worker replication service
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Nov 18, 2024
1 parent 27d2637 commit 36dc72a
Show file tree
Hide file tree
Showing 22 changed files with 119 additions and 1,367 deletions.
13 changes: 3 additions & 10 deletions src/replica/apps/WorkerApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
#include "replica/util/FileUtils.h"
#include "replica/worker/FileServer.h"
#include "replica/worker/WorkerProcessor.h"
#include "replica/worker/WorkerRequestFactory.h"
#include "replica/worker/WorkerServer.h"

// LSST headers
Expand Down Expand Up @@ -111,13 +110,7 @@ int WorkerApp::runImpl() {

_verifyCreateFolders();

// Configure the factory with a pool of persistent connectors
auto const config = serviceProvider()->config();
auto const connectionPool = ConnectionPool::create(Configuration::qservWorkerDbParams(),
config->get<size_t>("database", "services-pool-size"));
WorkerRequestFactory requestFactory(serviceProvider(), connectionPool);

auto const reqProcSvr = WorkerServer::create(serviceProvider(), requestFactory, worker);
auto const reqProcSvr = WorkerServer::create(serviceProvider(), worker);
thread reqProcSvrThread([reqProcSvr]() { reqProcSvr->run(); });

auto const fileSvr = FileServer::create(serviceProvider(), worker);
Expand Down Expand Up @@ -147,8 +140,8 @@ int WorkerApp::runImpl() {
<< " new, in-progress, finished: " << reqProcSvr->processor()->numNewRequests() << ", "
<< reqProcSvr->processor()->numInProgressRequests() << ", "
<< reqProcSvr->processor()->numFinishedRequests());
this_thread::sleep_for(
chrono::seconds(max(1U, config->get<unsigned int>("registry", "heartbeat-ival-sec"))));
this_thread::sleep_for(chrono::seconds(
max(1U, serviceProvider()->config()->get<unsigned int>("registry", "heartbeat-ival-sec"))));
}
reqProcSvrThread.join();
fileSvrThread.join();
Expand Down
1 change: 0 additions & 1 deletion src/replica/worker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ target_sources(replica_worker PRIVATE
WorkerProcessorThread.cc
WorkerReplicationRequest.cc
WorkerRequest.cc
WorkerRequestFactory.cc
WorkerServer.cc
WorkerServerConnection.cc
WorkerSqlRequest.cc
Expand Down
38 changes: 4 additions & 34 deletions src/replica/worker/WorkerDeleteRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,15 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.WorkerDeleteRequest");

namespace lsst::qserv::replica {

//////////////////////////////////////////////////////////////
///////////////////// WorkerDeleteRequest ////////////////////
//////////////////////////////////////////////////////////////

WorkerDeleteRequest::Ptr WorkerDeleteRequest::create(ServiceProvider::Ptr const& serviceProvider,
string const& worker, string const& id, int priority,
ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec,
ProtocolRequestDelete const& request) {
return WorkerDeleteRequest::Ptr(new WorkerDeleteRequest(serviceProvider, worker, id, priority, onExpired,
requestExpirationIvalSec, request));
auto ptr = WorkerDeleteRequest::Ptr(new WorkerDeleteRequest(
serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request));
ptr->init();
return ptr;
}

WorkerDeleteRequest::WorkerDeleteRequest(ServiceProvider::Ptr const& serviceProvider, string const& worker,
Expand Down Expand Up @@ -85,34 +83,6 @@ void WorkerDeleteRequest::setInfo(ProtocolResponseDelete& response) const {
bool WorkerDeleteRequest::execute() {
LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " db: " << database() << " chunk: " << chunk());

return WorkerRequest::execute();
}

///////////////////////////////////////////////////////////////////
///////////////////// WorkerDeleteRequestPOSIX ////////////////////
///////////////////////////////////////////////////////////////////

WorkerDeleteRequestPOSIX::Ptr WorkerDeleteRequestPOSIX::create(ServiceProvider::Ptr const& serviceProvider,
string const& worker, string const& id,
int priority,
ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec,
ProtocolRequestDelete const& request) {
return WorkerDeleteRequestPOSIX::Ptr(new WorkerDeleteRequestPOSIX(
serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec, request));
}

WorkerDeleteRequestPOSIX::WorkerDeleteRequestPOSIX(ServiceProvider::Ptr const& serviceProvider,
string const& worker, string const& id, int priority,
ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec,
ProtocolRequestDelete const& request)
: WorkerDeleteRequest(serviceProvider, worker, id, priority, onExpired, requestExpirationIvalSec,
request) {}

bool WorkerDeleteRequestPOSIX::execute() {
LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " db: " << database() << " chunk: " << chunk());

replica::Lock lock(_mtx, context(__func__));

auto const config = _serviceProvider->config();
Expand Down
47 changes: 3 additions & 44 deletions src/replica/worker/WorkerDeleteRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@ namespace lsst::qserv::replica {

/**
* Class WorkerDeleteRequest represents a context and a state of replica deletion
* requests within the worker servers. It can also be used for testing the framework
* operation as its implementation won't make any changes to any files or databases.
*
* Real implementations of the request processing must derive from this class.
* requests within the worker servers.
*/
class WorkerDeleteRequest : public WorkerRequest {
public:
Expand Down Expand Up @@ -72,12 +69,9 @@ class WorkerDeleteRequest : public WorkerRequest {
WorkerDeleteRequest(WorkerDeleteRequest const&) = delete;
WorkerDeleteRequest& operator=(WorkerDeleteRequest const&) = delete;

~WorkerDeleteRequest() override = default;

// Trivial get methods
virtual ~WorkerDeleteRequest() = default;

std::string const& database() const { return _request.database(); }

unsigned int chunk() const { return _request.chunk(); }

/**
Expand All @@ -86,55 +80,20 @@ class WorkerDeleteRequest : public WorkerRequest {
*/
void setInfo(ProtocolResponseDelete& response) const;

bool execute() override;
virtual bool execute() override;

protected:
WorkerDeleteRequest(ServiceProvider::Ptr const& serviceProvider, std::string const& worker,
std::string const& id, int priority, ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec, ProtocolRequestDelete const& request);

// Input parameters

ProtocolRequestDelete const _request;

/// Extended status of the replica deletion request
ReplicaInfo _replicaInfo;
};

/**
* Class WorkerDeleteRequestPOSIX provides an actual implementation for
* the replica deletion based on the direct manipulation of files on
* a POSIX file system.
*/
class WorkerDeleteRequestPOSIX : public WorkerDeleteRequest {
public:
typedef std::shared_ptr<WorkerDeleteRequestPOSIX> Ptr;

static Ptr create(ServiceProvider::Ptr const& serviceProvider, std::string const& worker,
std::string const& id, int priority, ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec, ProtocolRequestDelete const& request);

WorkerDeleteRequestPOSIX() = delete;
WorkerDeleteRequestPOSIX(WorkerDeleteRequestPOSIX const&) = delete;
WorkerDeleteRequestPOSIX& operator=(WorkerDeleteRequestPOSIX const&) = delete;

~WorkerDeleteRequestPOSIX() final = default;

bool execute() final;

private:
WorkerDeleteRequestPOSIX(ServiceProvider::Ptr const& serviceProvider, std::string const& worker,
std::string const& id, int priority, ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec, ProtocolRequestDelete const& request);
};

/**
* Class WorkerDeleteRequestFS has the same implementation as the 'typedef'-ed
* class for the replica deletion based on the direct manipulation of files on
* a POSIX file system.
*/
typedef WorkerDeleteRequestPOSIX WorkerDeleteRequestFS;

} // namespace lsst::qserv::replica

#endif // LSST_QSERV_REPLICA_WORKERDELETEREQUEST_H
13 changes: 5 additions & 8 deletions src/replica/worker/WorkerDirectorIndexRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ WorkerDirectorIndexRequest::Ptr WorkerDirectorIndexRequest::create(
ServiceProvider::Ptr const& serviceProvider, ConnectionPoolPtr const& connectionPool,
string const& worker, string const& id, int priority, ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec, ProtocolRequestDirectorIndex const& request) {
return WorkerDirectorIndexRequest::Ptr(new WorkerDirectorIndexRequest(serviceProvider, connectionPool,
worker, id, priority, onExpired,
requestExpirationIvalSec, request));
auto ptr = WorkerDirectorIndexRequest::Ptr(
new WorkerDirectorIndexRequest(serviceProvider, connectionPool, worker, id, priority, onExpired,
requestExpirationIvalSec, request));
ptr->init();
return ptr;
}

WorkerDirectorIndexRequest::WorkerDirectorIndexRequest(ServiceProvider::Ptr const& serviceProvider,
Expand All @@ -80,22 +82,17 @@ WorkerDirectorIndexRequest::WorkerDirectorIndexRequest(ServiceProvider::Ptr cons

void WorkerDirectorIndexRequest::setInfo(ProtocolResponseDirectorIndex& response) const {
LOGS(_log, LOG_LVL_DEBUG, context(__func__));

replica::Lock lock(_mtx, context(__func__));

response.set_allocated_target_performance(performance().info().release());
response.set_error(_error);
response.set_data(_data);
response.set_total_bytes(_fileSizeBytes);

*(response.mutable_request()) = _request;
}

bool WorkerDirectorIndexRequest::execute() {
LOGS(_log, LOG_LVL_DEBUG, context(__func__));

replica::Lock lock(_mtx, context(__func__));

switch (status()) {
case ProtocolStatus::IN_PROGRESS:
break;
Expand Down
11 changes: 2 additions & 9 deletions src/replica/worker/WorkerDirectorIndexRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,18 @@ class WorkerDirectorIndexRequest : public WorkerRequest {
WorkerDirectorIndexRequest(WorkerDirectorIndexRequest const&) = delete;
WorkerDirectorIndexRequest& operator=(WorkerDirectorIndexRequest const&) = delete;

~WorkerDirectorIndexRequest() override = default;
virtual ~WorkerDirectorIndexRequest() = default;

/// @return the original request
ProtocolRequestDirectorIndex const& request() const { return _request; }

/**
* Extract request status into the Protobuf response object.
*
* @param response Protobuf response to be initialized
*/
void setInfo(ProtocolResponseDirectorIndex& response) const;

bool execute() override;
virtual bool execute();

private:
WorkerDirectorIndexRequest(ServiceProvider::Ptr const& serviceProvider,
Expand Down Expand Up @@ -147,12 +146,6 @@ class WorkerDirectorIndexRequest : public WorkerRequest {
std::string _data;
};

/// Class WorkerDirectorIndexRequest provides an actual implementation
typedef WorkerDirectorIndexRequest WorkerDirectorIndexRequestFS;

/// Class WorkerDirectorIndexRequest provides an actual implementation
typedef WorkerDirectorIndexRequest WorkerDirectorIndexRequestPOSIX;

} // namespace lsst::qserv::replica

#endif // LSST_QSERV_REPLICA_WORKERDIRECTORINDEXREQUEST_H
15 changes: 4 additions & 11 deletions src/replica/worker/WorkerEchoRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ WorkerEchoRequest::Ptr WorkerEchoRequest::create(ServiceProvider::Ptr const& ser
ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec,
ProtocolRequestEcho const& request) {
return WorkerEchoRequest::Ptr(new WorkerEchoRequest(serviceProvider, worker, id, priority, onExpired,
requestExpirationIvalSec, request));
auto ptr = WorkerEchoRequest::Ptr(new WorkerEchoRequest(serviceProvider, worker, id, priority, onExpired,
requestExpirationIvalSec, request));
ptr->init();
return ptr;
}

WorkerEchoRequest::WorkerEchoRequest(ServiceProvider::Ptr const& serviceProvider, string const& worker,
Expand All @@ -64,39 +66,30 @@ void WorkerEchoRequest::setInfo(ProtocolResponseEcho& response) const {
LOGS(_log, LOG_LVL_DEBUG, context(__func__));

replica::Lock lock(_mtx, context(__func__));

response.set_allocated_target_performance(performance().info().release());
response.set_data(data());

*(response.mutable_request()) = _request;
}

bool WorkerEchoRequest::execute() {
LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " delay:" << delay() << " _delayLeft:" << _delayLeft);

replica::Lock lock(_mtx, context(__func__));

switch (status()) {
case ProtocolStatus::IN_PROGRESS:
break;

case ProtocolStatus::IS_CANCELLING:

// Abort the operation right away

setStatus(lock, ProtocolStatus::CANCELLED);
throw WorkerRequestCancelled();

default:
throw logic_error(context(__func__) +
" not allowed while in state: " + WorkerRequest::status2string(status()));
}

// Block the thread for the random number of milliseconds in the interval
// below. Then update the amount of time which is still left.

util::BlockPost blockPost(1000, 2000);

uint64_t const span = blockPost.wait();
_delayLeft -= (span < _delayLeft) ? span : _delayLeft;

Expand Down
14 changes: 2 additions & 12 deletions src/replica/worker/WorkerEchoRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,9 @@ class WorkerEchoRequest : public WorkerRequest {
WorkerEchoRequest(WorkerEchoRequest const&) = delete;
WorkerEchoRequest& operator=(WorkerEchoRequest const&) = delete;

~WorkerEchoRequest() override = default;

// Trivial get methods
virtual ~WorkerEchoRequest() = default;

std::string const& data() const { return _request.data(); }

uint64_t delay() const { return _request.delay(); }

/**
Expand All @@ -83,27 +80,20 @@ class WorkerEchoRequest : public WorkerRequest {
*/
void setInfo(ProtocolResponseEcho& response) const;

bool execute() override;
virtual bool execute();

protected:
WorkerEchoRequest(ServiceProvider::Ptr const& serviceProvider, std::string const& worker,
std::string const& id, int priority, ExpirationCallbackType const& onExpired,
unsigned int requestExpirationIvalSec, ProtocolRequestEcho const& request);

// Input parameters

ProtocolRequestEcho const _request;

/// The amount of the initial delay which is still left
uint64_t _delayLeft;
};

/// Class WorkerEchoRequest provides an actual implementation
typedef WorkerEchoRequest WorkerEchoRequestFS;

/// Class WorkerEchoRequest provides an actual implementation
typedef WorkerEchoRequest WorkerEchoRequestPOSIX;

} // namespace lsst::qserv::replica

#endif // LSST_QSERV_REPLICA_WORKERECHOREQUEST_H
Loading

0 comments on commit 36dc72a

Please sign in to comment.