Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tickets/dm 45548 #884

Draft
wants to merge 25 commits into
base: tickets/DM-43715
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0d07782
Removed QueryRequest and XrdSsiMocks.
jgates108 Aug 1, 2024
85461dc
Removed unnecessary code.
jgates108 Aug 2, 2024
5bb2b08
Added ActiveWorker.
jgates108 Aug 6, 2024
74c3a57
Added unit test for query status message.
jgates108 Aug 30, 2024
ac9afad
Added cancellation code and for queries, uberjobs, and czar restart.
jgates108 Sep 3, 2024
68d591a
More cancellation code added.
jgates108 Sep 10, 2024
43a894e
Added query retries.
jgates108 Sep 19, 2024
61f1a9b
Added worker believed czar was dead handling.
jgates108 Oct 1, 2024
0cff54a
Added dead message handling.
jgates108 Oct 4, 2024
5e3642a
Fixed problems with rowlimit and WorkerCzarComIssue.
jgates108 Oct 15, 2024
662e5aa
Rebase.
jgates108 Oct 18, 2024
82811de
Added comments and removed dead code.
jgates108 Oct 21, 2024
769affb
Fixed dead worker check.
jgates108 Oct 23, 2024
fa16b4f
Created protojson namespace.
jgates108 Nov 18, 2024
831b5ec
clang-format
fritzm Nov 26, 2024
9c4c602
Added unit test.
jgates108 Nov 22, 2024
d51fa7d
Reworked the UberJob json message.
jgates108 Dec 5, 2024
e1fea4b
Enabled chunk Id replacement, and added connection pools.
jgates108 Dec 6, 2024
8ad73c1
Rearranged UberJob building and removed chunkResultName.
jgates108 Dec 13, 2024
816da25
Removed TaskMsgFactory.
jgates108 Dec 16, 2024
d4bf9e9
Changed Czar to catch 5GB limit.
jgates108 Dec 18, 2024
055702f
Improved Job creation performance.
jgates108 Jan 9, 2025
fd9df34
Contention testing.
jgates108 Jan 22, 2025
2277f63
The blocking version of the FQDN retrieval function
jgates108 Feb 3, 2025
ccab87c
Some cleanup.
jgates108 Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ add_subdirectory(mysql)
add_subdirectory(parser)
add_subdirectory(partition)
add_subdirectory(proto)
add_subdirectory(protojson)
add_subdirectory(proxy)
add_subdirectory(qana)
add_subdirectory(qdisp)
Expand All @@ -89,7 +90,6 @@ add_subdirectory(wpublish)
add_subdirectory(wsched)
add_subdirectory(www)
add_subdirectory(xrdlog)
add_subdirectory(xrdreq)
add_subdirectory(xrdsvc)

#-----------------------------------------------------------------------------
Expand All @@ -103,6 +103,7 @@ target_link_libraries(qserv_common PUBLIC
mysql
sql
util
protojson
)

install(
Expand Down Expand Up @@ -143,7 +144,6 @@ target_link_libraries(qserv_czar PUBLIC
rproc
qserv_css
qserv_meta
xrdreq
)

install(
Expand Down
2 changes: 0 additions & 2 deletions src/admin/templates/http/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ largestPriority = 3
vectRunSizes = 50:50:50:50
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
qReqPseudoFifoMaxRunning = 299

[replication]

Expand Down
10 changes: 4 additions & 6 deletions src/admin/templates/proxy/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,19 @@ notifyWorkersOnCzarRestart = 1
#[debug]
#chunkLimit = -1

# Please see qdisp/QdispPool.h QdispPool::QdispPool for more information
# Please see util/QdispPool.h QdispPool::QdispPool for more information
[qdisppool]
#size of the pool
poolSize = 50
poolSize = 1000
# Low numbers are higher priority. Largest priority 3 creates 4 priority queues 0, 1, 2, 3
# Must be greater than 0.
largestPriority = 3
# Maximum number of threads running for each queue. No spaces. Values separated by ':'
# Using largestPriority = 2 and vectRunsizes = 3:5:8
# queue 0 would have runSize 3, queue 1 would have runSize 5, and queue 2 would have runSize 8.
vectRunSizes = 50:50:50:50
vectRunSizes = 800:800:500:500
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
qReqPseudoFifoMaxRunning = 299
vectMinRunningSizes = 0:3:3:3

[replication]

Expand Down
7 changes: 3 additions & 4 deletions src/cconfig/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,17 @@ namespace lsst::qserv::cconfig {

std::mutex CzarConfig::_mtxOnInstance;

std::shared_ptr<CzarConfig> CzarConfig::_instance;
CzarConfig::Ptr CzarConfig::_instance;

std::shared_ptr<CzarConfig> CzarConfig::create(std::string const& configFileName,
std::string const& czarName) {
CzarConfig::Ptr CzarConfig::create(std::string const& configFileName, std::string const& czarName) {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
_instance = std::shared_ptr<CzarConfig>(new CzarConfig(util::ConfigStore(configFileName), czarName));
}
return _instance;
}

std::shared_ptr<CzarConfig> CzarConfig::instance() {
CzarConfig::Ptr CzarConfig::instance() {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created.");
Expand Down
54 changes: 48 additions & 6 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace lsst::qserv::cconfig {
*/
class CzarConfig {
public:
using Ptr = std::shared_ptr<CzarConfig>;
/**
* Create an instance of CzarConfig and load parameters from the specifid file.
* @note One has to call this method at least once before trying to obtain
Expand All @@ -63,15 +64,15 @@ class CzarConfig {
* @param czarName - the unique name of Czar.
* @return the shared pointer to the configuration object
*/
static std::shared_ptr<CzarConfig> create(std::string const& configFileName, std::string const& czarName);
static Ptr create(std::string const& configFileName, std::string const& czarName);

/**
* Get a pointer to an instance that was created by the last call to
* the method 'create'.
* @return the shared pointer to the configuration object
* @throws std::logic_error when attempting to call the bethod before creating an instance.
*/
static std::shared_ptr<CzarConfig> instance();
static Ptr instance();

CzarConfig() = delete;
CzarConfig(CzarConfig const&) = delete;
Expand Down Expand Up @@ -117,7 +118,7 @@ class CzarConfig {
*/
std::string const& getXrootdFrontendUrl() const { return _xrootdFrontendUrl->getVal(); }

/* Get the maximum number of threads for xrootd to use.
/* Get the maximum number of threads for xrootd to use. // TODO:UJ delete
*
* @return the maximum number of threads for xrootd to use.
*/
Expand Down Expand Up @@ -198,6 +199,28 @@ class CzarConfig {
/// the OOM situation.
unsigned int czarStatsRetainPeriodSec() const { return _czarStatsRetainPeriodSec->getVal(); }

/// A worker is considered fully ALIVE if the last update from the worker has been
/// heard in less than _activeWorkerTimeoutAliveSecs seconds.
int getActiveWorkerTimeoutAliveSecs() const { return _activeWorkerTimeoutAliveSecs->getVal(); }

/// A worker is considered DEAD if it hasn't been heard from in more than
/// _activeWorkerTimeoutDeadSecs.
int getActiveWorkerTimeoutDeadSecs() const { return _activeWorkerTimeoutDeadSecs->getVal(); }

/// Max lifetime of a message to be sent to an active worker. If the czar has been
/// trying to send a message to a worker and has failed for this many seconds,
/// it gives up at this point, removing elements of the message to save memory.
int getActiveWorkerMaxLifetimeSecs() const { return _activeWorkerMaxLifetimeSecs->getVal(); }

/// The maximum number of chunks (basically Jobs) allowed in a single UberJob.
int getUberJobMaxChunks() const { return _uberJobMaxChunks->getVal(); }

/// Return the maximum number of http connections to use for czar commands.
int getCommandMaxHttpConnections() const { return _commandMaxHttpConnections->getVal(); }

/// Return the sleep time (in milliseconds) between messages sent to active workers.
int getMonitorSleepTimeMilliSec() const { return _monitorSleepTimeMilliSec->getVal(); }

// Parameters of the Czar management service

std::string const& replicationInstanceId() const { return _replicationInstanceId->getVal(); }
Expand Down Expand Up @@ -293,7 +316,7 @@ class CzarConfig {
CVTIntPtr _resultMaxConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxconnections", notReq, 40);
CVTIntPtr _resultMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 8192);
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 2000);
CVTIntPtr _oldestResultKeptDays =
util::ConfigValTInt::create(_configValMap, "resultdb", "oldestResultKeptDays", notReq, 30);

Expand Down Expand Up @@ -344,10 +367,11 @@ class CzarConfig {
CVTIntPtr _qdispMaxPriority =
util::ConfigValTInt::create(_configValMap, "qdisppool", "largestPriority", notReq, 2);
CVTStrPtr _qdispVectRunSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "50:50:50:50");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "800:800:500:50");
CVTStrPtr _qdispVectMinRunningSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:1:3:3");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3");

// TODO:UJ delete xrootd specific entries.
CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4);
CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create(
_configValMap, "tuning", "qMetaSecsBetweenChunkCompletionUpdates", notReq, 60);
Expand Down Expand Up @@ -385,6 +409,24 @@ class CzarConfig {
util::ConfigValTInt::create(_configValMap, "replication", "http_port", notReq, 0);
CVTUIntPtr _replicationNumHttpThreads =
util::ConfigValTUInt::create(_configValMap, "replication", "num_http_threads", notReq, 2);

// Active Worker
CVTIntPtr _activeWorkerTimeoutAliveSecs = // 5min
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutAliveSecs", notReq, 60 * 5);
CVTIntPtr _activeWorkerTimeoutDeadSecs = // 10min
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutDeadSecs", notReq, 60 * 10);
CVTIntPtr _activeWorkerMaxLifetimeSecs = // 1hr
util::ConfigValTInt::create(_configValMap, "activeworker", "maxLifetimeSecs", notReq, 60 * 60);
CVTIntPtr _monitorSleepTimeMilliSec = util::ConfigValTInt::create(
_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000);

// UberJobs
CVTIntPtr _uberJobMaxChunks =
util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 1000);

/// This may impact `_resultMaxHttpConnections` as too many connections may cause kernel memory issues.
CVTIntPtr _commandMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "uberjob", "commandMaxHttpConnections", notReq, 2000);
};

} // namespace lsst::qserv::cconfig
Expand Down
2 changes: 0 additions & 2 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ target_link_libraries(ccontrol PUBLIC
parser
replica
sphgeom
xrdreq
XrdCl
)

Expand All @@ -51,7 +50,6 @@ FUNCTION(ccontrol_tests)
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
Loading
Loading