Skip to content

Commit

Permalink
Eliminated the replication worker "technology" parameters
Browse files Browse the repository at this point in the history
The parameter is no longer needed after refacttoring and simplifying
a design and implementation of the worker service.

Also cleaned empty lines in the worker code
  • Loading branch information
iagaponenko committed Nov 21, 2024
1 parent 96f6324 commit e42b22b
Show file tree
Hide file tree
Showing 17 changed files with 5 additions and 136 deletions.
1 change: 1 addition & 0 deletions src/replica/apps/WorkerApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ int WorkerApp::runImpl() {

// Keep sending periodic 'heartbeats' to the Registry service to report
// a configuration and a status of the current worker.
auto const config = serviceProvider()->config();
while (true) {
try {
serviceProvider()->registry()->addWorker(worker);
Expand Down
6 changes: 2 additions & 4 deletions src/replica/config/ConfigTestData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ map<string, set<string>> ConfigTestData::parameters() {
{"xrootd",
{"auto-notify", "request-timeout-sec", "host", "port", "allow-reconnect", "reconnect-timeout"}},
{"worker",
{"technology",
"num-threads",
{"num-threads",
"num-svc-processing-threads",
"num-fs-processing-threads",
"fs-buf-size-bytes",
Expand Down Expand Up @@ -113,8 +112,7 @@ json ConfigTestData::data() {
{"request-timeout-sec", 400},
{"allow-reconnect", 0},
{"reconnect-timeout", 500}});
generalObj["worker"] = json::object({{"technology", "POSIX"},
{"num-threads", 3},
generalObj["worker"] = json::object({{"num-threads", 3},
{"num-svc-processing-threads", 4},
{"num-fs-processing-threads", 5},
{"fs-buf-size-bytes", 1024},
Expand Down
7 changes: 1 addition & 6 deletions src/replica/config/ConfigurationSchema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,7 @@ json const ConfigurationSchema::_schemaJson = json::object(
" (if the server is not up, or if it's not reachable for some reason)"},
{"default", 3600}}}}},
{"worker",
{{"technology",
{{"description",
"The name of a technology for implementing replica management requests at workers."},
{"restricted", {{"type", "set"}, {"values", json::array({"FS", "POSIX", "TEST"})}}},
{"default", "FS"}}},
{"num-threads",
{{"num-threads",
{{"description",
"The number of threads managed by BOOST ASIO. Must be greater than 0."
" Note that setting too many threads may result in a significant memory footprint"
Expand Down
1 change: 0 additions & 1 deletion src/replica/proto/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,6 @@ message ProtocolServiceResponse {
RUNNING = 2;
}
required ServiceState service_state = 3;
required string technology = 4;

/// When the service started (milliseconds since UNIX Epoch)
required uint64 start_time = 5;
Expand Down
2 changes: 0 additions & 2 deletions src/replica/requests/ServiceManagementRequestBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ void ServiceState::set(ProtocolServiceResponse const& message) {
throw runtime_error("ServiceState::" + string(__func__) +
" service state found in protocol is unknown");
}
technology = message.technology();
startTime = message.start_time();

numNewRequests = message.num_new_requests();
Expand All @@ -121,7 +120,6 @@ ostream& operator<<(ostream& os, ServiceState const& ss) {
os << "ServiceState:\n"
<< "\n Summary:\n\n"
<< " service state: " << ss.state2string() << "\n"
<< " technology: " << ss.technology << "\n"
<< " start time [ms]: " << ss.startTime << " (" << secondsAgo << " seconds ago)\n"
<< " total new requests: " << ss.numNewRequests << "\n"
<< " total in-progress requests: " << ss.numInProgressRequests << "\n"
Expand Down
3 changes: 0 additions & 3 deletions src/replica/requests/ServiceManagementRequestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ struct ServiceState {
enum State { SUSPEND_IN_PROGRESS = 0, SUSPENDED = 1, RUNNING = 2 };
State state;

/// The back-end technology
std::string technology;

/// When the service started (milliseconds since UNIX Epoch)
uint64_t startTime;

Expand Down
5 changes: 0 additions & 5 deletions src/replica/tests/testConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ BOOST_AUTO_TEST_CASE(ConfigurationTestReadingGeneralParameters) {

BOOST_CHECK(config->get<size_t>("database", "services-pool-size") == 2);

BOOST_CHECK(config->get<string>("worker", "technology") == "POSIX");
BOOST_CHECK(config->get<size_t>("worker", "num-threads") == 3);
BOOST_CHECK(config->get<size_t>("worker", "num-svc-processing-threads") == 4);
BOOST_CHECK(config->get<size_t>("worker", "num-fs-processing-threads") == 5);
Expand Down Expand Up @@ -322,10 +321,6 @@ BOOST_AUTO_TEST_CASE(ConfigurationTestModifyingGeneralParameters) {
BOOST_REQUIRE_NO_THROW(config->set<size_t>("database", "services-pool-size", 3));
BOOST_CHECK(config->get<size_t>("database", "services-pool-size") == 3);

BOOST_CHECK_THROW(config->set<string>("worker", "technology", ""), std::invalid_argument);
BOOST_REQUIRE_NO_THROW(config->set<string>("worker", "technology", "FS"));
BOOST_CHECK(config->get<string>("worker", "technology") == "FS");

BOOST_CHECK_THROW(config->set<size_t>("worker", "num-threads", 0), std::invalid_argument);
BOOST_REQUIRE_NO_THROW(config->set<size_t>("worker", "num-threads", 4));
BOOST_CHECK(config->get<size_t>("worker", "num-threads") == 4);
Expand Down
9 changes: 0 additions & 9 deletions src/replica/worker/WorkerDeleteRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,9 @@ WorkerDeleteRequest::WorkerDeleteRequest(ServiceProvider::Ptr const& serviceProv

void WorkerDeleteRequest::setInfo(ProtocolResponseDelete& response) const {
LOGS(_log, LOG_LVL_DEBUG, context(__func__));

replica::Lock lock(_mtx, context(__func__));

response.set_allocated_target_performance(performance().info().release());
response.set_allocated_replica_info(_replicaInfo.info().release());

*(response.mutable_request()) = _request;
}

Expand All @@ -88,26 +85,21 @@ bool WorkerDeleteRequest::execute() {

auto const config = _serviceProvider->config();
DatabaseInfo const databaseInfo = config->databaseInfo(database());

vector<string> const files = FileUtils::partitionedFiles(databaseInfo, chunk());

// The data folder will be locked while performing the operation

int numFilesDeleted = 0;

WorkerRequest::ErrorContext errorContext;
boost::system::error_code ec;
{
replica::Lock dataFolderLock(_mtxDataFolderOperations, context(__func__));

fs::path const dataDir = fs::path(config->get<string>("worker", "data-dir")) / database();
fs::file_status const stat = fs::status(dataDir, ec);
errorContext = errorContext or
reportErrorIf(stat.type() == fs::status_error, ProtocolStatusExt::FOLDER_STAT,
"failed to check the status of directory: " + dataDir.string()) or
reportErrorIf(!fs::exists(stat), ProtocolStatusExt::NO_FOLDER,
"the directory does not exists: " + dataDir.string());

for (const auto& name : files) {
const fs::path file = dataDir / fs::path(name);
if (fs::remove(file, ec)) ++numFilesDeleted;
Expand All @@ -119,7 +111,6 @@ bool WorkerDeleteRequest::execute() {
setStatus(lock, ProtocolStatus::FAILED, errorContext.extendedStatus);
return true;
}

setStatus(lock, ProtocolStatus::SUCCESS);
return true;
}
Expand Down
1 change: 0 additions & 1 deletion src/replica/worker/WorkerEchoRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ WorkerEchoRequest::WorkerEchoRequest(ServiceProvider::Ptr const& serviceProvider

void WorkerEchoRequest::setInfo(ProtocolResponseEcho& response) const {
LOGS(_log, LOG_LVL_DEBUG, context(__func__));

replica::Lock lock(_mtx, context(__func__));
response.set_allocated_target_performance(performance().info().release());
response.set_data(data());
Expand Down
2 changes: 0 additions & 2 deletions src/replica/worker/WorkerFindAllRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ void WorkerFindAllRequest::setInfo(ProtocolResponseFindAll& response) const {

bool WorkerFindAllRequest::execute() {
LOGS(_log, LOG_LVL_DEBUG, context(__func__) << " database: " << database());

replica::Lock lock(_mtx, context(__func__));
checkIfCancelling(lock, __func__);

auto const config = _serviceProvider->config();
DatabaseInfo const databaseInfo = config->databaseInfo(database());

Expand Down
22 changes: 0 additions & 22 deletions src/replica/worker/WorkerFindRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ WorkerFindRequest::WorkerFindRequest(ServiceProvider::Ptr const& serviceProvider

void WorkerFindRequest::setInfo(ProtocolResponseFind& response) const {
LOGS(_log, LOG_LVL_DEBUG, context(__func__));

replica::Lock lock(_mtx, context(__func__));
response.set_allocated_target_performance(performance().info().release());
response.set_allocated_replica_info(_replicaInfo.info().release());
Expand All @@ -94,27 +93,22 @@ bool WorkerFindRequest::execute() {
//
// Both methods are combined within the same code block to avoid
// code duplication.

WorkerRequest::ErrorContext errorContext;
boost::system::error_code ec;

if (not computeCheckSum() or not _csComputeEnginePtr) {
auto const config = _serviceProvider->config();
DatabaseInfo const databaseInfo = config->databaseInfo(database());

// Check if the data directory exists and it can be read

replica::Lock dataFolderLock(_mtxDataFolderOperations, context(__func__));

fs::path const dataDir = fs::path(config->get<string>("worker", "data-dir")) / database();
fs::file_status const stat = fs::status(dataDir, ec);

errorContext = errorContext or
reportErrorIf(stat.type() == fs::status_error, ProtocolStatusExt::FOLDER_STAT,
"failed to check the status of directory: " + dataDir.string()) or
reportErrorIf(not fs::exists(stat), ProtocolStatusExt::NO_FOLDER,
"the directory does not exists: " + dataDir.string());

if (errorContext.failed) {
setStatus(lock, ProtocolStatus::FAILED, errorContext.extendedStatus);
return true;
Expand All @@ -138,11 +132,9 @@ bool WorkerFindRequest::execute() {
for (auto&& file : FileUtils::partitionedFiles(databaseInfo, chunk())) {
fs::path const path = dataDir / file;
fs::file_status const stat = fs::status(path, ec);

errorContext = errorContext or
reportErrorIf(stat.type() == fs::status_error, ProtocolStatusExt::FILE_STAT,
"failed to check the status of file: " + path.string());

if (fs::exists(stat)) {
if (not computeCheckSum()) {
// Get file size & mtime right away
Expand All @@ -151,12 +143,10 @@ bool WorkerFindRequest::execute() {
errorContext =
errorContext or reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_SIZE,
"failed to read file size: " + path.string());

const time_t mtime = fs::last_write_time(path, ec);
errorContext =
errorContext or reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_MTIME,
"failed to read file mtime: " + path.string());

fileInfoCollection.emplace_back(ReplicaInfo::FileInfo({
file, size, mtime, "", /* cs */
0, /* beginTransferTime */
Expand Down Expand Up @@ -186,9 +176,7 @@ bool WorkerFindRequest::execute() {
// Fill in the info on the chunk before finishing the operation
_replicaInfo = ReplicaInfo(status, worker(), database(), chunk(), util::TimeUtils::now(),
fileInfoCollection);

setStatus(lock, ProtocolStatus::SUCCESS);

return true;
}

Expand All @@ -203,17 +191,13 @@ bool WorkerFindRequest::execute() {
if (finished) {
// Extract statistics
ReplicaInfo::FileInfoCollection fileInfoCollection;

auto const fileNames = _csComputeEnginePtr->fileNames();
for (auto&& file : fileNames) {
const fs::path path(file);

uint64_t const size = _csComputeEnginePtr->bytes(file);

time_t const mtime = fs::last_write_time(path, ec);
errorContext = errorContext or reportErrorIf(ec.value() != 0, ProtocolStatusExt::FILE_MTIME,
"failed to read file mtime: " + path.string());

fileInfoCollection.emplace_back(ReplicaInfo::FileInfo({
path.filename().string(), size, mtime, to_string(_csComputeEnginePtr->cs(file)),
0, /* beginTransferTime */
Expand All @@ -227,9 +211,7 @@ bool WorkerFindRequest::execute() {
}

// Fnalize the operation

DatabaseInfo const databaseInfo = _serviceProvider->config()->databaseInfo(database());

ReplicaInfo::Status status = ReplicaInfo::Status::NOT_FOUND;
if (fileInfoCollection.size())
status = FileUtils::partitionedFiles(databaseInfo, chunk()).size() == fileNames.size()
Expand All @@ -239,21 +221,17 @@ bool WorkerFindRequest::execute() {
// Fill in the info on the chunk before finishing the operation
_replicaInfo = ReplicaInfo(status, worker(), database(), chunk(), util::TimeUtils::now(),
fileInfoCollection);

setStatus(lock, ProtocolStatus::SUCCESS);
}

} catch (exception const& ex) {
WorkerRequest::ErrorContext errorContext;
errorContext = errorContext or reportErrorIf(true, ProtocolStatusExt::FILE_READ, ex.what());

setStatus(lock, ProtocolStatus::FAILED, errorContext.extendedStatus);
}

// If done (either way) then get rid of the engine right away because
// it may still have allocated buffers
if (finished) _csComputeEnginePtr.reset();

return finished;
}

Expand Down
7 changes: 0 additions & 7 deletions src/replica/worker/WorkerProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,6 @@ void WorkerProcessor::setServiceResponse(ProtocolServiceResponse& response, stri
replica::Lock lock(_mtx, _context(__func__));

response.set_status(status);
response.set_technology("FS");
response.set_start_time(_startTime);

switch (state()) {
Expand Down Expand Up @@ -667,14 +666,11 @@ WorkerRequest::Ptr WorkerProcessor::_fetchNextForProcessing(WorkerProcessorThrea
// the wait.
{
replica::Lock lock(_mtx, _context(__func__));

if (not _newRequests.empty()) {
WorkerRequest::Ptr request = _newRequests.top();
_newRequests.pop();

request->start();
_inProgressRequests[request->id()] = request;

return request;
}
}
Expand Down Expand Up @@ -706,7 +702,6 @@ void WorkerProcessor::_processingFinished(WorkerRequest::Ptr const& request) {
LOGS(_log, LOG_LVL_DEBUG,
_context(__func__) << " id: " << request->id()
<< " status: " << WorkerRequest::status2string(request->status()));

replica::Lock lock(_mtx, _context(__func__));

// Note that disposed requests won't be found in any queue.
Expand All @@ -720,7 +715,6 @@ void WorkerProcessor::_processingFinished(WorkerRequest::Ptr const& request) {
void WorkerProcessor::_processorThreadStopped(WorkerProcessorThread::Ptr const& processorThread) {
LOGS(_log, LOG_LVL_DEBUG, _context(__func__) << " thread: " << processorThread->id());
replica::Lock lock(_mtx, _context(__func__));

if (_state == STATE_IS_STOPPING) {
// Complete state transition if all threads are stopped
for (auto&& t : _threads) {
Expand All @@ -732,7 +726,6 @@ void WorkerProcessor::_processorThreadStopped(WorkerProcessorThread::Ptr const&

void WorkerProcessor::_setInfo(WorkerRequest::Ptr const& request, ProtocolResponseReplicate& response) {
if (nullptr == request) return;

auto ptr = dynamic_pointer_cast<WorkerReplicationRequest>(request);
if (not ptr) {
throw logic_error(_classMethodContext(__func__) +
Expand Down
14 changes: 1 addition & 13 deletions src/replica/worker/WorkerProcessorThread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,14 @@ bool WorkerProcessorThread::isRunning() const { return _thread != nullptr; }
void WorkerProcessorThread::run() {
if (isRunning()) return;

auto const self = shared_from_this();

_thread = make_unique<thread>([self]() {
_thread = make_unique<thread>([self = shared_from_this()]() {
LOGS(_log, LOG_LVL_DEBUG, self->context() << "start");

while (not self->_stop) {
// Get the next request to process if any. This operation will block
// until either the next request is available (returned a valid pointer)
// or the specified timeout expires. In either case this thread has a chance
// to re-evaluate the stopping condition.

auto const request = self->_processor->_fetchNextForProcessing(self, 1000);

if (self->_stop) {
if (request) self->_processor->_processingRefused(request);
continue;
Expand All @@ -76,36 +71,29 @@ void WorkerProcessorThread::run() {
LOGS(_log, LOG_LVL_DEBUG,
self->context() << "begin processing"
<< " id: " << request->id());

bool finished = false; // just to report the request completion

try {
while (not(finished = request->execute())) {
if (self->_stop) {
LOGS(_log, LOG_LVL_DEBUG,
self->context() << "rollback processing"
<< " id: " << request->id());

request->rollback();
self->_processor->_processingRefused(request);

break;
}
}

} catch (WorkerRequestCancelled const& ex) {
LOGS(_log, LOG_LVL_DEBUG,
self->context() << "cancel processing"
<< " id: " << request->id());

self->_processor->_processingFinished(request);
}
if (finished) {
LOGS(_log, LOG_LVL_DEBUG,
self->context() << "finish processing"
<< " id: " << request->id()
<< " status: " << WorkerRequest::status2string(request->status()));

self->_processor->_processingFinished(request);
}
}
Expand Down
Loading

0 comments on commit e42b22b

Please sign in to comment.