diff --git a/core/modules/loader/Central.h b/core/modules/loader/Central.h index ce610c286b..519346953d 100644 --- a/core/modules/loader/Central.h +++ b/core/modules/loader/Central.h @@ -69,8 +69,6 @@ class Central { virtual ~Central(); - void run(); - std::string getMasterHostName() const { return _masterAddr.ip; } int getMasterPort() const { return _masterAddr.port; } NetworkAddress getMasterAddr() const { return _masterAddr; } @@ -80,7 +78,8 @@ class Central { int getErrCount() const { return _server->getErrCount(); } /// Send the contents of 'sendBuf' to 'host:port'. This waits for the message to be - /// sent before returning. Throws boost::system::system_error on failure. + /// sent before returning. + /// @throw boost::system::system_error on failure. void sendBufferTo(std::string const& host, int port, BufferUdp& sendBuf) { _server->sendBufferTo(host, port, sendBuf); } @@ -103,6 +102,13 @@ class Central { return doList->addItem(item); } + /// Run the server. + void runServer() { + for (; _runningIOThreads < _iOThreads; ++_runningIOThreads) { + run(); + } + } + /// Provides a method for identifying different Central classes and /// CentralWorkers in the log file. virtual std::string getOurLogId() const { return "Central baseclass"; } @@ -110,12 +116,16 @@ class Central { protected: Central(boost::asio::io_service& ioService_, std::string const& masterHostName, int masterPort, - int threadPoolSize, int loopSleepTime) + int threadPoolSize, int loopSleepTime, + int iOThreads) : ioService(ioService_), _masterAddr(masterHostName, masterPort), - _threadPoolSize(threadPoolSize), _loopSleepTime(loopSleepTime) { + _threadPoolSize(threadPoolSize), _loopSleepTime(loopSleepTime), + _iOThreads(iOThreads) { _initialize(); } + void run(); ///< Run a single asio thread. + boost::asio::io_service& ioService; DoList::Ptr doList; ///< List of items to be checked at regular intervals. @@ -143,6 +153,9 @@ class Central { std::vector _ioServiceThreads; ///< List of asio io threads created by this std::thread _checkDoListThread; ///< Thread for running doList checks on DoListItems. + + int _iOThreads{5}; ///< Number of asio IO threads to run, set by config file. + int _runningIOThreads{0}; ///< Number of asio IO threads started. }; }}} // namespace lsst::qserv::loader diff --git a/core/modules/loader/CentralClient.cc b/core/modules/loader/CentralClient.cc index 730b334874..e668cf458e 100644 --- a/core/modules/loader/CentralClient.cc +++ b/core/modules/loader/CentralClient.cc @@ -50,13 +50,13 @@ namespace loader { CentralClient::CentralClient(boost::asio::io_service& ioService_, std::string const& hostName, ClientConfig const& cfg) - : Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime()), + : Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()), _hostName(hostName), _udpPort(cfg.getClientPortUdp()), _defWorkerHost(cfg.getDefWorkerHost()), _defWorkerPortUdp(cfg.getDefWorkerPortUdp()), _doListMaxLookups(cfg.getMaxLookups()), _doListMaxInserts(cfg.getMaxInserts()), - _maxRequestSleepTime(cfg.getMaxRequestSleepTime()){ + _maxRequestSleepTime(cfg.getMaxRequestSleepTime()) { } @@ -65,26 +65,26 @@ void CentralClient::start() { } -void CentralClient::handleKeyInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) { - LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyInfo"); +void CentralClient::handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) { + LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyLookup"); - StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data)); + auto const sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data)); if (sData == nullptr) { - LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to parse list"); + LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list"); return; } auto protoData = sData->protoParse(); if (protoData == nullptr) { - LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to parse list"); + LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list"); return; } // TODO put in separate thread - _handleKeyInfo(inMsg, protoData); + _handleKeyLookup(inMsg, protoData); } -void CentralClient::_handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr& protoBuf) { +void CentralClient::_handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr& protoBuf) { std::unique_ptr protoData(std::move(protoBuf)); CompositeKey key(protoData->keyint(), protoData->keystr()); @@ -92,18 +92,18 @@ void CentralClient::_handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr lck(_waitingKeyInfoMtx); - auto iter = _waitingKeyInfoMap.find(key); - if (iter == _waitingKeyInfoMap.end()) { - LOGS(_log, LOG_LVL_WARN, "handleKeyInfoComplete could not find key=" << key); + std::lock_guard lck(_waitingKeyLookupMtx); + auto iter = _waitingKeyLookupMap.find(key); + if (iter == _waitingKeyLookupMap.end()) { + LOGS(_log, LOG_LVL_WARN, "_handleKeyLookup could not find key=" << key); return; } - keyInfoOneShot = iter->second; - _waitingKeyInfoMap.erase(iter); + keyLookupOneShot = iter->second; + _waitingKeyLookupMap.erase(iter); } - keyInfoOneShot->keyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success()); + keyLookupOneShot->keyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success()); LOGS(_log, LOG_LVL_INFO, "Successfully found key=" << key << " " << chunkInfo); } @@ -139,7 +139,6 @@ void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique size_t mapSize; { std::lock_guard lck(_waitingKeyInsertMtx); - mapSize = _waitingKeyInsertMap.size(); auto iter = _waitingKeyInsertMap.find(key); if (iter == _waitingKeyInsertMap.end()) { LOGS(_log, LOG_LVL_WARN, "handleKeyInsertComplete could not find key=" << key); @@ -147,6 +146,7 @@ void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique } keyInsertOneShot = iter->second; _waitingKeyInsertMap.erase(iter); + mapSize = _waitingKeyInsertMap.size(); } keyInsertOneShot->keyInsertComplete(); LOGS(_log, LOG_LVL_INFO, "Successfully inserted key=" << key << " " << chunkInfo << @@ -178,6 +178,7 @@ KeyInfoData::Ptr CentralClient::keyInsertReq(CompositeKey const& key, int chunk, LOGS(_log, LOG_LVL_INFO, "keyInsertReq waiting key=" << key << "size=" << sz << " loopCount=" << loopCount); } + // Let the CPU do something else while waiting for some requests to finish. usleep(_maxRequestSleepTime); ++loopCount; lck.lock(); @@ -230,16 +231,14 @@ void CentralClient::_keyInsertReq(CompositeKey const& key, int chunk, int subchu strElem.appendToData(msgData); try { sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInsertReq boost system_error=" << e.what() << " key=" << key << " chunk=" << chunk << " sub=" << subchunk); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } } -KeyInfoData::Ptr CentralClient::keyInfoReq(CompositeKey const& key) { +KeyInfoData::Ptr CentralClient::keyLookupReq(CompositeKey const& key) { // Returns a pointer to a Tracker object that can be used to track job // completion and job status. keyInsertOneShot will call _keyInsertReq until // it knows the task was completed. _handleKeyInfoComplete marks @@ -247,43 +246,49 @@ KeyInfoData::Ptr CentralClient::keyInfoReq(CompositeKey const& key) { // Insert a oneShot DoListItem to keep trying to add the key until // we get word that it has been added successfully. LOGS(_log, LOG_LVL_INFO, "Trying to lookup key=" << key); - auto keyInfoOneShot = std::make_shared(this, key); + auto keyLookupOneShot = std::make_shared(this, key); { - std::unique_lock lck(_waitingKeyInfoMtx); + std::unique_lock lck(_waitingKeyLookupMtx); // Limit the number of concurrent lookups. // If the key is already in the map, there is no point in blocking. int loopCount = 0; - auto iter = _waitingKeyInfoMap.find(key); - while (_waitingKeyInfoMap.size() > _doListMaxLookups - && iter == _waitingKeyInfoMap.end()) { - size_t sz = _waitingKeyInfoMap.size(); + uint64_t sleptForMicroSec = 0; + uint64_t const tenSec = 10000000; + auto iter = _waitingKeyLookupMap.find(key); + while (_waitingKeyLookupMap.size() > _doListMaxLookups + && iter == _waitingKeyLookupMap.end()) { + size_t sz = _waitingKeyLookupMap.size(); lck.unlock(); - if (loopCount % 100 == 0) { + // Log a message about this about once every 10 seconds. + if (sleptForMicroSec > tenSec) sleptForMicroSec = 0; + if (sleptForMicroSec == 0) { LOGS(_log, LOG_LVL_INFO, "keyInfoReq waiting key=" << key << "size=" << sz << " loopCount=" << loopCount); } + // Let the CPU do something else while waiting for some requests to finish. usleep(_maxRequestSleepTime); + sleptForMicroSec += _maxRequestSleepTime; ++loopCount; lck.lock(); - iter = _waitingKeyInfoMap.find(key); + iter = _waitingKeyLookupMap.find(key); } // Use the existing lookup, if there is one. - if (iter != _waitingKeyInfoMap.end()) { + if (iter != _waitingKeyLookupMap.end()) { auto cData = iter->second->cmdData; return cData; } - _waitingKeyInfoMap[key] = keyInfoOneShot; + _waitingKeyLookupMap[key] = keyLookupOneShot; } - runAndAddDoListItem(keyInfoOneShot); - return keyInfoOneShot->cmdData; + runAndAddDoListItem(keyLookupOneShot); + return keyLookupOneShot->cmdData; } -void CentralClient::_keyInfoReq(CompositeKey const& key) { - LOGS(_log, LOG_LVL_INFO, "CentralClient::_keyInfoReq trying key=" << key); - LoaderMsg msg(LoaderMsg::KEY_INFO_REQ, getNextMsgId(), getHostName(), getUdpPort()); +void CentralClient::_keyLookupReq(CompositeKey const& key) { + LOGS(_log, LOG_LVL_INFO, "CentralClient::_keyLookupReq trying key=" << key); + LoaderMsg msg(LoaderMsg::KEY_LOOKUP_REQ, getNextMsgId(), getHostName(), getUdpPort()); BufferUdp msgData; msg.appendToData(msgData); // create the proto buffer @@ -305,11 +310,9 @@ void CentralClient::_keyInfoReq(CompositeKey const& key) { try { sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInfoReq boost system_error=" << e.what() << " key=" << key); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought. - // So just blow up so it's unmistakable something bad happened for now. } } @@ -341,11 +344,11 @@ void CentralClient::KeyInsertReqOneShot::keyInsertComplete() { } -util::CommandTracked::Ptr CentralClient::KeyInfoReqOneShot::createCommand() { +util::CommandTracked::Ptr CentralClient::KeyLookupReqOneShot::createCommand() { struct KeyInfoReqCmd : public util::CommandTracked { KeyInfoReqCmd(KeyInfoData::Ptr& cd, CentralClient* cent_) : cData(cd), cent(cent_) {} void action(util::CmdData*) override { - cent->_keyInfoReq(cData->key); + cent->_keyLookupReq(cData->key); } KeyInfoData::Ptr cData; CentralClient* cent; @@ -354,7 +357,7 @@ util::CommandTracked::Ptr CentralClient::KeyInfoReqOneShot::createCommand() { } -void CentralClient::KeyInfoReqOneShot::keyInfoComplete(CompositeKey const& key, +void CentralClient::KeyLookupReqOneShot::keyInfoComplete(CompositeKey const& key, int chunk, int subchunk, bool success) { if (key == cmdData->key) { cmdData->chunk = chunk; diff --git a/core/modules/loader/CentralClient.h b/core/modules/loader/CentralClient.h index e529588dd5..7e225ce598 100644 --- a/core/modules/loader/CentralClient.h +++ b/core/modules/loader/CentralClient.h @@ -74,7 +74,6 @@ class CentralClient : public Central { void start(); - ~CentralClient() override = default; std::string const& getHostName() const { return _hostName; } @@ -86,8 +85,6 @@ class CentralClient : public Central { /// @return the default worker's UDP port int getDefWorkerPortUdp() const { return _defWorkerPortUdp; } - - /// Asynchronously request a key value insert to the workers. /// This can block if too many key insert requests are already in progress. /// @return - a KeyInfoData object for checking the job's status or @@ -101,9 +98,9 @@ class CentralClient : public Central { /// Asynchronously request a key value lookup from the workers. It returns a /// KeyInfoData object to be used to track job status and get the value of the key. /// This can block if too many key lookup requests are already in progress. - KeyInfoData::Ptr keyInfoReq(CompositeKey const& key); + KeyInfoData::Ptr keyLookupReq(CompositeKey const& key); /// Handle a workers response to the keyInfoReq call. - void handleKeyInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data); + void handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data); std::string getOurLogId() const override { return "client"; } @@ -111,8 +108,8 @@ class CentralClient : public Central { void _keyInsertReq(CompositeKey const& key, int chunk, int subchunk); ///< see keyInsertReq() void _handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique_ptr& protoBuf); - void _keyInfoReq(CompositeKey const& key); ///< see keyInfoReq() - void _handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr& protoBuf); + void _keyLookupReq(CompositeKey const& key); ///< see keyLookReq() + void _handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr& protoBuf); @@ -136,12 +133,12 @@ class CentralClient : public Central { CentralClient* central; }; - /// Create commands to find a key in the index and get its value. + /// Create commands to lookup a key in the index and get its value. /// It should keep trying this until it works and then drop it from _waitingKeyInfoMap. - struct KeyInfoReqOneShot : public DoListItem { - using Ptr = std::shared_ptr; + struct KeyLookupReqOneShot : public DoListItem { + using Ptr = std::shared_ptr; - KeyInfoReqOneShot(CentralClient* central_, CompositeKey const& key_) : + KeyLookupReqOneShot(CentralClient* central_, CompositeKey const& key_) : cmdData(std::make_shared(key_, -1, -1)), central(central_) { setOneShot(true); } util::CommandTracked::Ptr createCommand() override; @@ -164,15 +161,16 @@ class CentralClient : public Central { const int _defWorkerPortUdp; ///< Default worker UDP port - size_t _doListMaxLookups{1000}; ///< Maximum number of concurrent lookups in DoList (set by config) - size_t _doListMaxInserts{1000}; ///< Maximum number of concurrent inserts in DoList (set by config) - int _maxRequestSleepTime{100000}; ///< Time to sleep between checking requests when at max length (set by config) + size_t const _doListMaxLookups = 1000; ///< Maximum number of concurrent lookups in DoList, set by config + size_t const _doListMaxInserts = 1000; ///< Maximum number of concurrent inserts in DoList, set by config + /// Time to sleep between checking requests when at max length, set by config + int const _maxRequestSleepTime = 100000; - std::map _waitingKeyInsertMap; + std::map _waitingKeyInsertMap; ///< Map of current insert requests. std::mutex _waitingKeyInsertMtx; ///< protects _waitingKeyInsertMap, _doListMaxInserts - std::map _waitingKeyInfoMap; // &&& change all references of keyInfo to keyLookup &&&, including protobuf keyInfo should only apply to worker key count and worker key range. - std::mutex _waitingKeyInfoMtx; ///< protects _waitingKeyInfoMap, _doListMaxLookups + std::map _waitingKeyLookupMap; ///< Map of current look up requests. + std::mutex _waitingKeyLookupMtx; ///< protects _waitingKeyLookMap, _doListMaxLookups }; }}} // namespace lsst::qserv::loader diff --git a/core/modules/loader/CentralMaster.cc b/core/modules/loader/CentralMaster.cc index 36fad0b49b..0b2ca49d73 100644 --- a/core/modules/loader/CentralMaster.cc +++ b/core/modules/loader/CentralMaster.cc @@ -65,7 +65,7 @@ void CentralMaster::addWorker(std::string const& ip, int udpPort, int tcpPort) { } -void CentralMaster::updateWorkerInfo(uint32_t workerId, NeighborsInfo const& nInfo, StringRange const& strRange) { +void CentralMaster::updateWorkerInfo(uint32_t workerId, NeighborsInfo const& nInfo, KeyRange const& strRange) { if (workerId == 0) { return; } @@ -99,12 +99,10 @@ void CentralMaster::setWorkerNeighbor(MWorkerListItem::WPtr const& target, int m auto addr = targetWorker->getUdpAddress(); try { sendBufferTo(addr.ip, addr.port, msgData); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "CentralMaster::setWorkerNeighbor boost system_error=" << e.what() << " targ=" << *targetWorker << " msg=" << message << " neighborId=" << neighborId); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } } @@ -206,12 +204,10 @@ void CentralMaster::reqWorkerKeysInfo(uint64_t msgId, std::string const& targetI reqMsg.appendToData(data); try { sendBufferTo(targetIp, targetPort, data); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "CentralMaster::reqWorkerKeysInfo boost system_error=" << e.what() << " msgId=" << msgId << " tIp=" << targetIp << " tPort=" << targetPort << " ourHost=" << ourHostName << " ourPort=" << ourPort); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } } diff --git a/core/modules/loader/CentralMaster.h b/core/modules/loader/CentralMaster.h index d22dfb9788..f4ceb9703e 100644 --- a/core/modules/loader/CentralMaster.h +++ b/core/modules/loader/CentralMaster.h @@ -55,7 +55,7 @@ class CentralMaster : public Central { CentralMaster(boost::asio::io_service& ioService_, std::string const& masterHostName_, MasterConfig const& cfg) : Central(ioService_, masterHostName_, cfg.getMasterPort(), - cfg.getThreadPoolSize(), cfg.getLoopSleepTime()), + cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()), _maxKeysPerWorker(cfg.getMaxKeysPerWorker()) {} /// Open the UDP port. This can throw boost::system::system_error. @@ -67,7 +67,7 @@ class CentralMaster : public Central { int getMaxKeysPerWorker() const { return _maxKeysPerWorker; } void addWorker(std::string const& ip, int udpPort, int tcpPort); ///< Add a new worker to the system. - void updateWorkerInfo(uint32_t workerId, NeighborsInfo const& nInfo, StringRange const& strRange); + void updateWorkerInfo(uint32_t workerId, NeighborsInfo const& nInfo, KeyRange const& strRange); MWorkerListItem::Ptr getWorkerWithId(uint32_t id); diff --git a/core/modules/loader/CentralWorker.cc b/core/modules/loader/CentralWorker.cc index f8da2fd19f..7a5df19dc6 100644 --- a/core/modules/loader/CentralWorker.cc +++ b/core/modules/loader/CentralWorker.cc @@ -54,7 +54,7 @@ namespace loader { CentralWorker::CentralWorker(boost::asio::io_service& ioService_, boost::asio::io_context& io_context_, std::string const& hostName_, WorkerConfig const& cfg) : Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), - cfg.getThreadPoolSize(), cfg.getLoopSleepTime()), + cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()), _hostName(hostName_), _udpPort(cfg.getWPortUdp()), _tcpPort(cfg.getWPortTcp()), @@ -246,7 +246,7 @@ bool CentralWorker::_determineRange() { LOGS(_log, LOG_LVL_INFO, funcName << " rightNeighbor workerId=" << workerId << " keyCount=" << nInfoR.keyCount << " recentAdds=" << nInfoR.recentAdds); bool valid = protoRange.valid(); - StringRange rightRange; + KeyRange rightRange; if (valid) { CompositeKey min(protoRange.minint(), protoRange.minstr()); CompositeKey max(protoRange.maxint(), protoRange.maxstr()); @@ -258,12 +258,12 @@ bool CentralWorker::_determineRange() { // Our maximum value is up to but not including the right minimum. { std::lock_guard lckMap(_idMapMtx); - auto origMax = _strRange.getMax(); - auto origUnlim = _strRange.getUnlimited(); + auto origMax = _keyRange.getMax(); + auto origUnlim = _keyRange.getUnlimited(); // Can't be unlimited anymore as there is a right neighbor. - _strRange.setMax(min, false); - if (origUnlim != _strRange.getUnlimited() || - (!origUnlim && origMax != _strRange.getMax())) { + _keyRange.setMax(min, false); + if (origUnlim != _keyRange.getUnlimited() || + (!origUnlim && origMax != _keyRange.getMax())) { rangeChanged = true; } } @@ -297,11 +297,11 @@ bool CentralWorker::_shiftIfNeeded(std::lock_guard const& rightMtxLG } // Get local copies of range and map info. - StringRange range; + KeyRange range; size_t mapSize; { std::lock_guard lck(_idMapMtx); - range = _strRange; + range = _keyRange; mapSize = _keyValueMap.size(); } @@ -309,7 +309,7 @@ bool CentralWorker::_shiftIfNeeded(std::lock_guard const& rightMtxLG // If this worker has _thresholdAverage more keys than average or _thresholdNeighborShift more keys than the right neighbor // send enough keys to the right to balance (min 1 key, max _maxShiftKeys, never shift more than 1/3 of our keys) int rightKeyCount = 0; - StringRange rightRange; + KeyRange rightRange; _neighborRight.getKeyData(rightKeyCount, rightRange); if (range > rightRange) { LOGS(_log, LOG_LVL_ERROR, "Right neighbor range is less than ours!!!! our=" << range << " right=" << rightRange); @@ -382,7 +382,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { // TODO This is very similar to code in TcpBaseConnection::_handleShiftToRight and they should be merged. int sz = protoKeyList->keypair_size(); - std::vector keyList; + std::vector keyList; for (int j=0; j < sz; ++j) { proto::KeyInfo const& protoKI = protoKeyList->keypair(j); ChunkSubchunk chSub(protoKI.chunk(), protoKI.subchunk()); @@ -403,20 +403,15 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { // Construct a message with that many keys and send it (sending the highest keys) proto::KeyList protoKeyList; protoKeyList.set_keycount(keysToShift); - CompositeKey minKey = CompositeKey::minValue(); // smallest value of a key is sent to right neighbor - CompositeKey maxKey = CompositeKey::minValue(); + CompositeKey minKey = CompositeKey::minValue; // smallest key is sent to right neighbor { std::lock_guard lck(_idMapMtx); if (not _transferListToRight.empty()) { throw LoaderMsgErr(ERR_LOC, "_shift _transferList not empty"); } - bool firstPass = true; for (int j=0; j < keysToShift && _keyValueMap.size() > 1; ++j) { auto iter = _keyValueMap.end(); --iter; // rbegin() returns a reverse iterator which doesn't work with erase(). - if (firstPass) { - maxKey = iter->first; - } _transferListToRight.push_back(std::make_pair(iter->first, iter->second)); proto::KeyInfo* protoKI = protoKeyList.add_keypair(); minKey = iter->first; @@ -427,7 +422,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { _keyValueMap.erase(iter); } // Adjust our range; - _strRange.setMax(minKey); + _keyRange.setMax(minKey); } StringElement keyList; protoKeyList.SerializeToString(&(keyList.element)); @@ -473,8 +468,8 @@ void CentralWorker::finishShiftFromRight() { StringElement::UPtr CentralWorker::buildKeyList(int keysToShift) { std::string funcName = "CentralWorker::buildKeyList"; proto::KeyList protoKeyList; - CompositeKey minKey = CompositeKey::minValue(); // smallest key sent - CompositeKey maxKey = CompositeKey::minValue(); // largest key sent + CompositeKey minKey = CompositeKey::minValue; // smallest key sent + CompositeKey maxKey = CompositeKey::minValue; // largest key sent { LOGS(_log, LOG_LVL_INFO, funcName); std::lock_guard lck(_idMapMtx); @@ -501,10 +496,10 @@ StringElement::UPtr CentralWorker::buildKeyList(int keysToShift) { } // Adjust our range; auto iter = _keyValueMap.begin(); - auto minKey = _strRange.getMin(); - if (minKey != CompositeKey::minValue()) { + auto minKey = _keyRange.getMin(); + if (minKey != CompositeKey::minValue) { if (iter->first != minKey) { - _strRange.setMin(iter->first); + _keyRange.setMin(iter->first); _rangeChanged = true; } } @@ -562,7 +557,7 @@ void CentralWorker::_rightConnect(std::lock_guard const& rightMtxLG) } -void CentralWorker::setNeighborInfoLeft(uint32_t wId, int keyCount, StringRange const& range) { +void CentralWorker::setNeighborInfoLeft(uint32_t wId, int keyCount, KeyRange const& range) { if (wId != _neighborLeft.getId()) { LOGS(_log, LOG_LVL_ERROR, "disconnecting left since setNeighborInfoLeft wId(" << wId << ") != neighborLeft.name(" << _neighborLeft.getId() << ")"); @@ -627,8 +622,8 @@ void CentralWorker::cancelShiftsWithLeftNeighbor() { _transferListWithLeft.clear(); // Fix the bottom of the range. - if (_strRange.getMin() != CompositeKey::minValue()) { - _strRange.setMin(_keyValueMap.begin()->first); + if (_keyRange.getMin() != CompositeKey::minValue) { + _keyRange.setMin(_keyValueMap.begin()->first); } } } @@ -668,7 +663,7 @@ void CentralWorker::_workerInfoReceive(std::unique_ptr& p portUdp = protoAddr.udpport(); portTcp = protoAddr.tcpport(); } - StringRange strRange; + KeyRange strRange; if (protoList->has_range()) { proto::WorkerRange protoRange = protoList->range(); bool valid = protoRange.valid(); @@ -694,9 +689,9 @@ void CentralWorker::_workerInfoReceive(std::unique_ptr& p // take the range given as our own. if (strRange.getValid()) { std::lock_guard lckM(_idMapMtx); - if (not _strRange.getValid()) { + if (not _keyRange.getValid()) { LOGS(_log, LOG_LVL_INFO, "Setting our range " << strRange); - _strRange.setMinMax(strRange.getMin(), strRange.getMax(), strRange.getUnlimited()); + _keyRange.setMinMax(strRange.getMin(), strRange.getMax(), strRange.getUnlimited()); } } } @@ -706,7 +701,7 @@ void CentralWorker::_workerInfoReceive(std::unique_ptr& p } -StringRange CentralWorker::updateRangeWithLeftData(StringRange const& leftNeighborRange) { +KeyRange CentralWorker::updateRangeWithLeftData(KeyRange const& leftNeighborRange) { // Update our range with data from our left neighbor. Our min is their max. // If our range is invalid // our min is their max incremented (stringRange increment function) @@ -714,20 +709,20 @@ StringRange CentralWorker::updateRangeWithLeftData(StringRange const& leftNeighb // else max = increment(min) // send range to master // return our new range - StringRange newLeftNeighborRange(leftNeighborRange); + KeyRange newLeftNeighborRange(leftNeighborRange); { std::unique_lock lck(_idMapMtx); - if (not _strRange.getValid()) { + if (not _keyRange.getValid()) { // Our range has not been set, so base it on the range of the left neighbor. - auto min = StringRange::increment(leftNeighborRange.getMax()); + auto min = KeyRange::increment(leftNeighborRange.getMax()); auto max = min; - _strRange.setMinMax(min, max, leftNeighborRange.getUnlimited()); + _keyRange.setMinMax(min, max, leftNeighborRange.getUnlimited()); newLeftNeighborRange.setMax(max, false); } else { // Our range is valid already, it should be > than the left neighbor range. - if (_strRange < leftNeighborRange) { + if (_keyRange < leftNeighborRange) { LOGS(_log, LOG_LVL_ERROR, "LeftNeighborRange(" << leftNeighborRange << - ") is greater than our range(" << _strRange << ")"); + ") is greater than our range(" << _keyRange << ")"); // TODO corrective action? } // The left neighbor's max should be the minimum value in our keymap, unless the @@ -736,7 +731,7 @@ StringRange CentralWorker::updateRangeWithLeftData(StringRange const& leftNeighb // Don't do anything to left neighbor range. } else { auto min = _keyValueMap.begin()->first; - _strRange.setMin(min); + _keyRange.setMin(min); newLeftNeighborRange.setMax(min, false); } } @@ -777,10 +772,10 @@ void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr< /// see if the key should be inserted into our map std::unique_lock lck(_idMapMtx); - auto min = _strRange.getMin(); + auto min = _keyRange.getMin(); auto leftAddress = _neighborLeft.getAddressUdp(); auto rightAddress = _neighborRight.getAddressUdp(); - if (_strRange.isInRange(key)) { + if (_keyRange.isInRange(key)) { // insert into our map auto res = _keyValueMap.insert(std::make_pair(key, chunkInfo)); lck.unlock(); @@ -806,11 +801,9 @@ void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr< LOGS(_log, LOG_LVL_INFO, "sending complete " << key << " to " << nAddr << " from " << _ourId); try { sendBufferTo(nAddr.ip, nAddr.port, msgData); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_workerKeyInsertReq boost system_error=" << e.what() << " msg=" << inMsg); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } } else { lck.unlock(); @@ -851,11 +844,9 @@ void CentralWorker::_forwardKeyInsertRequest(NetworkAddress const& targetAddr, L strElem.appendToData(msgData); try { sendBufferTo(targetAddr.ip, targetAddr.port, msgData); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_forwardKeyInsertRequest boost system_error=" << e.what() << " tAddr=" << targetAddr << " inMsg=" << inMsg); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } } @@ -891,14 +882,14 @@ void CentralWorker::_workerKeyInfoReq(LoaderMsg const& inMsg, std::unique_ptr lck(_idMapMtx); - if (_strRange.isInRange(key)) { + if (_keyRange.isInRange(key)) { LOGS(_log, LOG_LVL_INFO, "CentralWorker::_workerKeyInfoReq " << _ourId << " looking for key=" << key); // check out map auto iter = _keyValueMap.find(key); lck.unlock(); // Key found or not, message will be returned. - LoaderMsg msg(LoaderMsg::KEY_INFO, inMsg.msgId->element, getHostName(), getUdpPort()); + LoaderMsg msg(LoaderMsg::KEY_LOOKUP, inMsg.msgId->element, getHostName(), getUdpPort()); BufferUdp msgData; msg.appendToData(msgData); proto::KeyInfo protoReply; @@ -925,11 +916,9 @@ void CentralWorker::_workerKeyInfoReq(LoaderMsg const& inMsg, std::unique_ptr StringKeyPair; // &&& rename CompositeKeyPair + typedef std::pair CompKeyPair; enum SocketStatus { VOID0 = 0, @@ -91,7 +91,7 @@ class CentralWorker : public Central { /// @parameter mustSetMin should be set true if this is not the left /// most worker. It causes the minimum value to /// be set to the smallest key in _keyValueMap. - void insertKeys(std::vector const& keyList, bool mustSetMin); + void insertKeys(std::vector const& keyList, bool mustSetMin); /// @Return a list of the smallest keys from _keyValueMap. The keys are removed from /// from the map. Put keys are also put in _transferList in case the shift fails @@ -108,7 +108,7 @@ class CentralWorker : public Central { /// Update our range with data from our left neighbor. /// Our minimum key is their maximum key(exclusive). /// @returns what it thinks the range of the left neighbor should be. - StringRange updateRangeWithLeftData(StringRange const& strRange); + KeyRange updateRangeWithLeftData(KeyRange const& strRange); /// Receive our name from the master. Returns true if successful. bool workerInfoReceive(BufferUdp::Ptr const& data); @@ -137,7 +137,7 @@ class CentralWorker : public Central { std::string getOurLogId() const override; std::unique_ptr _workerKeysInfoBuilder(); // TODO make private - void setNeighborInfoLeft(uint32_t wId, int keyCount, StringRange const& range); // TODO make private + void setNeighborInfoLeft(uint32_t wId, int keyCount, KeyRange const& range); // TODO make private /// @Return a string describing the first and last 'count' keys. count=0 dumps all keys. @@ -235,7 +235,7 @@ class CentralWorker : public Central { mutable std::mutex _ourIdMtx; ///< protects _ourIdInvalid, _ourId - StringRange _strRange; ///< range for this worker TODO _range both int and string; + KeyRange _keyRange; ///< range for this worker std::atomic _rangeChanged{false}; std::map _keyValueMap; std::deque _recentAdds; ///< track how many keys added recently. @@ -260,15 +260,15 @@ class CentralWorker : public Central { /// Maximum number of keys to shift in one iteration. 10000 may be reasonable. /// An iteration would be transfer, insert, and verify range. During the - /// insert phase, the mutex is locked preventing key inserts and look ups. + /// insert phase, the mutex is locked preventing key inserts and lookups. /// Using smaller values locks the mutex for more periods of time but each /// period is shorter and lookups can occur during the gaps. /// Too big a value, and the maps will be paralyzed for a long time during inserts. /// Too small and shift operations will take significantly longer. int _maxKeysToShift; - std::vector _transferListToRight; ///< List of items being transfered to right + std::vector _transferListToRight; ///< List of items being transfered to right /// List of items being transfered to our left neighbor. (answering neighbor's FromRight request) - std::vector _transferListWithLeft; + std::vector _transferListWithLeft; /// The DoListItem that makes sure _monitor() is run. std::shared_ptr _centralWorkerDoListItem; diff --git a/core/modules/loader/CentralWorkerDoListItem.h b/core/modules/loader/CentralWorkerDoListItem.h index 649c8fbfe1..6a72ee1c6a 100644 --- a/core/modules/loader/CentralWorkerDoListItem.h +++ b/core/modules/loader/CentralWorkerDoListItem.h @@ -38,7 +38,7 @@ class CentralWorkerDoListItem : public DoListItem { public: CentralWorkerDoListItem() = delete; explicit CentralWorkerDoListItem(CentralWorker* centralWorker) : _centralWorker(centralWorker) { - setTimeOut(std::chrono::seconds(7)); + setTimeOut(std::chrono::seconds(4)); // TODO: DM-17453 set via config } util::CommandTracked::Ptr createCommand() override { diff --git a/core/modules/loader/ClientConfig.h b/core/modules/loader/ClientConfig.h index ba58622445..5b59e4f39f 100644 --- a/core/modules/loader/ClientConfig.h +++ b/core/modules/loader/ClientConfig.h @@ -53,6 +53,7 @@ class ClientConfig : public ConfigBase { int getMaxLookups() const { return _maxLookups->getInt(); } int getMaxInserts() const { return _maxInserts->getInt(); } int getMaxRequestSleepTime() const { return _maxRequestSleepTime->getInt(); } + int getIOThreads() const { return _iOThreads->getInt(); } std::ostream& dump(std::ostream &os) const override; @@ -90,8 +91,11 @@ class ClientConfig : public ConfigBase { /// When reaching maxInserts or maxLookups, sleep this long before trying to add more, /// in micro seconds. 100000micro = 0.1sec ConfigElement::Ptr _maxRequestSleepTime{ - ConfigElement::create(cfgList, header, - "maxRequestSleepTime", ConfigElement::INT, false, "100000")}; + ConfigElement::create(cfgList, header, + "maxRequestSleepTime", ConfigElement::INT, false, "100000")}; + /// Number of IO threads the server should run. + ConfigElement::Ptr _iOThreads{ + ConfigElement::create(cfgList, header, "iOThreads", ConfigElement::INT, false, "4")}; }; diff --git a/core/modules/loader/ClientServer.cc b/core/modules/loader/ClientServer.cc index 89a18ad083..43e5d67235 100644 --- a/core/modules/loader/ClientServer.cc +++ b/core/modules/loader/ClientServer.cc @@ -63,9 +63,9 @@ BufferUdp::Ptr ClientServer::parseMsg(BufferUdp::Ptr const& data, sendData.reset(); // Never send a response back for one of these, infinite loop. break; - case LoaderMsg::KEY_INFO: - LOGS(_log, LOG_LVL_INFO, "KEY_INFO"); - _centralClient->handleKeyInfo(inMsg, data); + case LoaderMsg::KEY_LOOKUP: + LOGS(_log, LOG_LVL_INFO, "KEY_LOOK"); + _centralClient->handleKeyLookup(inMsg, data); break; case LoaderMsg::KEY_INSERT_COMPLETE: @@ -75,7 +75,7 @@ BufferUdp::Ptr ClientServer::parseMsg(BufferUdp::Ptr const& data, // following not expected by client case LoaderMsg::KEY_INSERT_REQ: // This is what this client should send out - case LoaderMsg::KEY_INFO_REQ: // This is what this client should send out + case LoaderMsg::KEY_LOOKUP_REQ: // This is what this client should send out case LoaderMsg::MAST_WORKER_INFO: case LoaderMsg::MAST_WORKER_LIST: // TODO having the client know would be useful. case LoaderMsg::MAST_INFO: diff --git a/core/modules/loader/CompositeKey.cc b/core/modules/loader/CompositeKey.cc index 8e1f11f2c4..e2a3d26731 100644 --- a/core/modules/loader/CompositeKey.cc +++ b/core/modules/loader/CompositeKey.cc @@ -28,8 +28,6 @@ // System headers #include -// qserv headers - // LSST headers #include "lsst/log/Log.h" @@ -43,10 +41,14 @@ namespace qserv { namespace loader { -std::ostream& CompositeKey::dump(std::ostream& os) const { +CompositeKey const CompositeKey::minValue(0,""); + + +void CompositeKey::dump(std::ostream& os) const { os << "CKey(" << kInt << ", " << kStr << ")"; - return os; } + + std::string CompositeKey::dump() const { std::stringstream os; dump(os); @@ -62,7 +64,3 @@ std::ostream& operator<<(std::ostream& os, CompositeKey const& cKey) { }}} // namespace lsst::qserv::loader - - - - diff --git a/core/modules/loader/CompositeKey.h b/core/modules/loader/CompositeKey.h index d2d3368bc0..b2a7a60df6 100644 --- a/core/modules/loader/CompositeKey.h +++ b/core/modules/loader/CompositeKey.h @@ -25,9 +25,11 @@ #define LSST_QSERV_LOADER_COMPOSITEKEY_H // system headers +#include +#include #include +#include -// Qserv headers namespace lsst { namespace qserv { @@ -45,7 +47,7 @@ class CompositeKey { CompositeKey() : CompositeKey(0, "") {} ~CompositeKey() = default; - static uint64_t maxIntVal() { return UINT64_MAX; } + static uint64_t maxIntVal() { return std::numeric_limits::max(); } CompositeKey& operator=(CompositeKey const& other) { if (this != &other) { @@ -55,13 +57,11 @@ class CompositeKey { return *this; } - static CompositeKey minValue() { return CompositeKey(0, ""); } + /// Smallest possible value for a CompositeKey (0,"") + static CompositeKey const minValue; bool operator<(CompositeKey const& other) const { - if (kInt < other.kInt) return true; - if (kInt > other.kInt) return false; - if (kStr < other.kStr) return true; - return false; + return std::tie(kInt, kStr) < std::tie(other.kInt, other.kStr); } bool operator>(CompositeKey const& other) const { @@ -84,7 +84,7 @@ class CompositeKey { return !(*this < other); } - std::ostream& dump(std::ostream& os) const; + void dump(std::ostream& os) const; std::string dump() const ; uint64_t kInt; diff --git a/core/modules/loader/DoListItem.h b/core/modules/loader/DoListItem.h index a5203218bc..032658a7cd 100644 --- a/core/modules/loader/DoListItem.h +++ b/core/modules/loader/DoListItem.h @@ -65,7 +65,7 @@ class TimeOut { std::chrono::milliseconds getTimeOut() const { return _timeOut; } private: // How much time since lastTrigger needs to pass before triggering. - std::chrono::milliseconds _timeOut; // default, 15 minutes + std::chrono::milliseconds _timeOut; TimePoint _lastTrigger{std::chrono::seconds(0)}; }; @@ -77,7 +77,7 @@ class TimeOut { /// at a low frequency (a couple of times a second to once every /// few hours or even days). /// The DoListItems can cycle forever by just remaining on the -/// DoList where it will run their actions when the timer run out, +/// DoList where it will run their actions when the timer runs out, /// which is useful for monitoring status. /// Or they can be setup to run until they have completed once, /// a oneShot, which is useful for looking up or inserting keys. @@ -101,10 +101,10 @@ class DoListItem : public std::enable_shared_from_this { util::CommandTracked::Ptr runIfNeeded(TimeOut::TimePoint now) { std::lock_guard lock(_mtx); if (_command == nullptr) { - if (_isOneShotDone()) { return nullptr; } + if (_isOneShotDone()) return nullptr; if ((_needInfo || _timeOut.due(now)) && _timeRequest.due(now)) { _timeRequest.triggered(); - _command = createCommand(); + _command = createCommand(); return _command; } } else if (_command->isFinished()) { @@ -166,8 +166,10 @@ class DoListItem : public std::enable_shared_from_this { bool _oneShot{false}; ///< True if after the needed information is gathered, this item can be dropped. bool _needInfo{true}; ///< True if information is needed. bool _remove{false}; ///< set to true if this item should no longer be checked. - TimeOut _timeOut{std::chrono::minutes(5)}; ///< If no info is needed, check for info after this period of time. - TimeOut _timeRequest{std::chrono::seconds(5)}; ///< Rate limiter, no more than 1 message every 5 seconds + /// If no info is needed, check for info after this period of time. + TimeOut _timeOut{std::chrono::minutes(5)}; + /// Rate limiter, no more than 1 message every few seconds + TimeOut _timeRequest{std::chrono::seconds(4)}; // TODO: DM-17453 set via config util::CommandTracked::Ptr _command; std::mutex _mtx; ///< protects _timeOut, _timeRequest, _command, _oneShot, _needInfo }; diff --git a/core/modules/loader/StringRange.cc b/core/modules/loader/KeyRange.cc similarity index 82% rename from core/modules/loader/StringRange.cc rename to core/modules/loader/KeyRange.cc index d55e94b080..0554be6ddc 100644 --- a/core/modules/loader/StringRange.cc +++ b/core/modules/loader/KeyRange.cc @@ -23,9 +23,8 @@ // Class header -#include "loader/StringRange.h" +#include "KeyRange.h" -// System headers #include // qserv headers @@ -54,7 +53,7 @@ std::ostream& operator<<(std::ostream& os, NeighborsInfo const& ni) { return os; } -std::ostream& operator<<(std::ostream& os, StringRange const& strRange) { +std::ostream& operator<<(std::ostream& os, KeyRange const& strRange) { os << "valid=" << strRange._valid << " min=" << strRange._min << " max=" << strRange._maxE @@ -63,7 +62,7 @@ std::ostream& operator<<(std::ostream& os, StringRange const& strRange) { } -void StringRange::setAllInclusiveRange() { +void KeyRange::setAllInclusiveRange() { _min = CompositeKey(0,""); _maxE = CompositeKey(CompositeKey::maxIntVal(), ""); _unlimited = true; @@ -71,7 +70,7 @@ void StringRange::setAllInclusiveRange() { } -bool StringRange::setMin(CompositeKey const& val) { +bool KeyRange::setMin(CompositeKey const& val) { if (not _unlimited && val >= _maxE) { _min = decrement(_maxE); return false; @@ -81,7 +80,7 @@ bool StringRange::setMin(CompositeKey const& val) { } -bool StringRange::setMax(CompositeKey const& val, bool unlimited) { +bool KeyRange::setMax(CompositeKey const& val, bool unlimited) { _unlimited = unlimited; if (unlimited) { if (val > _maxE) { _maxE = val; } @@ -96,7 +95,7 @@ bool StringRange::setMax(CompositeKey const& val, bool unlimited) { } -bool StringRange::setMinMax(CompositeKey const& vMin, CompositeKey const& vMax, bool unlimited) { +bool KeyRange::setMinMax(CompositeKey const& vMin, CompositeKey const& vMax, bool unlimited) { _unlimited = unlimited; if (!unlimited && vMin > vMax) { return false; @@ -116,7 +115,7 @@ bool StringRange::setMinMax(CompositeKey const& vMin, CompositeKey const& vMax, -std::string StringRange::incrementString(std::string const& str, char appendChar) { +std::string KeyRange::incrementString(std::string const& str, char appendChar) { std::string output(str); if (output.empty()) { output += appendChar; @@ -133,13 +132,13 @@ std::string StringRange::incrementString(std::string const& str, char appendChar } -CompositeKey StringRange::increment(CompositeKey const& key, char appendChar) { +CompositeKey KeyRange::increment(CompositeKey const& key, char appendChar) { CompositeKey outKey(key.kInt, incrementString(key.kStr, appendChar)); return outKey; } -std::string StringRange::decrementString(std::string const& str, char minChar) { +std::string KeyRange::decrementString(std::string const& str, char minChar) { if (str.empty()) { return std::string(); } @@ -157,7 +156,7 @@ std::string StringRange::decrementString(std::string const& str, char minChar) { } -CompositeKey StringRange::decrement(CompositeKey const& key, char minChar) { +CompositeKey KeyRange::decrement(CompositeKey const& key, char minChar) { CompositeKey outK(key); if (outK.kStr.empty()) { if (outK.kInt > 0) --outK.kInt; @@ -168,7 +167,7 @@ CompositeKey StringRange::decrement(CompositeKey const& key, char minChar) { } -void StringRange::loadProtoRange(proto::WorkerRange& protoRange) { +void KeyRange::loadProtoRange(proto::WorkerRange& protoRange) { protoRange.set_valid(_valid); protoRange.set_minint(_min.kInt); protoRange.set_minstr(_min.kStr); @@ -178,7 +177,7 @@ void StringRange::loadProtoRange(proto::WorkerRange& protoRange) { } -void ProtoHelper::workerKeysInfoExtractor(BufferUdp& data, uint32_t& wId, NeighborsInfo& nInfo, StringRange& strRange) { +void ProtoHelper::workerKeysInfoExtractor(BufferUdp& data, uint32_t& wId, NeighborsInfo& nInfo, KeyRange& keyRange) { auto funcName = "CentralWorker::_workerKeysInfoExtractor"; LOGS(_log, LOG_LVL_DEBUG, funcName); auto protoItem = StringElement::protoParse(data); @@ -192,10 +191,10 @@ void ProtoHelper::workerKeysInfoExtractor(BufferUdp& data, uint32_t& wId, Neighb proto::WorkerRange protoRange = protoItem->range(); bool valid = protoRange.valid(); if (valid) { - CompositeKey min(protoRange.minint(), protoRange.minstr()); - CompositeKey max(protoRange.maxint(), protoRange.maxstr()); + CompositeKey minKey(protoRange.minint(), protoRange.minstr()); + CompositeKey maxKey(protoRange.maxint(), protoRange.maxstr()); bool unlimited = protoRange.maxunlimited(); - strRange.setMinMax(min, max, unlimited); + keyRange.setMinMax(minKey, maxKey, unlimited); } proto::Neighbor protoLeftNeigh = protoItem->left(); nInfo.neighborLeft->update(protoLeftNeigh.wid()); diff --git a/core/modules/loader/StringRange.h b/core/modules/loader/KeyRange.h similarity index 78% rename from core/modules/loader/StringRange.h rename to core/modules/loader/KeyRange.h index c621a03486..451b26b29a 100644 --- a/core/modules/loader/StringRange.h +++ b/core/modules/loader/KeyRange.h @@ -21,8 +21,8 @@ * see . * */ -#ifndef LSST_QSERV_LOADER_STRINGRANGE_H -#define LSST_QSERV_LOADER_STRINGRANGE_H +#ifndef LSST_QSERV_LOADER_KEYRANGE_H +#define LSST_QSERV_LOADER_KEYRANGE_H // system headers #include @@ -38,7 +38,7 @@ namespace lsst { namespace qserv { namespace loader { -/// Class for storing the range of a single worker. +/// Class for storing the key range of a single worker. /// This is likely to become a template class, hence lots in the header. /// It tries to keep its state consistent, _min < _max, but depends on /// other classes to eventually get the correct values for _min and _max. @@ -50,15 +50,15 @@ namespace loader { /// right neighbor (if there is one) each have at least one key. The worker /// ranges should eventually reach the master, then the other workers /// and clients. -class StringRange { +class KeyRange { public: - using Ptr = std::shared_ptr; + using Ptr = std::shared_ptr; - StringRange() = default; - StringRange(StringRange const&) = default; - StringRange& operator=(StringRange const&) = default; + KeyRange() = default; + KeyRange(KeyRange const&) = default; + KeyRange& operator=(KeyRange const&) = default; - ~StringRange() = default; + ~KeyRange() = default; void setAllInclusiveRange(); @@ -72,29 +72,29 @@ class StringRange { } /// Return true if other functionally equivalent. - bool equal(StringRange const& other) const { - if (_valid != other._valid) { return false; } - if (not _valid) { return true; } // both invalid - if (_min != other._min) { return false; } - if (_unlimited != other._unlimited) { return false; } - if (_unlimited) { return true; } // both same _min and _unlimited - if (_maxE != other._maxE) { return false; } + bool equal(KeyRange const& other) const { + if (_valid != other._valid) return false; + if (not _valid) return true; // both invalid + if (_min != other._min) return false; + if (_unlimited != other._unlimited) return false; + if (_unlimited) return true; // both same _min and _unlimited + if (_maxE != other._maxE) return false; return true; } bool isInRange(CompositeKey const& cKey) const { - if (not _valid) { return false; } - if (cKey < _min) { return false; } - if (not _unlimited && cKey >= _maxE) { return false; } + if (not _valid) return false; + if (cKey < _min) return false; + if (not _unlimited && cKey >= _maxE) return false; return true; } bool getValid() const { return _valid; } bool getUnlimited() const { return _unlimited; } - CompositeKey getMin() const { return _min; } - CompositeKey getMax() const { return _maxE; } + CompositeKey const& getMin() const { return _min; } + CompositeKey const& getMax() const { return _maxE; } - bool operator<(StringRange const& other) const { + bool operator<(KeyRange const& other) const { /// Arbitrarily, invalid are less than valid, but such comparisons should be avoided. if (_valid != other._valid) { if (not _valid) { return true; } @@ -105,7 +105,7 @@ class StringRange { return false; } - bool operator>(StringRange const& other) const { + bool operator>(KeyRange const& other) const { return other < *this; } @@ -124,12 +124,12 @@ class StringRange { /// Load 'protoRange' with information from this object. void loadProtoRange(proto::WorkerRange& protoRange); - friend std::ostream& operator<<(std::ostream&, StringRange const&); + friend std::ostream& operator<<(std::ostream&, KeyRange const&); private: bool _valid{false}; ///< true if range is valid bool _unlimited{false}; ///< true if the range includes largest possible values. - CompositeKey _min; ///< Smallest value = "" + CompositeKey _min; ///< Smallest value = (0, "") CompositeKey _maxE; ///< maximum value exclusive }; @@ -154,10 +154,10 @@ class BufferUdp; class ProtoHelper { public: - static void workerKeysInfoExtractor(BufferUdp& data, uint32_t& name, NeighborsInfo& nInfo, StringRange& strRange); + static void workerKeysInfoExtractor(BufferUdp& data, uint32_t& name, NeighborsInfo& nInfo, KeyRange& strRange); }; }}} // namespace lsst::qserv::loader -#endif // LSST_QSERV_LOADER_STRINGRANGE_H +#endif // LSST_QSERV_LOADER_KEYRANGE_H diff --git a/core/modules/loader/LoaderMsg.h b/core/modules/loader/LoaderMsg.h index 99381e53c0..1cc8ca5647 100644 --- a/core/modules/loader/LoaderMsg.h +++ b/core/modules/loader/LoaderMsg.h @@ -59,8 +59,8 @@ class LoaderMsg { WORKER_KEYS_INFO, // Information about number of key values, range, number of new keys. KEY_INSERT_REQ, // Insert a new key with info. MSG_RECEIVED + KEY_INFO KEY_INSERT_COMPLETE, // Key has been inserted and logged. - KEY_INFO_REQ, // Request info for a single key. - KEY_INFO, // Information about a specific key. (includes file id and row) + KEY_LOOKUP_REQ, // Request info for a single key. + KEY_LOOKUP, // Information about a specific key. (includes file id and row) WORKER_LEFT_NEIGHBOR, // Mast assigns a left neighbor to a worker. WORKER_RIGHT_NEIGHBOR, // Mast assigns a right neighbor to a worker. IM_YOUR_L_NEIGHBOR, // Worker message to other worker to setup being neighbors. diff --git a/core/modules/loader/MWorkerList.cc b/core/modules/loader/MWorkerList.cc index e35d716ad6..a34c6d2bb1 100644 --- a/core/modules/loader/MWorkerList.cc +++ b/core/modules/loader/MWorkerList.cc @@ -183,12 +183,10 @@ bool MWorkerList::sendListTo(uint64_t msgId, std::string const& ip, short port, } try { _central->sendBufferTo(ip, port, *_stateListData); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "MWorkerList::sendListTo boost system_error=" << e.what() << " msgId=" << msgId << " ip=" << ip << " port=" << port << " ourName=" << ourHostName << " ourPort=" << ourPort); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } } diff --git a/core/modules/loader/MWorkerList.h b/core/modules/loader/MWorkerList.h index fc1f180274..b019c847a5 100644 --- a/core/modules/loader/MWorkerList.h +++ b/core/modules/loader/MWorkerList.h @@ -34,8 +34,8 @@ #include "loader/Updateable.h" #include "loader/BufferUdp.h" #include "loader/DoList.h" +#include "loader/KeyRange.h" #include "loader/NetworkAddress.h" -#include "loader/StringRange.h" #include "loader/WorkerListItemBase.h" diff --git a/core/modules/loader/MasterConfig.h b/core/modules/loader/MasterConfig.h index 58ba9d6023..b7b6481ebc 100644 --- a/core/modules/loader/MasterConfig.h +++ b/core/modules/loader/MasterConfig.h @@ -39,7 +39,7 @@ namespace loader { class MasterConfig : public ConfigBase { public: /// Constructor can throw ConfigErr - explicit MasterConfig(std::string configFileName) + explicit MasterConfig(std::string const& configFileName) : MasterConfig(util::ConfigStore(configFileName)) {} MasterConfig() = delete; @@ -50,6 +50,7 @@ class MasterConfig : public ConfigBase { int getThreadPoolSize() const { return _threadPoolSize->getInt(); } int getLoopSleepTime() const { return _loopSleepTime->getInt(); } int getMaxKeysPerWorker() const { return _maxKeysPerWorker->getInt(); } + int getIOThreads() const { return _iOThreads->getInt(); } std::ostream& dump(std::ostream &os) const override; @@ -60,7 +61,7 @@ class MasterConfig : public ConfigBase { /// UDP port for the master - usually 9875 ConfigElement::Ptr _portUdp{ConfigElement::create(cfgList, header, "portUdp", ConfigElement::INT, true)}; - /// Maximum average keys per worker before activating a new worker. 1000 + /// Maximum average keys per worker before activating a new worker. ConfigElement::Ptr _maxKeysPerWorker{ ConfigElement::create(cfgList, header, "maxKeysPerWorker", ConfigElement::INT, true)}; /// Size of the master's thread pool - 10 @@ -69,6 +70,9 @@ class MasterConfig : public ConfigBase { /// Time spent sleeping between checking elements in the DoList in microseconds. 0.1 seconds. ConfigElement::Ptr _loopSleepTime{ ConfigElement::create(cfgList, header, "loopSleepTime", ConfigElement::INT, true)}; + /// Number of IO threads the server should run. + ConfigElement::Ptr _iOThreads{ + ConfigElement::create(cfgList, header, "iOThreads", ConfigElement::INT, false, "5")}; }; }}} // namespace lsst::qserv::loader diff --git a/core/modules/loader/MasterServer.cc b/core/modules/loader/MasterServer.cc index e5663d61fd..66338a35e4 100644 --- a/core/modules/loader/MasterServer.cc +++ b/core/modules/loader/MasterServer.cc @@ -94,8 +94,8 @@ BufferUdp::Ptr MasterServer::parseMsg(BufferUdp::Ptr const& data, case LoaderMsg::MAST_WORKER_LIST: case LoaderMsg::MAST_WORKER_INFO: case LoaderMsg::KEY_INSERT_REQ: - case LoaderMsg::KEY_INFO_REQ: - case LoaderMsg::KEY_INFO: + case LoaderMsg::KEY_LOOKUP_REQ: + case LoaderMsg::KEY_LOOKUP: /// TODO add msg unexpected by master response. break; default: @@ -199,7 +199,7 @@ BufferUdp::Ptr MasterServer::workerKeysInfo(LoaderMsg const& inMsg, BufferUdp::P try { uint32_t name; NeighborsInfo nInfo; - StringRange strRange; + KeyRange strRange; ProtoHelper::workerKeysInfoExtractor(*data, name, nInfo, strRange); LOGS(_log, LOG_LVL_INFO, funcName << "name=" << name << " keyCount=" << nInfo.keyCount << " recentAdds=" << nInfo.recentAdds << " range=" << strRange); @@ -242,7 +242,6 @@ BufferUdp::Ptr MasterServer::workerInfoRequest(LoaderMsg const& inMsg, BufferUdp /// Return worker's name, netaddress, and range in MAST_WORKER_INFO msg proto::WorkerListItem protoWorker; proto::LdrNetAddress* protoAddr = protoWorker.mutable_address(); - //proto::WorkerRangeString* protoRange = protoWorker.mutable_rangestr(); &&& proto::WorkerRange* protoRange = protoWorker.mutable_range(); protoWorker.set_wid(workerItem->getId()); auto udp = workerItem->getUdpAddress(); @@ -264,13 +263,11 @@ BufferUdp::Ptr MasterServer::workerInfoRequest(LoaderMsg const& inMsg, BufferUdp // Send the response to the worker that asked for it. try { _centralMaster->sendBufferTo(requestorAddr->ip, requestorAddr->port, sendBuf); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "MasterServer::workerInfoRequest boost system_error=" << e.what() << " inMsg=" << inMsg); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } - } catch (LoaderMsgErr &msgErr) { + } catch (LoaderMsgErr const& msgErr) { LOGS(_log, LOG_LVL_ERROR, msgErr.what()); return prepareReplyMsg(senderEndpoint, inMsg, LoaderMsg::STATUS_PARSE_ERR, msgErr.what()); } diff --git a/core/modules/loader/Neighbor.h b/core/modules/loader/Neighbor.h index f831bf539c..f3cae40c22 100644 --- a/core/modules/loader/Neighbor.h +++ b/core/modules/loader/Neighbor.h @@ -95,12 +95,12 @@ class Neighbor { _keyCount = count; } - void setRange(StringRange const& range) { + void setRange(KeyRange const& range) { std::lock_guard lck(_nMtx); _strRange = range; } - void getKeyData(int& keyCount, StringRange& range) { + void getKeyData(int& keyCount, KeyRange& range) { std::lock_guard lck(_nMtx); keyCount = _keyCount; range = _strRange; @@ -117,7 +117,7 @@ class Neighbor { std::mutex _nMtx; Type _type; int _keyCount{0}; - StringRange _strRange; + KeyRange _strRange; }; }}} // namespace lsst::qserv::loader diff --git a/core/modules/loader/ServerTcpBase.cc b/core/modules/loader/ServerTcpBase.cc index 0caffe2cc1..aeddb1b36d 100644 --- a/core/modules/loader/ServerTcpBase.cc +++ b/core/modules/loader/ServerTcpBase.cc @@ -168,7 +168,7 @@ bool ServerTcpBase::testConnect() { } // socket.close(); socket should close when it falls out of scope. } - catch (std::exception& e) { + catch (std::exception const& e) { std::cerr << e.what() << std::endl; return false; } @@ -435,13 +435,13 @@ void TcpBaseConnection::_handleImYourLNeighbor1(boost::system::error_code const& LOGS(_log, LOG_LVL_INFO, funcName << " WorkerKeysInfo name=" << workerName << " keyCount=" << nInfo.keyCount << " recentAdds=" << nInfo.recentAdds); bool valid = protoRange.valid(); - StringRange leftRange; - StringRange newLeftRange; + KeyRange leftRange; + KeyRange newLeftRange; if (valid) { - CompositeKey min(protoRange.minint(), protoRange.minstr()); - CompositeKey max(protoRange.maxint(), protoRange.maxstr()); + CompositeKey minKey(protoRange.minint(), protoRange.minstr()); + CompositeKey maxKey(protoRange.maxint(), protoRange.maxstr()); bool unlimited = protoRange.maxunlimited(); - leftRange.setMinMax(min, max, unlimited); + leftRange.setMinMax(minKey, maxKey, unlimited); LOGS(_log, LOG_LVL_WARN, funcName << " leftRange=" << leftRange); newLeftRange = _serverTcpBase->getCentralWorker()->updateRangeWithLeftData(leftRange); } @@ -524,7 +524,7 @@ void TcpBaseConnection::_handleShiftToRight1(boost::system::error_code const& ec if (keyCount != sz) { LOGS(_log, LOG_LVL_WARN, funcName << " keyCount(" << keyCount << ") != sz(" << sz << ")"); } - std::vector keyList; + std::vector keyList; for (int j=0; j < sz; ++j) { proto::KeyInfo const& protoKI = protoKeyList->keypair(j); ChunkSubchunk chSub(protoKI.chunk(), protoKI.subchunk()); diff --git a/core/modules/loader/ServerUdpBase.cc b/core/modules/loader/ServerUdpBase.cc index 460aa92f60..58bd18414c 100644 --- a/core/modules/loader/ServerUdpBase.cc +++ b/core/modules/loader/ServerUdpBase.cc @@ -84,32 +84,6 @@ void ServerUdpBase::_sendResponse() { void ServerUdpBase::sendBufferTo(std::string const& hostName, int port, BufferUdp& sendBuf) { -#if 0 // TODO Delete this if send_to proves to be safe. - // The socket is not thread safe. To send on "_socket", it needs to be an async send - // and then it needs to know when the message was sent so it can return and free the buffer. - using namespace boost::asio; - - std::mutex mtx; - std::condition_variable cv; - bool done = false; - - // This function will wait until callbackFunc is called before returning, so the references will be - // valid for the life of callbackFunc. - auto callbackFunc = [&mtx, &cv, &done](const boost::system::error_code& error, std::size_t bytesTransferred) { - { - std::lock_guard lock(mtx); - done = true; - } - cv.notify_one(); - }; - - ip::udp::endpoint dest(boost::asio::ip::address::from_string(hostName), port); - _socket.async_send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest, - callbackFunc); - - std::unique_lock uLock(mtx); - cv.wait(uLock, [&done](){return done;}); -#else using namespace boost::asio; LOGS(_log, LOG_LVL_DEBUG, "ServerUdpBase::sendBufferTo hostName=" << hostName << " port=" << port); try { @@ -118,9 +92,8 @@ void ServerUdpBase::sendBufferTo(std::string const& hostName, int port, BufferUd } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "ServerUdpBase::sendBufferTo boost system_error=" << e.what() << " host=" << hostName << " port=" << port << " buf=" << sendBuf); - throw e; + throw; } -#endif } @@ -153,24 +126,13 @@ void ServerUdpBase::_receivePrepare() { boost::asio::ip::udp::endpoint ServerUdpBase::resolve(std::string const& hostName, int port) { -#if 1 // &&& std::lock_guard lg(_resolveMtx); - /* &&& - using namespace boost::asio; - io_context ioContext; - ip::udp::resolver resolver(ioContext); - return *resolver.resolve(ip::udp::v4(), hostName, std::to_string(port)).begin(); - */ using namespace boost::asio; - // resolver returns an iterator. This uses the first item only. + // Resolver returns an iterator. This uses the first item only. + // Failure to resolve anything throws a boost::system::error. ip::udp::endpoint dest = *_resolver.resolve(ip::udp::v4(), hostName, std::to_string(port)).begin(); return dest; -#else - using namespace boost::asio; - ip::udp::endpoint dest(ip::address::from_string(hostName), port); - return dest; -#endif } diff --git a/core/modules/loader/ServerUdpBase.h b/core/modules/loader/ServerUdpBase.h index a7aeb3b1ed..e10e29a55d 100644 --- a/core/modules/loader/ServerUdpBase.h +++ b/core/modules/loader/ServerUdpBase.h @@ -90,9 +90,9 @@ class ServerUdpBase { /// Items for resolving UDP addresses /// There appear to be concurrency issues even with /// separate io_contexts, so re-using existing objects. - boost::asio::io_context _ioContext; - boost::asio::ip::udp::resolver _resolver{_ioContext}; - std::mutex _resolveMtx; ///< protects _ioContext, _resolver + boost::asio::io_context _ioContext; + boost::asio::ip::udp::resolver _resolver{_ioContext}; + std::mutex _resolveMtx; ///< protects _ioContext, _resolver }; diff --git a/core/modules/loader/WWorkerList.cc b/core/modules/loader/WWorkerList.cc index eac13949b1..619dd6cf65 100644 --- a/core/modules/loader/WWorkerList.cc +++ b/core/modules/loader/WWorkerList.cc @@ -82,13 +82,12 @@ util::CommandTracked::Ptr WWorkerList::createCommandWorker(CentralWorker* centra // Send the request to master. auto masterHost = _centralW->getMasterHostName(); auto masterPort = _centralW->getMasterPort(); - LOGS(_log, LOG_LVL_INFO, "&&&MastWorkerListReqCmd::action host=" << masterHost << " port=" << masterPort); + LOGS(_log, LOG_LVL_DEBUG, "MastWorkerListReqCmd::action host=" << masterHost << + " port=" << masterPort); try { _centralW->sendBufferTo(masterHost, masterPort, sendBuf); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "MastWorkerListReqCmd::action boost system_error=" << e.what()); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } /// Go through the existing list and add any that have not been add to the doList @@ -212,7 +211,7 @@ std::string WWorkerList::dump() const { // TODO believe our neighbors range over the master void WWorkerList::updateEntry(uint32_t wId, std::string const& ip, int portUdp, int portTcp, - StringRange& strRange) { + KeyRange& strRange) { std::unique_lock lk(_mapMtx); auto iter = _wIdMap.find(wId); if (iter == _wIdMap.end()) { @@ -356,11 +355,9 @@ util::CommandTracked::Ptr WWorkerListItem::createCommandWorkerInfoReq(CentralWor auto masterPort = _centralW->getMasterPort(); try { _centralW->sendBufferTo(masterHost, masterPort, sendBuf); - } catch (boost::system::system_error e) { + } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "WorkerReqCmd::action boost system_error=" << e.what() << " wId=" << _wId); - exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, - // so just blow up so it's unmistakable something bad happened for now. } } diff --git a/core/modules/loader/WWorkerList.h b/core/modules/loader/WWorkerList.h index 54d07080e5..90fe507025 100644 --- a/core/modules/loader/WWorkerList.h +++ b/core/modules/loader/WWorkerList.h @@ -127,7 +127,7 @@ class WWorkerList : public DoListItem { void updateEntry(uint32_t wId, std::string const& ipUdp, int portUdp, int portTcp, - StringRange& strRange); + KeyRange& strRange); WWorkerListItem::Ptr findWorkerForKey(CompositeKey const& key); std::string dump() const; @@ -138,7 +138,7 @@ class WWorkerList : public DoListItem { CentralWorker* _central; std::map _wIdMap; ///< worker id map std::map _ipMap; - std::map _rangeMap; + std::map _rangeMap; bool _wListChanged{false}; ///< true if the list has changed uint32_t _totalNumberOfWorkers{0}; ///< total number of workers according to the master. mutable std::mutex _mapMtx; ///< protects _wIdMap, _ipMap, _rangeMap, _wListChanged diff --git a/core/modules/loader/WorkerConfig.h b/core/modules/loader/WorkerConfig.h index 95f74d86fd..ca37aa521b 100644 --- a/core/modules/loader/WorkerConfig.h +++ b/core/modules/loader/WorkerConfig.h @@ -52,6 +52,7 @@ class WorkerConfig : public ConfigBase { double getThresholdNeighborShift() const { return _thresholdNeighborShift->getDouble(); } int getMaxKeysToShift() const { return _maxKeysToShift->getInt(); } int getLoopSleepTime() const { return _loopSleepTime->getInt(); } + int getIOThreads() const { return _iOThreads->getInt(); } std::ostream& dump(std::ostream &os) const override; @@ -89,6 +90,9 @@ class WorkerConfig : public ConfigBase { /// Time spent sleeping between checking elements in the DoList in micro seconds. 100000 ConfigElement::Ptr _loopSleepTime{ ConfigElement::create(cfgList, header, "loopSleepTime", ConfigElement::INT, false, "100000")}; + /// Number of IO threads the server should run. + ConfigElement::Ptr _iOThreads{ + ConfigElement::create(cfgList, header, "iOThreads", ConfigElement::INT, false, "5")}; }; diff --git a/core/modules/loader/WorkerListItemBase.cc b/core/modules/loader/WorkerListItemBase.cc index cb6be9839c..3c98b932dc 100644 --- a/core/modules/loader/WorkerListItemBase.cc +++ b/core/modules/loader/WorkerListItemBase.cc @@ -40,7 +40,7 @@ namespace qserv { namespace loader { -StringRange WorkerListItemBase::setRangeString(StringRange const& strRange) { +KeyRange WorkerListItemBase::setRangeString(KeyRange const& strRange) { std::lock_guard lck(_mtx); auto oldRange = _range; _range = strRange; diff --git a/core/modules/loader/WorkerListItemBase.h b/core/modules/loader/WorkerListItemBase.h index edc842bfc7..ac988c292b 100644 --- a/core/modules/loader/WorkerListItemBase.h +++ b/core/modules/loader/WorkerListItemBase.h @@ -30,10 +30,11 @@ #include #include + // Qserv headers -#include "loader/Updateable.h" +#include "loader/KeyRange.h" #include "loader/NetworkAddress.h" -#include "loader/StringRange.h" +#include "loader/Updateable.h" namespace lsst { @@ -54,10 +55,10 @@ class WorkerListItemBase : public std::enable_shared_from_this lck(_mtx); return _range; } @@ -100,7 +101,7 @@ class WorkerListItemBase : public std::enable_shared_from_thisworkerKeyInsertReq(inMsg, data); break; - case LoaderMsg::KEY_INFO_REQ: + case LoaderMsg::KEY_LOOKUP_REQ: _centralWorker->workerKeyInfoReq(inMsg, data); break; case LoaderMsg::WORKER_KEYS_INFO_REQ: @@ -87,7 +87,7 @@ BufferUdp::Ptr WorkerServer::parseMsg(BufferUdp::Ptr const& data, break; // Following not expected by worker - case LoaderMsg::KEY_INFO: + case LoaderMsg::KEY_LOOKUP: case LoaderMsg::MAST_INFO_REQ: case LoaderMsg::MAST_WORKER_LIST_REQ: case LoaderMsg::MAST_WORKER_INFO_REQ: diff --git a/core/modules/loader/appClientNum.cc b/core/modules/loader/appClientNum.cc index fec57838a3..b2f36404bc 100644 --- a/core/modules/loader/appClientNum.cc +++ b/core/modules/loader/appClientNum.cc @@ -22,19 +22,16 @@ */ -// Class header -#include "loader/CentralClient.h" -#include "loader/ClientConfig.h" - // System headers #include #include // Third-party headers - +#include "boost/lexical_cast.hpp" // qserv headers - +#include "loader/CentralClient.h" +#include "loader/ClientConfig.h" // LSST headers #include "lsst/log/Log.h" @@ -68,10 +65,21 @@ bool keyInsertListClean(KeyInfoDataList& kList, int& successCount, int& failedCo } +/// Get a repeatable value for the chunk and subchunk numbers. It's arbitrary for +/// the test as there just needs to be some check that what was written in for +/// the key is the same as what was read +int calcChunkFrom(uint64_t j) { + return j % 10000; +} +int calcSubchunkFrom(uint64_t j) { + return j % 100; +} + + KeyInfoData::Ptr clientAdd(CentralClient& central, uint64_t j) { CompositeKey cKey(j); - int chunk = j%10000; - int subchunk = j%100; + int chunk = calcChunkFrom(j); + int subchunk = calcSubchunkFrom(j); return central.keyInsertReq(cKey, chunk, subchunk); } @@ -85,14 +93,14 @@ bool keyLookupListClean(KeyInfoDataList& kList, int& successCount, int& failedCo // check the values uint64_t j = kPtr->key.kInt; // expected chunk and subchunk values. - int expChunk = j%10000; - int expSubchunk = j%100; + int expChunk = calcChunkFrom(j); + int expSubchunk = calcSubchunkFrom(j); if (kPtr->chunk == expChunk && kPtr->subchunk == expSubchunk) { ++successCount; } else { ++failedCount; - LOGS(_log, LOG_LVL_WARN, "lookup failed bad values expected c=" << expChunk << - " sc=" << expSubchunk << " found=" << *kPtr); + LOGS(_log, LOG_LVL_WARN, "lookup failed, bad values, expected c=" << expChunk << + " sc=" << expSubchunk << " found=" << *kPtr); } } else { ++failedCount; @@ -109,7 +117,7 @@ bool keyLookupListClean(KeyInfoDataList& kList, int& successCount, int& failedCo KeyInfoData::Ptr clientAddLookup(CentralClient& central, uint64_t j) { CompositeKey cKey(j); - return central.keyInfoReq(cKey); + return central.keyLookupReq(cKey); } @@ -117,25 +125,21 @@ int main(int argc, char* argv[]) { std::string cCfgFile("core/modules/loader/config/client1.cnf"); if (argc < 3) { LOGS(_log, LOG_LVL_ERROR, "usage: appClientNum "); - exit(-1); + return 1; } - uint64_t numStart = std::stoi(argv[1]); - uint64_t numEnd = std::stoi(argv[2]); + uint64_t numStart = boost::lexical_cast(argv[1]); + uint64_t numEnd = boost::lexical_cast(argv[2]); if (argc > 3) { cCfgFile = argv[3]; } LOGS(_log, LOG_LVL_INFO, "start=" << numStart << " end=" << numEnd << " cCfg=" << cCfgFile); - - std::string ourHost; - { - char hName[300]; - if (gethostname(hName, sizeof(hName)) < 0) { - LOGS(_log, LOG_LVL_ERROR, "Failed to get host name errno=" << errno); - exit(-1); - } - ourHost = hName; + if (numEnd == 0) { + LOGS(_log, LOG_LVL_ERROR, "end cannot equal 0"); + return 1; } + + std::string const ourHost = boost::asio::ip::host_name(); boost::asio::io_service ioService; ClientConfig cCfg(cCfgFile); @@ -144,35 +148,41 @@ int main(int argc, char* argv[]) { cClient.start(); } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "cWorker.start() failed e=" << e.what()); - exit(-1); + return 1; } - // Need to start several threads so messages aren't dropped while being processed. - cClient.run(); - cClient.run(); - cClient.run(); + cClient.runServer(); KeyInfoDataList kList; int successCount = 0; int failedCount = 0; + int totalKeyCount = 0; + + TimeOut::TimePoint insertBegin = TimeOut::Clock::now(); + if (numEnd >= numStart) { + totalKeyCount = (numEnd - numStart) + 1; for (uint64_t j=numStart; j<=numEnd; ++j) { kList.push_back(clientAdd(cClient, j)); // occasionally trim the list - if (j%1000 == 0) keyInsertListClean(kList, successCount, failedCount); + if (j%10000 == 0) keyInsertListClean(kList, successCount, failedCount); } } else { + totalKeyCount = (numStart - numEnd) + 1; for (uint64_t j=numStart; j>=numEnd; --j) { kList.push_back(clientAdd(cClient, j)); // occasionally trim the list - if (j%1000 == 0) keyInsertListClean(kList, successCount, failedCount); + if (j%10000 == 0) keyInsertListClean(kList, successCount, failedCount); } } int count = 0; // If all the requests are done, the list should be empty. - // it should be done well before 100 seconds (TODO: maybe 1 second per 1000 keys) - while (!keyInsertListClean(kList, successCount, failedCount) && count < 100) { + // Wait up to 1 second per 1000 keys. (System does a bit better than 1000keys per second.) + int waitForKeysCount = totalKeyCount/1000; + int maxWaitCount = 16; // minimum wait to allow for 3 or 4 retries. + if (waitForKeysCount > maxWaitCount) maxWaitCount = waitForKeysCount; + while (!keyInsertListClean(kList, successCount, failedCount) && count < waitForKeysCount) { LOGS(_log, LOG_LVL_INFO, "waiting for inserts to finish count=" << count); sleep(1); ++count; @@ -191,12 +201,13 @@ int main(int argc, char* argv[]) { if (!kList.empty() || failedCount > 0) { LOGS(_log, LOG_LVL_ERROR, "FAILED to insert all elements. success=" << successCount << " failed=" << failedCount << " size=" << kList.size()); - exit(-1); + return 1; } LOGS(_log, LOG_LVL_INFO, "inserted all elements. success=" << successCount << " failed=" << failedCount << " size=" << kList.size()); + TimeOut::TimePoint insertEnd = TimeOut::Clock::now(); // Lookup answers auto nStart = numStart; @@ -210,13 +221,13 @@ int main(int argc, char* argv[]) { for (uint64_t j=nStart; j<=nEnd; ++j) { kList.push_back(clientAddLookup(cClient, j)); // occasionally trim the list - if (j%1000 == 0) keyLookupListClean(kList, successCount, failedCount); + if (j%10000 == 0) keyLookupListClean(kList, successCount, failedCount); } count = 0; // If all the requests are done, the list should be empty. - // it should be done well before 100 seconds (TODO: maybe 1 second per 1000 keys) - while (!keyLookupListClean(kList, successCount, failedCount) && count < 100) { + // About 1 second per 1000 keys) + while (!keyLookupListClean(kList, successCount, failedCount) && count < waitForKeysCount) { LOGS(_log, LOG_LVL_INFO, "waiting for lookups to finish count=" << count); sleep(1); ++count; @@ -234,15 +245,21 @@ int main(int argc, char* argv[]) { if (!kList.empty() || failedCount > 0) { LOGS(_log, LOG_LVL_ERROR, "FAILED to lookup all elements. success=" << successCount << " failed=" << failedCount << " size=" << kList.size()); - exit(-1); + return 1; } LOGS(_log, LOG_LVL_INFO, "lookup all elements. success=" << successCount << " failed=" << failedCount << " size=" << kList.size()); + TimeOut::TimePoint lookupEnd = TimeOut::Clock::now(); - ioService.stop(); // this doesn't seem to work cleanly + LOGS(_log, LOG_LVL_INFO, "inserts seconds=" << + std::chrono::duration_cast(insertEnd - insertBegin).count()); + LOGS(_log, LOG_LVL_INFO, "lookups seconds=" << + std::chrono::duration_cast(lookupEnd - insertEnd).count()); + ioService.stop(); LOGS(_log, LOG_LVL_INFO, "client DONE"); + return 0; } diff --git a/core/modules/loader/appMaster.cc b/core/modules/loader/appMaster.cc index a26790e6bc..e72147d89e 100644 --- a/core/modules/loader/appMaster.cc +++ b/core/modules/loader/appMaster.cc @@ -22,17 +22,11 @@ */ -// Class header -#include "loader/CentralMaster.h" - // System headers #include -// Third-party headers - - // qserv headers - +#include "loader/CentralMaster.h" // LSST headers #include "lsst/log/Log.h" @@ -52,16 +46,7 @@ int main(int argc, char* argv[]) { } LOGS(_log, LOG_LVL_INFO, "masterCfg=" << mCfgFile); - std::string ourHost; - { - char hName[300]; - if (gethostname(hName, sizeof(hName)) < 0) { - LOGS(_log, LOG_LVL_ERROR, "Failed to get host name errno=" << errno); - exit(-1); - } - ourHost = hName; - } - + std::string const ourHost = boost::asio::ip::host_name(); boost::asio::io_service ioService; MasterConfig mCfg(mCfgFile); @@ -70,21 +55,15 @@ int main(int argc, char* argv[]) { cMaster.start(); } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "cMaster.start() failed e=" << e.what()); - exit(-1); + return 1; } - cMaster.setMaxKeysPerWorker(100); // delete - // Need to start several threads so messages aren't dropped while being processed. - cMaster.run(); - cMaster.run(); - cMaster.run(); - cMaster.run(); - cMaster.run(); + cMaster.runServer(); bool loop = true; while(loop) { sleep(10); } - ioService.stop(); // this doesn't seem to work cleanly + ioService.stop(); LOGS(_log, LOG_LVL_INFO, "master DONE"); } diff --git a/core/modules/loader/appTest.cc b/core/modules/loader/appTest.cc index 9e5a07879f..eaec39bdc6 100644 --- a/core/modules/loader/appTest.cc +++ b/core/modules/loader/appTest.cc @@ -151,7 +151,7 @@ int main(int argc, char* argv[]) { } } LOGS(_log, LOG_LVL_INFO, "data:" << data.dumpStr()); - } catch (LoaderMsgErr& ex) { + } catch (LoaderMsgErr const& ex) { LOGS(_log, LOG_LVL_ERROR, "Write to buffer FAILED msg=" << ex.what()); exit(-1); } @@ -179,7 +179,7 @@ int main(int argc, char* argv[]) { LOGS(_log, LOG_LVL_INFO, "matched " << ele->getStringVal()); } } - } catch (LoaderMsgErr& ex) { + } catch (LoaderMsgErr const& ex) { LOGS(_log, LOG_LVL_ERROR, "Read from buffer FAILED msg=" << ex.what()); exit(-1); } @@ -222,7 +222,7 @@ int main(int argc, char* argv[]) { LOGS(_log, LOG_LVL_INFO, "ServTcpBase e"); sleep(5); } - catch (std::exception& e) { + catch (std::exception const& e) { std::cerr << e.what() << std::endl; } } @@ -309,11 +309,7 @@ int main(int argc, char* argv[]) { } cMaster.setMaxKeysPerWorker(4); // Need to start several threads so messages aren't dropped while being processed. - cMaster.run(); - cMaster.run(); - cMaster.run(); - cMaster.run(); - cMaster.run(); + cMaster.runServer(); /// Start worker server 1 CentralWorker wCentral1(ioServiceWorker1, ioContext1, ourHost, workerCfg1); @@ -324,10 +320,7 @@ int main(int argc, char* argv[]) { exit(-1); } - wCentral1.run(); - wCentral1.run(); - wCentral1.run(); - + wCentral1.runServer(); /// Start worker server 2 CentralWorker wCentral2(ioServiceWorker2, ioContext2, ourHost, workerCfg2); @@ -337,10 +330,7 @@ int main(int argc, char* argv[]) { LOGS(_log, LOG_LVL_ERROR, "wCentral2.start() failed e=" << e.what()); exit(-1); } - wCentral2.run(); - wCentral2.run(); - wCentral2.run(); - + wCentral2.runServer(); ClientConfig clientCfg1("core/modules/loader/config/client1.cnf"); LOGS(_log, LOG_LVL_INFO, "clientCfg1=" << clientCfg1); @@ -351,7 +341,7 @@ int main(int argc, char* argv[]) { LOGS(_log, LOG_LVL_ERROR, "cCentral1A.start() failed e=" << e.what()); exit(-1); } - cCentral1A.run(); + cCentral1A.runServer(); ClientConfig clientCfg2("core/modules/loader/config/client2.cnf"); @@ -363,7 +353,7 @@ int main(int argc, char* argv[]) { LOGS(_log, LOG_LVL_ERROR, "cCentral2A.start() failed e=" << e.what()); exit(-1); } - cCentral2A.run(); + cCentral2A.runServer(); ClientConfig clientCfg3("core/modules/loader/config/client3.cnf"); LOGS(_log, LOG_LVL_INFO, "clientCfg3=" << clientCfg3); @@ -374,7 +364,7 @@ int main(int argc, char* argv[]) { LOGS(_log, LOG_LVL_ERROR, "cCentral1B.start() failed e=" << e.what()); exit(-1); } - cCentral1B.run(); + cCentral1B.runServer(); /// Unknown message kind test. Pretending to be worker1. @@ -435,7 +425,7 @@ int main(int argc, char* argv[]) { std::string reversed(bStr.rbegin(), bStr.rend()); LOGS(_log, LOG_LVL_INFO, bStr << " newKey=" << reversed << " j(" << j%10 << " ," << j << ")"); keyList.emplace_back(CompositeKey(reversed), j%10, j); - bStr = StringRange::incrementString(bStr, '0'); + bStr = KeyRange::incrementString(bStr, '0'); } } @@ -460,9 +450,9 @@ int main(int argc, char* argv[]) { // Retrieve keyA and keyB { LOGS(_log, LOG_LVL_INFO, "5TSTAGE client retrieve keyB keyA"); - auto keyBInfo = cCentral1A.keyInfoReq(keyB.key); - auto keyAInfo = cCentral1A.keyInfoReq(keyA.key); - auto keyCInfo = cCentral1A.keyInfoReq(keyC.key); + auto keyBInfo = cCentral1A.keyLookupReq(keyB.key); + auto keyAInfo = cCentral1A.keyLookupReq(keyA.key); + auto keyCInfo = cCentral1A.keyLookupReq(keyC.key); keyAInfo->waitComplete(); keyBInfo->waitComplete(); @@ -503,15 +493,15 @@ int main(int argc, char* argv[]) { LOGS(_log, LOG_LVL_INFO, "keyC inserted."); } - auto keyAInfo = cCentral1A.keyInfoReq(keyA.key); + auto keyAInfo = cCentral1A.keyLookupReq(keyA.key); LOGS(_log, LOG_LVL_INFO, "6TSTAGE waiting A"); keyAInfo->waitComplete(); - auto keyBInfo = cCentral2A.keyInfoReq(keyB.key); + auto keyBInfo = cCentral2A.keyLookupReq(keyB.key); LOGS(_log, LOG_LVL_INFO, "6TSTAGE waiting B"); keyBInfo->waitComplete(); - auto keyCInfo = cCentral2A.keyInfoReq(keyC.key); + auto keyCInfo = cCentral2A.keyLookupReq(keyC.key); LOGS(_log, LOG_LVL_INFO, "6TSTAGE waiting C"); keyCInfo->waitComplete(); diff --git a/core/modules/loader/appWorker.cc b/core/modules/loader/appWorker.cc index ac04314318..5322197279 100644 --- a/core/modules/loader/appWorker.cc +++ b/core/modules/loader/appWorker.cc @@ -21,19 +21,12 @@ * see . */ - -// Class header -#include "loader/CentralWorker.h" - // System headers #include #include -// Third-party headers - - // qserv headers - +#include "loader/CentralWorker.h" // LSST headers #include "lsst/log/Log.h" @@ -53,16 +46,7 @@ int main(int argc, char* argv[]) { } LOGS(_log, LOG_LVL_INFO, "workerCfg=" << wCfgFile); - std::string ourHost; - { - char hName[300]; - if (gethostname(hName, sizeof(hName)) < 0) { - LOGS(_log, LOG_LVL_ERROR, "Failed to get host name errno=" << errno); - exit(-1); - } - ourHost = hName; - } - + std::string const ourHost = boost::asio::ip::host_name(); boost::asio::io_service ioService; boost::asio::io_context ioContext; @@ -72,13 +56,9 @@ int main(int argc, char* argv[]) { cWorker.start(); } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "cWorker.start() failed e=" << e.what()); - exit(-1); + return 1; } - - // Need to start several threads so messages aren't dropped while being processed. - cWorker.run(); - cWorker.run(); - cWorker.run(); + cWorker.runServer(); bool loop = true; while(loop) { diff --git a/core/modules/loader/testLoader.cc b/core/modules/loader/testLoader.cc index cb84ba265e..15ec59c439 100644 --- a/core/modules/loader/testLoader.cc +++ b/core/modules/loader/testLoader.cc @@ -245,6 +245,18 @@ BOOST_AUTO_TEST_CASE(LoaderTest) { BOOST_CHECK( (a >= b)); } + { + LOGS_INFO("Comparisons integer greater than"); + CompositeKey a(1000000, "a"); + CompositeKey b(30, "b"); + BOOST_CHECK(!(a == b)); + BOOST_CHECK( (a != b)); + BOOST_CHECK(!(a < b)); + BOOST_CHECK( (a > b)); + BOOST_CHECK(!(a <= b)); + BOOST_CHECK( (a >= b)); + } + { LOGS_INFO("Comparisons string equal"); CompositeKey a(0, "string%$testA");