Skip to content

Commit

Permalink
Refactored worker XROOTD/SSI resource monitoring
Browse files Browse the repository at this point in the history
Class ResourceMonitor was moved from to module wpublish to wcontrol.
An object of the class is now owned by class Foreman which is
AKA "service provider" for the Qserv worker.
  • Loading branch information
iagaponenko committed Nov 10, 2023
1 parent fd61d71 commit ca79a7e
Show file tree
Hide file tree
Showing 16 changed files with 91 additions and 90 deletions.
1 change: 1 addition & 0 deletions src/wcontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_dependencies(wcontrol proto)

target_sources(wcontrol PRIVATE
Foreman.cc
ResourceMonitor.cc
SqlConnMgr.cc
TransmitMgr.cc
WorkerStats.cc
Expand Down
2 changes: 2 additions & 0 deletions src/wcontrol/Foreman.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "qhttp/Status.h"
#include "wbase/WorkerCommand.h"
#include "wconfig/WorkerConfig.h"
#include "wcontrol/ResourceMonitor.h"
#include "wcontrol/SqlConnMgr.h"
#include "wcontrol/WorkerStats.h"
#include "wdb/ChunkResource.h"
Expand Down Expand Up @@ -87,6 +88,7 @@ Foreman::Foreman(Scheduler::Ptr const& scheduler, unsigned int poolSize, unsigne
_queries(queries),
_sqlConnMgr(sqlConnMgr),
_transmitMgr(transmitMgr),
_resourceMonitor(make_shared<ResourceMonitor>()),
_io_service(),
_httpServer(qhttp::Server::create(_io_service, 0 /* grab the first available port */)) {
// Make the chunk resource mgr
Expand Down
28 changes: 19 additions & 9 deletions src/wcontrol/Foreman.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ namespace lsst::qserv::wbase {
struct TaskSelector;
} // namespace lsst::qserv::wbase

namespace lsst::qserv::wcontrol {
class ResourceMonitor;
class SqlConnMgr;
class TransmitMgr;
} // namespace lsst::qserv::wcontrol

namespace lsst::qserv::mysql {
class MySqlConfig;
} // namespace lsst::qserv::mysql
Expand All @@ -65,10 +71,9 @@ namespace lsst::qserv::wpublish {
class QueriesAndChunks;
} // namespace lsst::qserv::wpublish

namespace lsst::qserv::wcontrol {
// This header declarations

class SqlConnMgr;
class TransmitMgr;
namespace lsst::qserv::wcontrol {

/// An abstract scheduler interface. Foreman objects use Scheduler instances
/// to determine what tasks to launch upon triggering events.
Expand Down Expand Up @@ -107,8 +112,7 @@ class Foreman : public wbase::MsgProcessor {
*/
Foreman(Scheduler::Ptr const& scheduler, unsigned int poolSize, unsigned int maxPoolThreads,
mysql::MySqlConfig const& mySqlConfig, std::shared_ptr<wpublish::QueriesAndChunks> const& queries,
std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr,
std::shared_ptr<wcontrol::TransmitMgr> const& transmitMgr);
std::shared_ptr<SqlConnMgr> const& sqlConnMgr, std::shared_ptr<TransmitMgr> const& transmitMgr);

virtual ~Foreman() override;

Expand All @@ -120,8 +124,10 @@ class Foreman : public wbase::MsgProcessor {
std::shared_ptr<wdb::ChunkResourceMgr> const& chunkResourceMgr() const { return _chunkResourceMgr; }
mysql::MySqlConfig const& mySqlConfig() const { return _mySqlConfig; }
std::shared_ptr<wpublish::QueriesAndChunks> const& queriesAndChunks() const { return _queries; }
std::shared_ptr<wcontrol::SqlConnMgr> const& sqlConnMgr() const { return _sqlConnMgr; }
std::shared_ptr<wcontrol::TransmitMgr> const& transmitMgr() const { return _transmitMgr; }
std::shared_ptr<SqlConnMgr> const& sqlConnMgr() const { return _sqlConnMgr; }
std::shared_ptr<TransmitMgr> const& transmitMgr() const { return _transmitMgr; }
std::shared_ptr<ResourceMonitor> const& resourceMonitor() const { return _resourceMonitor; }

uint16_t httpPort() const;

/// Process a group of query processing tasks.
Expand Down Expand Up @@ -149,12 +155,16 @@ class Foreman : public wbase::MsgProcessor {
std::shared_ptr<wpublish::QueriesAndChunks> const _queries;

/// For limiting the number of MySQL connections used for tasks.
std::shared_ptr<wcontrol::SqlConnMgr> const _sqlConnMgr;
std::shared_ptr<SqlConnMgr> const _sqlConnMgr;

util::HoldTrack::Mark::Ptr _mark;

/// Used to throttle outgoing massages to prevent czars from being overloaded.
std::shared_ptr<wcontrol::TransmitMgr> const _transmitMgr;
std::shared_ptr<TransmitMgr> const _transmitMgr;

/// A a counter of the XROOTD/SSI resources which are in use at any given moment
/// of time by the worker.
std::shared_ptr<ResourceMonitor> const _resourceMonitor;

/// BOOST ASIO services needed to run the HTTP server
boost::asio::io_service _io_service;
Expand Down
20 changes: 10 additions & 10 deletions src/wpublish/ResourceMonitor.cc → src/wcontrol/ResourceMonitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@
*/

// Class header
#include "wpublish/ResourceMonitor.h"
#include "wcontrol/ResourceMonitor.h"

// Qserv headers
#include "global/ResourceUnit.h"

using namespace std;
using namespace nlohmann;

namespace lsst::qserv::wpublish {
namespace lsst::qserv::wcontrol {

void ResourceMonitor::increment(string const& resource) {
lock_guard<mutex> lock(_mtx);
Expand All @@ -48,25 +48,25 @@ unsigned int ResourceMonitor::count(string const& resource) const {
return _resourceCounter.count(resource) ? _resourceCounter.at(resource) : 0;
}

unsigned int ResourceMonitor::count(int chunk, string const& db) const {
return count(ResourceUnit::makePath(chunk, db));
unsigned int ResourceMonitor::count(int chunk, string const& databaseName) const {
return count(ResourceUnit::makePath(chunk, databaseName));
}

unsigned int ResourceMonitor::count(int chunk, vector<string> const& dbs) const {
unsigned int ResourceMonitor::count(int chunk, vector<string> const& databaseNames) const {
unsigned int result = 0;
for (string const& db : dbs) {
result += count(chunk, db);
for (string const& database : databaseNames) {
result += count(chunk, database);
}
return result;
}

json ResourceMonitor::statusToJson() const {
lock_guard<mutex> lock(_mtx);
json result = json::array();
for (auto&& entry : _resourceCounter) {
result.push_back({entry.first, entry.second});
for (auto&& [resource, counter] : _resourceCounter) {
result.push_back({resource, counter});
}
return result;
}

} // namespace lsst::qserv::wpublish
} // namespace lsst::qserv::wcontrol
27 changes: 11 additions & 16 deletions src/wpublish/ResourceMonitor.h → src/wcontrol/ResourceMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_WPUBLISH_RESOURCE_MONITOR_H
#define LSST_QSERV_WPUBLISH_RESOURCE_MONITOR_H
#ifndef LSST_QSERV_WCONTROL_RESOURCEMONITOR_H
#define LSST_QSERV_WCONTROL_RESOURCEMONITOR_H

// System headers
#include <map>
Expand All @@ -32,7 +32,7 @@
// Third party headers
#include "nlohmann/json.hpp"

namespace lsst::qserv::wpublish {
namespace lsst::qserv::wcontrol {

/**
* Class ResourceMonitor is a thread-safe implementation for a counter of resources
Expand Down Expand Up @@ -68,34 +68,29 @@ class ResourceMonitor {
unsigned int count(std::string const& resource) const;

/**
*
* @param chunk The chunk number.
* @param db The name of a database.
* @param databaseName The name of a database.
* @return The counter of resource uses (by database name and chunk number).
*/
unsigned int count(int chunk, std::string const& db) const;
unsigned int count(int chunk, std::string const& databaseName) const;

/**
* The method will returns a sum of counters for all uses of the chunk
* across all databases.
&
* @param chunk The chunk number.
* @param dbs The names of databases.
* @param databaseNames The names of databases.
* @return The counter of a group of related resources uses.
*/
unsigned int count(int chunk, std::vector<std::string> const& dbs) const;
unsigned int count(int chunk, std::vector<std::string> const& databaseNames) const;

/// @return The JSON representation of the object's status for the monitoring.
nlohmann::json statusToJson() const;

private:
/// Number of uses for each resource
ResourceCounter _resourceCounter;

/// Mutex for thread safaty
mutable std::mutex _mtx;
ResourceCounter _resourceCounter; ///< Number of uses for each resource.
mutable std::mutex _mtx; ///< Mutex for thread safe implementation of the public API.
};

} // namespace lsst::qserv::wpublish
} // namespace lsst::qserv::wcontrol

#endif // LSST_QSERV_WPUBLISH_RESOURCE_MONITOR_H
#endif // LSST_QSERV_WCONTROL_RESOURCEMONITOR_H
1 change: 0 additions & 1 deletion src/wpublish/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ target_sources(wpublish PRIVATE
GetStatusCommand.cc
QueriesAndChunks.cc
RemoveChunkGroupCommand.cc
ResourceMonitor.cc
SetChunkListCommand.cc
TestEchoCommand.cc
)
Expand Down
4 changes: 2 additions & 2 deletions src/wpublish/GetChunkListCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
// Qserv headers
#include "proto/worker.pb.h"
#include "wbase/SendChannel.h"
#include "wcontrol/ResourceMonitor.h"
#include "wpublish/ChunkInventory.h"
#include "wpublish/ResourceMonitor.h"

// LSST headers
#include "lsst/log/Log.h"
Expand All @@ -45,7 +45,7 @@ namespace lsst::qserv::wpublish {

GetChunkListCommand::GetChunkListCommand(shared_ptr<wbase::SendChannel> const& sendChannel,
shared_ptr<ChunkInventory> const& chunkInventory,
shared_ptr<ResourceMonitor> const& resourceMonitor)
shared_ptr<wcontrol::ResourceMonitor> const& resourceMonitor)
: wbase::WorkerCommand(sendChannel),
_chunkInventory(chunkInventory),
_resourceMonitor(resourceMonitor) {}
Expand Down
9 changes: 6 additions & 3 deletions src/wpublish/GetChunkListCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
#include "wbase/WorkerCommand.h"

// Forward declarations
namespace lsst::qserv::wcontrol {
class ResourceMonitor;
} // namespace lsst::qserv::wcontrol

namespace lsst::qserv::wpublish {
class ChunkInventory;
class ResourceMonitor;
} // namespace lsst::qserv::wpublish

// This header declarations
Expand All @@ -57,7 +60,7 @@ class GetChunkListCommand : public wbase::WorkerCommand {
*/
GetChunkListCommand(std::shared_ptr<wbase::SendChannel> const& sendChannel,
std::shared_ptr<ChunkInventory> const& chunkInventory,
std::shared_ptr<ResourceMonitor> const& resourceMonitor);
std::shared_ptr<wcontrol::ResourceMonitor> const& resourceMonitor);

protected:
void run() override;
Expand All @@ -66,7 +69,7 @@ class GetChunkListCommand : public wbase::WorkerCommand {
// Parameters of the object

std::shared_ptr<ChunkInventory> _chunkInventory;
std::shared_ptr<ResourceMonitor> _resourceMonitor;
std::shared_ptr<wcontrol::ResourceMonitor> _resourceMonitor;
};

} // namespace lsst::qserv::wpublish
Expand Down
4 changes: 2 additions & 2 deletions src/wpublish/GetStatusCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "wbase/FileChannelShared.h"
#include "wbase/MsgProcessor.h"
#include "wbase/SendChannel.h"
#include "wpublish/ResourceMonitor.h"
#include "wcontrol/ResourceMonitor.h"

// LSST headers
#include "lsst/log/Log.h"
Expand All @@ -46,7 +46,7 @@ namespace lsst::qserv::wpublish {

GetStatusCommand::GetStatusCommand(shared_ptr<wbase::SendChannel> const& sendChannel,
shared_ptr<wbase::MsgProcessor> const& processor,
shared_ptr<ResourceMonitor> const& resourceMonitor,
shared_ptr<wcontrol::ResourceMonitor> const& resourceMonitor,
wbase::TaskSelector const& taskSelector)
: wbase::WorkerCommand(sendChannel),
_processor(processor),
Expand Down
15 changes: 7 additions & 8 deletions src/wpublish/GetStatusCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@
#include "wbase/WorkerCommand.h"

// Forward declarations
namespace lsst::qserv {
namespace wbase {
namespace lsst::qserv::wbase {
class MsgProcessor;
class SendChannel;
} // namespace wbase
namespace wpublish {
} // namespace lsst::qserv::wbase

namespace lsst::qserv::wcontrol {
class ResourceMonitor;
}
} // namespace lsst::qserv
} // namespace lsst::qserv::wcontrol

// This header declarations
namespace lsst::qserv::wpublish {
Expand All @@ -58,7 +57,7 @@ class GetStatusCommand : public wbase::WorkerCommand {
*/
GetStatusCommand(std::shared_ptr<wbase::SendChannel> const& sendChannel,
std::shared_ptr<wbase::MsgProcessor> const& processor,
std::shared_ptr<ResourceMonitor> const& resourceMonitor,
std::shared_ptr<wcontrol::ResourceMonitor> const& resourceMonitor,
wbase::TaskSelector const& taskSelector);

GetStatusCommand() = delete;
Expand All @@ -74,7 +73,7 @@ class GetStatusCommand : public wbase::WorkerCommand {
// Parameters of the object

std::shared_ptr<wbase::MsgProcessor> const _processor;
std::shared_ptr<ResourceMonitor> const _resourceMonitor;
std::shared_ptr<wcontrol::ResourceMonitor> const _resourceMonitor;
wbase::TaskSelector const _taskSelector;
};

Expand Down
4 changes: 2 additions & 2 deletions src/wpublish/RemoveChunkGroupCommand.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

// Qserv headers
#include "wbase/SendChannel.h"
#include "wcontrol/ResourceMonitor.h"
#include "wpublish/ChunkInventory.h"
#include "wpublish/ResourceMonitor.h"
#include "xrdsvc/SsiProvider.h"
#include "xrdsvc/XrdName.h"

Expand All @@ -51,7 +51,7 @@ namespace lsst::qserv::wpublish {

RemoveChunkGroupCommand::RemoveChunkGroupCommand(shared_ptr<wbase::SendChannel> const& sendChannel,
shared_ptr<ChunkInventory> const& chunkInventory,
shared_ptr<ResourceMonitor> const& resourceMonitor,
shared_ptr<wcontrol::ResourceMonitor> const& resourceMonitor,
mysql::MySqlConfig const& mySqlConfig, int chunk,
vector<string> const& dbs, bool force)
: wbase::WorkerCommand(sendChannel),
Expand Down
20 changes: 11 additions & 9 deletions src/wpublish/RemoveChunkGroupCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@
#include "wbase/WorkerCommand.h"

// Forward declarations
namespace lsst::qserv {
namespace wbase {
namespace lsst::qserv::wbase {
class SendChannel;
}
namespace wpublish {
class ChunkInventory;
} // namespace lsst::qserv::wbase

namespace lsst::qserv::wcontrol {
class ResourceMonitor;
} // namespace wpublish
} // namespace lsst::qserv
} // namespace lsst::qserv::wcontrol

namespace lsst::qserv::wpublish {
class ChunkInventory;
} // namespace lsst::qserv::wpublish

// This header declarations
namespace lsst::qserv::wpublish {
Expand All @@ -68,7 +70,7 @@ class RemoveChunkGroupCommand : public wbase::WorkerCommand {
*/
RemoveChunkGroupCommand(std::shared_ptr<wbase::SendChannel> const& sendChannel,
std::shared_ptr<ChunkInventory> const& chunkInventory,
std::shared_ptr<ResourceMonitor> const& resourceMonitor,
std::shared_ptr<wcontrol::ResourceMonitor> const& resourceMonitor,
mysql::MySqlConfig const& mySqlConfig, int chunk,
std::vector<std::string> const& dbs, bool force);

Expand All @@ -81,7 +83,7 @@ class RemoveChunkGroupCommand : public wbase::WorkerCommand {
// Parameters of the object

std::shared_ptr<ChunkInventory> _chunkInventory;
std::shared_ptr<ResourceMonitor> _resourceMonitor;
std::shared_ptr<wcontrol::ResourceMonitor> _resourceMonitor;
mysql::MySqlConfig _mySqlConfig;
int _chunk;
std::vector<std::string> _dbs;
Expand Down
Loading

0 comments on commit ca79a7e

Please sign in to comment.