Skip to content

Commit

Permalink
Enabled chunk Id replacement, and added connection pools.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Dec 12, 2024
1 parent d51fa7d commit e1fea4b
Show file tree
Hide file tree
Showing 20 changed files with 279 additions and 234 deletions.
8 changes: 4 additions & 4 deletions src/admin/templates/proxy/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,17 @@ notifyWorkersOnCzarRestart = 1
# Please see util/QdispPool.h QdispPool::QdispPool for more information
[qdisppool]
#size of the pool
poolSize = 50
poolSize = 1000
# Low numbers are higher priority. Largest priority 3 creates 4 priority queues 0, 1, 2, 3
# Must be greater than 0.
largestPriority = 3
# Maximum number of threads running for each queue. No spaces. Values separated by ':'
# Using largestPriority = 2 and vectRunsizes = 3:5:8
# queue 0 would have runSize 3, queue 1 would have runSize 5, and queue 2 would have runSize 8.
vectRunSizes = 50:50:50:50
vectRunSizes = 800:800:500:500
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
vectMinRunningSizes = 0:3:3:3
# Maximum number of QueryRequests allowed to be running at one time. &&& unused??
qReqPseudoFifoMaxRunning = 299

[replication]
Expand Down
15 changes: 11 additions & 4 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ class CzarConfig {
/// The maximum number of chunks (basically Jobs) allowed in a single UberJob.
int getUberJobMaxChunks() const { return _uberJobMaxChunks->getVal(); }

/// Return the maximum number of http connections to use for czar commands.
int getCommandMaxHttpConnections() const { return _commandMaxHttpConnections->getVal(); }

// Parameters of the Czar management service

std::string const& replicationInstanceId() const { return _replicationInstanceId->getVal(); }
Expand Down Expand Up @@ -310,7 +313,7 @@ class CzarConfig {
CVTIntPtr _resultMaxConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxconnections", notReq, 40);
CVTIntPtr _resultMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 8192);
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 2000);
CVTIntPtr _oldestResultKeptDays =
util::ConfigValTInt::create(_configValMap, "resultdb", "oldestResultKeptDays", notReq, 30);

Expand Down Expand Up @@ -361,9 +364,9 @@ class CzarConfig {
CVTIntPtr _qdispMaxPriority =
util::ConfigValTInt::create(_configValMap, "qdisppool", "largestPriority", notReq, 2);
CVTStrPtr _qdispVectRunSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "50:50:50:50");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "800:800:500:50");
CVTStrPtr _qdispVectMinRunningSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:1:3:3");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3");

CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4);
CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create(
Expand Down Expand Up @@ -413,7 +416,11 @@ class CzarConfig {

// UberJobs
CVTIntPtr _uberJobMaxChunks =
util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 10);
util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 1000);

/// This may impact `_resultMaxHttpConnections` as too many connections may cause kernel memory issues.
CVTIntPtr _commandMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "uberjob", "commandMaxHttpConnections", notReq, 2000);
};

} // namespace lsst::qserv::cconfig
Expand Down
27 changes: 25 additions & 2 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@
#include "rproc/InfileMerger.h"
#include "sql/Schema.h"
#include "util/Bug.h"
#include "util/InstanceCount.h" //&&&
#include "util/IterableFormatter.h"
#include "util/Histogram.h" //&&&
#include "util/QdispPool.h"
#include "util/ThreadPriority.h"
#include "qdisp/UberJob.h"
Expand Down Expand Up @@ -276,7 +278,8 @@ void UserQuerySelect::submit() {
qproc::ChunkQuerySpec::Ptr cs;
{
std::lock_guard<std::mutex> lock(chunksMtx);
cs = _qSession->buildChunkQuerySpec(queryTemplates, chunkSpec);
bool fillInChunkIdTag = false; // do not fill in the chunkId
cs = _qSession->buildChunkQuerySpec(queryTemplates, chunkSpec, fillInChunkIdTag);
chunks.push_back(cs->chunkId);
}
std::string chunkResultName = _ttn->make(cs->chunkId);
Expand Down Expand Up @@ -321,10 +324,15 @@ void UserQuerySelect::submit() {
}
}

util::HistogramRolling histoBuildAndS("&&&uj histoBuildAndS", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000);
util::HistogramRolling histoBuildAndS1("&&&uj histoBuildAndS1", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000);

void UserQuerySelect::buildAndSendUberJobs() {
util::InstanceCount ic("UserQuerySelect::buildAndSendUberJobs&&&");
// TODO:UJ Is special handling needed for the dummy chunk, 1234567890 ?
string const funcN("UserQuerySelect::" + string(__func__) + " QID=" + to_string(_qMetaQueryId));
LOGS(_log, LOG_LVL_DEBUG, funcN << " start");
LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj start " << _uberJobMaxChunks);

// Ensure `_monitor()` doesn't do anything until everything is ready.
if (!_executive->isReadyToExecute()) {
Expand All @@ -333,14 +341,17 @@ void UserQuerySelect::buildAndSendUberJobs() {
}

// Only one thread should be generating UberJobs for this user query at any given time.
LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj lock before");
lock_guard fcLock(_buildUberJobMtx);
LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj lock after");
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " totalJobs=" << _executive->getTotalJobs());

vector<qdisp::UberJob::Ptr> uberJobs;

qdisp::Executive::ChunkIdJobMapType unassignedChunksInQuery = _executive->unassignedChunksInQuery();
if (unassignedChunksInQuery.empty()) {
LOGS(_log, LOG_LVL_TRACE, funcN << " no unassigned Jobs");
LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj no unassigned Jobs");
return;
}

Expand Down Expand Up @@ -380,6 +391,7 @@ void UserQuerySelect::buildAndSendUberJobs() {
map<string, vector<qdisp::UberJob::Ptr>> workerJobMap;
vector<qdisp::Executive::ChunkIdType> missingChunks;

auto startassign = CLOCK::now(); //&&&
// unassignedChunksInQuery needs to be in numerical order so that UberJobs contain chunk numbers in
// numerical order. The workers run shared scans in numerical order of chunk id numbers.
// Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum,
Expand Down Expand Up @@ -441,10 +453,15 @@ void UserQuerySelect::buildAndSendUberJobs() {
}
auto& ujVectBack = ujVect.back();
ujVectBack->addJob(jqPtr);
LOGS(_log, LOG_LVL_DEBUG,
LOGS(_log, LOG_LVL_TRACE,
funcN << " ujVectBack{" << ujVectBack->getIdStr() << " jobCnt=" << ujVectBack->getJobCount()
<< "}");
}
auto endassign = CLOCK::now(); //&&&
std::chrono::duration<double> secsassign = endassign - startassign; // &&&
histoBuildAndS.addEntry(endassign, secsassign.count()); //&&&
LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoBuildAndS.getString(""));
auto startwcont = CLOCK::now(); //&&&

if (!missingChunks.empty()) {
string errStr = funcN + " a worker could not be found for these chunks ";
Expand All @@ -454,6 +471,7 @@ void UserQuerySelect::buildAndSendUberJobs() {
errStr += " they will be retried later.";
LOGS(_log, LOG_LVL_ERROR, errStr);
}
LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj waitForWorkerContactMap");

// Add worker contact info to UberJobs. The czar can't do anything without
// the contact map, so it will wait. This should only ever be an issue at startup.
Expand All @@ -475,7 +493,12 @@ void UserQuerySelect::buildAndSendUberJobs() {
_executive->queueUberJob(ujPtr);
}
}
auto endwcont = CLOCK::now(); //&&&
std::chrono::duration<double> secswcont = endwcont - startwcont; // &&&
histoBuildAndS1.addEntry(endwcont, secswcont.count()); //&&&
LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoBuildAndS1.getString(""));
LOGS(_log, LOG_LVL_DEBUG, funcN << " " << _executive->dumpUberJobCounts());
LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj " << _executive->dumpUberJobCounts());
}

/// Block until a submit()'ed query completes.
Expand Down
25 changes: 18 additions & 7 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include "czar/CzarRegistry.h"
#include "global/LogContext.h"
#include "http/Client.h"
#include "http/ClientConnPool.h"
#include "http/MetaModule.h"
#include "http/Method.h"
#include "proto/worker.pb.h"
Expand Down Expand Up @@ -89,13 +90,21 @@ Czar::Ptr Czar::createCzar(string const& configFilePath, string const& czarName)

void Czar::_monitor() {
string const funcN("Czar::_monitor");
uint16_t loopCount = 0; // unsigned to wrap around
while (_monitorLoop) {
++loopCount;
this_thread::sleep_for(_monitorSleepTime);
LOGS(_log, LOG_LVL_DEBUG, funcN << " start0");

/// Check database for changes in worker chunk assignments and aliveness
try {
_czarFamilyMap->read();
// TODO:UJ The read() is incredibly expensive until the database has
// a "changed" field of some kind (preferably timestamp) to
// indicate the last time it changed.
// For Now, just do one read every few times through this loop.
if (loopCount % 10 == 0 || true) {
_czarFamilyMap->read();
}
} catch (ChunkMapException const& cmex) {
// There are probably chunks that don't exist on any alive worker,
// continue on in hopes that workers will show up with the missing chunks
Expand All @@ -104,8 +113,7 @@ void Czar::_monitor() {
}

// Send appropriate messages to all ActiveWorkers. This will
// check if workers have died by timeout. The response
// from the worker include
// check if workers have died by timeout.
_czarRegistry->sendActiveWorkersMessages();

/// Create new UberJobs (if possible) for all jobs that are
Expand Down Expand Up @@ -193,10 +201,10 @@ Czar::Czar(string const& configFilePath, string const& czarName)
string vectMinRunningSizesStr = _czarConfig->getQdispVectMinRunningSizes();
vector<int> vectMinRunningSizes = util::String::parseToVectInt(vectMinRunningSizesStr, ":", 0);
LOGS(_log, LOG_LVL_INFO,
"INFO qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes="
<< vectRunSizesStr << " -> " << util::prettyCharList(vectRunSizes)
<< " vectMinRunningSizes=" << vectMinRunningSizesStr << " -> "
<< util::prettyCharList(vectMinRunningSizes));
" qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes="
<< vectRunSizesStr << " -> " << util::prettyCharList(vectRunSizes)
<< " vectMinRunningSizes=" << vectMinRunningSizesStr << " -> "
<< util::prettyCharList(vectMinRunningSizes));
_qdispPool = make_shared<util::QdispPool>(qPoolSize, maxPriority, vectRunSizes, vectMinRunningSizes);

qdisp::CzarStats::setup(_qdispPool);
Expand All @@ -208,6 +216,9 @@ Czar::Czar(string const& configFilePath, string const& czarName)
LOGS(_log, LOG_LVL_INFO, "config xrootdSpread=" << xrootdSpread);
_queryDistributionTestVer = _czarConfig->getQueryDistributionTestVer();

_commandHttpPool = shared_ptr<http::ClientConnPool>(
new http::ClientConnPool(_czarConfig->getCommandMaxHttpConnections()));

LOGS(_log, LOG_LVL_INFO, "Creating czar instance with name " << czarName);
LOGS(_log, LOG_LVL_INFO, "Czar config: " << *_czarConfig);

Expand Down
14 changes: 12 additions & 2 deletions src/czar/Czar.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class ActiveWorkerMap;
class HttpSvc;
} // namespace lsst::qserv::czar

namespace lsst::qserv::http {
class ClientConnPool;
} // namespace lsst::qserv::http

namespace lsst::qserv::util {
class FileMonitor;
} // namespace lsst::qserv::util
Expand Down Expand Up @@ -155,6 +159,8 @@ class Czar {

std::shared_ptr<util::QdispPool> getQdispPool() const { return _qdispPool; }

std::shared_ptr<http::ClientConnPool> getCommandHttpPool() const { return _commandHttpPool; }

/// Startup time of czar, sent to workers so they can detect that the czar was
/// was restarted when this value changes.
static uint64_t const czarStartupTime;
Expand Down Expand Up @@ -228,22 +234,26 @@ class Czar {
std::atomic<bool> _monitorLoop{true};

/// Wait time between checks. TODO:UJ set from config
std::chrono::milliseconds _monitorSleepTime{15000};
std::chrono::milliseconds _monitorSleepTime{15'000}; // &&& config

/// Keeps track of all workers (alive or otherwise) that this czar
/// may communicate with. Once created, the pointer never changes.
std::shared_ptr<ActiveWorkerMap> _activeWorkerMap;

/// A combined priority queue and thread pool to regulate czar communications
/// with workers. Once created, the pointer never changes.
/// TODO:UJ - It would be better to have a pool for each worker as it
/// TODO:UJ - It may be better to have a pool for each worker as it
/// may be possible for a worker to have communications
/// problems in a way that would wedge the pool. This can
/// probably be done fairly easily by having pools
/// attached to ActiveWorker in _activeWorkerMap.
/// This was not possible in xrootd as the czar had
/// no reasonable way to know where Jobs were going.
std::shared_ptr<util::QdispPool> _qdispPool;

/// Pool of http client connections for sending commands (UberJobs
/// and worker status requests).
std::shared_ptr<http::ClientConnPool> _commandHttpPool;
};

} // namespace lsst::qserv::czar
Expand Down
10 changes: 10 additions & 0 deletions src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
#include "czar/CzarRegistry.h"
#include "qmeta/Exceptions.h"
#include "util/Bug.h"
#include "util/InstanceCount.h" //&&&
#include "util/Histogram.h" //&&&
#include "util/TimeUtils.h"

using namespace std;
Expand Down Expand Up @@ -357,9 +359,13 @@ bool CzarFamilyMap::_read() {
return true;
}

util::HistogramRolling histoMakeNewMaps("&&&uj histoMakeNewMaps", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000);

std::shared_ptr<CzarFamilyMap::FamilyMapType> CzarFamilyMap::makeNewMaps(
qmeta::QMetaChunkMap const& qChunkMap) {
// Create new maps.
util::InstanceCount ic("CzarFamilyMap::makeNewMaps&&&");
auto startMakeMaps = CLOCK::now(); //&&&
std::shared_ptr<FamilyMapType> newFamilyMap = make_shared<FamilyMapType>();

// Workers -> Databases map
Expand Down Expand Up @@ -413,6 +419,10 @@ std::shared_ptr<CzarFamilyMap::FamilyMapType> CzarFamilyMap::makeNewMaps(
}
}

auto endMakeMaps = CLOCK::now(); //&&&
std::chrono::duration<double> secsMakeMaps = endMakeMaps - startMakeMaps; // &&&
histoMakeNewMaps.addEntry(endMakeMaps, secsMakeMaps.count()); //&&&
LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoMakeNewMaps.getString(""));
return newFamilyMap;
}

Expand Down
Loading

0 comments on commit e1fea4b

Please sign in to comment.