diff --git a/src/cconfig/CzarConfig.h b/src/cconfig/CzarConfig.h index b4d51fc7c..4b0c1cde3 100644 --- a/src/cconfig/CzarConfig.h +++ b/src/cconfig/CzarConfig.h @@ -118,7 +118,7 @@ class CzarConfig { */ std::string const& getXrootdFrontendUrl() const { return _xrootdFrontendUrl->getVal(); } - /* Get the maximum number of threads for xrootd to use. + /* Get the maximum number of threads for xrootd to use. // TODO:UJ delete * * @return the maximum number of threads for xrootd to use. */ @@ -371,6 +371,7 @@ class CzarConfig { CVTStrPtr _qdispVectMinRunningSizes = util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3"); + // TODO:UJ delete xrootd specific entries. CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4); CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create( _configValMap, "tuning", "qMetaSecsBetweenChunkCompletionUpdates", notReq, 60); @@ -416,8 +417,8 @@ class CzarConfig { util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutDeadSecs", notReq, 60 * 10); CVTIntPtr _activeWorkerMaxLifetimeSecs = // 1hr util::ConfigValTInt::create(_configValMap, "activeworker", "maxLifetimeSecs", notReq, 60 * 60); - CVTIntPtr _monitorSleepTimeMilliSec = - util::ConfigValTInt::create(_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000); + CVTIntPtr _monitorSleepTimeMilliSec = util::ConfigValTInt::create( + _configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000); // UberJobs CVTIntPtr _uberJobMaxChunks = diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index 9a6ee5b5c..aaa940049 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -123,6 +123,10 @@ std::tuple readHttpFileAndMergeHttp( int headerCount = 0; uint64_t totalBytesRead = 0; try { + auto exec = uberJob->getExecutive(); + if (exec == nullptr || exec->getCancelled()) { + throw runtime_error(context + " query was cancelled"); + } string const noClientData; vector const noClientHeaders; http::ClientConfig clientConfig; @@ -139,10 +143,12 @@ std::tuple readHttpFileAndMergeHttp( bool last = false; char const* next = inBuf; char const* const end = inBuf + inBufSize; + LOGS(_log, LOG_LVL_INFO, + context << " next=" << (uint64_t)next << " end=" << (uint64_t)end); // &&& DEBUG while ((next < end) && !last) { - LOGS(_log, LOG_LVL_WARN, - context << "TODO:UJ next=" << (uint64_t)next << " end=" << (uint64_t)end - << " last=" << last); + if (exec->getCancelled()) { + throw runtime_error(context + " query was cancelled"); + } if (msgSizeBytes == 0) { // Continue or finish reading the frame header. size_t const bytes2read = @@ -210,15 +216,15 @@ std::tuple readHttpFileAndMergeHttp( msgSizeBytes = 0; } else { LOGS(_log, LOG_LVL_WARN, - context << " headerCount=" << headerCount - << " incomplete read diff=" << (msgSizeBytes - msgBufNext)); + context << " headerCount=" << headerCount << " incomplete read diff=" + << (msgSizeBytes - msgBufNext)); // &&& DEBUG } } } }); - LOGS(_log, LOG_LVL_DEBUG, + LOGS(_log, LOG_LVL_WARN, context << " headerCount=" << headerCount << " msgSizeBytes=" << msgSizeBytes - << " totalBytesRead=" << totalBytesRead); + << " totalBytesRead=" << totalBytesRead); // &&& if (msgSizeBufNext != 0) { throw runtime_error("short read of the message header at offset " + to_string(offset - msgSizeBytes) + ", file: " + httpUrl); @@ -366,7 +372,7 @@ tuple MergingHandler::flushHttp(string const& fileUrl, uint64_t expe } if (success) { - _infileMerger->mergeCompleteFor(uberJob->getJobId()); + _infileMerger->mergeCompleteFor(uberJob->getUjId()); } return {success, shouldCancel}; } diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index aa4e06dd0..6868abb16 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -98,6 +98,7 @@ class MergingHandler : public qdisp::ResponseHandler { /// Prepare for first call to flush(). void _initState(); + // &&& delete bool _merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData, std::shared_ptr const& jobQuery); diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index 02ac3cebd..186359768 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -156,16 +156,16 @@ std::string UserQuerySelect::getError() const { /// Attempt to kill in progress. void UserQuerySelect::kill() { - LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect kill"); + LOGS(_log, LOG_LVL_INFO, "UserQuerySelect KILL"); std::lock_guard lock(_killMutex); if (!_killed) { _killed = true; - int64_t collectedRows = _executive->getTotalResultRows(); + auto exec = _executive; + int64_t collectedRows = (exec) ? exec->getTotalResultRows() : -1; size_t collectedBytes = _infileMerger->getTotalResultSize(); try { // make a copy of executive pointer to keep it alive and avoid race // with pointer being reset in discard() method - std::shared_ptr exec = _executive; if (exec != nullptr) { exec->squash(); } @@ -233,6 +233,11 @@ std::string UserQuerySelect::getResultQuery() const { /// Begin running on all chunks added so far. void UserQuerySelect::submit() { + auto exec = _executive; + if (exec == nullptr) { + LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::submit() executive is null at start"); + return; + } _qSession->finalize(); // Using the QuerySession, generate query specs (text, db, chunkId) and then @@ -259,13 +264,13 @@ void UserQuerySelect::submit() { LOGS(_log, LOG_LVL_WARN, "Failed queryStatsTmpRegister " << e.what()); } - _executive->setScanInteractive(_qSession->getScanInteractive()); - _executive->setScanInfo(_qSession->getScanInfo()); + exec->setScanInteractive(_qSession->getScanInteractive()); + exec->setScanInfo(_qSession->getScanInfo()); string dbName(""); bool dbNameSet = false; - for (auto i = _qSession->cQueryBegin(), e = _qSession->cQueryEnd(); i != e && !_executive->getCancelled(); + for (auto i = _qSession->cQueryBegin(), e = _qSession->cQueryEnd(); i != e && !exec->getCancelled(); ++i) { auto& chunkSpec = *i; @@ -297,9 +302,9 @@ void UserQuerySelect::submit() { ResourceUnit ru; ru.setAsDbChunk(cs->db, cs->chunkId); qdisp::JobDescription::Ptr jobDesc = qdisp::JobDescription::create( - _qMetaCzarId, _executive->getId(), sequence, ru, + _qMetaCzarId, exec->getId(), sequence, ru, std::make_shared(_infileMerger, chunkResultName), cs, chunkResultName); - auto job = _executive->add(jobDesc); + auto job = exec->add(jobDesc); ++sequence; } @@ -309,12 +314,12 @@ void UserQuerySelect::submit() { /// At this point the executive has a map of all jobs with the chunkIds as the key. // This is needed to prevent Czar::_monitor from starting things before they are ready. - _executive->setReadyToExecute(); + exec->setReadyToExecute(); buildAndSendUberJobs(); LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence); // TODO:UJ Waiting for all jobs to start may not be needed anymore? - _executive->waitForAllJobsToStart(); + exec->waitForAllJobsToStart(); // we only care about per-chunk info for ASYNC queries if (_async) { @@ -331,18 +336,23 @@ void UserQuerySelect::buildAndSendUberJobs() { LOGS(_log, LOG_LVL_DEBUG, funcN << " start " << _uberJobMaxChunks); // Ensure `_monitor()` doesn't do anything until everything is ready. - if (!_executive->isReadyToExecute()) { + auto exec = _executive; + if (exec == nullptr) { + LOGS(_log, LOG_LVL_ERROR, funcN << " called with null exec " << getQueryIdString()); + return; + } + if (!exec->isReadyToExecute()) { LOGS(_log, LOG_LVL_INFO, funcN << " executive isn't ready to generate UberJobs."); return; } // Only one thread should be generating UberJobs for this user query at any given time. lock_guard fcLock(_buildUberJobMtx); - LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " totalJobs=" << _executive->getTotalJobs()); + LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " totalJobs=" << exec->getTotalJobs()); vector uberJobs; - qdisp::Executive::ChunkIdJobMapType unassignedChunksInQuery = _executive->unassignedChunksInQuery(); + qdisp::Executive::ChunkIdJobMapType unassignedChunksInQuery = exec->unassignedChunksInQuery(); if (unassignedChunksInQuery.empty()) { LOGS(_log, LOG_LVL_DEBUG, funcN << " no unassigned Jobs"); return; @@ -397,9 +407,8 @@ void UserQuerySelect::buildAndSendUberJobs() { // Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum, // and should minimize the time for the first UberJob on the worker to complete. for (auto const& [chunkId, jqPtr] : unassignedChunksInQuery) { - bool const increaseAttemptCount = true; - jqPtr->getDescription()->incrAttemptCount(_executive, increaseAttemptCount); + jqPtr->getDescription()->incrAttemptCount(exec, increaseAttemptCount); // If too many workers are down, there will be a chunk that cannot be found. // Just continuing should leave jobs `unassigned` with their attempt count @@ -407,9 +416,7 @@ void UserQuerySelect::buildAndSendUberJobs() { // attempt count will reach max and the query will be cancelled auto lambdaMissingChunk = [&](string const& msg) { missingChunks.push_back(chunkId); - //&&&bool const increaseAttemptCount = true; - //&&&jqPtr->getDescription()->incrAttemptCountScrubResultsJson(_executive, increaseAttemptCount); - LOGS(_log, LOG_LVL_ERROR, msg); + LOGS(_log, LOG_LVL_WARN, msg); }; auto iter = chunkMapPtr->find(chunkId); @@ -463,8 +470,8 @@ void UserQuerySelect::buildAndSendUberJobs() { auto ujId = _uberJobIdSeq++; // keep ujId consistent string uberResultName = _ttn->make(ujId); auto respHandler = make_shared(_infileMerger, uberResultName); - auto uJob = qdisp::UberJob::create(_executive, respHandler, _executive->getId(), ujId, - _qMetaCzarId, targetWorker); + auto uJob = qdisp::UberJob::create(exec, respHandler, exec->getId(), ujId, _qMetaCzarId, + targetWorker); uJob->setWorkerContactInfo(wInfUJ->wInf); wInfUJ->uberJobPtr = uJob; }; @@ -473,7 +480,7 @@ void UserQuerySelect::buildAndSendUberJobs() { if (wInfUJ->uberJobPtr->getJobCount() >= _uberJobMaxChunks) { // Queue the UberJob to be sent to a worker - _executive->addAndQueueUberJob(wInfUJ->uberJobPtr); + exec->addAndQueueUberJob(wInfUJ->uberJobPtr); // Clear the pinter so a new UberJob is created later if needed. wInfUJ->uberJobPtr = nullptr; @@ -498,18 +505,23 @@ void UserQuerySelect::buildAndSendUberJobs() { if (winfUjPtr != nullptr) { auto& ujPtr = winfUjPtr->uberJobPtr; if (ujPtr != nullptr) { - _executive->addAndQueueUberJob(ujPtr); + exec->addAndQueueUberJob(ujPtr); } } } - LOGS(_log, LOG_LVL_DEBUG, funcN << " " << _executive->dumpUberJobCounts()); + LOGS(_log, LOG_LVL_DEBUG, funcN << " " << exec->dumpUberJobCounts()); } /// Block until a submit()'ed query completes. /// @return the QueryState indicating success or failure QueryState UserQuerySelect::join() { - bool successful = _executive->join(); // Wait for all data + auto exec = _executive; + if (exec == nullptr) { + LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::join() called with null exec " << getQueryIdString()); + return ERROR; + } + bool successful = exec->join(); // Wait for all data // Since all data are in, run final SQL commands like GROUP BY. size_t collectedBytes = 0; int64_t finalRows = 0; @@ -520,7 +532,7 @@ QueryState UserQuerySelect::join() { _messageStore->addMessage(-1, "MERGE", 1105, "Failure while merging result", MessageSeverity::MSG_ERROR); } - _executive->updateProxyMessages(); + exec->updateProxyMessages(); try { _discardMerger(); @@ -533,7 +545,7 @@ QueryState UserQuerySelect::join() { // Update the permanent message table. _qMetaUpdateMessages(); - int64_t collectedRows = _executive->getTotalResultRows(); + int64_t collectedRows = exec->getTotalResultRows(); // finalRows < 0 indicates there was no postprocessing, so collected rows and final rows should be the // same. if (finalRows < 0) finalRows = collectedRows; @@ -555,7 +567,7 @@ QueryState UserQuerySelect::join() { // Notify workers on the query completion/cancellation to ensure // resources are properly cleaned over there as well. - czar::Czar::getCzar()->getActiveWorkerMap()->addToDoneDeleteFiles(_executive->getId()); + czar::Czar::getCzar()->getActiveWorkerMap()->addToDoneDeleteFiles(exec->getId()); return state; } @@ -577,8 +589,14 @@ void UserQuerySelect::discard() { } } + auto exec = _executive; + if (exec == nullptr) { + LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::discard called with null exec " << getQueryIdString()); + return; + } + // Make sure resources are released. - if (_executive && _executive->getNumInflight() > 0) { + if (exec->getNumInflight() > 0) { throw UserQueryError(getQueryIdString() + " Executive unfinished, cannot discard"); } @@ -777,8 +795,9 @@ void UserQuerySelect::qMetaRegister(std::string const& resultLocation, std::stri throw UserQueryError(getQueryIdString() + _errorExtra); } - if (_executive != nullptr) { - _executive->setQueryId(_qMetaQueryId); + auto exec = _executive; + if (exec != nullptr) { + exec->setQueryId(_qMetaQueryId); } else { LOGS(_log, LOG_LVL_WARN, "No Executive, assuming invalid query"); } diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index 0ec244993..3061b4f7e 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -159,7 +159,7 @@ Czar::Czar(string const& configFilePath, string const& czarName) _idCounter(), _uqFactory(), _clientToQuery(), - _monitorSleepTime (_czarConfig->getMonitorSleepTimeMilliSec()), + _monitorSleepTime(_czarConfig->getMonitorSleepTimeMilliSec()), _activeWorkerMap(new ActiveWorkerMap(_czarConfig)) { // set id counter to milliseconds since the epoch, mod 1 year. struct timeval tv; @@ -402,45 +402,45 @@ void Czar::killQuery(string const& query, string const& clientId) { int threadId; QueryId queryId; if (ccontrol::UserQueryType::isKill(query, threadId)) { - LOGS(_log, LOG_LVL_DEBUG, "thread ID: " << threadId); + LOGS(_log, LOG_LVL_INFO, "KILL thread ID: " << threadId); lock_guard lock(_mutex); // find it in the client map based in client/thread id ClientThreadId ctId(clientId, threadId); auto iter = _clientToQuery.find(ctId); if (iter == _clientToQuery.end()) { - LOGS(_log, LOG_LVL_INFO, "Cannot find client thread id: " << threadId); - throw std::runtime_error("Unknown thread ID: " + query); + LOGS(_log, LOG_LVL_INFO, "KILL Cannot find client thread id: " << threadId); + throw std::runtime_error("KILL Unknown thread ID: " + query); } uq = iter->second.lock(); } else if (ccontrol::UserQueryType::isCancel(query, queryId)) { - LOGS(_log, LOG_LVL_DEBUG, "query ID: " << queryId); + LOGS(_log, LOG_LVL_INFO, "KILL query ID: " << queryId); lock_guard lock(_mutex); // find it in the client map based in client/thread id auto iter = _idToQuery.find(queryId); if (iter == _idToQuery.end()) { - LOGS(_log, LOG_LVL_INFO, "Cannot find query id: " << queryId); - throw std::runtime_error("Unknown or finished query ID: " + query); + LOGS(_log, LOG_LVL_INFO, "KILL Cannot find query id: " << queryId); + throw std::runtime_error("KILL unknown or finished query ID: " + query); } uq = iter->second.lock(); } else { - throw std::runtime_error("Failed to parse query: " + query); + throw std::runtime_error("KILL failed to parse query: " + query); } // assume this cannot fail or throw if (uq) { - LOGS(_log, LOG_LVL_DEBUG, "Killing query: " << uq->getQueryId()); + LOGS(_log, LOG_LVL_INFO, "KILLing query: " << uq->getQueryId()); // query killing can potentially take very long and we do now want to block // proxy from serving other requests so run it in a detached thread thread killThread([uq]() { uq->kill(); - LOGS(_log, LOG_LVL_DEBUG, "Finished killing query: " << uq->getQueryId()); + LOGS(_log, LOG_LVL_INFO, "Finished KILLing query: " << uq->getQueryId()); }); killThread.detach(); } else { - LOGS(_log, LOG_LVL_DEBUG, "Query has expired/finished: " << query); - throw std::runtime_error("Query has already finished: " + query); + LOGS(_log, LOG_LVL_INFO, "KILL query has expired/finished: " << query); + throw std::runtime_error("KILL query has already finished: " + query); } } diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index 82d8fd1e8..6df9936c9 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -333,7 +333,7 @@ bool CzarFamilyMap::_read() { LOGS(_log, LOG_LVL_TRACE, "CzarFamilyMap::_read() start"); // If replacing the map, this may take a bit of time, but it's probably // better to wait for new maps if something changed. - std::lock_guard gLock(_familyMapMtx); // &&& check waiting is really needed + std::lock_guard gLock(_familyMapMtx); // &&& check waiting is really needed qmeta::QMetaChunkMap qChunkMap = _qmeta->getChunkMap(_lastUpdateTime); if (_lastUpdateTime == qChunkMap.updateTime) { LOGS(_log, LOG_LVL_DEBUG, diff --git a/src/czar/HttpCzarWorkerModule.cc b/src/czar/HttpCzarWorkerModule.cc index 266fdbdbe..f0a05388b 100644 --- a/src/czar/HttpCzarWorkerModule.cc +++ b/src/czar/HttpCzarWorkerModule.cc @@ -163,6 +163,7 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) { throw invalid_argument(string("HttpCzarWorkerModule::_handleJobReady No executive for qid=") + to_string(queryId) + " czar=" + to_string(czarId)); } + qdisp::UberJob::Ptr uj = exec->findUberJob(uberJobId); if (uj == nullptr) { throw invalid_argument(string("HttpCzarWorkerModule::_handleJobReady No UberJob for qid=") + @@ -170,6 +171,9 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) { " czar=" + to_string(czarId)); } + uj->setResultFileSize(fileSize); + exec->checkResultFileSize(fileSize); + auto importRes = uj->importResultFile(fileUrl, rowCount, fileSize); jsRet = importRes; diff --git a/src/protojson/UberJobMsg.cc b/src/protojson/UberJobMsg.cc index 65564cdf4..7ac1a89ad 100644 --- a/src/protojson/UberJobMsg.cc +++ b/src/protojson/UberJobMsg.cc @@ -60,8 +60,8 @@ UberJobMsg::UberJobMsg(unsigned int metaVersion, std::string const& replicationI _ujId(ujId), _rowLimit(rowLimit), _maxTableSizeMB(maxTableSizeMB), - _scanInfo(scanInfo_) { - + _scanInfo(scanInfo_), + _idStr("QID=" + to_string(_qId) + "_ujId=" + to_string(_ujId)) { for (auto& jobPtr : jobs) { // This creates the JobMsg objects for all relates jobs and their fragments. auto jobMsg = JobMsg::create(jobPtr, _jobSubQueryTempMap, _jobDbTablesMap); @@ -192,7 +192,7 @@ nlohmann::json JobMsg::serializeJson() const { {"queryFragments", json::array()}}); // These are indexes into _jobDbTablesMap, which is shared between all JobMsg in this UberJobMsg. - // &&& TODO:UJ queries appear to work even when "chunkscantables_indexes" is wrong + // &&& TODO:UJ "chunkscantables_indexes" may be unused. auto& jsqCstIndexes = jsJobMsg["chunkscantables_indexes"]; for (auto const& index : _chunkScanTableIndexes) { jsqCstIndexes.push_back(index); diff --git a/src/protojson/UberJobMsg.h b/src/protojson/UberJobMsg.h index d5f6ade9e..c06a3735d 100644 --- a/src/protojson/UberJobMsg.h +++ b/src/protojson/UberJobMsg.h @@ -222,15 +222,16 @@ class JobMsg { JobId _jobId; int _attemptCount; - std::string _chunkQuerySpecDb; - int _scanRating; - bool _scanInteractive; + std::string _chunkQuerySpecDb; // &&& remove, use value for UJ + int _scanRating; // &&& remove, use value for UJ + bool _scanInteractive; // &&& remove, use value for UJ int _chunkId; JobFragment::VectPtr _jobFragments{new JobFragment::Vect()}; JobSubQueryTempMap::Ptr _jobSubQueryTempMap; ///< Map of all query templates related to this UberJob. JobDbTablesMap::Ptr _jobDbTablesMap; ///< Map of all db.tables related to this UberJob. + // &&& remove, use value for UJ std::vector _chunkScanTableIndexes; ///< list of indexes into _jobDbTablesMap. }; @@ -277,6 +278,8 @@ class UberJobMsg : public std::enable_shared_from_this { ScanInfo::Ptr getScanInfo() const { return _scanInfo; } + std::string const& getIdStr() const { return _idStr; } + private: UberJobMsg(unsigned int metaVersion, std::string const& replicationInstanceId, std::string const& replicationAuthKey, CzarContactInfo::Ptr const& czInfo, @@ -304,6 +307,8 @@ class UberJobMsg : public std::enable_shared_from_this { JobMsg::VectPtr _jobMsgVect{new JobMsg::Vect()}; ScanInfo::Ptr _scanInfo{ScanInfo::create()}; ///< &&& doc + + std::string const _idStr; }; } // namespace lsst::qserv::protojson diff --git a/src/qana/QueryMapping.h b/src/qana/QueryMapping.h index 585971f97..2e8dca319 100644 --- a/src/qana/QueryMapping.h +++ b/src/qana/QueryMapping.h @@ -92,8 +92,6 @@ class QueryMapping { bool hasParameter(Parameter p) const; DbTableSet const& getSubChunkTables() const { return _subChunkTables; } - std::string dump() const { return std::string("&&& NEED CODE"); } - private: ParameterMap _subs; DbTableSet _subChunkTables; diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index 983a0bf94..41754bc00 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -235,27 +235,11 @@ void Executive::queueFileCollect(util::PriorityCommand::Ptr const& cmd) { } } -/* &&& -void Executive::queueUberJob(std::shared_ptr const& uberJob) { - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&&uj queueUberJob"); - auto runUberJobFunc = [uberJob](util::CmdData*) { uberJob->runUberJob(); }; - - auto cmd = util::PriorityCommand::Ptr(new util::PriorityCommand(runUberJobFunc)); - _jobStartCmdList.push_back(cmd); - if (_scanInteractive) { - _qdispPool->queCmd(cmd, 0); - } else { - _qdispPool->queCmd(cmd, 1); - } -} -*/ - void Executive::addAndQueueUberJob(shared_ptr const& uj) { { lock_guard lck(_uberJobsMapMtx); - UberJobId ujId = uj->getJobId(); + UberJobId ujId = uj->getUjId(); _uberJobsMap[ujId] = uj; - //&&&uj->setAdded(); LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " ujId=" << ujId << " uj.sz=" << uj->getJobCount()); } @@ -427,7 +411,8 @@ void Executive::markCompleted(JobId jobId, bool success) { } _unTrack(jobId); if (!success && !isRowLimitComplete()) { - LOGS(_log, LOG_LVL_ERROR, + auto logLvl = (_cancelled) ? LOG_LVL_ERROR : LOG_LVL_TRACE; + LOGS(_log, logLvl, "Executive: requesting squash, cause: " << " failed (code=" << err.getCode() << " " << err.getMsg() << ")"); squash(); // ask to squash @@ -758,6 +743,39 @@ void Executive::checkLimitRowComplete() { _squashSuperfluous(); } +void Executive::checkResultFileSize(uint64_t fileSize) { + _totalResultFileSize += fileSize; + if (_cancelled) return; + + size_t const MB_SIZE_BYTES = 1024 * 1024; + uint64_t maxResultTableSizeBytes = cconfig::CzarConfig::instance()->getMaxTableSizeMB() * MB_SIZE_BYTES; + LOGS(_log, LOG_LVL_TRACE, + cName(__func__) << " sz=" << fileSize << " total=" << _totalResultFileSize + << " max=" << maxResultTableSizeBytes); + if (_totalResultFileSize > maxResultTableSizeBytes) { + LOGS(_log, LOG_LVL_WARN, + cName(__func__) << " total=" << _totalResultFileSize << " max=" << maxResultTableSizeBytes); + // _totalResultFileSize may include non zero values from dead UberJobs, + // so recalculate it to verify. + uint64_t total = 0; + { + lock_guard lck(_uberJobsMapMtx); + for (auto const& [ujId, ujPtr] : _uberJobsMap) { + total += ujPtr->getResultFileSize(); + } + _totalResultFileSize = total; + } + LOGS(_log, LOG_LVL_WARN, + cName(__func__) << "recheck total=" << total << " max=" << maxResultTableSizeBytes); + if (total > maxResultTableSizeBytes) { + LOGS(_log, LOG_LVL_ERROR, "Executive: requesting squash, result file size too large " << total); + ResponseHandler::Error err(0, string("Incomplete result already too large ") + to_string(total)); + _multiError.push_back(err); + squash(); + } + } +} + ostream& operator<<(ostream& os, Executive::JobMap::value_type const& v) { auto const& status = v.second->getStatus(); os << v.first << ": " << *status; diff --git a/src/qdisp/Executive.h b/src/qdisp/Executive.h index c2cef1a34..e72216474 100644 --- a/src/qdisp/Executive.h +++ b/src/qdisp/Executive.h @@ -232,6 +232,13 @@ class Executive : public std::enable_shared_from_this { /// Return a pointer to _scanInfo. protojson::ScanInfo::Ptr getScanInfo() { return _scanInfo; } + /// Add fileSize to `_totalResultFileSize` and check if it exceeds limits. + /// If it is too large, check the value against existing UberJob result + /// sizes as `_totalResultFileSize` may include failed UberJobs. + /// If the sum of all UberJob result files size is too large, + /// cancel this user query. + void checkResultFileSize(uint64_t fileSize = 0); + protected: Executive(ExecutiveConfig const& cfg, std::shared_ptr const& ms, std::shared_ptr const& sharedResources, @@ -343,6 +350,8 @@ class Executive : public std::enable_shared_from_this { std::atomic _readyToExecute{false}; protojson::ScanInfo::Ptr _scanInfo; ///< Scan rating and tables. + + std::atomic _totalResultFileSize{0}; ///< Total size of all UberJob result files. }; } // namespace qdisp diff --git a/src/qdisp/JobDescription.h b/src/qdisp/JobDescription.h index 75ca4a33b..9ad0ffe62 100644 --- a/src/qdisp/JobDescription.h +++ b/src/qdisp/JobDescription.h @@ -63,8 +63,8 @@ class JobDescription { std::shared_ptr const& respHandler, std::shared_ptr const& chunkQuerySpec, std::string const& chunkResultName, bool mock = false) { - JobDescription::Ptr jd(new JobDescription(czarId, qId, jobId, resource, respHandler, - chunkQuerySpec, chunkResultName, mock)); + JobDescription::Ptr jd(new JobDescription(czarId, qId, jobId, resource, respHandler, chunkQuerySpec, + chunkResultName, mock)); return jd; } diff --git a/src/qdisp/UberJob.cc b/src/qdisp/UberJob.cc index 10f535ff1..07ccd6875 100644 --- a/src/qdisp/UberJob.cc +++ b/src/qdisp/UberJob.cc @@ -77,8 +77,10 @@ UberJob::UberJob(Executive::Ptr const& executive, std::shared_ptrsetUberJobId(getJobId())) { + if (job->setUberJobId(getUjId())) { lock_guard lck(_jobsMtx); _jobs.push_back(job); success = true; @@ -167,10 +169,9 @@ void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelle bool transmitSuccess = false; string exceptionWhat; try { - //&&&util::InstanceCount ic{"runUberJob&&&"}; - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj d"); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj sending"); json const response = client.readAsJson(); - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj d1"); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj worker recv"); if (0 != response.at("success").get()) { transmitSuccess = true; } else { @@ -206,6 +207,7 @@ void UberJob::prepScrubResults() { } void UberJob::_unassignJobs() { + LOGS(_log, LOG_LVL_INFO, cName(__func__)); lock_guard lck(_jobsMtx); auto exec = _executive.lock(); if (exec == nullptr) { @@ -214,7 +216,7 @@ void UberJob::_unassignJobs() { } for (auto&& job : _jobs) { string jid = job->getIdStr(); - if (!job->unassignFromUberJob(getJobId())) { + if (!job->unassignFromUberJob(getUjId())) { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " could not unassign job=" << jid << " cancelling"); exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "unable to re-assign " + jid, util::ErrorCode::INTERNAL); @@ -265,6 +267,7 @@ bool UberJob::_setStatusIfOk(qmeta::JobStatus::State newState, string const& msg void UberJob::callMarkCompleteFunc(bool success) { LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " success=" << success); + LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& success=" << success); lock_guard lck(_jobsMtx); // Need to set this uberJob's status, however exec->markCompleted will set @@ -287,11 +290,16 @@ void UberJob::callMarkCompleteFunc(bool success) { _jobs.clear(); } +util::HistogramRolling histoQueImp("&&&uj histoQueImp", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); + /// Retrieve and process a result file using the file-based protocol /// Uses a copy of JobQuery::Ptr instead of _jobQuery as a call to cancel() would reset _jobQuery. json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_t fileSize) { LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " fileUrl=" << fileUrl << " rowCount=" << rowCount << " fileSize=" << fileSize); + LOGS(_log, LOG_LVL_WARN, + cName(__func__) << "&&& fileUrl=" << fileUrl << " rowCount=" << rowCount + << " fileSize=" << fileSize); if (isQueryCancelled()) { LOGS(_log, LOG_LVL_WARN, cName(__func__) << " import job was cancelled."); @@ -313,7 +321,7 @@ json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_ return _importResultError(false, "rowLimited", "Enough rows already"); } - LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " fileSize=" << fileSize); + LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " fileSize=" << fileSize); bool const statusSet = setStatusIfOk(qmeta::JobStatus::RESPONSE_READY, getIdStr() + " " + fileUrl); if (!statusSet) { @@ -322,10 +330,15 @@ json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_ } weak_ptr ujThis = weak_from_this(); - // TODO:UJ lambda may not be the best way to do this, alsocheck synchronization - may need a mutex for - // merging. + auto startQImp = CLOCK::now(); // &&& + + // fileCollectFunc will be put on the queue to run later. string const idStr = _idStr; - auto fileCollectFunc = [ujThis, fileUrl, rowCount, idStr](util::CmdData*) { + auto fileCollectFunc = [ujThis, fileUrl, rowCount, idStr, startQImp](util::CmdData*) { + auto endQImp = CLOCK::now(); //&&& + std::chrono::duration secsQImp = endQImp - startQImp; // &&& + histoQueImp.addEntry(endQImp, secsQImp.count()); //&&& + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoQueImp.getString("")); auto ujPtr = ujThis.lock(); if (ujPtr == nullptr) { LOGS(_log, LOG_LVL_DEBUG, @@ -435,6 +448,7 @@ json UberJob::_importResultError(bool shouldCancel, string const& errorType, str void UberJob::_importResultFinish(uint64_t resultRows) { LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " start"); + LOGS(_log, LOG_LVL_INFO, cName(__func__) << " &&& start"); auto exec = _executive.lock(); if (exec == nullptr) { diff --git a/src/qdisp/UberJob.h b/src/qdisp/UberJob.h index ce719d50d..c1ead8b24 100644 --- a/src/qdisp/UberJob.h +++ b/src/qdisp/UberJob.h @@ -71,9 +71,7 @@ class UberJob : public std::enable_shared_from_this { void killUberJob(); QueryId getQueryId() const { return _queryId; } - UberJobId getJobId() const { - return _uberJobId; - } // &&& TODO:UJ change name when JobBase no longer needed. + UberJobId getUjId() const { return _uberJobId; } std::string const& getIdStr() const { return _idStr; } std::shared_ptr getRespHandler() { return _respHandler; } std::shared_ptr getStatus() { return _jobStatus; } @@ -107,12 +105,15 @@ class UberJob : public std::enable_shared_from_this { /// Get the data for the worker that should handle this UberJob. czar::CzarChunkMap::WorkerChunksData::Ptr getWorkerData() { return _workerData; } - /// Collect and merge the results from the worker. + /// Queue the lambda function to collect and merge the results from the worker. nlohmann::json importResultFile(std::string const& fileUrl, uint64_t rowCount, uint64_t fileSize); /// Handle an error from the worker. nlohmann::json workerError(int errorCode, std::string const& errorMsg); + void setResultFileSize(uint64_t fileSize) { _resultFileSize = fileSize; } + uint64_t getResultFileSize() { return _resultFileSize; } + std::ostream& dumpOS(std::ostream& os) const; std::string dump() const; friend std::ostream& operator<<(std::ostream& os, UberJob const& uj); @@ -160,6 +161,7 @@ class UberJob : public std::enable_shared_from_this { UberJobId const _uberJobId; qmeta::CzarId const _czarId; int const _rowLimit; + uint64_t _resultFileSize = 0; std::string const _idStr; diff --git a/src/qdisp/testQDisp.cc b/src/qdisp/testQDisp.cc index bda0a020f..59299d1c5 100644 --- a/src/qdisp/testQDisp.cc +++ b/src/qdisp/testQDisp.cc @@ -182,8 +182,8 @@ qdisp::JobDescription::Ptr makeMockJobDescription(qdisp::Executive::Ptr const& e auto cqs = std::make_shared(); // dummy, unused in this case. std::string chunkResultName = "dummyResultTableName"; qmeta::CzarId const czarId = 1; - auto job = qdisp::JobDescription::create(czarId, ex->getId(), sequence, ru, mHandler, - cqs, chunkResultName, true); + auto job = qdisp::JobDescription::create(czarId, ex->getId(), sequence, ru, mHandler, cqs, + chunkResultName, true); return job; } diff --git a/src/qproc/ChunkQuerySpec.h b/src/qproc/ChunkQuerySpec.h index 41582368f..d7ad75984 100644 --- a/src/qproc/ChunkQuerySpec.h +++ b/src/qproc/ChunkQuerySpec.h @@ -67,8 +67,7 @@ class ChunkQuerySpec { bool scanInteractive{false}; DbTableSet subChunkTables; std::vector subChunkIds; - std::vector queries; // &&& remove if possible - std::vector queryTemplates; + std::vector queries; // Consider promoting the concept of container of ChunkQuerySpec // in the hopes of increased code cleanliness. std::shared_ptr nextFragment; ///< ad-hoc linked list (consider removal) diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index bf0f88d7c..b192f6c0f 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -332,8 +332,13 @@ bool InfileMerger::merge(proto::ResponseSummary const& responseSummary, return ret; } +uint32_t histLimitCount = 0; +util::HistogramRolling histoInfileBuild("&&&uj histoInfileBuild", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); +util::HistogramRolling histoMergeSecs("&&&uj histoMergeSecs", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); +util::HistogramRolling histoMergeSzB("&&&uj histoMergeSzB", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); + bool InfileMerger::mergeHttp(qdisp::UberJob::Ptr const& uberJob, proto::ResponseData const& responseData) { - UberJobId const uJobId = uberJob->getJobId(); + UberJobId const uJobId = uberJob->getUjId(); std::string queryIdJobStr = uberJob->getIdStr(); if (!_queryIdStrSet) { _setQueryIdStr(QueryIdHelper::makeIdStr(uberJob->getQueryId())); @@ -372,13 +377,17 @@ bool InfileMerger::mergeHttp(qdisp::UberJob::Ptr const& uberJob, proto::Response // Add columns to rows in virtFile. util::Timer virtFileT; virtFileT.start(); + auto startInfileBuild = CLOCK::now(); //&&& // UberJobs only get one attempt - int resultJobId = makeJobIdAttempt(uberJob->getJobId(), 0); + int resultJobId = makeJobIdAttempt(uberJob->getUjId(), 0); ProtoRowBuffer::Ptr pRowBuffer = std::make_shared( responseData, resultJobId, _jobIdColName, _jobIdSqlType, _jobIdMysqlType); std::string const virtFile = _infileMgr.prepareSrc(pRowBuffer); std::string const infileStatement = sql::formLoadInfile(_mergeTable, virtFile); virtFileT.stop(); + auto endInfileBuild = CLOCK::now(); //&&& + std::chrono::duration secsInfileBuild = endInfileBuild - startInfileBuild; // &&& + histoInfileBuild.addEntry(endInfileBuild, secsInfileBuild.count()); //&&& // If the job attempt is invalid, exit without adding rows. // It will wait here if rows need to be deleted. @@ -416,7 +425,8 @@ bool InfileMerger::mergeHttp(qdisp::UberJob::Ptr const& uberJob, proto::Response return true; } - auto start = std::chrono::system_clock::now(); + //&&&auto start = std::chrono::system_clock::now(); + auto start = CLOCK::now(); switch (_dbEngine) { case MYISAM: ret = _applyMysqlMyIsam(infileStatement, resultSize); @@ -428,11 +438,20 @@ bool InfileMerger::mergeHttp(qdisp::UberJob::Ptr const& uberJob, proto::Response default: throw std::invalid_argument("InfileMerger::_dbEngine is unknown =" + engineToStr(_dbEngine)); } - auto end = std::chrono::system_clock::now(); + auto end = CLOCK::now(); auto mergeDur = std::chrono::duration_cast(end - start); LOGS(_log, LOG_LVL_DEBUG, "mergeDur=" << mergeDur.count() << " sema(total=" << _semaMgrConn->getTotalCount() << " used=" << _semaMgrConn->getUsedCount() << ")"); + std::chrono::duration secs = end - start; // &&& + histoMergeSecs.addEntry(end, secs.count()); //&&& + histoMergeSzB.addEntry(end, resultSize); // &&& + if ((++histLimitCount) % 1000 == 0) { + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoInfileBuild.getString("")); + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoMergeSecs.getString("")); + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoMergeSzB.getString("")); + } + if (not ret) { LOGS(_log, LOG_LVL_ERROR, "InfileMerger::merge mysql applyMysql failure"); } diff --git a/src/rproc/InfileMerger.h b/src/rproc/InfileMerger.h index d8e472c54..3091246ca 100644 --- a/src/rproc/InfileMerger.h +++ b/src/rproc/InfileMerger.h @@ -165,6 +165,7 @@ class InfileMerger { /// Merge a worker response, which contains a single ResponseData message /// Using job query info for early termination of the merge if needed. /// @return true if merge was successfully imported. + // &&& delete bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData, std::shared_ptr const& jq); diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index 030163d60..f51052a1b 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -455,12 +455,15 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptrcName(__func__) << " sending start"); //&&& TRACE if (!_sendResponse(tMtxLockA, task, cancelled, multiErr, rowLimitComplete)) { LOGS(_log, LOG_LVL_ERROR, "Could not transmit the request completion message to Czar."); erred = true; break; } - LOGS(_log, LOG_LVL_TRACE, __func__ << " " << task->getIdStr() << " sending done!!!"); + LOGS(_log, LOG_LVL_WARN, + "FileChannelShared " << task->cName(__func__) << " sending done!!!"); //&&& TRACE } } transmitT.stop(); diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index ca2dbf8e7..33b24f39e 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -144,7 +144,9 @@ Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chun _scanInteractive(scanInteractive), _queryStats(queryStats_), _maxTableSize(maxTableSize * ::MB_SIZE_BYTES), - _rowLimit(ujData->getRowLimit()) { + _rowLimit(ujData->getRowLimit()), + _ujData(ujData), + _idStr(ujData->getIdStr() + " jId=" + to_string(_jId) + " sc=" + to_string(_subchunkId)) { // These attributes will be passed back to Czar in the Protobuf response // to advice which result delivery channel to use. auto const workerConfig = wconfig::WorkerConfig::instance(); @@ -191,118 +193,13 @@ Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chun } _dbTblsAndSubchunks = make_unique(dbTbls_, subchunksVect_); + LOGS(_log, LOG_LVL_INFO, cName(__func__) << " created"); //&&& } Task::~Task() {} -/* &&& -std::vector Task::createTasksForChunk( - std::shared_ptr const& ujData, nlohmann::json const& jsJobs, - std::shared_ptr const& sendChannel, - protojson::ScanInfo::Ptr const& scanInfo, bool scanInteractive, int maxTableSizeMb, - std::shared_ptr const& chunkResourceMgr, mysql::MySqlConfig const& mySqlConfig, - std::shared_ptr const& sqlConnMgr, - std::shared_ptr const& queriesAndChunks, uint16_t resultsHttpPort) { - QueryId qId = ujData->getQueryId(); - UberJobId ujId = ujData->getUberJobId(); - CzarIdType czId = ujData->getCzarId(); - - wpublish::QueryStatistics::Ptr queryStats = queriesAndChunks->addQueryId(qId, czId); - UserQueryInfo::Ptr userQueryInfo = queryStats->getUserQueryInfo(); - - string funcN(__func__); - funcN += " QID=" + to_string(qId) + " "; - - vector vect; - for (auto const& job : jsJobs) { - json const& jsJobDesc = job["jobdesc"]; - http::RequestBodyJSON rbJobDesc(jsJobDesc); - // See qproc::TaskMsgFactory::makeMsgJson for message construction. - auto const jdCzarId = rbJobDesc.required("czarId"); - auto const jdQueryId = rbJobDesc.required("queryId"); - if (jdQueryId != qId) { - throw TaskException(ERR_LOC, string("ujId=") + to_string(ujId) + " qId=" + to_string(qId) + - " QueryId mismatch Job qId=" + to_string(jdQueryId)); - } - auto const jdJobId = rbJobDesc.required("jobId"); - auto const jdAttemptCount = rbJobDesc.required("attemptCount"); - auto const jdQuerySpecDb = rbJobDesc.required("querySpecDb"); - auto const jdScanPriority = rbJobDesc.required("scanPriority"); - auto const jdScanInteractive = rbJobDesc.required("scanInteractive"); - auto const jdMaxTableSizeMb = rbJobDesc.required("maxTableSize"); - auto const jdChunkId = rbJobDesc.required("chunkId"); - LOGS(_log, LOG_LVL_TRACE, - funcN << " jd cid=" << jdCzarId << " jdQId=" << jdQueryId << " jdJobId=" << jdJobId - << " jdAtt=" << jdAttemptCount << " jdQDb=" << jdQuerySpecDb - << " jdScanPri=" << jdScanPriority << " interactive=" << jdScanInteractive - << " maxTblSz=" << jdMaxTableSizeMb << " chunkId=" << jdChunkId); - - auto const jdQueryFragments = rbJobDesc.required("queryFragments"); - int fragmentNumber = 0; - for (auto const& frag : jdQueryFragments) { - vector fragSubQueries; - vector fragSubchunkIds; - vector fragSubTables; - LOGS(_log, LOG_LVL_DEBUG, funcN << " frag=" << frag); - http::RequestBodyJSON rbFrag(frag); - auto const& jsQueries = rbFrag.required("queries"); - // TODO:UJ move to uberjob???, these should be the same for all jobs - for (auto const& subQ : jsQueries) { - http::RequestBodyJSON rbSubQ(subQ); - auto const subQuery = rbSubQ.required("subQuery"); - LOGS(_log, LOG_LVL_DEBUG, funcN << " subQuery=" << subQuery); - fragSubQueries.push_back(subQuery); - } - auto const& resultTable = rbFrag.required("resultTable"); - auto const& jsSubIds = rbFrag.required("subchunkIds"); - for (auto const& scId : jsSubIds) { - fragSubchunkIds.push_back(scId); - } - auto const& jsSubTables = rbFrag.required("subchunkTables"); - - for (auto const& scDbTable : jsSubTables) { // TODO:UJ are these the same for all jobs? - http::RequestBodyJSON rbScDbTable(scDbTable); - string scDb = rbScDbTable.required("scDb"); - string scTable = rbScDbTable.required("scTable"); - TaskDbTbl scDbTbl(scDb, scTable); - fragSubTables.push_back(scDbTbl); - } - - for (string const& fragSubQ : fragSubQueries) { - size_t templateId = userQueryInfo->addTemplate(fragSubQ); - if (fragSubchunkIds.empty()) { - bool const noSubchunks = false; - int const subchunkId = -1; - auto task = Task::Ptr(new Task( - ujData, jdJobId, jdAttemptCount, jdChunkId, fragmentNumber, templateId, - noSubchunks, subchunkId, jdQuerySpecDb, scanInfo, scanInteractive, maxTableSizeMb, - fragSubTables, fragSubchunkIds, sendChannel, queryStats, resultsHttpPort)); - - vect.push_back(task); - } else { - for (auto subchunkId : fragSubchunkIds) { - bool const hasSubchunks = true; - auto task = Task::Ptr(new Task(ujData, jdJobId, jdAttemptCount, jdChunkId, - fragmentNumber, templateId, hasSubchunks, subchunkId, - jdQuerySpecDb, scanInfo, scanInteractive, - maxTableSizeMb, fragSubTables, fragSubchunkIds, - sendChannel, queryStats, resultsHttpPort)); - vect.push_back(task); - } - } - } - ++fragmentNumber; - } - } - - for (auto taskPtr : vect) { - // newQueryRunner sets the `_taskQueryRunner` pointer in `task`. - taskPtr->setTaskQueryRunner(wdb::QueryRunner::newQueryRunner(taskPtr, chunkResourceMgr, mySqlConfig, - sqlConnMgr, queriesAndChunks)); - } - return vect; -} -*/ +util::HistogramRolling histoBuildTasks("&&&uj histoBuildTasks", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); +util::HistogramRolling histoTaskCount("&&&uj histoTasksCount", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); std::vector Task::createTasksFromUberJobMsg( std::shared_ptr const& ujMsg, std::shared_ptr const& ujData, @@ -314,6 +211,7 @@ std::vector Task::createTasksFromUberJobMsg( UberJobId ujId = ujData->getUberJobId(); CzarIdType czId = ujData->getCzarId(); + auto startBuildTasks = CLOCK::now(); vector vect; // List of created tasks to be returned. wpublish::QueryStatistics::Ptr queryStats = queriesAndChunks->addQueryId(qId, czId); @@ -403,6 +301,14 @@ std::vector Task::createTasksFromUberJobMsg( taskPtr->setTaskQueryRunner(wdb::QueryRunner::newQueryRunner(taskPtr, chunkResourceMgr, mySqlConfig, sqlConnMgr, queriesAndChunks)); } + + auto endBuildTasks = CLOCK::now(); //&&& + std::chrono::duration secsBuildTasks = endBuildTasks - startBuildTasks; // &&& + histoBuildTasks.addEntry(endBuildTasks, secsBuildTasks.count()); //&&& + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoBuildTasks.getString("")); + histoTaskCount.addEntry(endBuildTasks, vect.size()); //&&& + LOGS(_log, LOG_LVL_INFO, "&&&uj histo " << histoTaskCount.getString("")); + return vect; } diff --git a/src/wbase/Task.h b/src/wbase/Task.h index 0711cfe9f..9f9d30b88 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -149,7 +149,7 @@ class Task : public util::CommandForThreadPool { bool operator()(Ptr const& x, Ptr const& y); }; - std::string cName(const char* func) const { return std::string("Task::") + func; } + std::string cName(const char* func) const { return std::string("Task::") + func + " " + _idStr; } // TODO:UJ too many parameters. // - fragmentNumber seems pointless @@ -168,18 +168,6 @@ class Task : public util::CommandForThreadPool { Task(const Task&) = delete; virtual ~Task(); -/* &&& - /// Read json to generate a vector of one or more task for a chunk. - static std::vector createTasksForChunk( /// &&& delete - std::shared_ptr const& ujData, nlohmann::json const& jsJobs, - std::shared_ptr const& sendChannel, - protojson::ScanInfo::Ptr const& scanInfo, bool scanInteractive, int maxTableSizeMb, - std::shared_ptr const& chunkResourceMgr, - mysql::MySqlConfig const& mySqlConfig, std::shared_ptr const& sqlConnMgr, - std::shared_ptr const& queriesAndChunks, - uint16_t resultsHttpPort = 8080); -*/ - /// &&& static std::vector createTasksFromUberJobMsg( std::shared_ptr const& uberJobMsg, @@ -195,12 +183,7 @@ class Task : public util::CommandForThreadPool { std::shared_ptr const& ujData, nlohmann::json const& jsJobs, std::shared_ptr const& sendChannel, protojson::ScanInfo::Ptr const& scanInfo, bool scanInteractive, int maxTableSizeMb, - std::shared_ptr const& chunkResourceMgr - //&&&mysql::MySqlConfig const& mySqlConfig, std::shared_ptr const& - // sqlConnMgr, - //&&&std::shared_ptr const& queriesAndChunks, - //&&&uint16_t resultsHttpPort = 8080); - ); + std::shared_ptr const& chunkResourceMgr); std::shared_ptr getSendChannel() const { return _sendChannel; } void resetSendChannel() { _sendChannel.reset(); } ///< reset the shared pointer for FileChannelShared @@ -408,6 +391,9 @@ class Task : public util::CommandForThreadPool { /// When > 0, indicates maximum number of rows needed for a result. int const _rowLimit; + std::shared_ptr _ujData; + std::string const _idStr; + bool _unitTest = false; ///< }; diff --git a/src/wbase/UberJobData.cc b/src/wbase/UberJobData.cc index a70793f2a..b782e645a 100644 --- a/src/wbase/UberJobData.cc +++ b/src/wbase/UberJobData.cc @@ -68,7 +68,7 @@ UberJobData::UberJobData(UberJobId uberJobId, std::string const& czarName, qmeta _workerId(workerId), _authKey(authKey), _foreman(foreman), - _idStr(string("QID=") + to_string(_queryId) + ":ujId=" + to_string(_uberJobId)) {} + _idStr(string("QID=") + to_string(_queryId) + "_ujId=" + to_string(_uberJobId)) {} void UberJobData::setFileChannelShared(std::shared_ptr const& fileChannelShared) { if (_fileChannelShared != nullptr && _fileChannelShared != fileChannelShared) { @@ -79,7 +79,8 @@ void UberJobData::setFileChannelShared(std::shared_ptr const& void UberJobData::responseFileReady(string const& httpFileUrl, uint64_t rowCount, uint64_t fileSize, uint64_t headerCount) { - LOGS(_log, LOG_LVL_TRACE, + //&&&LOGS(_log, LOG_LVL_TRACE, + LOGS(_log, LOG_LVL_INFO, cName(__func__) << " httpFileUrl=" << httpFileUrl << " rows=" << rowCount << " fSize=" << fileSize << " headerCount=" << headerCount); @@ -152,6 +153,7 @@ bool UberJobData::responseError(util::MultiError& multiErr, std::shared_ptr const& headers_, std::string const& url_, std::string const& requestContext_, std::string const& requestStr_) { + LOGS(_log, LOG_LVL_INFO, cName(__func__)); // &&& util::QdispPool::Ptr wPool; if (_foreman != nullptr) { wPool = _foreman->getWPool(); @@ -183,11 +185,12 @@ void UberJobData::cancelAllTasks() { string UJTransmitCmd::cName(const char* funcN) const { stringstream os; - os << "UJTransmitCmd::" << funcN << " czId=" << _czarId << " qId=" << _queryId << " ujId=" << _uberJobId; + os << "UJTransmitCmd::" << funcN << " czId=" << _czarId << " QID=" << _queryId << "_ujId=" << _uberJobId; return os.str(); } void UJTransmitCmd::action(util::CmdData* data) { + LOGS(_log, LOG_LVL_INFO, cName(__func__)); //&&& // Make certain _selfPtr is reset before leaving this function. // If a retry is needed, duplicate() is called. class ResetSelf { @@ -218,6 +221,7 @@ void UJTransmitCmd::action(util::CmdData* data) { } catch (exception const& ex) { LOGS(_log, LOG_LVL_WARN, cName(__func__) + " " + _requestContext + " failed, ex: " + ex.what()); } + LOGS(_log, LOG_LVL_INFO, cName(__func__) << " &&& transmit finished"); if (!transmitSuccess) { auto sPtr = _selfPtr; @@ -256,8 +260,8 @@ void UJTransmitCmd::action(util::CmdData* data) { } void UJTransmitCmd::kill() { - string const funcN("UJTransmitCmd::kill"); - LOGS(_log, LOG_LVL_WARN, funcN); + //&&&string const funcN("UJTransmitCmd::kill"); + LOGS(_log, LOG_LVL_WARN, cName(__func__)); auto sPtr = _selfPtr; _selfPtr.reset(); if (sPtr == nullptr) { @@ -266,6 +270,7 @@ void UJTransmitCmd::kill() { } UJTransmitCmd::Ptr UJTransmitCmd::duplicate() { + LOGS(_log, LOG_LVL_INFO, cName(__func__)); //&&& auto ujD = _ujData.lock(); if (ujD == nullptr) { return nullptr; diff --git a/src/wbase/UberJobData.h b/src/wbase/UberJobData.h index d4765fbbe..2634e0325 100644 --- a/src/wbase/UberJobData.h +++ b/src/wbase/UberJobData.h @@ -96,7 +96,7 @@ class UberJobData : public std::enable_shared_from_this { /// Let the Czar know there's been a problem. bool responseError(util::MultiError& multiErr, std::shared_ptr const& task, bool cancelled); - std::string getIdStr() const { return _idStr; } + std::string const& getIdStr() const { return _idStr; } std::string cName(std::string const& funcName) { return "UberJobData::" + funcName + " " + getIdStr(); } bool getCancelled() const { return _cancelled; } diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index 8fdd5194c..5774de042 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -136,8 +136,9 @@ util::TimerHistogram memWaitHisto("memWait Hist", {1, 5, 10, 20, 40}); bool QueryRunner::runQuery() { util::HoldTrack::Mark runQueryMarkA(ERR_LOC, "runQuery " + to_string(_task->getQueryId())); QSERV_LOGCONTEXT_QUERY_JOB(_task->getQueryId(), _task->getJobId()); - LOGS(_log, LOG_LVL_TRACE, - __func__ << " tid=" << _task->getIdStr() << " scsId=" << _task->getSendChannel()->getScsId()); + LOGS(_log, LOG_LVL_WARN, + "QueryRunner " << _task->cName(__func__) //&&& TRACE + << " scsId=" << _task->getSendChannel()->getScsId()); // Start tracking the task. auto now = chrono::system_clock::now(); @@ -261,7 +262,9 @@ bool QueryRunner::_dispatchChannel() { util::Timer primeT; primeT.start(); _task->queryExecutionStarted(); + LOGS(_log, LOG_LVL_WARN, "QueryRunner " << _task->cName(__func__) << " sql start"); //&&& TRACE MYSQL_RES* res = _primeResult(query); // This runs the SQL query, throws SqlErrorObj on failure. + LOGS(_log, LOG_LVL_WARN, "QueryRunner " << _task->cName(__func__) << " sql end"); //&&& TRACE primeT.stop(); needToFreeRes = true; if (taskSched != nullptr) { diff --git a/src/wdb/QueryRunner.h b/src/wdb/QueryRunner.h index a881075f0..639a8f569 100644 --- a/src/wdb/QueryRunner.h +++ b/src/wdb/QueryRunner.h @@ -55,7 +55,8 @@ class QueriesAndChunks; namespace lsst::qserv::wdb { -/// On the worker, run a query related to a Task, writing the results to a table or supplied SendChannel. +/// On the worker, run a query related to a Task, hold the resources needed to run the query, +/// and write the results to the supplied SendChannel. /// class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_from_this { public: diff --git a/src/wdb/testQueryRunner.cc b/src/wdb/testQueryRunner.cc index c67acf74a..de9aebaab 100644 --- a/src/wdb/testQueryRunner.cc +++ b/src/wdb/testQueryRunner.cc @@ -167,9 +167,8 @@ BOOST_AUTO_TEST_CASE(Simple) { auto scanInfo = lsst::qserv::protojson::ScanInfo::create(); scanInfo->scanRating = mInfo.scanRating; scanInfo->infoTables.emplace_back(mInfo.db, mInfo.table, mInfo.lockInMemory, mInfo.scanRating); - vector taskVect = - Task::createTasksForUnitTest(ujData, *msgJson, sChannel, scanInfo, mInfo.scanInteractive, - mInfo.maxTableSize, crm); + vector taskVect = Task::createTasksForUnitTest(ujData, *msgJson, sChannel, scanInfo, + mInfo.scanInteractive, mInfo.maxTableSize, crm); Task::Ptr task = taskVect[0]; QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr, queries)); @@ -193,9 +192,8 @@ BOOST_AUTO_TEST_CASE(Output) { auto scanInfo = lsst::qserv::protojson::ScanInfo::create(); scanInfo->scanRating = mInfo.scanRating; scanInfo->infoTables.emplace_back(mInfo.db, mInfo.table, mInfo.lockInMemory, mInfo.scanRating); - vector taskVect = - Task::createTasksForUnitTest(ujData, *msgJson, sc, scanInfo, mInfo.scanInteractive, - mInfo.maxTableSize, crm); + vector taskVect = Task::createTasksForUnitTest(ujData, *msgJson, sc, scanInfo, + mInfo.scanInteractive, mInfo.maxTableSize, crm); Task::Ptr task = taskVect[0]; QueryRunner::Ptr a(QueryRunner::newQueryRunner(task, crm, newMySqlConfig(), sqlConnMgr, queries)); diff --git a/src/xrdsvc/HttpWorkerCzarModule.cc b/src/xrdsvc/HttpWorkerCzarModule.cc index 8a4aa910b..a672b740a 100644 --- a/src/xrdsvc/HttpWorkerCzarModule.cc +++ b/src/xrdsvc/HttpWorkerCzarModule.cc @@ -112,6 +112,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { try { auto const& jsReq = body().objJson; auto uberJobMsg = protojson::UberJobMsg::createFromJson(jsReq); + LOGS(_log, LOG_LVL_WARN, uberJobMsg->getIdStr() << " &&& parsed msg"); UberJobId ujId = uberJobMsg->getUberJobId(); auto ujCzInfo = uberJobMsg->getCzarContactInfo(); @@ -123,6 +124,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { // Get or create QueryStatistics and UserQueryInfo instances. auto queryStats = foreman()->getQueriesAndChunks()->addQueryId(ujQueryId, ujCzInfo->czId); auto userQueryInfo = queryStats->getUserQueryInfo(); + LOGS(_log, LOG_LVL_WARN, uberJobMsg->getIdStr() << " &&& added to stats"); if (userQueryInfo->getCancelledByCzar()) { throw wbase::TaskException( @@ -136,6 +138,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { auto ujData = wbase::UberJobData::create(ujId, ujCzInfo->czName, ujCzInfo->czId, ujCzInfo->czHostName, ujCzInfo->czPort, ujQueryId, ujRowLimit, targetWorkerId, foreman(), authKey()); + LOGS(_log, LOG_LVL_WARN, uberJobMsg->getIdStr() << " &&& ujData created"); // Find the entry for this queryId, create a new one if needed. userQueryInfo->addUberJob(ujData); @@ -150,8 +153,7 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { channelShared->setTaskCount(ujTasks.size()); ujData->addTasks(ujTasks); - // At this point, it looks like the message was sent successfully, update - // czar touched time. + // At this point, it looks like the message was sent successfully. wcontrol::WCzarInfoMap::Ptr wCzarMap = foreman()->getWCzarInfoMap(); wcontrol::WCzarInfo::Ptr wCzarInfo = wCzarMap->getWCzarInfo(czarId); wCzarInfo->czarMsgReceived(CLOCK::now());