From b80021c7619b1065444c01b07d05b568287a3311 Mon Sep 17 00:00:00 2001 From: John Gates Date: Fri, 4 Oct 2024 13:49:31 -0700 Subject: [PATCH] Added dead message handling. --- src/cconfig/CzarConfig.cc | 7 +- src/cconfig/CzarConfig.h | 5 +- src/ccontrol/UserQuerySelect.cc | 67 ++++++++++--------- src/czar/ActiveWorker.cc | 28 ++++---- src/czar/ActiveWorker.h | 15 ++--- src/czar/Czar.cc | 28 ++++---- src/czar/CzarChunkMap.cc | 102 +++++++++++++++++++++-------- src/czar/CzarChunkMap.h | 22 ++++++- src/czar/CzarRegistry.cc | 27 ++++---- src/czar/CzarRegistry.h | 19 +++--- src/czar/HttpCzarWorkerModule.cc | 6 ++ src/http/WorkerQueryStatusData.cc | 32 +-------- src/http/WorkerQueryStatusData.h | 19 ++++-- src/http/testStatusData.cc | 40 ++--------- src/qdisp/Executive.cc | 12 +--- src/qdisp/Executive.h | 28 +------- src/qdisp/JobBase.h | 3 - src/qdisp/JobDescription.cc | 7 ++ src/qdisp/JobQuery.cc | 11 ++-- src/qdisp/JobQuery.h | 14 +--- src/wbase/Task.cc | 37 +---------- src/wbase/Task.h | 1 - src/wbase/UberJobData.cc | 13 ---- src/wbase/UberJobData.h | 2 +- src/wconfig/WorkerConfig.h | 9 --- src/wcontrol/Foreman.cc | 8 --- src/wdb/ChunkResource.cc | 1 - src/wdb/QueryRunner.cc | 13 ---- src/wdb/QueryRunner.h | 4 -- src/wpublish/QueryStatistics.cc | 7 -- src/wpublish/QueryStatistics.h | 2 - src/xrdsvc/HttpReplicaMgtModule.cc | 10 --- src/xrdsvc/HttpWorkerCzarModule.cc | 10 --- src/xrdsvc/SsiService.cc | 10 --- 34 files changed, 240 insertions(+), 379 deletions(-) diff --git a/src/cconfig/CzarConfig.cc b/src/cconfig/CzarConfig.cc index 68f24f092..5962af9e5 100644 --- a/src/cconfig/CzarConfig.cc +++ b/src/cconfig/CzarConfig.cc @@ -62,10 +62,9 @@ namespace lsst::qserv::cconfig { std::mutex CzarConfig::_mtxOnInstance; -std::shared_ptr CzarConfig::_instance; +CzarConfig::Ptr CzarConfig::_instance; -std::shared_ptr CzarConfig::create(std::string const& configFileName, - std::string const& czarName) { +CzarConfig::Ptr CzarConfig::create(std::string const& configFileName, std::string const& czarName) { std::lock_guard const lock(_mtxOnInstance); if (_instance == nullptr) { _instance = std::shared_ptr(new CzarConfig(util::ConfigStore(configFileName), czarName)); @@ -73,7 +72,7 @@ std::shared_ptr CzarConfig::create(std::string const& configFileName return _instance; } -std::shared_ptr CzarConfig::instance() { +CzarConfig::Ptr CzarConfig::instance() { std::lock_guard const lock(_mtxOnInstance); if (_instance == nullptr) { throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created."); diff --git a/src/cconfig/CzarConfig.h b/src/cconfig/CzarConfig.h index d55183177..6fd1ed0da 100644 --- a/src/cconfig/CzarConfig.h +++ b/src/cconfig/CzarConfig.h @@ -53,6 +53,7 @@ namespace lsst::qserv::cconfig { */ class CzarConfig { public: + using Ptr = std::shared_ptr; /** * Create an instance of CzarConfig and load parameters from the specifid file. * @note One has to call this method at least once before trying to obtain @@ -63,7 +64,7 @@ class CzarConfig { * @param czarName - the unique name of Czar. * @return the shared pointer to the configuration object */ - static std::shared_ptr create(std::string const& configFileName, std::string const& czarName); + static Ptr create(std::string const& configFileName, std::string const& czarName); /** * Get a pointer to an instance that was created by the last call to @@ -71,7 +72,7 @@ class CzarConfig { * @return the shared pointer to the configuration object * @throws std::logic_error when attempting to call the bethod before creating an instance. */ - static std::shared_ptr instance(); + static Ptr instance(); CzarConfig() = delete; CzarConfig(CzarConfig const&) = delete; diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index 6ed20d896..beef84f21 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -303,16 +303,15 @@ void UserQuerySelect::submit() { } /// At this point the executive has a map of all jobs with the chunkIds as the key. - if (uberJobsEnabled) { - // TODO:UJ _maxCHunksPerUberJob maybe put in config??? or set on command line?? - // Different queries may benefit from different values - // Such as LIMIT=1 may work best with this at 1, where - // 100 would be better for others. - _maxChunksPerUberJob = 2; - // This is needed to prevent Czar::_monitor from starting things before they are ready. - _executive->setReadyToExecute(); - buildAndSendUberJobs(); - } + // TODO:UJ _maxCHunksPerUberJob maybe put in config??? or set on command line?? + // Different queries may benefit from different values + // Such as LIMIT=1 may work best with this at 1, where + // 100 would be better for others. + // &&& + _maxChunksPerUberJob = 2; + // This is needed to prevent Czar::_monitor from starting things before they are ready. + _executive->setReadyToExecute(); + buildAndSendUberJobs(); LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence); // TODO:UJ Waiting for all jobs to start may not be needed anymore? @@ -326,7 +325,6 @@ void UserQuerySelect::submit() { } void UserQuerySelect::buildAndSendUberJobs() { - // &&& NEED CODE - this function should check if the worker is DEAD. TODO:UJ string const funcN("UserQuerySelect::" + string(__func__) + " QID=" + to_string(_qMetaQueryId)); LOGS(_log, LOG_LVL_DEBUG, funcN << " start"); @@ -376,52 +374,59 @@ void UserQuerySelect::buildAndSendUberJobs() { // - For failures - If a worker cannot be contacted, that's an uberjob failure. // - uberjob failures (due to communications problems) will result in the uberjob // being broken up into multiple UberJobs going to different workers. - // - The best way to do this is probably to just kill the UberJob and mark all - // Jobs that were in that UberJob as needing re-assignment, and re-running - // the code here. The trick is going to be figuring out which workers are alive. - // Maybe force a fresh lookup from the replicator Registry when an UberJob fails. + // - If an UberJob fails, the UberJob is killed and all the Jobs it contained + // are flagged as needing re-assignment and this function will be called + // again to put those Jobs in new UberJobs. Correctly re-assigning the + // Jobs requires accurate information from the registry about which workers + // are alive or dead. map> workerJobMap; vector missingChunks; // unassignedChunksInQuery needs to be in numerical order so that UberJobs contain chunk numbers in // numerical order. The workers run shared scans in numerical order of chunk id numbers. - // This keeps the number of partially complete UberJobs running on a worker to a minimum, + // Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum, // and should minimize the time for the first UberJob on the worker to complete. for (auto const& [chunkId, jqPtr] : unassignedChunksInQuery) { - auto iter = chunkMapPtr->find(chunkId); - if (iter == chunkMapPtr->end()) { + // If too many workers are down, there will be a chunk that cannot be found. + // Just continuing should leave jobs `unassigned` with their attempt count + // increased. Either the chunk will be found and jobs assigned, or the jobs' + // attempt count will reach max and the query will be cancelled + auto lambdaMissingChunk = [&](string const& msg) { missingChunks.push_back(chunkId); bool const increaseAttemptCount = true; jqPtr->getDescription()->incrAttemptCountScrubResultsJson(_executive, increaseAttemptCount); - // Assign as many jobs as possible. Any chunks not found will be attempted later. + LOGS(_log, LOG_LVL_ERROR, msg); + }; + + auto iter = chunkMapPtr->find(chunkId); + if (iter == chunkMapPtr->end()) { + lambdaMissingChunk(funcN + " No chunkData for=" + to_string(chunkId)); continue; } czar::CzarChunkMap::ChunkData::Ptr chunkData = iter->second; auto targetWorker = chunkData->getPrimaryScanWorker().lock(); - // TODO:UJ maybe if (targetWorker == nullptr || this worker already tried for this chunk) { - if (targetWorker == nullptr) { - LOGS(_log, LOG_LVL_ERROR, funcN << " No primary scan worker for chunk=" << chunkData->dump()); + // TODO:UJ maybe if (targetWorker == nullptr || ... || this worker already tried for this chunk) { + if (targetWorker == nullptr || targetWorker->isDead()) { + LOGS(_log, LOG_LVL_WARN, + funcN << " No primary scan worker for chunk=" + chunkData->dump() + << ((targetWorker == nullptr) ? " targ was null" : " targ was dead")); // Try to assign a different worker to this job auto workerHasThisChunkMap = chunkData->getWorkerHasThisMapCopy(); bool found = false; for (auto wIter = workerHasThisChunkMap.begin(); wIter != workerHasThisChunkMap.end() && !found; ++wIter) { auto maybeTarg = wIter->second.lock(); - if (maybeTarg != nullptr) { + if (maybeTarg != nullptr && !maybeTarg->isDead()) { targetWorker = maybeTarg; found = true; LOGS(_log, LOG_LVL_WARN, - funcN << " Alternate worker found for chunk=" << chunkData->dump()); + funcN << " Alternate worker=" << targetWorker->getWorkerId() + << " found for chunk=" << chunkData->dump()); } } if (!found) { - // If too many workers are down, there will be a chunk that cannot be found. - // Just continuing should leave jobs `unassigned` with their attempt count - // increased. Either the chunk will be found and jobs assigned, or the jobs' - // attempt count will reach max and the query will be cancelled - // TODO:UJ Needs testing/verification - LOGS(_log, LOG_LVL_ERROR, - funcN << " No primary or alternate worker found for chunk=" << chunkData->dump()); + lambdaMissingChunk(funcN + + " No primary or alternate worker found for chunk=" + chunkData->dump()); continue; } } diff --git a/src/czar/ActiveWorker.cc b/src/czar/ActiveWorker.cc index ed0e445c5..921d678a7 100644 --- a/src/czar/ActiveWorker.cc +++ b/src/czar/ActiveWorker.cc @@ -115,7 +115,6 @@ void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double ti _changeStateTo(ALIVE, secsSinceUpdate, cName(__func__)); } else { // Don't waste time on this worker until the registry has heard from it. - // &&& If it's been a really really long time, maybe delete this entry ??? return; } break; @@ -132,20 +131,6 @@ void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double ti shared_ptr jsWorkerReqPtr; { - lock_guard lg(_aMtx); //&&& needed ??? - lock_guard mapLg(_wqsData->mapMtx); - // Check how many messages are currently being sent to the worker, if at the limit, return - if (_wqsData->qIdDoneKeepFiles.empty() && _wqsData->qIdDoneDeleteFiles.empty() && - _wqsData->qIdDeadUberJobs.empty()) { - return; - } - int tCount = _conThreadCount; - if (tCount > _maxConThreadCount) { - LOGS(_log, LOG_LVL_DEBUG, - cName(__func__) << " not sending message since at max threads " << tCount); - return; - } - // Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a // message to send to the worker. jsWorkerReqPtr = _wqsData->serializeJson(maxLifetime); @@ -217,6 +202,17 @@ void ActiveWorker::addDeadUberJob(QueryId qId, UberJobId ujId) { _wqsData->addDeadUberJob(qId, ujId, now); } +http::WorkerContactInfo::Ptr ActiveWorker::getWInfo() const { + std::lock_guard lg(_aMtx); + if (_wqsData == nullptr) return nullptr; + return _wqsData->getWInfo(); +} + +ActiveWorker::State ActiveWorker::getState() const { + std::lock_guard lg(_aMtx); + return _state; +} + string ActiveWorker::dump() const { lock_guard lg(_aMtx); return _dump(); @@ -238,6 +234,7 @@ void ActiveWorkerMap::updateMap(http::WorkerContactInfo::WCMap const& wcMap, auto iter = _awMap.find(wcKey); if (iter == _awMap.end()) { auto newAW = ActiveWorker::create(wcVal, czInfo, replicationInstanceId, replicationAuthKey); + LOGS(_log, LOG_LVL_INFO, cName(__func__) << " AciveWorker created for " << wcKey); _awMap[wcKey] = newAW; if (_czarCancelAfterRestart) { newAW->setCzarCancelAfterRestart(_czarCancelAfterRestartCzId, _czarCancelAfterRestartQId); @@ -252,6 +249,7 @@ void ActiveWorkerMap::updateMap(http::WorkerContactInfo::WCMap const& wcMap, // If there is existing information, only host and port values will change. aWorker->setWorkerContactInfo(wcVal); } + aWorker->getWInfo()->setRegUpdateTime(wcVal->getRegUpdateTime()); } } } diff --git a/src/czar/ActiveWorker.h b/src/czar/ActiveWorker.h index 0ddc1f9d5..630a10eae 100644 --- a/src/czar/ActiveWorker.h +++ b/src/czar/ActiveWorker.h @@ -79,7 +79,8 @@ class ActiveWorker : public std::enable_shared_from_this { ActiveWorker& operator=(ActiveWorker const&) = delete; std::string cName(const char* fName) { - return std::string("ActiveWorker::") + fName + " " + ((_wqsData == nullptr) ? "?" : _wqsData->dump()); + auto wqsd = _wqsData; + return std::string("ActiveWorker::") + fName + " " + ((wqsd == nullptr) ? "?" : wqsd->dump()); } static std::string getStateStr(State st); @@ -97,10 +98,7 @@ class ActiveWorker : public std::enable_shared_from_this { _wqsData->setCzarCancelAfterRestart(czId, lastQId); } - http::WorkerContactInfo::Ptr getWInfo() const { - if (_wqsData == nullptr) return nullptr; - return _wqsData->getWInfo(); - } + http::WorkerContactInfo::Ptr getWInfo() const; ~ActiveWorker() = default; @@ -138,6 +136,8 @@ class ActiveWorker : public std::enable_shared_from_this { /// individual UberJobs anymore, so this function will get rid of them. void removeDeadUberJobsFor(QueryId qId); + State getState() const; + std::string dump() const; private: @@ -169,10 +169,6 @@ class ActiveWorker : public std::enable_shared_from_this { State _state{QUESTIONABLE}; ///< current state of this worker. mutable std::mutex _aMtx; ///< protects _wInfo, _state, _qIdDoneKeepFiles, _qIdDoneDeleteFiles - - /// The number of communication threads currently in use by this class instance. - std::atomic _conThreadCount{0}; - int _maxConThreadCount{2}; }; /// &&& doc @@ -182,6 +178,7 @@ class ActiveWorker : public std::enable_shared_from_this { /// come back from the dead. class ActiveWorkerMap { public: + using Ptr = std::shared_ptr; ActiveWorkerMap() = default; ActiveWorkerMap(ActiveWorkerMap const&) = delete; ActiveWorkerMap operator=(ActiveWorkerMap const&) = delete; diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index c2df9e545..c05400ab7 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -95,10 +95,17 @@ void Czar::_monitor() { LOGS(_log, LOG_LVL_DEBUG, funcN << " start0"); /// Check database for changes in worker chunk assignments and aliveness - _czarFamilyMap->read(); + try { + _czarFamilyMap->read(); + } catch (ChunkMapException const& cmex) { + // There are probably chunks that don't exist on any alive worker, + // continue on in hopes that workers will show up with the missing chunks + // later. + LOGS(_log, LOG_LVL_ERROR, funcN << " family map read problems " << cmex.what()); + } // Send appropriate messages to all ActiveWorkers. This will - // check if workers have died by timeout. The reponse + // check if workers have died by timeout. The response // from the worker include _czarRegistry->sendActiveWorkersMessages(); @@ -126,14 +133,13 @@ void Czar::_monitor() { execVal->assignJobsToUberJobs(); } - // TODO:UJ DM-45470 Maybe get missing results from workers. - // To prevent anything from slipping through the cracks: - // Workers will keep trying to transmit results until they think the czar is dead. - // If a worker thinks the czar died, it will cancel all related jobs that it has, - // and if the czar sends a status message to that worker, that worker will send back - // a separate message saying it killed everything that this czar gave it. Upon - // getting this message from a worker, this czar will reassign everything it had - // sent to that worker. + // To prevent anything from slipping through the cracks: + // Workers will keep trying to transmit results until they think the czar is dead. + // If a worker thinks the czar died, it will cancel all related jobs that it has, + // and if the czar sends a status message to that worker, that worker will send back + // a separate message (see WorkerCzarComIssue) saying it killed everything that this + // czar gave it. Upon getting this message from a worker, this czar will reassign + // everything it had sent to that worker. // TODO:UJ How long should queryId's remain on this list? } @@ -229,7 +235,7 @@ Czar::Czar(string const& configFilePath, string const& czarName) auto const port = _controlHttpSvc->start(); _czarConfig->setReplicationHttpPort(port); - _czarRegistry = CzarRegistry::create(_czarConfig); + _czarRegistry = CzarRegistry::create(_czarConfig, _activeWorkerMap); // Start the monitor thread thread monitorThrd(&Czar::_monitor, this); diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index 7dd1e407a..11f7865d1 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -84,20 +84,22 @@ void CzarChunkMap::verify() { for (auto const& [chunkId, chunkDataPtr] : chunkMap) { if (chunkDataPtr == nullptr) { - LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " had nullptr"); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " chunkId=" << chunkId << " had nullptr"); ++errorCount; continue; } auto primeScanWkr = chunkDataPtr->_primaryScanWorker.lock(); if (primeScanWkr == nullptr) { - LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " missing primaryScanWorker"); + LOGS(_log, LOG_LVL_ERROR, + cName(__func__) << " chunkId=" << chunkId << " missing primaryScanWorker"); ++errorCount; continue; } if (primeScanWkr->_sharedScanChunkMap.find(chunkId) == primeScanWkr->_sharedScanChunkMap.end()) { LOGS(_log, LOG_LVL_ERROR, - " chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for " - << primeScanWkr->_workerId); + cName(__func__) << " chunkId=" << chunkId + << " should have been (and was not) in the sharedScanChunkMap for " + << primeScanWkr->_workerId); ++errorCount; continue; } @@ -105,7 +107,8 @@ void CzarChunkMap::verify() { if (iter != allChunkIds.end()) { allChunkIds.erase(iter); } else { - LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " chunkId was not in allChunks list"); + LOGS(_log, LOG_LVL_ERROR, + cName(__func__) << " chunkId=" << chunkId << " chunkId was not in allChunks list"); ++errorCount; continue; } @@ -118,14 +121,14 @@ void CzarChunkMap::verify() { allMissingIds += to_string(cId) + ","; } LOGS(_log, LOG_LVL_ERROR, - " There were " << missing << " missing chunks from the scan list " << allMissingIds); + cName(__func__) << " There were " << missing << " missing chunks from the scan list " + << allMissingIds); ++errorCount; } if (errorCount > 0) { - // TODO:UJ There may be an argument to keep the new maps even if there are problems - // with them. For current testing, it's probably best to leave it how it is so that - // it's easier to isolate problems. + // Original creation of the family map will keep re-reading until there are no problems. + // _monitor will log this and keep using the old maps. throw ChunkMapException(ERR_LOC, "verification failed with " + to_string(errorCount) + " errors"); } } @@ -161,20 +164,21 @@ void CzarChunkMap::ChunkData::_calcTotalBytes() { void CzarChunkMap::ChunkData::addToWorkerHasThis(std::shared_ptr const& worker) { if (worker == nullptr) { - throw ChunkMapException(ERR_LOC, string(__func__) + " worker was null"); + throw ChunkMapException(ERR_LOC, cName(__func__) + " worker was null"); } _workerHasThisMap[worker->_workerId] = worker; } -std::map> -CzarChunkMap::ChunkData::getWorkerHasThisMapCopy() const { - std::map> newMap = _workerHasThisMap; +map> CzarChunkMap::ChunkData::getWorkerHasThisMapCopy() + const { + map> newMap = _workerHasThisMap; return newMap; } -void CzarChunkMap::organize() { +shared_ptr CzarChunkMap::organize() { auto chunksSortedBySize = make_shared(); + auto missingChunks = make_shared(); calcChunkMap(*_chunkMap, *chunksSortedBySize); @@ -191,27 +195,31 @@ void CzarChunkMap::organize() { for (auto&& [wkrId, wkrDataWeak] : chunkData->_workerHasThisMap) { auto wkrData = wkrDataWeak.lock(); if (wkrData == nullptr) { - LOGS(_log, LOG_LVL_ERROR, __func__ << " unexpected null weak ptr for " << wkrId); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " unexpected null weak ptr for " << wkrId); continue; // maybe the next one will be okay. } + LOGS(_log, LOG_LVL_DEBUG, - __func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize - << " smallest=" << smallest); + cName(__func__) << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize + << " smallest=" << smallest); if (wkrData->_sharedScanTotalSize < smallest) { smallestWkr = wkrData; smallest = smallestWkr->_sharedScanTotalSize; } } if (smallestWkr == nullptr) { - throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" + - to_string(chunkData->_chunkId)); + LOGS(_log, LOG_LVL_ERROR, + cName(__func__) + " no smallesWkr found for chunk=" + to_string(chunkData->_chunkId)); + missingChunks->push_back(chunkData); + } else { + smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData; + smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes; + chunkData->_primaryScanWorker = smallestWkr; + LOGS(_log, LOG_LVL_DEBUG, + " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId); } - smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData; - smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes; - chunkData->_primaryScanWorker = smallestWkr; - LOGS(_log, LOG_LVL_DEBUG, - " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId); } + return missingChunks; } string CzarChunkMap::ChunkData::dump() const { @@ -231,6 +239,34 @@ string CzarChunkMap::ChunkData::dump() const { return os.str(); } +bool CzarChunkMap::WorkerChunksData::isDead() { + if (_activeWorker == nullptr) { + // At startup, these may not be available + auto czarPtr = Czar::getCzar(); + if (czarPtr == nullptr) { + LOGS(_log, LOG_LVL_ERROR, + cName(__func__) << " czarPtr is null, this should only hap[pen in unit test."); + return false; + } + auto awMap = Czar::getCzar()->getActiveWorkerMap(); + if (awMap == nullptr) { + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " awMap is null."); + return true; + } + _activeWorker = awMap->getActiveWorker(_workerId); + if (_activeWorker == nullptr) { + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " activeWorker not found."); + return true; + } + } + auto wState = _activeWorker->getState(); + bool res = wState == ActiveWorker::DEAD; + if (!res) { + LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " is dead"); + } + return res; +} + string CzarChunkMap::WorkerChunksData::dump() const { stringstream os; os << "{WorkerChunksData id=" << _workerId << " scanTotalSize=" << _sharedScanTotalSize; @@ -300,9 +336,6 @@ bool CzarFamilyMap::_read() { return false; } - // &&& TODO:UJ Before makeNewMaps(), get a list of workers considered to be alive by - // czar::_activeWorkerMap - // Make the new maps. shared_ptr familyMapPtr = makeNewMaps(qChunkMap); @@ -356,7 +389,20 @@ std::shared_ptr CzarFamilyMap::makeNewMaps( // this needs to be done for each CzarChunkMap in the family map. for (auto&& [familyName, chunkMapPtr] : *newFamilyMap) { LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " working on " << familyName); - chunkMapPtr->organize(); + auto missing = chunkMapPtr->organize(); + if (missing != nullptr && !missing->empty()) { + // TODO:UJ Some element of the dashboard should be made aware of this. Also, + // TODO:UJ maybe this should check all families before throwing. + // TODO:UJ There are implications that maybe the replicator should not + // TODO:UJ tell the czar about families/databases that do not have + // TODO:UJ at least one copy of each chunk with data loaded on a worker. + string chunkIdStr; + for (auto const& chunkData : *missing) { + chunkIdStr += to_string(chunkData->getChunkId()) + " "; + } + throw ChunkMapException( + ERR_LOC, cName(__func__) + " family=" + familyName + " is missing chunks " + chunkIdStr); + } } return newFamilyMap; diff --git a/src/czar/CzarChunkMap.h b/src/czar/CzarChunkMap.h index f0b85a1d3..97e864855 100644 --- a/src/czar/CzarChunkMap.h +++ b/src/czar/CzarChunkMap.h @@ -43,6 +43,7 @@ struct QMetaChunkMap; namespace lsst::qserv::czar { +class ActiveWorker; class CzarFamilyMap; class ChunkMapException : public util::Issue { @@ -71,10 +72,11 @@ class CzarChunkMap { using Ptr = std::shared_ptr; using SizeT = uint64_t; + std::string cName(const char* func) { return std::string("CzarChunkMap::") + func; } + CzarChunkMap(CzarChunkMap const&) = delete; CzarChunkMap& operator=(CzarChunkMap const&) = delete; - // static Ptr create(std::shared_ptr const& qmeta) { return Ptr(new CzarChunkMap(qmeta)); } static Ptr create() { return Ptr(new CzarChunkMap()); } ~CzarChunkMap(); @@ -88,8 +90,10 @@ class CzarChunkMap { using Ptr = std::shared_ptr; ChunkData(int chunkId_) : _chunkId(chunkId_) {} + std::string cName(const char* func) { + return std::string("ChunkData::") + func + " " + std::to_string(_chunkId); + } int64_t getChunkId() const { return _chunkId; } - SizeT getTotalBytes() const { return _totalBytes; } std::weak_ptr getPrimaryScanWorker() const { return _primaryScanWorker; } @@ -127,6 +131,10 @@ class CzarChunkMap { using Ptr = std::shared_ptr; WorkerChunksData(std::string const& workerId) : _workerId(workerId) {} + std::string cName(const char* func) { + return std::string("WorkerChunksData::") + func + " " + _workerId; + } + /// Return the worker's id string. std::string const& getWorkerId() const { return _workerId; } @@ -134,6 +142,9 @@ class CzarChunkMap { /// accessed in a full table scan on this worker. SizeT getSharedScanTotalSize() const { return _sharedScanTotalSize; } + /// &&& doc + bool isDead(); + /// Return a reference to `_sharedScanChunkMap`. A copy of the pointer /// to this class (or the containing map) should be held to ensure the reference. std::map const& getSharedScanChunkMap() const { return _sharedScanChunkMap; } @@ -159,6 +170,10 @@ class CzarChunkMap { /// The total size (in bytes) of all chunks on this worker that /// are to be used in shared scans. SizeT _sharedScanTotalSize = 0; + + /// Used to determine if this worker is alive and set + /// when the test is made. + std::shared_ptr _activeWorker; }; using WorkerChunkMap = std::map; @@ -192,7 +207,8 @@ class CzarChunkMap { /// Use the information from the registry to `organize` `_chunkMap` and `_workerChunkMap` /// into their expected formats. - void organize(); + /// @return a vector of ChunkData::Ptr of chunks where no worker was found. + std::shared_ptr organize(); private: CzarChunkMap(); diff --git a/src/czar/CzarRegistry.cc b/src/czar/CzarRegistry.cc index 6f9275b71..72b845001 100644 --- a/src/czar/CzarRegistry.cc +++ b/src/czar/CzarRegistry.cc @@ -48,7 +48,9 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarRegistry"); namespace lsst::qserv::czar { -CzarRegistry::CzarRegistry(std::shared_ptr const& czarConfig) : _czarConfig(czarConfig) { +CzarRegistry::CzarRegistry(cconfig::CzarConfig::Ptr const& czarConfig, + ActiveWorkerMap::Ptr const& activeWorkerMap) + : _czarConfig(czarConfig), _activeWorkerMap(activeWorkerMap) { // Begin periodically updating worker's status in the Replication System's registry. // This will continue until the application gets terminated. thread registryUpdateThread(&CzarRegistry::_registryUpdateLoop, this); @@ -68,6 +70,11 @@ CzarRegistry::~CzarRegistry() { } } +http::WorkerContactInfo::WCMapPtr CzarRegistry::getWorkerContactMap() const { + std::lock_guard lockG(_cmapMtx); + return _contactMap; +} + void CzarRegistry::_registryUpdateLoop() { auto const method = http::Method::POST; string const url = "http://" + _czarConfig->replicationRegistryHost() + ":" + @@ -129,12 +136,12 @@ void CzarRegistry::_registryWorkerInfoLoop() { auto czInfo = http::CzarContactInfo::create(_czarConfig->name(), _czarConfig->id(), _czarConfig->replicationHttpPort(), util::get_current_host_fqdn(), czarStartTime); - lock_guard lck(_mapMtx); + lock_guard lck(_cmapMtx); if (wMap != nullptr && !_compareMapContactInfo(*wMap)) { _contactMap = wMap; _latestMapUpdate = CLOCK::now(); - _activeWorkerMap.updateMap(*_contactMap, czInfo, replicationInstanceId, - replicationAuthKey); + _activeWorkerMap->updateMap(*_contactMap, czInfo, replicationInstanceId, + replicationAuthKey); } } } @@ -198,7 +205,7 @@ http::WorkerContactInfo::WCMapPtr CzarRegistry::waitForWorkerContactMap() const http::WorkerContactInfo::WCMapPtr contMap = nullptr; while (contMap == nullptr) { { - std::lock_guard lockG(_mapMtx); + std::lock_guard lockG(_cmapMtx); contMap = _contactMap; } if (contMap == nullptr) { @@ -212,21 +219,19 @@ http::WorkerContactInfo::WCMapPtr CzarRegistry::waitForWorkerContactMap() const void CzarRegistry::sendActiveWorkersMessages() { // Send messages to each active worker as needed - lock_guard lck(_mapMtx); - _activeWorkerMap.sendActiveWorkersMessages(); + _activeWorkerMap->sendActiveWorkersMessages(); } void CzarRegistry::endUserQueryOnWorkers(QueryId qId, bool deleteWorkerResults) { - lock_guard lck(_mapMtx); // Add query id to the appropriate list. if (deleteWorkerResults) { - _activeWorkerMap.addToDoneDeleteFiles(qId); + _activeWorkerMap->addToDoneDeleteFiles(qId); } else { - _activeWorkerMap.addToDoneKeepFiles(qId); + _activeWorkerMap->addToDoneKeepFiles(qId); } // With lists updated, send out messages. - _activeWorkerMap.sendActiveWorkersMessages(); + _activeWorkerMap->sendActiveWorkersMessages(); } } // namespace lsst::qserv::czar diff --git a/src/czar/CzarRegistry.h b/src/czar/CzarRegistry.h index 076f7fd40..bc8b6dc6d 100644 --- a/src/czar/CzarRegistry.h +++ b/src/czar/CzarRegistry.h @@ -61,18 +61,16 @@ class CzarRegistry { using Ptr = std::shared_ptr; /// Return a pointer to a new CzarRegistry object. - static Ptr create(std::shared_ptr const& czarConfig) { - return Ptr(new CzarRegistry(czarConfig)); + static Ptr create(std::shared_ptr const& czarConfig, + std::shared_ptr const& activeWorkerMap) { + return Ptr(new CzarRegistry(czarConfig, activeWorkerMap)); } ~CzarRegistry(); /// Return _contactMap, the object that the returned pointer points to is /// constant and no attempts should be made to change it. - http::WorkerContactInfo::WCMapPtr getWorkerContactMap() const { - std::lock_guard lockG(_mapMtx); - return _contactMap; - } + http::WorkerContactInfo::WCMapPtr getWorkerContactMap() const; /// Return _contactMap, the object that the returned pointer points to is /// constant and no attempts should be made to change it. This @@ -89,7 +87,8 @@ class CzarRegistry { private: CzarRegistry() = delete; - CzarRegistry(std::shared_ptr const& czarConfig); + CzarRegistry(std::shared_ptr const& czarConfig, + std::shared_ptr const& activeWorkerMap); /// This function will keep periodically updating Czar's info in the Replication System's Registry /// until _loop is set to false. @@ -105,6 +104,7 @@ class CzarRegistry { http::WorkerContactInfo::WCMapPtr _buildMapFromJson(nlohmann::json const& response); /// Return true if maps are the same size and all of the elements have the same contact info. + /// NOTE: _cmapMtx must be held when calling. bool _compareMapContactInfo(http::WorkerContactInfo::WCMap const& other) const; std::shared_ptr const _czarConfig; ///< Pointer to the CzarConfig. @@ -118,9 +118,10 @@ class CzarRegistry { TIMEPOINT _latestMapUpdate; ///< The last time the _contactMap was updated, unrelated to ///< WorkerContactInfo update. // &&& review how this _mapMtx is used, probably locks for too long a period. - mutable std::mutex _mapMtx; /// Protects _contactMap, _latestUpdate, _activeWorkerMap + mutable std::mutex _cmapMtx; /// Protects _contactMap, _latestUpdate - ActiveWorkerMap _activeWorkerMap; ///< Map of workers czar considers active. + /// Map for tracking worker aliveness, it has its own internal mutex. + std::shared_ptr const _activeWorkerMap; }; } // namespace lsst::qserv::czar diff --git a/src/czar/HttpCzarWorkerModule.cc b/src/czar/HttpCzarWorkerModule.cc index 5f82eb2be..75ccbdd6d 100644 --- a/src/czar/HttpCzarWorkerModule.cc +++ b/src/czar/HttpCzarWorkerModule.cc @@ -98,6 +98,7 @@ json HttpCzarWorkerModule::_workerCzarComIssue() { } json HttpCzarWorkerModule::_handleJobError(string const& func) { + LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleJobError start"); // Metadata-only responses for the file-based protocol should not have any data // Parse and verify the json message and then kill the UberJob. @@ -133,10 +134,12 @@ json HttpCzarWorkerModule::_handleJobError(string const& func) { "HttpCzarWorkerModule::_handleJobError received " << iaEx.what() << " js=" << body().objJson); jsRet = {{"success", 0}, {"errortype", "parse"}, {"note", iaEx.what()}}; } + LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleJobError end"); return jsRet; } json HttpCzarWorkerModule::_handleJobReady(string const& func) { + LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleJobReady start"); // Metadata-only responses for the file-based protocol should not have any data // Parse and verify the json message and then have the uberjob import the file. @@ -173,10 +176,12 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) { "HttpCzarWorkerModule::_handleJobReady received " << iaEx.what() << " js=" << body().objJson); jsRet = {{"success", 0}, {"errortype", "parse"}, {"note", iaEx.what()}}; } + LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleJobReady end"); return jsRet; } json HttpCzarWorkerModule::_handleWorkerCzarComIssue(string const& func) { + LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleWorkerCzarComIssue start"); // Parse and verify the json message and then deal with the problems. json jsRet = {{"success", 1}, {"errortype", "unknown"}, {"note", "initialized"}}; try { @@ -208,6 +213,7 @@ json HttpCzarWorkerModule::_handleWorkerCzarComIssue(string const& func) { << " js=" << body().objJson); jsRet = {{"success", 0}, {"errortype", "parse"}, {"note", iaEx.what()}}; } + LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleWorkerCzarComIssue end"); return jsRet; } diff --git a/src/http/WorkerQueryStatusData.cc b/src/http/WorkerQueryStatusData.cc index 247efa04b..58a12773d 100644 --- a/src/http/WorkerQueryStatusData.cc +++ b/src/http/WorkerQueryStatusData.cc @@ -61,7 +61,6 @@ CzarContactInfo::Ptr CzarContactInfo::createFromJson(nlohmann::json const& czJso auto czHostName_ = RequestBodyJSON::required(czJson, "management-host-name"); auto czStartupTime_ = RequestBodyJSON::required(czJson, "czar-startup-time"); return create(czName_, czId_, czPort_, czHostName_, czStartupTime_); - //&&& return create(czName_, czId_, czPort_, czHostName_); } catch (invalid_argument const& exc) { LOGS(_log, LOG_LVL_ERROR, string("CzarContactInfo::createJson invalid ") << exc.what()); } @@ -70,8 +69,6 @@ CzarContactInfo::Ptr CzarContactInfo::createFromJson(nlohmann::json const& czJso std::string CzarContactInfo::dump() const { stringstream os; - //&&& os << "czName=" << czName << " czId=" << czId << " czPort=" << czPort << " czHostName=" << - // czHostName; os << "czName=" << czName << " czId=" << czId << " czPort=" << czPort << " czHostName=" << czHostName << " czStartupTime=" << czStartupTime; return os.str(); @@ -253,18 +250,14 @@ WorkerQueryStatusData::Ptr WorkerQueryStatusData::createFromJson(nlohmann::json std::string const& replicationInstanceId_, std::string const& replicationAuthKey_, TIMEPOINT updateTm) { - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& a"); try { if (jsWorkerReq["version"] != http::MetaModule::version) { LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson bad version"); return nullptr; } - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& b"); auto czInfo_ = CzarContactInfo::createFromJson(jsWorkerReq["czar"]); - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& c"); auto wInfo_ = WorkerContactInfo::createFromJsonWorker(jsWorkerReq["worker"], updateTm); - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& d"); if (czInfo_ == nullptr || wInfo_ == nullptr) { LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson czar or worker info could not be parsed in " @@ -272,9 +265,8 @@ WorkerQueryStatusData::Ptr WorkerQueryStatusData::createFromJson(nlohmann::json } auto wqsData = WorkerQueryStatusData::create(wInfo_, czInfo_, replicationInstanceId_, replicationAuthKey_); - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& e"); wqsData->parseLists(jsWorkerReq, updateTm); - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::createJson &&& end"); + bool czarRestart = RequestBodyJSON::required(jsWorkerReq, "czarrestart"); if (czarRestart) { auto restartCzarId = RequestBodyJSON::required(jsWorkerReq, "czarrestartcancelczid"); @@ -297,39 +289,26 @@ void WorkerQueryStatusData::parseListsInto(nlohmann::json const& jsWR, TIMEPOINT std::map& doneKeepF, std::map& doneDeleteF, std::map>& deadUberJobs) { - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& a"); auto& jsQIdDoneKeepFiles = jsWR["qiddonekeepfiles"]; - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& b"); for (auto const& qidKeep : jsQIdDoneKeepFiles) { - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& b1"); doneKeepF[qidKeep] = updateTm; } - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& c"); auto& jsQIdDoneDeleteFiles = jsWR["qiddonedeletefiles"]; - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& d"); for (auto const& qidDelete : jsQIdDoneDeleteFiles) { - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& d1"); doneDeleteF[qidDelete] = updateTm; } - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& e"); auto& jsQIdDeadUberJobs = jsWR["qiddeaduberjobs"]; - LOGS(_log, LOG_LVL_ERROR, - "WorkerQueryStatusData::parseListsInto &&& f jsQIdDeadUberJobs=" << jsQIdDeadUberJobs); // Interestingly, !jsQIdDeadUberJobs.empty() doesn't work, but .size() > 0 does. // Not having the size() check causes issues with the for loop trying to read the // first element of an empty list, which goes badly. if (jsQIdDeadUberJobs.size() > 0) { - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& f1"); for (auto const& qDeadUjs : jsQIdDeadUberJobs) { - LOGS(_log, LOG_LVL_ERROR, "WorkerQueryStatusData::parseListsInto &&& f1a qDeadUjs=" << qDeadUjs); QueryId qId = qDeadUjs["qid"]; auto const& ujIds = qDeadUjs["ujids"]; auto& mapOfUj = deadUberJobs[qId]; for (auto const& ujId : ujIds) { - LOGS(_log, LOG_LVL_ERROR, - "WorkerQueryStatusData::parseListsInto &&& f1d1 qId=" << qId << " ujId=" << ujId); mapOfUj[ujId] = updateTm; } } @@ -442,7 +421,7 @@ shared_ptr WorkerCzarComIssue::serializeJson() { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " _wInfo or _czInfo was null"); return jsCzarReqPtr; } - //&&&auto now = CLOCK::now(); + jsCzarR["version"] = http::MetaModule::version; jsCzarR["instance_id"] = _replicationInstanceId; jsCzarR["auth_key"] = _replicationAuthKey; @@ -460,28 +439,21 @@ WorkerCzarComIssue::Ptr WorkerCzarComIssue::createFromJson(nlohmann::json const& std::string const& replicationInstanceId_, std::string const& replicationAuthKey_) { string const fName("WorkerCzarComIssue::createFromJson"); - LOGS(_log, LOG_LVL_WARN, fName << " &&& a"); try { if (jsCzarReq["version"] != http::MetaModule::version) { LOGS(_log, LOG_LVL_ERROR, fName << " bad version"); return nullptr; } - LOGS(_log, LOG_LVL_ERROR, fName << " &&& b"); auto czInfo_ = CzarContactInfo::createFromJson(jsCzarReq["czar"]); - LOGS(_log, LOG_LVL_ERROR, fName << " &&& c"); auto now = CLOCK::now(); auto wInfo_ = WorkerContactInfo::createFromJsonWorker(jsCzarReq["worker"], now); - LOGS(_log, LOG_LVL_ERROR, fName << " && d"); if (czInfo_ == nullptr || wInfo_ == nullptr) { LOGS(_log, LOG_LVL_ERROR, fName << " or worker info could not be parsed in " << jsCzarReq); } - //&&&auto wccIssue = create(wInfo_, czInfo_, replicationInstanceId_, replicationAuthKey_); auto wccIssue = create(replicationInstanceId_, replicationAuthKey_); wccIssue->setContactInfo(wInfo_, czInfo_); - LOGS(_log, LOG_LVL_ERROR, fName << " &&& e"); wccIssue->_thoughtCzarWasDead = RequestBodyJSON::required(jsCzarReq, "thoughtczarwasdead"); - LOGS(_log, LOG_LVL_ERROR, fName << " &&& end"); return wccIssue; } catch (invalid_argument const& exc) { LOGS(_log, LOG_LVL_ERROR, string("WorkerQueryStatusData::createJson invalid ") << exc.what()); diff --git a/src/http/WorkerQueryStatusData.h b/src/http/WorkerQueryStatusData.h index 63066fc21..7b1ad0a56 100644 --- a/src/http/WorkerQueryStatusData.h +++ b/src/http/WorkerQueryStatusData.h @@ -148,20 +148,25 @@ class WorkerContactInfo { return (wId == oWId && _wHost == oWHost && _wManagementHost == oWManagementHost && _wPort == oWPort); } - void regUpdateTime(TIMEPOINT updateTime) { + void setRegUpdateTime(TIMEPOINT updateTime) { std::lock_guard lg(_rMtx); - _regUpdate = updateTime; + _regUpdateTime = updateTime; + } + + TIMEPOINT getRegUpdateTime(TIMEPOINT updateTime) { + std::lock_guard lg(_rMtx); + return _regUpdateTime; } double timeSinceRegUpdateSeconds() const { std::lock_guard lg(_rMtx); - double secs = std::chrono::duration(CLOCK::now() - _regUpdate).count(); + double secs = std::chrono::duration(CLOCK::now() - _regUpdateTime).count(); return secs; } - TIMEPOINT getRegUpdate() const { + TIMEPOINT getRegUpdateTime() const { std::lock_guard lg(_rMtx); - return _regUpdate; + return _regUpdateTime; } /// @return true if startupTime equals _wStartupTime or _wStartupTime was never set, @@ -192,7 +197,7 @@ class WorkerContactInfo { WorkerContactInfo(std::string const& wId_, std::string const& wHost_, std::string const& wManagementHost_, int wPort_, TIMEPOINT updateTime_) : wId(wId_), _wHost(wHost_), _wManagementHost(wManagementHost_), _wPort(wPort_) { - regUpdateTime(updateTime_); + setRegUpdateTime(updateTime_); } // _rMtx must be locked before calling @@ -208,7 +213,7 @@ class WorkerContactInfo { /// Last time the registry heard from this worker. The ActiveWorker class /// will use this to determine the worker's state. /// &&& Store in seconds since epoch to make atomic? - TIMEPOINT _regUpdate; + TIMEPOINT _regUpdateTime; /// "w-startup-time", it's value is set to zero until the real value is /// received from the worker. Once it is non-zero, any change indicates diff --git a/src/http/testStatusData.cc b/src/http/testStatusData.cc index 2256de93a..54e0d49d4 100644 --- a/src/http/testStatusData.cc +++ b/src/http/testStatusData.cc @@ -55,18 +55,13 @@ BOOST_AUTO_TEST_CASE(WorkerQueryStatusData) { int czrPort = 2022; string const czrHost("cz_host"); - //&&&auto czarA = lsst::qserv::http::CzarContactInfo::create(czrName, czrId, czrPort, czrHost); auto czarA = lsst::qserv::http::CzarContactInfo::create(czrName, czrId, czrPort, czrHost, cxrStartTime); - LOGS_ERROR("&&& a czarA=" << czarA->dump()); auto czarAJs = czarA->serializeJson(); - LOGS_ERROR("&&& b czarAJs=" << czarAJs); auto czarB = lsst::qserv::http::CzarContactInfo::createFromJson(czarAJs); - LOGS_ERROR("&&& c czarB=" << czarB); BOOST_REQUIRE(czarA->compare(*czarB)); - //&&&auto czarC = lsst::qserv::http::CzarContactInfo::create("different", czrId, czrPort, czrHost); auto czarC = lsst::qserv::http::CzarContactInfo::create("different", czrId, czrPort, czrHost, cxrStartTime); BOOST_REQUIRE(!czarA->compare(*czarC)); @@ -77,34 +72,22 @@ BOOST_AUTO_TEST_CASE(WorkerQueryStatusData) { auto workerB = WorkerContactInfo::create("sd_workerB", "host_w2", "mgmhost_a", 3421, start); auto workerC = WorkerContactInfo::create("sd_workerC", "host_w3", "mgmhost_b", 3422, start); - LOGS_ERROR("&&& d workerA=" << workerA->dump()); - auto jsWorkerA = workerA->serializeJson(); - LOGS_ERROR("&&& e jsWorkerA=" << jsWorkerA); auto start1Sec = start + 1s; auto workerA1 = WorkerContactInfo::createFromJsonWorker(jsWorkerA, start1Sec); - LOGS_ERROR("&&& f workerA1=" << workerA1->dump()); BOOST_REQUIRE(workerA->isSameContactInfo(*workerA1)); // WorkerQueryStatusData auto wqsdA = lsst::qserv::http::WorkerQueryStatusData::create(workerA, czarA, replicationInstanceId, replicationAuthKey); - LOGS_ERROR("&&& g wqsdA=" << wqsdA->dump()); - //&&&double timeoutAliveSecs = 100.0; - //&&&double timeoutDeadSecs = 2*timeoutAliveSecs; double maxLifetime = 300.0; auto jsDataA = wqsdA->serializeJson(maxLifetime); - LOGS_ERROR("&&& h jsDataA=" << *jsDataA); // Check that empty lists work. auto wqsdA1 = lsst::qserv::http::WorkerQueryStatusData::createFromJson(*jsDataA, replicationInstanceId, replicationAuthKey, start1Sec); - LOGS_ERROR("&&& i wqsdA1=" << wqsdA1->dump()); - LOGS_ERROR("&&& i wqsdA=" << wqsdA->dump()); auto jsDataA1 = wqsdA1->serializeJson(maxLifetime); - LOGS_ERROR("&&& i jsDataA1=" << *jsDataA1); - LOGS_ERROR("&&& i jsDataA=" << *jsDataA); BOOST_REQUIRE(*jsDataA == *jsDataA1); vector qIdsDelFiles = {7, 8, 9, 15, 25, 26, 27, 30}; @@ -114,7 +97,6 @@ BOOST_AUTO_TEST_CASE(WorkerQueryStatusData) { } jsDataA = wqsdA->serializeJson(maxLifetime); - LOGS_ERROR("&&& j jsDataA=" << jsDataA); BOOST_REQUIRE(*jsDataA != *jsDataA1); for (auto const qIdKF : qIdsKeepFiles) { @@ -123,10 +105,7 @@ BOOST_AUTO_TEST_CASE(WorkerQueryStatusData) { wqsdA->addDeadUberJobs(12, {1, 3}, start); - LOGS_ERROR("&&& i wqsdA=" << wqsdA->dump()); - jsDataA = wqsdA->serializeJson(maxLifetime); - LOGS_ERROR("&&& j jsDataA=" << *jsDataA); auto start5Sec = start + 5s; auto workerAFromJson = lsst::qserv::http::WorkerQueryStatusData::createFromJson( @@ -139,13 +118,11 @@ BOOST_AUTO_TEST_CASE(WorkerQueryStatusData) { wqsdA->addDeadUberJobs(1059, {1, 4, 6, 7, 8, 10, 3, 22, 93}, start5Sec); jsDataA = wqsdA->serializeJson(maxLifetime); - LOGS_ERROR("&&& k jsDataA=" << *jsDataA); BOOST_REQUIRE(*jsDataA != *jsWorkerAFromJson); workerAFromJson = lsst::qserv::http::WorkerQueryStatusData::createFromJson( *jsDataA, replicationInstanceId, replicationAuthKey, start5Sec); jsWorkerAFromJson = workerAFromJson->serializeJson(maxLifetime); - LOGS_ERROR("&&& l jsWorkerAFromJson=" << *jsWorkerAFromJson); BOOST_REQUIRE(*jsDataA == *jsWorkerAFromJson); // Make the response, which contains lists of the items handled by the workers. @@ -178,37 +155,28 @@ BOOST_AUTO_TEST_CASE(WorkerCzarComIssue) { string const czrHost("cz_host"); auto czarA = lsst::qserv::http::CzarContactInfo::create(czrName, czrId, czrPort, czrHost, cxrStartTime); - LOGS_ERROR("&&&i a czarA=" << czarA->dump()); auto czarAJs = czarA->serializeJson(); - LOGS_ERROR("&&&i b czarAJs=" << czarAJs); auto start = lsst::qserv::CLOCK::now(); auto workerA = WorkerContactInfo::create("sd_workerA", "host_w1", "mgmhost_a", 3421, start); - LOGS_ERROR("&&&i d workerA=" << workerA->dump()); auto jsWorkerA = workerA->serializeJson(); - LOGS_ERROR("&&&i e jsWorkerA=" << jsWorkerA); // WorkerCzarComIssue - //&&&auto wccIssueA = lsst::qserv::http::WorkerCzarComIssue::create(workerA, czarA, replicationInstanceId, - //replicationAuthKey); auto wccIssueA = lsst::qserv::http::WorkerCzarComIssue::create(replicationInstanceId, replicationAuthKey); wccIssueA->setContactInfo(workerA, czarA); BOOST_REQUIRE(wccIssueA->needToSend() == false); wccIssueA->setThoughtCzarWasDead(true); BOOST_REQUIRE(wccIssueA->needToSend() == true); - LOGS_ERROR("&&&i f wccIssue=" << wccIssueA->dump()); - auto jsIssueA = wccIssueA->serializeJson(); - LOGS_ERROR("&&&i g jsIssue=" << *jsIssueA); auto wccIssueA1 = lsst::qserv::http::WorkerCzarComIssue::createFromJson(*jsIssueA, replicationInstanceId, replicationAuthKey); - LOGS_ERROR("&&&i i wccIssueA1=" << wccIssueA1->dump()); - LOGS_ERROR("&&&i i wccIssueA=" << wccIssueA->dump()); + LOGS_ERROR("&&& wccIssueA1=" << wccIssueA1->dump()); + LOGS_ERROR("&&& wccIssueA=" << wccIssueA->dump()); auto jsIssueA1 = wccIssueA1->serializeJson(); - LOGS_ERROR("&&&i i jsIssueA1=" << *jsIssueA1); - LOGS_ERROR("&&&i i jsIssueA=" << *jsIssueA); + LOGS_ERROR("&&& jsIssueA1=" << *jsIssueA1); + LOGS_ERROR("&&& jsIssueA=" << *jsIssueA); BOOST_REQUIRE(*jsIssueA == *jsIssueA1); // &&& Test with items in lists. diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index e7c95b221..a5aa23c5d 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -190,8 +190,7 @@ JobQuery::Ptr Executive::add(JobDescription::Ptr const& jobDesc) { // Create the JobQuery and put it in the map. auto jobStatus = make_shared(); Ptr thisPtr = shared_from_this(); - MarkCompleteFunc::Ptr mcf = make_shared(thisPtr, jobDesc->id()); - jobQuery = JobQuery::create(thisPtr, jobDesc, jobStatus, mcf, _id); + jobQuery = JobQuery::create(thisPtr, jobDesc, jobStatus, _id); QSERV_LOGCONTEXT_QUERY_JOB(jobQuery->getQueryId(), jobQuery->getJobId()); @@ -227,15 +226,6 @@ JobQuery::Ptr Executive::add(JobDescription::Ptr const& jobDesc) { return jobQuery; } -void Executive::queueJobStart(util::PriorityCommand::Ptr const& cmd) { - _jobStartCmdList.push_back(cmd); - if (_scanInteractive) { - _qdispPool->queCmd(cmd, 0); - } else { - _qdispPool->queCmd(cmd, 1); - } -} - void Executive::queueFileCollect(util::PriorityCommand::Ptr const& cmd) { if (_scanInteractive) { _qdispPool->queCmd(cmd, 3); diff --git a/src/qdisp/Executive.h b/src/qdisp/Executive.h index db02a9c43..8d603fdf5 100644 --- a/src/qdisp/Executive.h +++ b/src/qdisp/Executive.h @@ -50,9 +50,6 @@ #include "util/threadSafe.h" #include "util/ThreadPool.h" -// TODO:UJ replace with better enable/disable feature, or just use only UberJobs -#define uberJobsEnabled 1 // &&& delete - namespace lsst::qserv { namespace ccontrol { @@ -133,11 +130,8 @@ class Executive : public std::enable_shared_from_this { // Queue `uberJob` to be run using the QDispPool. void runUberJob(std::shared_ptr const& uberJob); - /// Queue a job to be sent to a worker so it can be started. - void queueJobStart(std::shared_ptr const& cmd); // &&& delete ??? - /// Queue `cmd`, using the QDispPool, so it can be used to collect the result file. - void queueFileCollect(std::shared_ptr const& cmd); // &&& delete ??? + void queueFileCollect(std::shared_ptr const& cmd); /// Waits for all jobs on _jobStartCmdList to start. This should not be called /// before ALL jobs have been added to the pool. @@ -340,26 +334,6 @@ class Executive : public std::enable_shared_from_this { std::atomic _readyToExecute{false}; }; -/// TODO:UJ delete - MarkCompleteFunc is not needed with uberjobs. //&&&QM -class MarkCompleteFunc { -public: - typedef std::shared_ptr Ptr; - - MarkCompleteFunc(Executive::Ptr const& e, JobId jobId) : _executive(e), _jobId(jobId) {} - virtual ~MarkCompleteFunc() {} - - virtual void operator()(bool success) { - auto exec = _executive.lock(); - if (exec != nullptr) { - exec->markCompleted(_jobId, success); - } - } - -private: - std::weak_ptr _executive; - JobId _jobId; -}; - } // namespace qdisp } // namespace lsst::qserv diff --git a/src/qdisp/JobBase.h b/src/qdisp/JobBase.h index b6b18d325..a030d1612 100644 --- a/src/qdisp/JobBase.h +++ b/src/qdisp/JobBase.h @@ -58,9 +58,6 @@ class JobBase : public std::enable_shared_from_this { virtual QueryId getQueryId() const = 0; virtual UberJobId getJobId() const = 0; virtual std::string const& getIdStr() const = 0; - //&&&virtual std::shared_ptr getQdispPool() = 0; - //&&& virtual std::string const& getPayload() const = 0; ///< const& in return type is essential for - // xrootd virtual std::shared_ptr getRespHandler() = 0; virtual std::shared_ptr getStatus() = 0; virtual bool getScanInteractive() const = 0; diff --git a/src/qdisp/JobDescription.cc b/src/qdisp/JobDescription.cc index ab4234545..353ce1c18 100644 --- a/src/qdisp/JobDescription.cc +++ b/src/qdisp/JobDescription.cc @@ -65,9 +65,16 @@ JobDescription::JobDescription(qmeta::CzarId czarId, QueryId qId, JobId jobId, R _mock(mock) {} bool JobDescription::incrAttemptCountScrubResultsJson(std::shared_ptr const& exec, bool increase) { + LOGS(_log, LOG_LVL_ERROR, + "JobDescription::incrAttemptCountScrubResultsJson &&&a qId=" << _queryId << " jId=" << _jobId + << " attempt=" << _attemptCount); + if (increase) { ++_attemptCount; } + LOGS(_log, LOG_LVL_ERROR, + "JobDescription::incrAttemptCountScrubResultsJson &&&b qId=" << _queryId << " jId=" << _jobId + << " attempt=" << _attemptCount); if (_attemptCount >= MAX_JOB_ATTEMPTS) { LOGS(_log, LOG_LVL_ERROR, "attemptCount greater than maximum number of retries " << _attemptCount); return false; diff --git a/src/qdisp/JobQuery.cc b/src/qdisp/JobQuery.cc index 85c2b4efc..114d3efef 100644 --- a/src/qdisp/JobQuery.cc +++ b/src/qdisp/JobQuery.cc @@ -44,16 +44,13 @@ using namespace std; namespace lsst::qserv::qdisp { JobQuery::JobQuery(Executive::Ptr const& executive, JobDescription::Ptr const& jobDescription, - qmeta::JobStatus::Ptr const& jobStatus, - shared_ptr const& markCompleteFunc, QueryId qid) + qmeta::JobStatus::Ptr const& jobStatus, QueryId qid) : JobBase(), _executive(executive), _jobDescription(jobDescription), - _markCompleteFunc(markCompleteFunc), _jobStatus(jobStatus), _qid(qid), _idStr(QueryIdHelper::makeIdStr(qid, getJobId())) { - //&&&_qdispPool = executive->getQdispPool(); LOGS(_log, LOG_LVL_TRACE, "JobQuery desc=" << _jobDescription); } @@ -145,9 +142,9 @@ int JobQuery::getAttemptCount() const { return _jobDescription->getAttemptCount(); } -//&&&string const& JobQuery::getPayload() const { return _jobDescription->payload(); } - -void JobQuery::callMarkCompleteFunc(bool success) { _markCompleteFunc->operator()(success); } +void JobQuery::callMarkCompleteFunc(bool success) { + throw util::Bug(ERR_LOC, "&&& JobQuery::callMarkCompleteFunc should not be called, ever"); +} ostream& JobQuery::dumpOS(ostream& os) const { return os << "{" << getIdStr() << _jobDescription << " " << _jobStatus << "}"; diff --git a/src/qdisp/JobQuery.h b/src/qdisp/JobQuery.h index c6fcc0829..9a8e13962 100644 --- a/src/qdisp/JobQuery.h +++ b/src/qdisp/JobQuery.h @@ -40,7 +40,6 @@ namespace lsst::qserv::qdisp { -//&&&class QdispPool; class QueryRequest; /// This class is used to describe, monitor, and control a single query to a worker. @@ -53,9 +52,8 @@ class JobQuery : public JobBase { /// Factory function to make certain a shared_ptr is used and _setup is called. static JobQuery::Ptr create(Executive::Ptr const& executive, JobDescription::Ptr const& jobDescription, - qmeta::JobStatus::Ptr const& jobStatus, - std::shared_ptr const& markCompleteFunc, QueryId qid) { - Ptr jq = Ptr(new JobQuery(executive, jobDescription, jobStatus, markCompleteFunc, qid)); + qmeta::JobStatus::Ptr const& jobStatus, QueryId qid) { + Ptr jq = Ptr(new JobQuery(executive, jobDescription, jobStatus, qid)); jq->_setup(); return jq; } @@ -78,15 +76,12 @@ class JobQuery : public JobBase { std::shared_ptr getExecutive() override { return _executive.lock(); } - //&&&std::shared_ptr getQdispPool() override { return _qdispPool; } - std::ostream& dumpOS(std::ostream& os) const override; /// Make a copy of the job description. JobQuery::_setup() must be called after creation. /// Do not call this directly, use create. JobQuery(Executive::Ptr const& executive, JobDescription::Ptr const& jobDescription, - qmeta::JobStatus::Ptr const& jobStatus, - std::shared_ptr const& markCompleteFunc, QueryId qid); + qmeta::JobStatus::Ptr const& jobStatus, QueryId qid); /// If the UberJob is unassigned, change the _uberJobId to ujId. bool setUberJobId(UberJobId ujId) { @@ -131,7 +126,6 @@ class JobQuery : public JobBase { std::weak_ptr _executive; /// The job description needs to survive until the task is complete. JobDescription::Ptr _jobDescription; - std::shared_ptr _markCompleteFunc; // JobStatus has its own mutex. qmeta::JobStatus::Ptr _jobStatus; ///< Points at status in Executive::_statusMap @@ -148,8 +142,6 @@ class JobQuery : public JobBase { // Cancellation std::atomic _cancelled{false}; ///< Lock to make sure cancel() is only called once. - //&&& std::shared_ptr _qdispPool; - /// The UberJobId that this job is assigned to. Values less than zero /// indicate this job is unassigned. To prevent race conditions, /// an UberJob may only unassign a job if it has the same ID as diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index 0389632cc..c581229d0 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -120,14 +120,6 @@ atomic taskSequence{0}; ///< Unique identifier source for Task. /// available to define the action to take when this task is run, so /// Command::setFunc() is used set the action later. This is why /// the util::CommandThreadPool is not called here. -/* &&& -Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chunkId, int fragmentNumber, - shared_ptr const& userQueryInfo, size_t templateId, bool hasSubchunks, - int subchunkId, string const& db, proto::ScanInfo const& scanInfo, bool scanInteractive, - int maxTableSize, vector const& fragSubTables, vector const& fragSubchunkIds, - shared_ptr const& sc, std::shared_ptr const& -queryStats_, uint16_t resultsHttpPort) : _userQueryInfo(userQueryInfo), -*/ Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chunkId, int fragmentNumber, size_t templateId, bool hasSubchunks, int subchunkId, string const& db, proto::ScanInfo const& scanInfo, bool scanInteractive, int maxTableSize, @@ -199,15 +191,7 @@ Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chun _dbTblsAndSubchunks = make_unique(dbTbls_, subchunksVect_); } -Task::~Task() { - /* &&& - _userQueryInfo.reset(); - UserQueryInfo::uqMapErase(_qId); - if (UserQueryInfo::uqMapGet(_qId) == nullptr) { - LOGS(_log, LOG_LVL_TRACE, "~Task Cleared uqMap entry for _qId=" << _qId); - } - */ -} +Task::~Task() {} std::vector Task::createTasksForChunk( std::shared_ptr const& ujData, nlohmann::json const& jsJobs, @@ -220,7 +204,6 @@ std::vector Task::createTasksForChunk( UberJobId ujId = ujData->getUberJobId(); CzarIdType czId = ujData->getCzarId(); - //&&&UserQueryInfo::Ptr userQueryInfo = UserQueryInfo::uqMapInsert(qId); wpublish::QueryStatistics::Ptr queryStats = queriesAndChunks->addQueryId(qId, czId); UserQueryInfo::Ptr userQueryInfo = queryStats->getUserQueryInfo(); @@ -287,13 +270,6 @@ std::vector Task::createTasksForChunk( if (fragSubchunkIds.empty()) { bool const noSubchunks = false; int const subchunkId = -1; - /* &&& - auto task = Task::Ptr(new Task( - ujData, jdJobId, jdAttemptCount, jdChunkId, fragmentNumber, userQueryInfo, - templateId, noSubchunks, subchunkId, jdQuerySpecDb, scanInfo, scanInteractive, - maxTableSizeMb, fragSubTables, fragSubchunkIds, sendChannel, queryStats, - resultsHttpPort)); - */ auto task = Task::Ptr(new Task( ujData, jdJobId, jdAttemptCount, jdChunkId, fragmentNumber, templateId, noSubchunks, subchunkId, jdQuerySpecDb, scanInfo, scanInteractive, maxTableSizeMb, @@ -308,14 +284,6 @@ std::vector Task::createTasksForChunk( jdQuerySpecDb, scanInfo, scanInteractive, maxTableSizeMb, fragSubTables, fragSubchunkIds, sendChannel, queryStats, resultsHttpPort)); - /* &&& - auto task = Task::Ptr(new Task(ujData, jdJobId, jdAttemptCount, jdChunkId, - fragmentNumber, userQueryInfo, templateId, - hasSubchunks, subchunkId, jdQuerySpecDb, scanInfo, - scanInteractive, maxTableSizeMb, fragSubTables, - fragSubchunkIds, sendChannel, queryStats, - resultsHttpPort)); - */ vect.push_back(task); } } @@ -377,10 +345,9 @@ string Task::getQueryString() const { auto uQInfo = qStats->getUserQueryInfo(); string qs = uQInfo->getTemplate(_templateId); - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& a qs=" << qs); boost::algorithm::replace_all(qs, CHUNK_TAG, to_string(_chunkId)); boost::algorithm::replace_all(qs, SUBCHUNK_TAG, to_string(_subchunkId)); - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& b qs=" << qs); + LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " qs=" << qs); return qs; } diff --git a/src/wbase/Task.h b/src/wbase/Task.h index b74049a98..4a1d06671 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -309,7 +309,6 @@ class Task : public util::CommandForThreadPool { } private: - //&&&std::weak_ptr _userQueryInfo; ///< Details common to Tasks in this UserQuery. std::shared_ptr _sendChannel; ///< Send channel. uint64_t const _tSeq = 0; ///< identifier for the specific task diff --git a/src/wbase/UberJobData.cc b/src/wbase/UberJobData.cc index 3098908eb..d4cb0c734 100644 --- a/src/wbase/UberJobData.cc +++ b/src/wbase/UberJobData.cc @@ -108,7 +108,6 @@ void UberJobData::responseFileReady(string const& httpFileUrl, uint64_t rowCount string const requestContext = "Worker: '" + http::method2string(method) + "' request to '" + url + "'"; string const requestStr = request.dump(); _queueUJResponse(method, headers, url, requestContext, requestStr); - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& end"); } bool UberJobData::responseError(util::MultiError& multiErr, std::shared_ptr const& task, @@ -156,21 +155,15 @@ void UberJobData::_queueUJResponse(http::Method method_, std::vectorgetWPool(); } - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& creating UJTransmitCmd wPool=" << wPool); auto cmdTransmit = UJTransmitCmd::create(_foreman, shared_from_this(), method_, headers_, url_, requestContext_, requestStr_); - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& created UJTransmitCmd wPool=" << wPool); if (wPool == nullptr) { // No thread pool. Run the command now. This should only happen in unit tests. - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& creating UJTransmitCmd direct run action"); cmdTransmit->action(nullptr); } else { - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& creating UJTransmitCmd queue transmit"); if (_scanInteractive) { - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& creating UJTransmitCmd queue transmit_0"); wPool->queCmd(cmdTransmit, 0); } else { - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& creating UJTransmitCmd queue transmit_1"); wPool->queCmd(cmdTransmit, 1); } } @@ -204,7 +197,6 @@ void UJTransmitCmd::action(util::CmdData* data) { ResetSelf resetSelf(this); _attemptCount++; - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& start attempt=" << _attemptCount); auto ujPtr = _ujData.lock(); if (ujPtr == nullptr || ujPtr->getCancelled()) { LOGS(_log, LOG_LVL_WARN, cName(__func__) << " UberJob was cancelled " << _attemptCount); @@ -220,11 +212,8 @@ void UJTransmitCmd::action(util::CmdData* data) { LOGS(_log, LOG_LVL_WARN, cName(__func__) << " Transmit success == 0"); // There's no point in re-sending as the czar got the message and didn't like // it. - // &&& maybe add this czId+ujId to a list of failed uberjobs that can be put - // &&& status return??? Probably overkill. } } catch (exception const& ex) { - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& start d except"); LOGS(_log, LOG_LVL_WARN, cName(__func__) + " " + _requestContext + " failed, ex: " + ex.what()); } @@ -262,7 +251,6 @@ void UJTransmitCmd::action(util::CmdData* data) { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " _selfPtr was null, assuming job killed."); } } - LOGS(_log, LOG_LVL_WARN, cName(__func__) << " &&& start end"); } void UJTransmitCmd::kill() { @@ -273,7 +261,6 @@ void UJTransmitCmd::kill() { if (sPtr == nullptr) { return; } - // &&& TODO:UJ Is there anything that should be done here??? } UJTransmitCmd::Ptr UJTransmitCmd::duplicate() { diff --git a/src/wbase/UberJobData.h b/src/wbase/UberJobData.h index 0ccdf7c7e..76d335411 100644 --- a/src/wbase/UberJobData.h +++ b/src/wbase/UberJobData.h @@ -196,7 +196,7 @@ class UJTransmitCmd : public util::PriorityCommand { std::string const _requestContext; std::string const _requestStr; int _attemptCount = 0; ///< How many attempts have been made to transmit this. - util::InstanceCount _ic{cName("&&&")}; + util::InstanceCount _ic{cName("UJTransmitCmd&&&")}; }; } // namespace lsst::qserv::wbase diff --git a/src/wconfig/WorkerConfig.h b/src/wconfig/WorkerConfig.h index 9b6d682b5..584aa3209 100644 --- a/src/wconfig/WorkerConfig.h +++ b/src/wconfig/WorkerConfig.h @@ -210,11 +210,6 @@ class WorkerConfig { return _ReservedInteractiveSqlConnections->getVal(); } - /* &&& - /// @return the maximum number of gigabytes that can be used by StreamBuffers - unsigned int getBufferMaxTotalGB() const { return _bufferMaxTotalGB->getVal(); } //&&& delete - */ - /// @return the maximum number of concurrent transmits to a czar unsigned int getMaxTransmits() const { return _maxTransmits->getVal(); } @@ -364,10 +359,6 @@ class WorkerConfig { util::ConfigValTUInt::create(_configValMap, "sqlconnections", "maxsqlconn", notReq, 800); CVTUIntPtr _ReservedInteractiveSqlConnections = util::ConfigValTUInt::create( _configValMap, "sqlconnections", "reservedinteractivesqlconn", notReq, 50); - /* &&& - CVTUIntPtr _bufferMaxTotalGB = - util::ConfigValTUInt::create(_configValMap, "transmit", "buffermaxtotalgb", notReq, 41); - */ CVTUIntPtr _maxTransmits = util::ConfigValTUInt::create(_configValMap, "transmit", "maxtransmits", notReq, 40); CVTIntPtr _maxPerQid = util::ConfigValTInt::create(_configValMap, "transmit", "maxperqid", notReq, 3); diff --git a/src/wcontrol/Foreman.cc b/src/wcontrol/Foreman.cc index 179022167..db4d7626f 100644 --- a/src/wcontrol/Foreman.cc +++ b/src/wcontrol/Foreman.cc @@ -132,14 +132,6 @@ Foreman::Foreman(Scheduler::Ptr const& scheduler, unsigned int poolSize, unsigne _mark = make_shared(ERR_LOC, "Forman Test Msg"); - /* &&& - int qPoolSize = _czarConfig->getQdispPoolSize(); - int maxPriority = std::max(0, _czarConfig->getQdispMaxPriority()); - string vectRunSizesStr = _czarConfig->getQdispVectRunSizes(); - vector vectRunSizes = util::String::parseToVectInt(vectRunSizesStr, ":", 1); - string vectMinRunningSizesStr = _czarConfig->getQdispVectMinRunningSizes(); - vector vectMinRunningSizes = util::String::parseToVectInt(vectMinRunningSizesStr, ":", 0); - */ int qPoolSize = 50; // &&& TODO:UJ put in config int maxPriority = 2; // &&& TODO:UJ put in config string vectRunSizesStr = "10:10:10:10"; // &&& TODO:UJ put in config diff --git a/src/wdb/ChunkResource.cc b/src/wdb/ChunkResource.cc index 54e498fe7..3521311b1 100644 --- a/src/wdb/ChunkResource.cc +++ b/src/wdb/ChunkResource.cc @@ -48,7 +48,6 @@ #include "util/Bug.h" #include "util/IterableFormatter.h" #include "wbase/Base.h" -//&&&#include "wdb/QuerySql.h" namespace { diff --git a/src/wdb/QueryRunner.cc b/src/wdb/QueryRunner.cc index 35501c76f..7e3ab7b76 100644 --- a/src/wdb/QueryRunner.cc +++ b/src/wdb/QueryRunner.cc @@ -268,11 +268,8 @@ bool QueryRunner::_dispatchChannel() { if (taskSched != nullptr) { taskSched->histTimeOfRunningTasks->addEntry(primeT.getElapsed()); LOGS(_log, LOG_LVL_DEBUG, "QR " << taskSched->histTimeOfRunningTasks->getString("run")); - LOGS(_log, LOG_LVL_WARN, - "&&&DASH QR " << taskSched->histTimeOfRunningTasks->getString("run")); } else { LOGS(_log, LOG_LVL_ERROR, "QR runtaskSched == nullptr"); - LOGS(_log, LOG_LVL_ERROR, "&&&DASH QR runtaskSched == nullptr"); } double runTimeSeconds = primeT.getElapsed(); double subchunkRunTimeSeconds = subChunkT.getElapsed(); @@ -348,16 +345,6 @@ void QueryRunner::cancel() { break; } } - - /* &&& - auto streamB = _streamBuf.lock(); - if (streamB != nullptr) { - streamB->cancel(); - } - - // The send channel will die naturally on its own when xrootd stops talking to it - // or other tasks call _transmitCancelledError(). - */ } QueryRunner::~QueryRunner() {} diff --git a/src/wdb/QueryRunner.h b/src/wdb/QueryRunner.h index 785496772..a881075f0 100644 --- a/src/wdb/QueryRunner.h +++ b/src/wdb/QueryRunner.h @@ -75,8 +75,6 @@ class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_fro /// by Task::cancel(), so if this needs to be cancelled elsewhere, /// call Task::cancel(). /// This should kill an in progress SQL command. - //&&&/// It also tries to unblock `_streamBuf` to keep the thread - //&&&/// from being blocked forever. void cancel() override; protected: @@ -93,8 +91,6 @@ class QueryRunner : public wbase::TaskQueryRunner, public std::enable_shared_fro bool _dispatchChannel(); MYSQL_RES* _primeResult(std::string const& query); ///< Obtain a result handle for a query. - //&&&static size_t _getDesiredLimit(); - wbase::Task::Ptr const _task; ///< Actual task qmeta::CzarId _czarId = 0; ///< To be replaced with the czarId of the requesting czar. diff --git a/src/wpublish/QueryStatistics.cc b/src/wpublish/QueryStatistics.cc index bc06eea58..2ca96d7f3 100644 --- a/src/wpublish/QueryStatistics.cc +++ b/src/wpublish/QueryStatistics.cc @@ -189,13 +189,6 @@ QueryStatistics::SchedTasksInfoMap QueryStatistics::getSchedulerTasksInfoMap() { return _taskSchedInfoMap; } -/* &&& -void QueryStatistics::touch(TIMEPOINT const now) { - lock_guard lock(_qStatsMtx); - _touched = now; -} -*/ - void QueryStatistics::addTask(TIMEPOINT const now) { lock_guard lock(_qStatsMtx); _touched = now; diff --git a/src/wpublish/QueryStatistics.h b/src/wpublish/QueryStatistics.h index c15e8e9f6..9d208e037 100644 --- a/src/wpublish/QueryStatistics.h +++ b/src/wpublish/QueryStatistics.h @@ -44,7 +44,6 @@ #include "util/InstanceCount.h" //&&& namespace lsst::qserv::wbase { -//&&&class Histogram; class UserQueryInfo; } // namespace lsst::qserv::wbase @@ -97,7 +96,6 @@ class QueryStatistics { void addTaskTransmit(double timeSeconds, int64_t bytesTransmitted, int64_t rowsTransmitted, double bufferFillSecs); - //&&&void touch(TIMEPOINT const now); void addTask(TIMEPOINT const now); void addTaskRunning(TIMEPOINT const now); bool addTaskCompleted(TIMEPOINT const now, double const taskDuration); diff --git a/src/xrdsvc/HttpReplicaMgtModule.cc b/src/xrdsvc/HttpReplicaMgtModule.cc index e7d61d95b..91692aa92 100644 --- a/src/xrdsvc/HttpReplicaMgtModule.cc +++ b/src/xrdsvc/HttpReplicaMgtModule.cc @@ -79,12 +79,6 @@ HttpReplicaMgtModule::HttpReplicaMgtModule(string const& context, shared_ptr const& req, shared_ptr const& resp) : HttpModule(context, foreman, req, resp) {} -/* &&& - : HttpModule(context, foreman, req, resp), - _providerServer(dynamic_cast(XrdSsiProviderLookup)), - _clusterManager(_providerServer->GetClusterManager()), - _dataContext(_clusterManager->DataContext()) {} - */ json HttpReplicaMgtModule::executeImpl(string const& subModuleName) { string const func = string(__func__) + "[sub-module='" + subModuleName + "']"; @@ -337,12 +331,8 @@ void HttpReplicaMgtModule::_modifyChunk(string const& func, int chunk, string co // copy of the inventory. After that modify both (persistent and // transient) inventories. if (Direction::ADD == direction) { - //&&&_clusterManager->Added(resource.data()); - //&&&if (_dataContext) _providerServer->GetChunkInventory().add(database, chunk); foreman()->chunkInventory()->add(database, chunk, foreman()->mySqlConfig()); } else { - //&&&_clusterManager->Removed(resource.data()); - //&&&if (_dataContext) _providerServer->GetChunkInventory().remove(database, chunk); foreman()->chunkInventory()->remove(database, chunk, foreman()->mySqlConfig()); } } catch (wpublish::InvalidParamError const& ex) { diff --git a/src/xrdsvc/HttpWorkerCzarModule.cc b/src/xrdsvc/HttpWorkerCzarModule.cc index b4fb81da5..224862ab3 100644 --- a/src/xrdsvc/HttpWorkerCzarModule.cc +++ b/src/xrdsvc/HttpWorkerCzarModule.cc @@ -131,7 +131,6 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { __func__ << " uj qid=" << ujQueryId << " ujid=" << ujId << " czid=" << ujCzarId); // Get or create QueryStatistics and UserQueryInfo instances. - //&&&auto queryStats = foreman()->addQueryId(ujQueryId); auto queryStats = foreman()->getQueriesAndChunks()->addQueryId(ujQueryId, ujCzarId); auto userQueryInfo = queryStats->getUserQueryInfo(); @@ -344,15 +343,6 @@ json HttpWorkerCzarModule::_handleQueryStatus(std::string const& func) { // Return a message containing lists of the queries that were cancelled. jsRet = wqsData->serializeResponseJson(foreman()->getWorkerStartupTime()); - // &&& queue sending WorkerCzarComIssue if needed. - /* &&& - auto const wczComIssue = wCzarInfo->getWorkerCzarComIssue(); - if (wczComIssue != nullptr && wczComIssue->needToSend()) { - LOGS(_log, LOG_LVL_ERROR, "&&& NEED CODE to queue wczComIssue message, do not queue more than one at a - time."); - // Limit the sending to happening after czar sends status - } - */ wCzarInfo->sendWorkerCzarComIssueIfNeeded(wqsData->getWInfo(), wqsData->getCzInfo()); return jsRet; diff --git a/src/xrdsvc/SsiService.cc b/src/xrdsvc/SsiService.cc index 7aa2ef85b..4ae7d6e76 100644 --- a/src/xrdsvc/SsiService.cc +++ b/src/xrdsvc/SsiService.cc @@ -278,17 +278,7 @@ SsiService::~SsiService() { } void SsiService::ProcessRequest(XrdSsiRequest& reqRef, XrdSsiResource& resRef) { -#if 0 //&&& - LOGS(_log, LOG_LVL_DEBUG, "Got request call where rName is: " << resRef.rName); - auto request = SsiRequest::newSsiRequest(resRef.rName, _foreman); - - // Continue execution in the session object as SSI gave us a new thread. - // Object deletes itself when finished is called. - // - request->execute(reqRef); -#else //&&& LOGS(_log, LOG_LVL_ERROR, "SsiService::ProcessRequest got called"); -#endif //&&& } } // namespace lsst::qserv::xrdsvc