diff --git a/src/admin/templates/proxy/etc/qserv-czar.cnf.jinja b/src/admin/templates/proxy/etc/qserv-czar.cnf.jinja index 7991d0ab0..26e13346f 100644 --- a/src/admin/templates/proxy/etc/qserv-czar.cnf.jinja +++ b/src/admin/templates/proxy/etc/qserv-czar.cnf.jinja @@ -101,17 +101,17 @@ notifyWorkersOnCzarRestart = 1 # Please see util/QdispPool.h QdispPool::QdispPool for more information [qdisppool] #size of the pool -poolSize = 50 +poolSize = 1000 # Low numbers are higher priority. Largest priority 3 creates 4 priority queues 0, 1, 2, 3 # Must be greater than 0. largestPriority = 3 # Maximum number of threads running for each queue. No spaces. Values separated by ':' # Using largestPriority = 2 and vectRunsizes = 3:5:8 # queue 0 would have runSize 3, queue 1 would have runSize 5, and queue 2 would have runSize 8. -vectRunSizes = 50:50:50:50 +vectRunSizes = 800:800:500:500 # Minimum number of threads running for each queue. No spaces. Values separated by ':' -vectMinRunningSizes = 0:1:3:3 -# Maximum number of QueryRequests allowed to be running at one time. +vectMinRunningSizes = 0:3:3:3 +# Maximum number of QueryRequests allowed to be running at one time. &&& unused?? qReqPseudoFifoMaxRunning = 299 [replication] diff --git a/src/cconfig/CzarConfig.h b/src/cconfig/CzarConfig.h index e77878e18..9b6096531 100644 --- a/src/cconfig/CzarConfig.h +++ b/src/cconfig/CzarConfig.h @@ -215,6 +215,9 @@ class CzarConfig { /// The maximum number of chunks (basically Jobs) allowed in a single UberJob. int getUberJobMaxChunks() const { return _uberJobMaxChunks->getVal(); } + /// Return the maximum number of http connections to use for czar commands. + int getCommandMaxHttpConnections() const { return _commandMaxHttpConnections->getVal(); } + // Parameters of the Czar management service std::string const& replicationInstanceId() const { return _replicationInstanceId->getVal(); } @@ -310,7 +313,7 @@ class CzarConfig { CVTIntPtr _resultMaxConnections = util::ConfigValTInt::create(_configValMap, "resultdb", "maxconnections", notReq, 40); CVTIntPtr _resultMaxHttpConnections = - util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 8192); + util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 2000); CVTIntPtr _oldestResultKeptDays = util::ConfigValTInt::create(_configValMap, "resultdb", "oldestResultKeptDays", notReq, 30); @@ -361,9 +364,9 @@ class CzarConfig { CVTIntPtr _qdispMaxPriority = util::ConfigValTInt::create(_configValMap, "qdisppool", "largestPriority", notReq, 2); CVTStrPtr _qdispVectRunSizes = - util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "50:50:50:50"); + util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "800:800:500:50"); CVTStrPtr _qdispVectMinRunningSizes = - util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:1:3:3"); + util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3"); CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4); CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create( @@ -413,7 +416,11 @@ class CzarConfig { // UberJobs CVTIntPtr _uberJobMaxChunks = - util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 10); + util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 1000); + + /// This may impact `_resultMaxHttpConnections` as too many connections may cause kernel memory issues. + CVTIntPtr _commandMaxHttpConnections = + util::ConfigValTInt::create(_configValMap, "uberjob", "commandMaxHttpConnections", notReq, 2000); }; } // namespace lsst::qserv::cconfig diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index 7f1da8353..c5e2aef35 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -105,7 +105,9 @@ #include "rproc/InfileMerger.h" #include "sql/Schema.h" #include "util/Bug.h" +#include "util/InstanceCount.h" //&&& #include "util/IterableFormatter.h" +#include "util/Histogram.h" //&&& #include "util/QdispPool.h" #include "util/ThreadPriority.h" #include "qdisp/UberJob.h" @@ -276,7 +278,8 @@ void UserQuerySelect::submit() { qproc::ChunkQuerySpec::Ptr cs; { std::lock_guard lock(chunksMtx); - cs = _qSession->buildChunkQuerySpec(queryTemplates, chunkSpec); + bool fillInChunkIdTag = false; // do not fill in the chunkId + cs = _qSession->buildChunkQuerySpec(queryTemplates, chunkSpec, fillInChunkIdTag); chunks.push_back(cs->chunkId); } std::string chunkResultName = _ttn->make(cs->chunkId); @@ -321,10 +324,15 @@ void UserQuerySelect::submit() { } } +util::HistogramRolling histoBuildAndS("&&&uj histoBuildAndS", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); +util::HistogramRolling histoBuildAndS1("&&&uj histoBuildAndS1", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); + void UserQuerySelect::buildAndSendUberJobs() { + util::InstanceCount ic("UserQuerySelect::buildAndSendUberJobs&&&"); // TODO:UJ Is special handling needed for the dummy chunk, 1234567890 ? string const funcN("UserQuerySelect::" + string(__func__) + " QID=" + to_string(_qMetaQueryId)); LOGS(_log, LOG_LVL_DEBUG, funcN << " start"); + LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj start " << _uberJobMaxChunks); // Ensure `_monitor()` doesn't do anything until everything is ready. if (!_executive->isReadyToExecute()) { @@ -333,7 +341,9 @@ void UserQuerySelect::buildAndSendUberJobs() { } // Only one thread should be generating UberJobs for this user query at any given time. + LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj lock before"); lock_guard fcLock(_buildUberJobMtx); + LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj lock after"); LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " totalJobs=" << _executive->getTotalJobs()); vector uberJobs; @@ -341,6 +351,7 @@ void UserQuerySelect::buildAndSendUberJobs() { qdisp::Executive::ChunkIdJobMapType unassignedChunksInQuery = _executive->unassignedChunksInQuery(); if (unassignedChunksInQuery.empty()) { LOGS(_log, LOG_LVL_TRACE, funcN << " no unassigned Jobs"); + LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj no unassigned Jobs"); return; } @@ -380,6 +391,7 @@ void UserQuerySelect::buildAndSendUberJobs() { map> workerJobMap; vector missingChunks; + auto startassign = CLOCK::now(); //&&& // unassignedChunksInQuery needs to be in numerical order so that UberJobs contain chunk numbers in // numerical order. The workers run shared scans in numerical order of chunk id numbers. // Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum, @@ -441,10 +453,15 @@ void UserQuerySelect::buildAndSendUberJobs() { } auto& ujVectBack = ujVect.back(); ujVectBack->addJob(jqPtr); - LOGS(_log, LOG_LVL_DEBUG, + LOGS(_log, LOG_LVL_TRACE, funcN << " ujVectBack{" << ujVectBack->getIdStr() << " jobCnt=" << ujVectBack->getJobCount() << "}"); } + auto endassign = CLOCK::now(); //&&& + std::chrono::duration secsassign = endassign - startassign; // &&& + histoBuildAndS.addEntry(endassign, secsassign.count()); //&&& + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoBuildAndS.getString("")); + auto startwcont = CLOCK::now(); //&&& if (!missingChunks.empty()) { string errStr = funcN + " a worker could not be found for these chunks "; @@ -454,6 +471,7 @@ void UserQuerySelect::buildAndSendUberJobs() { errStr += " they will be retried later."; LOGS(_log, LOG_LVL_ERROR, errStr); } + LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj waitForWorkerContactMap"); // Add worker contact info to UberJobs. The czar can't do anything without // the contact map, so it will wait. This should only ever be an issue at startup. @@ -475,7 +493,12 @@ void UserQuerySelect::buildAndSendUberJobs() { _executive->queueUberJob(ujPtr); } } + auto endwcont = CLOCK::now(); //&&& + std::chrono::duration secswcont = endwcont - startwcont; // &&& + histoBuildAndS1.addEntry(endwcont, secswcont.count()); //&&& + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoBuildAndS1.getString("")); LOGS(_log, LOG_LVL_DEBUG, funcN << " " << _executive->dumpUberJobCounts()); + LOGS(_log, LOG_LVL_WARN, funcN << " &&&uj " << _executive->dumpUberJobCounts()); } /// Block until a submit()'ed query completes. diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index db70bcbfe..fe62f34e3 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -51,6 +51,7 @@ #include "czar/CzarRegistry.h" #include "global/LogContext.h" #include "http/Client.h" +#include "http/ClientConnPool.h" #include "http/MetaModule.h" #include "http/Method.h" #include "proto/worker.pb.h" @@ -89,13 +90,21 @@ Czar::Ptr Czar::createCzar(string const& configFilePath, string const& czarName) void Czar::_monitor() { string const funcN("Czar::_monitor"); + uint16_t loopCount = 0; // unsigned to wrap around while (_monitorLoop) { + ++loopCount; this_thread::sleep_for(_monitorSleepTime); LOGS(_log, LOG_LVL_DEBUG, funcN << " start0"); /// Check database for changes in worker chunk assignments and aliveness try { - _czarFamilyMap->read(); + // TODO:UJ The read() is incredibly expensive until the database has + // a "changed" field of some kind (preferably timestamp) to + // indicate the last time it changed. + // For Now, just do one read every few times through this loop. + if (loopCount % 10 == 0 || true) { + _czarFamilyMap->read(); + } } catch (ChunkMapException const& cmex) { // There are probably chunks that don't exist on any alive worker, // continue on in hopes that workers will show up with the missing chunks @@ -104,8 +113,7 @@ void Czar::_monitor() { } // Send appropriate messages to all ActiveWorkers. This will - // check if workers have died by timeout. The response - // from the worker include + // check if workers have died by timeout. _czarRegistry->sendActiveWorkersMessages(); /// Create new UberJobs (if possible) for all jobs that are @@ -193,10 +201,10 @@ Czar::Czar(string const& configFilePath, string const& czarName) string vectMinRunningSizesStr = _czarConfig->getQdispVectMinRunningSizes(); vector vectMinRunningSizes = util::String::parseToVectInt(vectMinRunningSizesStr, ":", 0); LOGS(_log, LOG_LVL_INFO, - "INFO qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes=" - << vectRunSizesStr << " -> " << util::prettyCharList(vectRunSizes) - << " vectMinRunningSizes=" << vectMinRunningSizesStr << " -> " - << util::prettyCharList(vectMinRunningSizes)); + " qdisp config qPoolSize=" << qPoolSize << " maxPriority=" << maxPriority << " vectRunSizes=" + << vectRunSizesStr << " -> " << util::prettyCharList(vectRunSizes) + << " vectMinRunningSizes=" << vectMinRunningSizesStr << " -> " + << util::prettyCharList(vectMinRunningSizes)); _qdispPool = make_shared(qPoolSize, maxPriority, vectRunSizes, vectMinRunningSizes); qdisp::CzarStats::setup(_qdispPool); @@ -208,6 +216,9 @@ Czar::Czar(string const& configFilePath, string const& czarName) LOGS(_log, LOG_LVL_INFO, "config xrootdSpread=" << xrootdSpread); _queryDistributionTestVer = _czarConfig->getQueryDistributionTestVer(); + _commandHttpPool = shared_ptr( + new http::ClientConnPool(_czarConfig->getCommandMaxHttpConnections())); + LOGS(_log, LOG_LVL_INFO, "Creating czar instance with name " << czarName); LOGS(_log, LOG_LVL_INFO, "Czar config: " << *_czarConfig); diff --git a/src/czar/Czar.h b/src/czar/Czar.h index 78b02237a..408df5b10 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -56,6 +56,10 @@ class ActiveWorkerMap; class HttpSvc; } // namespace lsst::qserv::czar +namespace lsst::qserv::http { +class ClientConnPool; +} // namespace lsst::qserv::http + namespace lsst::qserv::util { class FileMonitor; } // namespace lsst::qserv::util @@ -155,6 +159,8 @@ class Czar { std::shared_ptr getQdispPool() const { return _qdispPool; } + std::shared_ptr getCommandHttpPool() const { return _commandHttpPool; } + /// Startup time of czar, sent to workers so they can detect that the czar was /// was restarted when this value changes. static uint64_t const czarStartupTime; @@ -228,7 +234,7 @@ class Czar { std::atomic _monitorLoop{true}; /// Wait time between checks. TODO:UJ set from config - std::chrono::milliseconds _monitorSleepTime{15000}; + std::chrono::milliseconds _monitorSleepTime{15'000}; // &&& config /// Keeps track of all workers (alive or otherwise) that this czar /// may communicate with. Once created, the pointer never changes. @@ -236,7 +242,7 @@ class Czar { /// A combined priority queue and thread pool to regulate czar communications /// with workers. Once created, the pointer never changes. - /// TODO:UJ - It would be better to have a pool for each worker as it + /// TODO:UJ - It may be better to have a pool for each worker as it /// may be possible for a worker to have communications /// problems in a way that would wedge the pool. This can /// probably be done fairly easily by having pools @@ -244,6 +250,10 @@ class Czar { /// This was not possible in xrootd as the czar had /// no reasonable way to know where Jobs were going. std::shared_ptr _qdispPool; + + /// Pool of http client connections for sending commands (UberJobs + /// and worker status requests). + std::shared_ptr _commandHttpPool; }; } // namespace lsst::qserv::czar diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index c064f60d1..17dc0c277 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -35,6 +35,8 @@ #include "czar/CzarRegistry.h" #include "qmeta/Exceptions.h" #include "util/Bug.h" +#include "util/InstanceCount.h" //&&& +#include "util/Histogram.h" //&&& #include "util/TimeUtils.h" using namespace std; @@ -357,9 +359,13 @@ bool CzarFamilyMap::_read() { return true; } +util::HistogramRolling histoMakeNewMaps("&&&uj histoMakeNewMaps", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); + std::shared_ptr CzarFamilyMap::makeNewMaps( qmeta::QMetaChunkMap const& qChunkMap) { // Create new maps. + util::InstanceCount ic("CzarFamilyMap::makeNewMaps&&&"); + auto startMakeMaps = CLOCK::now(); //&&& std::shared_ptr newFamilyMap = make_shared(); // Workers -> Databases map @@ -413,6 +419,10 @@ std::shared_ptr CzarFamilyMap::makeNewMaps( } } + auto endMakeMaps = CLOCK::now(); //&&& + std::chrono::duration secsMakeMaps = endMakeMaps - startMakeMaps; // &&& + histoMakeNewMaps.addEntry(endMakeMaps, secsMakeMaps.count()); //&&& + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoMakeNewMaps.getString("")); return newFamilyMap; } diff --git a/src/protojson/UberJobMsg.cc b/src/protojson/UberJobMsg.cc index e92631417..8f617103c 100644 --- a/src/protojson/UberJobMsg.cc +++ b/src/protojson/UberJobMsg.cc @@ -61,11 +61,9 @@ UberJobMsg::UberJobMsg(unsigned int metaVersion, std::string const& replicationI _rowLimit(rowLimit), _maxTableSizeMB(maxTableSizeMB), _scanInfo(scanInfo_) { - //&&&_jobs(jobs) { LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::UberJobMsg start"); for (auto& jobPtr : jobs) { - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::UberJobMsg loop"); // This creates the JobMsg objects for all relates jobs and their fragments. auto jobMsg = JobMsg::create(jobPtr, _jobSubQueryTempMap, _jobDbTablesMap); _jobMsgVect->push_back(jobMsg); @@ -89,24 +87,19 @@ json UberJobMsg::serializeJson() const { {"maxtablesizemb", _maxTableSizeMB}, {"scaninfo", _scanInfo->serializeJson()}, {"jobs", json::array()}}; - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::serializeJson b"); auto& jsJobs = ujmJson["jobs"]; - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::serializeJson c"); for (auto const& jbMsg : *_jobMsgVect) { - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::serializeJson c1"); - json jsJob = jbMsg->serializeJson(); - jsJobs.push_back(jsJob); + //&&&json jsJob = jbMsg->serializeJson(); + //&&&jsJobs.push_back(jsJob); + jsJobs.emplace_back(jbMsg->serializeJson()); } - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::serializeJson d"); - - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& ujmJson=" << ujmJson); + LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " &&& ujmJson=" << ujmJson); return ujmJson; } UberJobMsg::Ptr UberJobMsg::createFromJson(nlohmann::json const& ujmJson) { - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson a"); LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson ujmJson=" << ujmJson); try { if (ujmJson["version"] != http::MetaModule::version) { @@ -114,14 +107,13 @@ UberJobMsg::Ptr UberJobMsg::createFromJson(nlohmann::json const& ujmJson) { return nullptr; } - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson b"); auto czInfo_ = CzarContactInfo::createFromJson(ujmJson["czarinfo"]); if (czInfo_ == nullptr) { LOGS(_log, LOG_LVL_ERROR, "UberJobMsg::createFromJson czar could not be parsed in " << ujmJson); return nullptr; } - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson b-b"); + LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson b"); auto scanInfo_ = ScanInfo::createFromJson(ujmJson["scaninfo"]); if (scanInfo_ == nullptr) { LOGS(_log, LOG_LVL_ERROR, @@ -129,28 +121,18 @@ UberJobMsg::Ptr UberJobMsg::createFromJson(nlohmann::json const& ujmJson) { return nullptr; } - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson c"); auto metaVersion = http::RequestBodyJSON::required(ujmJson, "version"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson d"); auto replicationInstanceId = http::RequestBodyJSON::required(ujmJson, "instance_id"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson e"); auto replicationAuthKey = http::RequestBodyJSON::required(ujmJson, "auth_key"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson f"); auto workerId = http::RequestBodyJSON::required(ujmJson, "worker"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson g"); auto qId = http::RequestBodyJSON::required(ujmJson, "queryid"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson h"); auto ujId = http::RequestBodyJSON::required(ujmJson, "uberjobid"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson i"); auto rowLimit = http::RequestBodyJSON::required(ujmJson, "rowlimit"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson j"); auto maxTableSizeMB = http::RequestBodyJSON::required(ujmJson, "maxtablesizemb"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson k"); auto czInfo = CzarContactInfo::createFromJson(ujmJson["czarinfo"]); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson l"); auto jsUjJobs = http::RequestBodyJSON::required(ujmJson, "jobs"); - LOGS(_log, LOG_LVL_INFO, + LOGS(_log, LOG_LVL_TRACE, " &&& " << metaVersion << replicationInstanceId << replicationAuthKey << workerId << qId << ujId << rowLimit << jsUjJobs); @@ -161,17 +143,12 @@ UberJobMsg::Ptr UberJobMsg::createFromJson(nlohmann::json const& ujmJson) { LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson m"); auto const& jsSubQueriesMap = http::RequestBodyJSON::required(ujmJson, "subqueries_map"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson n"); ujmPtr->_jobSubQueryTempMap = JobSubQueryTempMap::createFromJson(jsSubQueriesMap); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson o"); auto jsDbTablesMap = http::RequestBodyJSON::required(ujmJson, "dbtables_map"); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson p"); ujmPtr->_jobDbTablesMap = JobDbTablesMap::createFromJson(jsDbTablesMap); - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson q"); for (auto const& jsUjJob : jsUjJobs) { - LOGS(_log, LOG_LVL_WARN, "&&& UberJobMsg::createFromJson q1"); JobMsg::Ptr jobMsgPtr = JobMsg::createFromJson(jsUjJob, ujmPtr->_jobSubQueryTempMap, ujmPtr->_jobDbTablesMap); ujmPtr->_jobMsgVect->push_back(jobMsgPtr); @@ -207,91 +184,51 @@ JobMsg::JobMsg(std::shared_ptr const& jobPtr, if (descr == nullptr) { throw util::Bug(ERR_LOC, cName(__func__) + " description=null for job=" + jobPtr->getIdStr()); } - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg a"); auto chunkQuerySpec = descr->getChunkQuerySpec(); _jobId = descr->id(); - //&&&{"attemptCount", attemptCount}, - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg b"); _attemptCount = descr->getAttemptCount(); // &&& may need to increment descr->AttemptCount at this time. - //&&&{"querySpecDb", chunkQuerySpec.db}, - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg c"); _chunkQuerySpecDb = chunkQuerySpec->db; - //&&&{"scanPriority", chunkQuerySpec.scanInfo.scanRating}, - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg d"); _scanRating = chunkQuerySpec->scanInfo->scanRating; - //&&&{"scanInteractive", chunkQuerySpec.scanInteractive}, - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg e"); _scanInteractive = chunkQuerySpec->scanInteractive; - //&&&{"maxTableSize", (cconfig::CzarConfig::instance()->getMaxTableSizeMB())}, - //_maxTableSizeMB; // &&& move up to UberJob - //&&&{"chunkScanTables", nlohmann::json::array()}, - //&&&{"chunkId", chunkQuerySpec.chunkId}, - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg f"); _chunkId = chunkQuerySpec->chunkId; - //&&&{"queryFragments", nlohmann::json::array()}})); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg g"); _chunkResultName = descr->getChunkResultName(); // Add scan tables (&&& not sure is this is the same for all jobs or not) - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg h"); for (auto const& sTbl : chunkQuerySpec->scanInfo->infoTables) { - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg h1"); - /* &&& - nlohmann::json cst = {{"db", sTbl.db}, - {"table", sTbl.table}, - {"lockInMemory", sTbl.lockInMemory}, - {"tblScanRating", sTbl.scanRating}}; - chunkScanTables.push_back(move(cst)); - */ int index = jobDbTablesMap->findDbTable(make_pair(sTbl.db, sTbl.table)); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg h2"); jobDbTablesMap->setScanRating(index, sTbl.scanRating, sTbl.lockInMemory); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg h3"); _chunkScanTableIndexes.push_back(index); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg h4"); } // Add fragments - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg i"); - _jobFragments = - JobFragment::createVect(*chunkQuerySpec, jobSubQueryTempMap, jobDbTablesMap, _chunkResultName); + _jobFragments = JobFragment::createVect(*chunkQuerySpec, jobSubQueryTempMap, jobDbTablesMap); LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::JobMsg end"); } nlohmann::json JobMsg::serializeJson() const { LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson a"); - auto jsJobMsg = - nlohmann::json({//&&&{"czarId", czarId}, - //&&&{"queryId", queryId}, - {"jobId", _jobId}, - {"attemptCount", _attemptCount}, - {"querySpecDb", _chunkQuerySpecDb}, - {"scanPriority", _scanRating}, - {"scanInteractive", _scanInteractive}, - //&&&{"maxTableSize", (cconfig::CzarConfig::instance()->getMaxTableSizeMB())}, - //&&&{"chunkScanTables", nlohmann::json::array()}, - {"chunkId", _chunkId}, - {"chunkresultname", _chunkResultName}, - {"chunkscantables_indexes", nlohmann::json::array()}, - {"queryFragments", json::array()}}); + auto jsJobMsg = nlohmann::json({{"jobId", _jobId}, + {"attemptCount", _attemptCount}, + {"querySpecDb", _chunkQuerySpecDb}, + {"scanPriority", _scanRating}, + {"scanInteractive", _scanInteractive}, + {"chunkId", _chunkId}, + {"chunkresultname", _chunkResultName}, + {"chunkscantables_indexes", nlohmann::json::array()}, + {"queryFragments", json::array()}}); // These are indexes into _jobDbTablesMap, which is shared between all JobMsg in this UberJobMsg. - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson b"); + // &&& TODO:UJ queries appear to work even when "chunkscantables_indexes" is wrong auto& jsqCstIndexes = jsJobMsg["chunkscantables_indexes"]; - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson c"); for (auto const& index : _chunkScanTableIndexes) { - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson c1"); jsqCstIndexes.push_back(index); } - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson d"); auto& jsqFrags = jsJobMsg["queryFragments"]; - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson e"); for (auto& jFrag : *_jobFragments) { - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson e1"); - auto jsFrag = jFrag->serializeJson(); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson e2"); - jsqFrags.push_back(jsFrag); + //&&&auto jsFrag = jFrag->serializeJson(); + //&&&jsqFrags.push_back(jsFrag); + jsqFrags.emplace_back(jFrag->serializeJson()); } LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::serializeJson end"); @@ -314,31 +251,23 @@ JobMsg::JobMsg(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap JobMsg::Ptr JobMsg::createFromJson(nlohmann::json const& ujJson, JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap::Ptr const& jobDbTablesMap) { - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson a"); LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson ujJson=" << ujJson); JobId jobId = http::RequestBodyJSON::required(ujJson, "jobId"); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson b"); int attemptCount = http::RequestBodyJSON::required(ujJson, "attemptCount"); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson c"); string chunkQuerySpecDb = http::RequestBodyJSON::required(ujJson, "querySpecDb"); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson d"); int scanRating = http::RequestBodyJSON::required(ujJson, "scanPriority"); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson e"); bool scanInteractive = http::RequestBodyJSON::required(ujJson, "scanInteractive"); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson f"); int chunkId = http::RequestBodyJSON::required(ujJson, "chunkId"); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson g"); string chunkResultName = http::RequestBodyJSON::required(ujJson, "chunkresultname"); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson h"); json jsQFrags = http::RequestBodyJSON::required(ujJson, "queryFragments"); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson i"); Ptr jMsgPtr = Ptr(new JobMsg(jobSubQueryTempMap, jobDbTablesMap, jobId, attemptCount, chunkQuerySpecDb, scanRating, scanInteractive, chunkId, chunkResultName)); - LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson j"); - jMsgPtr->_jobFragments = JobFragment::createVectFromJson( - jsQFrags, jMsgPtr->_jobSubQueryTempMap, jMsgPtr->_jobDbTablesMap, jMsgPtr->_chunkResultName); + json jsChunkTblIndexes = http::RequestBodyJSON::required(ujJson, "chunkscantables_indexes"); + jMsgPtr->_chunkScanTableIndexes = jsChunkTblIndexes.get>(); + jMsgPtr->_jobFragments = + JobFragment::createVectFromJson(jsQFrags, jMsgPtr->_jobSubQueryTempMap, jMsgPtr->_jobDbTablesMap); LOGS(_log, LOG_LVL_WARN, "&&& JobMsg::createFromJson end"); return jMsgPtr; @@ -350,20 +279,15 @@ json JobSubQueryTempMap::serializeJson() const { // std::map _qTemplateMap; json jsSubQueryTemplateMap = {{"subquerytemplate_map", json::array()}}; - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::serializeJson b"); - LOGS(_log, LOG_LVL_WARN, + LOGS(_log, LOG_LVL_TRACE, "&&& JobSubQueryTempMap::serializeJson jsSubQueryTemplateMap=" << jsSubQueryTemplateMap); auto& jsSqtMap = jsSubQueryTemplateMap["subquerytemplate_map"]; - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::serializeJson c"); for (auto const& [key, templ] : _qTemplateMap) { - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::serializeJson c1"); json jsElem = {{"index", key}, {"template", templ}}; - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::serializeJson c2"); jsSqtMap.push_back(jsElem); } - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::serializeJson e"); - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& " << jsSqtMap); + LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " &&& " << jsSqtMap); LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::serializeJson end"); return jsSubQueryTemplateMap; @@ -399,24 +323,19 @@ JobSubQueryTempMap::Ptr JobSubQueryTempMap::createFromJson(nlohmann::json const& } int JobSubQueryTempMap::findSubQueryTemp(string const& qTemp) { - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::findSubQueryTemp start"); // The expected number of templates is expected to be small, less than 4, // so this shouldn't be horribly expensive. - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::findSubQueryTemp qTemp=" << qTemp); for (auto const& [key, temp] : _qTemplateMap) { - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::findSubQueryTemp key=" << key << " t=" << temp); if (temp == qTemp) { LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::findSubQueryTemp end key=" << key); return key; } } - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::findSubQueryTemp endloop"); // Need to insert int index = _qTemplateMap.size(); - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::findSubQueryTemp index=" << index); _qTemplateMap[index] = qTemp; - LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::findSubQueryTemp end"); + LOGS(_log, LOG_LVL_WARN, "&&& JobSubQueryTempMap::findSubQueryTemp end index=" << index); return index; } @@ -521,107 +440,75 @@ void JobDbTablesMap::setScanRating(int index, int scanRating, bool lockInMemory) } JobFragment::JobFragment(JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, - JobDbTablesMap::Ptr const& jobDbTablesMap, std::string const& resultTblName) - : _jobSubQueryTempMap(jobSubQueryTempMap), - _jobDbTablesMap(jobDbTablesMap), - _resultTblName(resultTblName) { - LOGS(_log, LOG_LVL_WARN, - "&&& JobFragment::JobFragment _jobSubQueryTempMap!=nullptr=" << (_jobSubQueryTempMap != nullptr)); - LOGS(_log, LOG_LVL_WARN, - "&&& JobFragment::JobFragment _jobDbTablesMap!=nullptr=" << (_jobDbTablesMap != nullptr)); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::JobFragment resultTblName=" << resultTblName); -} + JobDbTablesMap::Ptr const& jobDbTablesMap) + : _jobSubQueryTempMap(jobSubQueryTempMap), _jobDbTablesMap(jobDbTablesMap) {} JobFragment::VectPtr JobFragment::createVect(qproc::ChunkQuerySpec const& chunkQuerySpec, JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, - JobDbTablesMap::Ptr const& jobDbTablesMap, - string const& resultTable) { + JobDbTablesMap::Ptr const& jobDbTablesMap) { LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect start"); - VectPtr jFragments{new Vect()}; - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect a"); if (chunkQuerySpec.nextFragment.get()) { - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect a1"); qproc::ChunkQuerySpec const* sPtr = &chunkQuerySpec; while (sPtr) { - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect a1a"); LOGS(_log, LOG_LVL_TRACE, "nextFragment"); for (unsigned int t = 0; t < (sPtr->queries).size(); t++) { // &&& del loop - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect a1a1"); LOGS(_log, LOG_LVL_DEBUG, __func__ << " q=" << (sPtr->queries).at(t)); } - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect a2"); + /* &&& for (auto const& sbi : sPtr->subChunkIds) { // &&& del loop LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect a2a"); LOGS(_log, LOG_LVL_DEBUG, __func__ << " sbi=" << sbi); } + */ // Linked fragments will not have valid subChunkTables vectors, // So, we reuse the root fragment's vector. - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect a3"); - _addFragment(*jFragments, resultTable, chunkQuerySpec.subChunkTables, sPtr->subChunkIds, - sPtr->queries, jobSubQueryTempMap, jobDbTablesMap); + _addFragment(*jFragments, chunkQuerySpec.subChunkTables, sPtr->subChunkIds, sPtr->queries, + jobSubQueryTempMap, jobDbTablesMap); sPtr = sPtr->nextFragment.get(); } - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect a4"); } else { LOGS(_log, LOG_LVL_TRACE, "no nextFragment"); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect b1"); for (unsigned int t = 0; t < (chunkQuerySpec.queries).size(); t++) { // &&& del loop - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect b1a"); LOGS(_log, LOG_LVL_TRACE, (chunkQuerySpec.queries).at(t)); } - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect b2"); - _addFragment(*jFragments, resultTable, chunkQuerySpec.subChunkTables, chunkQuerySpec.subChunkIds, + _addFragment(*jFragments, chunkQuerySpec.subChunkTables, chunkQuerySpec.subChunkIds, chunkQuerySpec.queries, jobSubQueryTempMap, jobDbTablesMap); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect b3"); } LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVect end"); return jFragments; } -void JobFragment::_addFragment(std::vector& jFragments, std::string const& resultTblName, - DbTableSet const& subChunkTables, std::vector const& subchunkIds, - std::vector const& queries, +//&&&void JobFragment::_addFragment(std::vector& jFragments, std::string const& resultTblName, +void JobFragment::_addFragment(std::vector& jFragments, DbTableSet const& subChunkTables, + std::vector const& subchunkIds, std::vector const& queries, JobSubQueryTempMap::Ptr const& subQueryTemplates, JobDbTablesMap::Ptr const& dbTablesMap) { LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment a"); - Ptr jFrag = Ptr(new JobFragment(subQueryTemplates, dbTablesMap, resultTblName)); + Ptr jFrag = Ptr(new JobFragment(subQueryTemplates, dbTablesMap)); // queries: The query string is stored in `_jobSubQueryTempMap` and the list of // integer indexes, `_subQueryTempIndexes`, points back to the specific template. - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment b"); for (auto& qry : queries) { - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment b1"); int index = jFrag->_jobSubQueryTempMap->findSubQueryTemp(qry); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment b2"); jFrag->_jobSubQueryTempIndexes.push_back(index); - LOGS(_log, LOG_LVL_INFO, jFrag->cName(__func__) << "&&& added frag=" << qry << " index=" << index); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment b4"); + LOGS(_log, LOG_LVL_TRACE, jFrag->cName(__func__) << "&&& added frag=" << qry << " index=" << index); } // Add the db+table pairs to the subchunks for the fragment. - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment c"); for (auto& tbl : subChunkTables) { - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment c1"); int index = jFrag->_jobDbTablesMap->findDbTable(make_pair(tbl.db, tbl.table)); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment c2"); jFrag->_jobDbTablesIndexes.push_back(index); - LOGS(_log, LOG_LVL_INFO, - jFrag->cName(__func__) << "&&& added dbtbl=" << tbl.db << "." << tbl.table - << " index=" << index); + LOGS(_log, LOG_LVL_TRACE, + jFrag->cName(__func__) << " added dbtbl=" << tbl.db << "." << tbl.table << " index=" << index); } - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment d"); // Add subchunk id numbers for (auto& subchunkId : subchunkIds) { - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment d1"); jFrag->_subchunkIds.push_back(subchunkId); - LOGS(_log, LOG_LVL_INFO, jFrag->cName(__func__) << "&&& added subchunkId=" << subchunkId); + LOGS(_log, LOG_LVL_TRACE, jFrag->cName(__func__) << " added subchunkId=" << subchunkId); } - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment e"); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment " << jFrag->dump()); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment ee"); jFragments.push_back(move(jFrag)); LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::_addFragment end"); @@ -629,7 +516,7 @@ void JobFragment::_addFragment(std::vector& jFragments, std::string const& string JobFragment::dump() const { stringstream os; - os << "JobFragment resultTbl=" << _resultTblName << " templateIndexes={"; + os << " templateIndexes={"; for (int j : _jobSubQueryTempIndexes) { os << j << ", "; } @@ -646,44 +533,24 @@ string JobFragment::dump() const { } nlohmann::json JobFragment::serializeJson() const { - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::serializeJson a"); - - json jsFragment = {{"resulttblname", _resultTblName}, - {"subquerytemplate_indexes", _jobSubQueryTempIndexes}, + json jsFragment = {{"subquerytemplate_indexes", _jobSubQueryTempIndexes}, {"dbtables_indexes", _jobDbTablesIndexes}, {"subchunkids", _subchunkIds}}; - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::serializeJson b"); - - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& " << jsFragment); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::serializeJson end"); + LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " " << jsFragment); return jsFragment; } JobFragment::VectPtr JobFragment::createVectFromJson(nlohmann::json const& jsFrags, JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, - JobDbTablesMap::Ptr const& dbTablesMap, - std::string const& resultTblName) { + JobDbTablesMap::Ptr const& dbTablesMap) { LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVectFromJson " << jsFrags); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVectFromJson a"); JobFragment::VectPtr jobFragments{new JobFragment::Vect()}; for (auto const& jsFrag : jsFrags) { - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVectFromJson b"); - Ptr jobFrag = Ptr(new JobFragment(jobSubQueryTempMap, dbTablesMap, resultTblName)); - - jobFrag->_resultTblName = http::RequestBodyJSON::required(jsFrag, "resulttblname"); - if (jobFrag->_resultTblName != resultTblName) { - // &&& hoping to remove _resultTblName from JobFragment. - LOGS(_log, LOG_LVL_ERROR, - jobFrag->cName(__func__) + " _resultTblName != resultTblName for " + to_string(jsFrag)); - throw util::Bug(ERR_LOC, jobFrag->cName(__func__) + " _resultTblName != resultTblName for " + - to_string(jsFrag)); - } + Ptr jobFrag = Ptr(new JobFragment(jobSubQueryTempMap, dbTablesMap)); - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVectFromJson c"); - //&&&std::vector _jobSubQueryTempIndexes; ///< &&& doc jobFrag->_jobSubQueryTempIndexes = jsFrag["subquerytemplate_indexes"].get>(); for (int j : jobFrag->_jobSubQueryTempIndexes) { try { @@ -698,7 +565,6 @@ JobFragment::VectPtr JobFragment::createVectFromJson(nlohmann::json const& jsFra } } - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVectFromJson d"); jobFrag->_jobDbTablesIndexes = jsFrag["dbtables_indexes"].get>(); for (int j : jobFrag->_jobDbTablesIndexes) { try { @@ -715,7 +581,6 @@ JobFragment::VectPtr JobFragment::createVectFromJson(nlohmann::json const& jsFra } } - LOGS(_log, LOG_LVL_WARN, "&&& JobFragment::createVectFromJson e"); jobFrag->_subchunkIds = jsFrag["subchunkids"].get>(); jobFragments->push_back(jobFrag); } diff --git a/src/protojson/UberJobMsg.h b/src/protojson/UberJobMsg.h index 51dbc24c6..9203e74ae 100644 --- a/src/protojson/UberJobMsg.h +++ b/src/protojson/UberJobMsg.h @@ -143,6 +143,7 @@ class JobFragment { JobFragment() = delete; JobFragment(JobFragment const&) = delete; + /* &&& static VectPtr createVect(qproc::ChunkQuerySpec const& chunkQuerySpec, JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap::Ptr const& dbTablesMap, std::string const& resultTblName); @@ -152,6 +153,16 @@ class JobFragment { JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, JobDbTablesMap::Ptr const& dbTablesMap, std::string const& resultTblName); + */ + + static VectPtr createVect(qproc::ChunkQuerySpec const& chunkQuerySpec, + JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, + JobDbTablesMap::Ptr const& dbTablesMap); + + /// &&& doc + static VectPtr createVectFromJson(nlohmann::json const& ujJson, + JobSubQueryTempMap::Ptr const& jobSubQueryTempMap, + JobDbTablesMap::Ptr const& dbTablesMap); /// Return a json version of the contents of this class. nlohmann::json serializeJson() const; @@ -159,20 +170,30 @@ class JobFragment { std::vector const& getJobSubQueryTempIndexes() const { return _jobSubQueryTempIndexes; } std::vector const& getJobDbTablesIndexes() const { return _jobDbTablesIndexes; } std::vector const& getSubchunkIds() const { return _subchunkIds; } - std::string const& getResultTblName() const { return _resultTblName; } + //&&&std::string const& getResultTblName() const { return _resultTblName; } std::string dump() const; private: + /* &&& JobFragment(JobSubQueryTempMap::Ptr const& subQueryTemplates, JobDbTablesMap::Ptr const& dbTablesMap, std::string const& resultTblName); + */ + JobFragment(JobSubQueryTempMap::Ptr const& subQueryTemplates, JobDbTablesMap::Ptr const& dbTablesMap); /// &&& doc + static void _addFragment(std::vector& jFragments, DbTableSet const& subChunkTables, + std::vector const& subchunkIds, std::vector const& queries, + JobSubQueryTempMap::Ptr const& subQueryTemplates, + JobDbTablesMap::Ptr const& dbTablesMap); + + /* &&& static void _addFragment(std::vector& jFragments, std::string const& resultTblName, DbTableSet const& subChunkTables, std::vector const& subchunkIds, std::vector const& queries, JobSubQueryTempMap::Ptr const& subQueryTemplates, JobDbTablesMap::Ptr const& dbTablesMap); + */ JobSubQueryTempMap::Ptr _jobSubQueryTempMap; ///< &&& doc std::vector _jobSubQueryTempIndexes; ///< &&& doc @@ -182,8 +203,10 @@ class JobFragment { std::vector _subchunkIds; ///< &&& doc + /* &&& std::string _resultTblName; ///< &&& doc &&& probably not needed here. Replace with ///< JobMsg::_chunkResultName field. + */ }; /// This class is used to store the information for a single Job (the queries and metadata diff --git a/src/protojson/testUberJobMsg.cc b/src/protojson/testUberJobMsg.cc index a56c77175..e0b056422 100644 --- a/src/protojson/testUberJobMsg.cc +++ b/src/protojson/testUberJobMsg.cc @@ -54,26 +54,65 @@ std::string testA() { } #endif // &&& -std::string testA() { - std::string ta = +string testA() { + string ta = R"({"maxtablesizemb":5432,"auth_key":"replauthkey","czarinfo":{"czar-startup-time":1732658208085,"id":1,"management-host-name":"3a8b68cf9b67","management-port":40865,"name":"proxy"},"dbtables_map":{"dbtable_map":[],"scanrating_map":[]},"scaninfo":{"infoscanrating":0,"infotables":[]},"instance_id":"qserv_proj","jobs":[{"attemptCount":0,"chunkId":1234567890,"chunkresultname":"r_1_a0d45001254932466b784acf90323565_1234567890_0","chunkscantables_indexes":[],"jobId":0,"queryFragments":[{"dbtables_indexes":[],"resulttblname":"r_1_a0d45001254932466b784acf90323565_1234567890_0","subchunkids":[],"subquerytemplate_indexes":[0]}],"querySpecDb":"qcase01","scanInteractive":true,"scanPriority":0}],"queryid":1,"rowlimit":0,"subqueries_map":{"subquerytemplate_map":[{"index":0,"template":"SELECT `qcase01.Filter`.`filterId` AS `filterId`,`qcase01.Filter`.`filterName` AS `filterName`,`qcase01.Filter`.`photClam` AS `photClam`,`qcase01.Filter`.`photBW` AS `photBW` FROM `qcase01`.`Filter`AS`qcase01.Filter` WHERE (`qcase01.Filter`.`filterId`<<1)=2"}]},"uberjobid":2,"version":39,"worker":"6c56ba9b-ac40-11ef-acb7-0242c0a8030a"})"; return ta; } +string testB() { + string tb = + R"({"auth_key":"slac6dev:kukara4a","czarinfo":{"czar-startup-time":1733499789161,"id":7,"management-host-name":"sdfqserv001.sdf.slac.stanford.edu","management-port":41923,"name":"proxy"},"dbtables_map":{"dbtable_map":[{"db":"dp02_dc2_catalogs","index":0,"table":"Object"}],"scanrating_map":[{"index":0,"lockinmem":true,"scanrating":1}]},"instance_id":"slac6dev","jobs":[{"attemptCount":0,"chunkId":79680,"chunkresultname":"r_280607_e6eac6bb53b0f8505ed36bf82a4d93f1_79680_0","chunkscantables_indexes":[0],"jobId":1398,"queryFragments":[{"dbtables_indexes":[],"resulttblname":"r_280607_e6eac6bb53b0f8505ed36bf82a4d93f1_79680_0","subchunkids":[],"subquerytemplate_indexes":[0]}],"querySpecDb":"dp02_dc2_catalogs","scanInteractive":false,"scanPriority":1},{"attemptCount":0,"chunkId":80358,"chunkresultname":"r_280607_e6eac6bb53b0f8505ed36bf82a4d93f1_80358_0","chunkscantables_indexes":[0],"jobId":1435,"queryFragments":[{"dbtables_indexes":[],"resulttblname":"r_280607_e6eac6bb53b0f8505ed36bf82a4d93f1_80358_0","subchunkids":[],"subquerytemplate_indexes":[1]}],"querySpecDb":"dp02_dc2_catalogs","scanInteractive":false,"scanPriority":1},{"attemptCount":0,"chunkId":81017,"chunkresultname":"r_280607_e6eac6bb53b0f8505ed36bf82a4d93f1_81017_0","chunkscantables_indexes":[0],"jobId":1452,"queryFragments":[{"dbtables_indexes":[],"resulttblname":"r_280607_e6eac6bb53b0f8505ed36bf82a4d93f1_81017_0","subchunkids":[],"subquerytemplate_indexes":[2]}],"querySpecDb":"dp02_dc2_catalogs","scanInteractive":false,"scanPriority":1}],"maxtablesizemb":5100,"queryid":280607,"rowlimit":0,"scaninfo":{"infoscanrating":1,"infotables":[{"sidb":"dp02_dc2_catalogs","silockinmem":true,"sirating":1,"sitable":"Object"}]},"subqueries_map":{"subquerytemplate_map":[{"index":0,"template":"SELECT COUNT(`obj`.`g_ap12Flux`) AS `QS1_COUNT`,SUM(`obj`.`g_ap12Flux`) AS `QS2_SUM`,MIN(`obj`.`g_ap12Flux`) AS `QS3_MIN`,MAX(`obj`.`g_ap12Flux`) AS `QS4_MAX`,COUNT(`obj`.`g_ap12FluxErr`) AS `QS5_COUNT`,SUM(`obj`.`g_ap12FluxErr`) AS `QS6_SUM`,MIN(`obj`.`g_ap12FluxErr`) AS `QS7_MIN`,MAX(`obj`.`g_ap12FluxErr`) AS `QS8_MAX`,COUNT(`obj`.`g_ap25Flux`) AS `QS9_COUNT`,SUM(`obj`.`g_ap25Flux`) AS `QS10_SUM`,MIN(`obj`.`g_ap25Flux`) AS `QS11_MIN`,MAX(`obj`.`g_ap25Flux`) AS `QS12_MAX`,COUNT(`obj`.`g_ap25FluxErr`) AS `QS13_COUNT`,SUM(`obj`.`g_ap25FluxErr`) AS `QS14_SUM`,MIN(`obj`.`g_ap25FluxErr`) AS `QS15_MIN`,MAX(`obj`.`g_ap25FluxErr`) AS `QS16_MAX` FROM `dp02_dc2_catalogs`.`Object_79680` AS `obj`"},{"index":1,"template":"SELECT COUNT(`obj`.`g_ap12Flux`) AS `QS1_COUNT`,SUM(`obj`.`g_ap12Flux`) AS `QS2_SUM`,MIN(`obj`.`g_ap12Flux`) AS `QS3_MIN`,MAX(`obj`.`g_ap12Flux`) AS `QS4_MAX`,COUNT(`obj`.`g_ap12FluxErr`) AS `QS5_COUNT`,SUM(`obj`.`g_ap12FluxErr`) AS `QS6_SUM`,MIN(`obj`.`g_ap12FluxErr`) AS `QS7_MIN`,MAX(`obj`.`g_ap12FluxErr`) AS `QS8_MAX`,COUNT(`obj`.`g_ap25Flux`) AS `QS9_COUNT`,SUM(`obj`.`g_ap25Flux`) AS `QS10_SUM`,MIN(`obj`.`g_ap25Flux`) AS `QS11_MIN`,MAX(`obj`.`g_ap25Flux`) AS `QS12_MAX`,COUNT(`obj`.`g_ap25FluxErr`) AS `QS13_COUNT`,SUM(`obj`.`g_ap25FluxErr`) AS `QS14_SUM`,MIN(`obj`.`g_ap25FluxErr`) AS `QS15_MIN`,MAX(`obj`.`g_ap25FluxErr`) AS `QS16_MAX` FROM `dp02_dc2_catalogs`.`Object_80358` AS `obj`"},{"index":2,"template":"SELECT COUNT(`obj`.`g_ap12Flux`) AS `QS1_COUNT`,SUM(`obj`.`g_ap12Flux`) AS `QS2_SUM`,MIN(`obj`.`g_ap12Flux`) AS `QS3_MIN`,MAX(`obj`.`g_ap12Flux`) AS `QS4_MAX`,COUNT(`obj`.`g_ap12FluxErr`) AS `QS5_COUNT`,SUM(`obj`.`g_ap12FluxErr`) AS `QS6_SUM`,MIN(`obj`.`g_ap12FluxErr`) AS `QS7_MIN`,MAX(`obj`.`g_ap12FluxErr`) AS `QS8_MAX`,COUNT(`obj`.`g_ap25Flux`) AS `QS9_COUNT`,SUM(`obj`.`g_ap25Flux`) AS `QS10_SUM`,MIN(`obj`.`g_ap25Flux`) AS `QS11_MIN`,MAX(`obj`.`g_ap25Flux`) AS `QS12_MAX`,COUNT(`obj`.`g_ap25FluxErr`) AS `QS13_COUNT`,SUM(`obj`.`g_ap25FluxErr`) AS `QS14_SUM`,MIN(`obj`.`g_ap25FluxErr`) AS `QS15_MIN`,MAX(`obj`.`g_ap25FluxErr`) AS `QS16_MAX` FROM `dp02_dc2_catalogs`.`Object_81017` AS `obj`"}]},"uberjobid":147,"version":39,"worker":"db04"})"; + return tb; +} + +bool parseSerializeReparseCheck(string const& jsStr, string const& note) { + string fName("parseSerialize "); + fName += note + " "; + LOGS(_log, LOG_LVL_INFO, fName << " start " << jsStr); + nlohmann::json js = nlohmann::json::parse(jsStr); + LOGS(_log, LOG_LVL_INFO, fName << " parse 1"); + + UberJobMsg::Ptr ujm = UberJobMsg::createFromJson(js); + BOOST_REQUIRE(ujm != nullptr); + + nlohmann::json jsUjm = ujm->serializeJson(); + LOGS(_log, LOG_LVL_INFO, fName << " serialized jsUjm=" << jsUjm); + + UberJobMsg::Ptr ujmCreated = UberJobMsg::createFromJson(jsUjm); + LOGS(_log, LOG_LVL_INFO, fName << " created"); + nlohmann::json jsUjmCreated = ujmCreated->serializeJson(); + LOGS(_log, LOG_LVL_INFO, fName << " created->serialized"); + + bool createdMatchesOriginal = jsUjm == jsUjmCreated; + if (createdMatchesOriginal) { + LOGS(_log, LOG_LVL_INFO, fName << "created matches original"); + } else { + LOGS(_log, LOG_LVL_ERROR, "jsUjm != jsUjmCreated"); + LOGS(_log, LOG_LVL_ERROR, "jsUjm=" << jsUjm); + LOGS(_log, LOG_LVL_ERROR, "jsUjmCreated=" << jsUjmCreated); + } + BOOST_REQUIRE(createdMatchesOriginal); + return createdMatchesOriginal; +} + BOOST_AUTO_TEST_CASE(WorkerQueryStatusData) { string const replicationInstanceId = "repliInstId"; string const replicationAuthKey = "repliIAuthKey"; - uint64_t cxrStartTime = lsst::qserv::millisecSinceEpoch(lsst::qserv::CLOCK::now() - 5s); + //&&&uint64_t cxrStartTime = lsst::qserv::millisecSinceEpoch(lsst::qserv::CLOCK::now() - 5s); //&&&uint64_t wkrStartTime = lsst::qserv::millisecSinceEpoch(lsst::qserv::CLOCK::now() - 10s); - string const czrName("czar_name"); - lsst::qserv::CzarIdType const czrId = 32; - int czrPort = 2022; - string const czrHost("cz_host"); - LOGS(_log, LOG_LVL_WARN, "&&& testUJM a"); - auto czarA = - lsst::qserv::protojson::CzarContactInfo::create(czrName, czrId, czrPort, czrHost, cxrStartTime); + /* &&& + string const czrName("czar_name"); + lsst::qserv::CzarIdType const czrId = 32; + int czrPort = 2022; + string const czrHost("cz_host"); + LOGS(_log, LOG_LVL_WARN, "&&& testUJM a"); + auto czarA = + lsst::qserv::protojson::CzarContactInfo::create(czrName, czrId, czrPort, czrHost, + cxrStartTime); + LOGS(_log, LOG_LVL_WARN, "&&& testUJM b"); string jsStr = testA(); @@ -99,6 +138,10 @@ BOOST_AUTO_TEST_CASE(WorkerQueryStatusData) { LOGS(_log, LOG_LVL_ERROR, "jsUjmCreated=" << jsUjmCreated); } BOOST_REQUIRE(createdMatchesOriginal); + */ + + BOOST_REQUIRE(parseSerializeReparseCheck(testA(), "A")); + BOOST_REQUIRE(parseSerializeReparseCheck(testB(), "B")); } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/qana/QueryMapping.h b/src/qana/QueryMapping.h index 2e8dca319..585971f97 100644 --- a/src/qana/QueryMapping.h +++ b/src/qana/QueryMapping.h @@ -92,6 +92,8 @@ class QueryMapping { bool hasParameter(Parameter p) const; DbTableSet const& getSubChunkTables() const { return _subChunkTables; } + std::string dump() const { return std::string("&&& NEED CODE"); } + private: ParameterMap _subs; DbTableSet _subChunkTables; diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index 9df2bbf08..3a709d5b2 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -236,6 +236,7 @@ void Executive::queueFileCollect(util::PriorityCommand::Ptr const& cmd) { } void Executive::queueUberJob(std::shared_ptr const& uberJob) { + LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&&uj queueUberJob"); auto runUberJobFunc = [uberJob](util::CmdData*) { uberJob->runUberJob(); }; auto cmd = util::PriorityCommand::Ptr(new util::PriorityCommand(runUberJobFunc)); diff --git a/src/qdisp/JobDescription.h b/src/qdisp/JobDescription.h index 10a9f13ba..635540297 100644 --- a/src/qdisp/JobDescription.h +++ b/src/qdisp/JobDescription.h @@ -59,8 +59,7 @@ namespace qdisp { class Executive; class ResponseHandler; -/** Description of a job managed by the executive - */ +/// Description of a job managed by the executive class JobDescription { public: using Ptr = std::shared_ptr; diff --git a/src/qdisp/UberJob.cc b/src/qdisp/UberJob.cc index 990154803..0765a14ce 100644 --- a/src/qdisp/UberJob.cc +++ b/src/qdisp/UberJob.cc @@ -43,6 +43,7 @@ #include "qproc/ChunkQuerySpec.h" #include "util/Bug.h" #include "util/common.h" +#include "util/Histogram.h" //&&& #include "util/QdispPool.h" // LSST headers @@ -98,9 +99,12 @@ bool UberJob::addJob(JobQuery::Ptr const& job) { return success; } -void UberJob::runUberJob() { +util::HistogramRolling histoRunUberJob("&&&uj histoRunUberJob", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); +util::HistogramRolling histoUJSerialize("&&&uj histoUJSerialize", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); + +void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelled LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " start"); - LOGS(_log, LOG_LVL_ERROR, "&&& jsonTESTrequest start"); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj start"); // Build the uberjob payload for each job. nlohmann::json uj; unique_lock jobsLock(_jobsMtx); @@ -149,8 +153,8 @@ void UberJob::runUberJob() { jsJobs.push_back(jsJob); jbPtr->getDescription()->resetJsForWorker(); // no longer needed. } -#else // &&& - LOGS(_log, LOG_LVL_ERROR, "&&& jsonTESTrequest a"); +#else // &&& + //&&&LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj a"); // Send the uberjob to the worker auto const method = http::Method::POST; auto [ciwId, ciwHost, ciwManagment, ciwPort] = _wContactInfo->getAll(); @@ -167,34 +171,51 @@ void UberJob::runUberJob() { auto uberJobMsg = protojson::UberJobMsg::create( http::MetaModule::version, czarConfig->replicationInstanceId(), czarConfig->replicationAuthKey(), czInfo, _wContactInfo, _queryId, _uberJobId, _rowLimit, maxTableSizeMB, scanInfoPtr, _jobs); + auto startserialize = CLOCK::now(); //&&& json request = uberJobMsg->serializeJson(); - - LOGS(_log, LOG_LVL_ERROR, "&&& jsonTESTrequest=" << request); + auto endserialize = CLOCK::now(); //&&& + std::chrono::duration secsserialize = endserialize - startserialize; // &&& + histoUJSerialize.addEntry(endserialize, secsserialize.count()); //&&& + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoUJSerialize.getString("")); +#endif // &&& + jobsLock.unlock(); // unlock so other _jobsMtx threads can advance while this waits for transmit + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj c"); + /* &&& { // &&& testing only, delete auto parsedReq = protojson::UberJobMsg::createFromJson(request); json jsParsedReq = parsedReq->serializeJson(); if (request == jsParsedReq) { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& YAY!!! "); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj YAY!!! "); } else { - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&& request != jsParsedReq"); - LOGS(_log, LOG_LVL_ERROR, "&&& request=" << request); - LOGS(_log, LOG_LVL_ERROR, "&&& jsParsedReq=" << jsParsedReq); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj noYAY request != jsParsedReq"); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj request=" << request); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj jsParsedReq=" << jsParsedReq); } } - -#endif // &&& - jobsLock.unlock(); // unlock so other _jobsMtx threads can advance while this waits for transmit + */ LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " REQ " << request); string const requestContext = "Czar: '" + http::method2string(method) + "' request to '" + url + "'"; LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " czarPost url=" << url << " request=" << request.dump() << " headers=" << headers[0]); - http::Client client(method, url, request.dump(), headers); + auto startclient = CLOCK::now(); //&&& + + auto commandHttpPool = czar::Czar::getCzar()->getCommandHttpPool(); + http::ClientConfig clientConfig; + clientConfig.httpVersion = CURL_HTTP_VERSION_1_1; // same as in qhttp + clientConfig.bufferSize = CURL_MAX_READ_SIZE; // 10 MB in the current version of libcurl + clientConfig.tcpKeepAlive = true; + clientConfig.tcpKeepIdle = 30; // the default is 60 sec + clientConfig.tcpKeepIntvl = 5; // the default is 60 sec + http::Client client(method, url, request.dump(), headers, clientConfig, commandHttpPool); bool transmitSuccess = false; string exceptionWhat; try { + //&&&util::InstanceCount ic{"runUberJob&&&"}; + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj d"); json const response = client.readAsJson(); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj d1"); if (0 != response.at("success").get()) { transmitSuccess = true; } else { @@ -204,6 +225,10 @@ void UberJob::runUberJob() { LOGS(_log, LOG_LVL_WARN, requestContext + " ujresponse failed, ex: " + ex.what()); exceptionWhat = ex.what(); } + auto endclient = CLOCK::now(); //&&& + std::chrono::duration secsclient = endclient - startclient; // &&& + histoRunUberJob.addEntry(endclient, secsclient.count()); //&&& + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoRunUberJob.getString("")); if (!transmitSuccess) { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " transmit failure, try to send jobs elsewhere"); _unassignJobs(); // locks _jobsMtx @@ -213,6 +238,7 @@ void UberJob::runUberJob() { } else { setStatusIfOk(qmeta::JobStatus::REQUEST, cName(__func__) + " transmitSuccess"); // locks _jobsMtx } + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj runuj end"); return; } diff --git a/src/qproc/ChunkQuerySpec.h b/src/qproc/ChunkQuerySpec.h index d7ad75984..41582368f 100644 --- a/src/qproc/ChunkQuerySpec.h +++ b/src/qproc/ChunkQuerySpec.h @@ -67,7 +67,8 @@ class ChunkQuerySpec { bool scanInteractive{false}; DbTableSet subChunkTables; std::vector subChunkIds; - std::vector queries; + std::vector queries; // &&& remove if possible + std::vector queryTemplates; // Consider promoting the concept of container of ChunkQuerySpec // in the hopes of increased code cleanliness. std::shared_ptr nextFragment; ///< ad-hoc linked list (consider removal) diff --git a/src/qproc/ChunkSpec.cc b/src/qproc/ChunkSpec.cc index 1bd36261f..0d1d0dba5 100644 --- a/src/qproc/ChunkSpec.cc +++ b/src/qproc/ChunkSpec.cc @@ -121,7 +121,9 @@ void normalize(ChunkSpecVector& specs) { //////////////////////////////////////////////////////////////////////// // ChunkSpec //////////////////////////////////////////////////////////////////////// -bool ChunkSpec::shouldSplit() const { return subChunks.size() > (unsigned)GOOD_SUBCHUNK_COUNT; } +//&&&bool ChunkSpec::shouldSplit() const { return subChunks.size() > (unsigned)GOOD_SUBCHUNK_COUNT; } +//&&& subchunks are handled in their own tasks now, so there's no point in splitting anymore. +bool ChunkSpec::shouldSplit() const { return false; } ChunkSpec ChunkSpec::intersect(ChunkSpec const& cs) const { ChunkSpec output(*this); diff --git a/src/qproc/QuerySession.cc b/src/qproc/QuerySession.cc index 969409a4d..7099e2647 100644 --- a/src/qproc/QuerySession.cc +++ b/src/qproc/QuerySession.cc @@ -391,6 +391,7 @@ std::vector QuerySession::_buildChunkQueries(query::QueryTemplate:: } for (auto&& queryTemplate : queryTemplates) { + LOGS(_log, LOG_LVL_WARN, "&&&uj QuerySession::_buildChunkQueries qt=" << queryTemplate.dump()); std::string str = _context->queryMapping->apply(chunkSpec, queryTemplate); chunkQueries.push_back(std::move(str)); } @@ -417,7 +418,7 @@ ChunkQuerySpec::Ptr QuerySession::buildChunkQuerySpec(query::QueryTemplate::Vect if (!_context->hasSubChunks()) { cQSpec->queries = _buildChunkQueries(queryTemplates, chunkSpec); } else { - if (chunkSpec.shouldSplit()) { + if (chunkSpec.shouldSplit()) { //&&& remove case ChunkSpecFragmenter frag(chunkSpec); ChunkSpec s = frag.get(); cQSpec->queries = _buildChunkQueries(queryTemplates, s); diff --git a/src/query/QueryTemplate.cc b/src/query/QueryTemplate.cc index 699a6faab..32e628e90 100644 --- a/src/query/QueryTemplate.cc +++ b/src/query/QueryTemplate.cc @@ -43,6 +43,8 @@ #include "query/ColumnRef.h" #include "query/TableRef.h" +using namespace std; + namespace lsst::qserv::query { //////////////////////////////////////////////////////////////////////// @@ -204,4 +206,18 @@ QueryTemplate::GetAliasMode QueryTemplate::getTableAliasMode() const { return DONT_USE; // should never get here but to satisfy the compiler. } +string QueryTemplate::dump() const { + ostringstream os; + os << "QueryTemplate quoteIdents=" << _quoteIdentifiers; + os << " useColOnly=" << _useColumnOnly; + os << " aliasMode=" << _aliasMode; + os << " entries={"; + for (auto const& entry : _entries) { + os << "(dynamic=" << entry->isDynamic(); + os << ":val=" << entry->getValue() << ")"; + } + os << "}"; + return os.str(); +} + } // namespace lsst::qserv::query diff --git a/src/query/QueryTemplate.h b/src/query/QueryTemplate.h index 5be5e3ac0..b0ffad8ba 100644 --- a/src/query/QueryTemplate.h +++ b/src/query/QueryTemplate.h @@ -208,6 +208,8 @@ class QueryTemplate { return os << qt.sqlFragment(); } + std::string dump() const; + private: EntryPtrVector _entries; SetAliasMode _aliasMode{USE_ALIAS}; diff --git a/src/util/InstanceCount.cc b/src/util/InstanceCount.cc index 9940523f3..895698d63 100644 --- a/src/util/InstanceCount.cc +++ b/src/util/InstanceCount.cc @@ -31,7 +31,8 @@ void InstanceCount::_increment(std::string const& source) { auto ret = _instances.insert(entry); auto iter = ret.first; iter->second += 1; - LOGS(_log, LOG_LVL_DEBUG, "InstanceCount " << source << " " << iter->first << "=" << iter->second); + LOGS(_log, LOG_LVL_WARN, + "InstanceCount " << source << " " << iter->first << "=" << iter->second); //&&&DEBUG } InstanceCount::~InstanceCount() { @@ -39,7 +40,8 @@ InstanceCount::~InstanceCount() { auto iter = _instances.find(_className); if (iter != _instances.end()) { iter->second -= 1; - LOGS(_log, LOG_LVL_DEBUG, "~InstanceCount " << iter->first << "=" << iter->second << " : " << *this); + LOGS(_log, LOG_LVL_WARN, + "~InstanceCount " << iter->first << "=" << iter->second << " : " << *this); //&&&DEBUG if (iter->second == 0) { _instances.erase(_className); } diff --git a/src/wsched/BlendScheduler.cc b/src/wsched/BlendScheduler.cc index ccb335b97..b5b37346f 100644 --- a/src/wsched/BlendScheduler.cc +++ b/src/wsched/BlendScheduler.cc @@ -259,6 +259,7 @@ void BlendScheduler::commandStart(util::Command::Ptr const& cmd) { LOGS(_log, LOG_LVL_ERROR, "BlendScheduler::commandStart scheduler not found"); } _infoChanged = true; + LOGS(_log, LOG_LVL_DEBUG, "BlendScheduler::commandStart &&& end"); } void BlendScheduler::commandFinish(util::Command::Ptr const& cmd) {