diff --git a/core/modules/loader/BufferUdp.cc b/core/modules/loader/BufferUdp.cc index c7f0368cfc..a842c36a29 100644 --- a/core/modules/loader/BufferUdp.cc +++ b/core/modules/loader/BufferUdp.cc @@ -160,30 +160,41 @@ bool BufferUdp::retrieveString(std::string& out, size_t len) { std::string BufferUdp::dumpStr(bool hexDump, bool charDump) const { - std::stringstream os; - os << "maxLength=" << _length; - os << " buffer=" << (void*)_buffer; - os << " wCurLen=" << getAvailableWriteLength(); - os << " wCursor=" << (void*)_wCursor; - os << " rCurLen=" << getBytesLeftToRead(); - os << " rCursor=" << (void*)_rCursor; - os << " end=" << (void*)_end; - - // hex dump - if (hexDump) { - os << "("; - for (const char* j=_buffer; j < _wCursor; ++j) { - os << std::hex << (int)*j << " "; - } - os << ")"; - } + std::stringstream os; + dump(os, hexDump, charDump); + return os.str(); +} + - // character dump - if (charDump) { - os << "(" << std::string(_buffer, _wCursor) << ")"; +std::ostream& BufferUdp::dump(std::ostream &os, bool hexDump, bool charDump) const { + os << "maxLength=" << _length; + os << " buffer=" << (void*)_buffer; + os << " wCurLen=" << getAvailableWriteLength(); + os << " wCursor=" << (void*)_wCursor; + os << " rCurLen=" << getBytesLeftToRead(); + os << " rCursor=" << (void*)_rCursor; + os << " end=" << (void*)_end; + + // hex dump + if (hexDump) { + os << "("; + for (const char* j=_buffer; j < _wCursor; ++j) { + os << std::hex << (int)*j << " "; } + os << ")"; + } - return os.str(); + // character dump + if (charDump) { + os << "(" << std::string(_buffer, _wCursor) << ")"; } + return os; +} + + +std::ostream& operator<<(std::ostream& os, BufferUdp const& buf) { + return buf.dump(os, false, false); +} + }}} // namespace lsst:qserv:loader diff --git a/core/modules/loader/BufferUdp.h b/core/modules/loader/BufferUdp.h index 570388971c..bec71e42d3 100644 --- a/core/modules/loader/BufferUdp.h +++ b/core/modules/loader/BufferUdp.h @@ -138,6 +138,8 @@ class BufferUdp { /// in ascii. std::string dumpStr(bool hexDump, bool charDump) const; + std::ostream& dump(std::ostream &os, bool hexDump, bool charDump) const; + private: void _setupBuffer() { _end = _buffer + _length; @@ -161,6 +163,9 @@ class BufferUdp { bool _ourBuffer{false}; ///< true if this class object is responsible for deleting the buffer. }; +/// Print basic buffer information. Use BufferUdp::dump() directly if the buffer contents are needed. +std::ostream& operator<<(std::ostream& os, BufferUdp const& buf); + }}} // namespace lsst:qserv:loader #endif // LSST_QSERV_LOADER_BUFFERUDP_H diff --git a/core/modules/loader/Central.h b/core/modules/loader/Central.h index c32bd94861..ce610c286b 100644 --- a/core/modules/loader/Central.h +++ b/core/modules/loader/Central.h @@ -80,7 +80,7 @@ class Central { int getErrCount() const { return _server->getErrCount(); } /// Send the contents of 'sendBuf' to 'host:port'. This waits for the message to be - /// sent before returning. + /// sent before returning. Throws boost::system::system_error on failure. void sendBufferTo(std::string const& host, int port, BufferUdp& sendBuf) { _server->sendBufferTo(host, port, sendBuf); } diff --git a/core/modules/loader/CentralClient.cc b/core/modules/loader/CentralClient.cc index f222e5a5ad..730b334874 100644 --- a/core/modules/loader/CentralClient.cc +++ b/core/modules/loader/CentralClient.cc @@ -55,7 +55,8 @@ CentralClient::CentralClient(boost::asio::io_service& ioService_, _defWorkerHost(cfg.getDefWorkerHost()), _defWorkerPortUdp(cfg.getDefWorkerPortUdp()), _doListMaxLookups(cfg.getMaxLookups()), - _doListMaxInserts(cfg.getMaxInserts()) { + _doListMaxInserts(cfg.getMaxInserts()), + _maxRequestSleepTime(cfg.getMaxRequestSleepTime()){ } @@ -89,7 +90,7 @@ void CentralClient::_handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptrkeyint(), protoData->keystr()); ChunkSubchunk chunkInfo(protoData->chunk(), protoData->subchunk()); - LOGS(_log, LOG_LVL_INFO, "trying to remove oneShot for lookup key=" << key << " " << chunkInfo); + LOGS(_log, LOG_LVL_DEBUG, "trying to remove oneShot for lookup key=" << key << " " << chunkInfo); /// Locate the original one shot and mark it as done. CentralClient::KeyInfoReqOneShot::Ptr keyInfoOneShot; { @@ -227,8 +228,14 @@ void CentralClient::_keyInsertReq(CompositeKey const& key, int chunk, int subchu StringElement strElem; protoKeyInsert.SerializeToString(&(strElem.element)); strElem.appendToData(msgData); - - sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData); + try { + sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInsertReq boost system_error=" << e.what() << + " key=" << key << " chunk=" << chunk << " sub=" << subchunk); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } } @@ -296,7 +303,14 @@ void CentralClient::_keyInfoReq(CompositeKey const& key) { protoKeyInsert.SerializeToString(&(strElem.element)); strElem.appendToData(msgData); - sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData); + try { + sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInfoReq boost system_error=" << e.what() << + " key=" << key); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought. + // So just blow up so it's unmistakable something bad happened for now. + } } diff --git a/core/modules/loader/CentralClient.h b/core/modules/loader/CentralClient.h index 4d2a452211..e529588dd5 100644 --- a/core/modules/loader/CentralClient.h +++ b/core/modules/loader/CentralClient.h @@ -164,9 +164,9 @@ class CentralClient : public Central { const int _defWorkerPortUdp; ///< Default worker UDP port - size_t _doListMaxLookups; ///< Maximum number of concurrent lookups in DoList DM-16555 &&& - size_t _doListMaxInserts; ///< Maximum number of concurrent inserts in DoList DM-16555 &&& - int _maxRequestSleepTime{100000}; ///< Time to sleep between checking requests when at max length &&& add config file entry + size_t _doListMaxLookups{1000}; ///< Maximum number of concurrent lookups in DoList (set by config) + size_t _doListMaxInserts{1000}; ///< Maximum number of concurrent inserts in DoList (set by config) + int _maxRequestSleepTime{100000}; ///< Time to sleep between checking requests when at max length (set by config) std::map _waitingKeyInsertMap; std::mutex _waitingKeyInsertMtx; ///< protects _waitingKeyInsertMap, _doListMaxInserts diff --git a/core/modules/loader/CentralMaster.cc b/core/modules/loader/CentralMaster.cc index fee1c14146..36fad0b49b 100644 --- a/core/modules/loader/CentralMaster.cc +++ b/core/modules/loader/CentralMaster.cc @@ -97,7 +97,15 @@ void CentralMaster::setWorkerNeighbor(MWorkerListItem::WPtr const& target, int m UInt32Element neighborIdElem(neighborId); neighborIdElem.appendToData(msgData); auto addr = targetWorker->getUdpAddress(); - sendBufferTo(addr.ip, addr.port, msgData); + try { + sendBufferTo(addr.ip, addr.port, msgData); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "CentralMaster::setWorkerNeighbor boost system_error=" << e.what() << + " targ=" << *targetWorker << " msg=" << message << + " neighborId=" << neighborId); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } } @@ -196,7 +204,15 @@ void CentralMaster::reqWorkerKeysInfo(uint64_t msgId, std::string const& targetI LoaderMsg reqMsg(LoaderMsg::WORKER_KEYS_INFO_REQ, msgId, ourHostName, ourPort); BufferUdp data; reqMsg.appendToData(data); - sendBufferTo(targetIp, targetPort, data); + try { + sendBufferTo(targetIp, targetPort, data); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "CentralMaster::reqWorkerKeysInfo boost system_error=" << e.what() << + " msgId=" << msgId << " tIp=" << targetIp << " tPort=" << targetPort << + " ourHost=" << ourHostName << " ourPort=" << ourPort); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } } }}} // namespace lsst::qserv::loader diff --git a/core/modules/loader/CentralWorker.cc b/core/modules/loader/CentralWorker.cc index 378e858358..f8da2fd19f 100644 --- a/core/modules/loader/CentralWorker.cc +++ b/core/modules/loader/CentralWorker.cc @@ -804,7 +804,14 @@ void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr< protoReply.SerializeToString(&(strElem.element)); strElem.appendToData(msgData); LOGS(_log, LOG_LVL_INFO, "sending complete " << key << " to " << nAddr << " from " << _ourId); - sendBufferTo(nAddr.ip, nAddr.port, msgData); + try { + sendBufferTo(nAddr.ip, nAddr.port, msgData); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_workerKeyInsertReq boost system_error=" << e.what() << + " msg=" << inMsg); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } } else { lck.unlock(); // Find the target range in the list and send the request there @@ -842,7 +849,14 @@ void CentralWorker::_forwardKeyInsertRequest(NetworkAddress const& targetAddr, L StringElement strElem; protoData->SerializeToString(&(strElem.element)); strElem.appendToData(msgData); - sendBufferTo(targetAddr.ip, targetAddr.port, msgData); + try { + sendBufferTo(targetAddr.ip, targetAddr.port, msgData); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_forwardKeyInsertRequest boost system_error=" << e.what() << + " tAddr=" << targetAddr << " inMsg=" << inMsg); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } } @@ -909,7 +923,14 @@ void CentralWorker::_workerKeyInfoReq(LoaderMsg const& inMsg, std::unique_ptrsendBufferTo(ip, port, *_stateListData); + try { + _central->sendBufferTo(ip, port, *_stateListData); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "MWorkerList::sendListTo boost system_error=" << e.what() << + " msgId=" << msgId << " ip=" << ip << " port=" << port << + " ourName=" << ourHostName << " ourPort=" << ourPort); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } } // See if this worker is know. diff --git a/core/modules/loader/MasterServer.cc b/core/modules/loader/MasterServer.cc index c4aef132be..e5663d61fd 100644 --- a/core/modules/loader/MasterServer.cc +++ b/core/modules/loader/MasterServer.cc @@ -262,8 +262,14 @@ BufferUdp::Ptr MasterServer::workerInfoRequest(LoaderMsg const& inMsg, BufferUdp seItem.appendToData(sendBuf); // Send the response to the worker that asked for it. - _centralMaster->sendBufferTo(requestorAddr->ip, requestorAddr->port, sendBuf); - + try { + _centralMaster->sendBufferTo(requestorAddr->ip, requestorAddr->port, sendBuf); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "MasterServer::workerInfoRequest boost system_error=" << e.what() << + " inMsg=" << inMsg); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } } catch (LoaderMsgErr &msgErr) { LOGS(_log, LOG_LVL_ERROR, msgErr.what()); return prepareReplyMsg(senderEndpoint, inMsg, LoaderMsg::STATUS_PARSE_ERR, msgErr.what()); diff --git a/core/modules/loader/ServerUdpBase.cc b/core/modules/loader/ServerUdpBase.cc index ac252f3796..460aa92f60 100644 --- a/core/modules/loader/ServerUdpBase.cc +++ b/core/modules/loader/ServerUdpBase.cc @@ -111,9 +111,15 @@ void ServerUdpBase::sendBufferTo(std::string const& hostName, int port, BufferUd cv.wait(uLock, [&done](){return done;}); #else using namespace boost::asio; - LOGS(_log, LOG_LVL_INFO, "ServerUdpBase::sendBufferTo hostName=" << hostName << " port=" << port); // &&& - ip::udp::endpoint dest(boost::asio::ip::address::from_string(hostName), port); - _socket.send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest); + LOGS(_log, LOG_LVL_DEBUG, "ServerUdpBase::sendBufferTo hostName=" << hostName << " port=" << port); + try { + ip::udp::endpoint dest = resolve(hostName, port); + _socket.send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest); + } catch (boost::system::system_error const& e) { + LOGS(_log, LOG_LVL_ERROR, "ServerUdpBase::sendBufferTo boost system_error=" << e.what() << + " host=" << hostName << " port=" << port << " buf=" << sendBuf); + throw e; + } #endif } @@ -147,15 +153,24 @@ void ServerUdpBase::_receivePrepare() { boost::asio::ip::udp::endpoint ServerUdpBase::resolve(std::string const& hostName, int port) { - /* More flexible version +#if 1 // &&& + std::lock_guard lg(_resolveMtx); + /* &&& using namespace boost::asio; io_context ioContext; ip::udp::resolver resolver(ioContext); - return *resolver.resolve(udp::v4(), hostName, std::to_string(port)).begin(); + return *resolver.resolve(ip::udp::v4(), hostName, std::to_string(port)).begin(); */ + using namespace boost::asio; + // resolver returns an iterator. This uses the first item only. + ip::udp::endpoint dest = + *_resolver.resolve(ip::udp::v4(), hostName, std::to_string(port)).begin(); + return dest; +#else using namespace boost::asio; ip::udp::endpoint dest(ip::address::from_string(hostName), port); return dest; +#endif } diff --git a/core/modules/loader/ServerUdpBase.h b/core/modules/loader/ServerUdpBase.h index 17e4a88d39..a7aeb3b1ed 100644 --- a/core/modules/loader/ServerUdpBase.h +++ b/core/modules/loader/ServerUdpBase.h @@ -62,7 +62,10 @@ class ServerUdpBase { uint32_t getErrCount() const { return _errCount; } /// This waits for the message to be sent before returning. + /// throws boost::system::system_error on failure. void sendBufferTo(std::string const& host, int port, BufferUdp& sendBuf); + + /// This throws boost::system::system_error on failure. boost::asio::ip::udp::endpoint resolve(std::string const& hostName, int port); protected: @@ -83,6 +86,13 @@ class ServerUdpBase { BufferUdp::Ptr _sendData; ///< data buffer for sending. std::string _hostName; int _port; + + /// Items for resolving UDP addresses + /// There appear to be concurrency issues even with + /// separate io_contexts, so re-using existing objects. + boost::asio::io_context _ioContext; + boost::asio::ip::udp::resolver _resolver{_ioContext}; + std::mutex _resolveMtx; ///< protects _ioContext, _resolver }; diff --git a/core/modules/loader/WWorkerList.cc b/core/modules/loader/WWorkerList.cc index a237ee4bb3..eac13949b1 100644 --- a/core/modules/loader/WWorkerList.cc +++ b/core/modules/loader/WWorkerList.cc @@ -82,7 +82,14 @@ util::CommandTracked::Ptr WWorkerList::createCommandWorker(CentralWorker* centra // Send the request to master. auto masterHost = _centralW->getMasterHostName(); auto masterPort = _centralW->getMasterPort(); - _centralW->sendBufferTo(masterHost, masterPort, sendBuf); + LOGS(_log, LOG_LVL_INFO, "&&&MastWorkerListReqCmd::action host=" << masterHost << " port=" << masterPort); + try { + _centralW->sendBufferTo(masterHost, masterPort, sendBuf); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "MastWorkerListReqCmd::action boost system_error=" << e.what()); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } /// Go through the existing list and add any that have not been add to the doList for (auto const& item : _wIdMap) { @@ -347,7 +354,14 @@ util::CommandTracked::Ptr WWorkerListItem::createCommandWorkerInfoReq(CentralWor // Send the request to master. auto masterHost = _centralW->getMasterHostName(); auto masterPort = _centralW->getMasterPort(); - _centralW->sendBufferTo(masterHost, masterPort, sendBuf); + try { + _centralW->sendBufferTo(masterHost, masterPort, sendBuf); + } catch (boost::system::system_error e) { + LOGS(_log, LOG_LVL_ERROR, "WorkerReqCmd::action boost system_error=" << e.what() << + " wId=" << _wId); + exit(-1); // TODO:&&& The correct course of action is unclear and requires thought, + // so just blow up so it's unmistakable something bad happened for now. + } } private: diff --git a/core/modules/loader/appClientNum.cc b/core/modules/loader/appClientNum.cc new file mode 100644 index 0000000000..fec57838a3 --- /dev/null +++ b/core/modules/loader/appClientNum.cc @@ -0,0 +1,248 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * Copyright 2019 AURA/LSST. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + + +// Class header +#include "loader/CentralClient.h" +#include "loader/ClientConfig.h" + +// System headers +#include +#include + +// Third-party headers + + +// qserv headers + + +// LSST headers +#include "lsst/log/Log.h" + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.loader.appClient"); +} + +using namespace lsst::qserv::loader; +using boost::asio::ip::udp; + +typedef std::list KeyInfoDataList; + +/// @return true if the list is empty. +bool keyInsertListClean(KeyInfoDataList& kList, int& successCount, int& failedCount) { + for(auto iter=kList.begin(); iter != kList.end();) { + if (*iter == nullptr || (*iter)->isFinished()) { + KeyInfoData::Ptr const& kPtr = *iter; + if (kPtr->success) { + ++successCount; + } else { + ++failedCount; + LOGS(_log, LOG_LVL_WARN, "insert failed " << *kPtr); + } + iter = kList.erase(iter); + } else { + ++iter; + } + } + return kList.empty(); +} + + +KeyInfoData::Ptr clientAdd(CentralClient& central, uint64_t j) { + CompositeKey cKey(j); + int chunk = j%10000; + int subchunk = j%100; + return central.keyInsertReq(cKey, chunk, subchunk); +} + + +/// @return true if the list is empty. +bool keyLookupListClean(KeyInfoDataList& kList, int& successCount, int& failedCount) { + for(auto iter=kList.begin(); iter != kList.end();) { + if (*iter == nullptr || (*iter)->isFinished()) { + KeyInfoData::Ptr const& kPtr = *iter; + if (kPtr->success) { + // check the values + uint64_t j = kPtr->key.kInt; + // expected chunk and subchunk values. + int expChunk = j%10000; + int expSubchunk = j%100; + if (kPtr->chunk == expChunk && kPtr->subchunk == expSubchunk) { + ++successCount; + } else { + ++failedCount; + LOGS(_log, LOG_LVL_WARN, "lookup failed bad values expected c=" << expChunk << + " sc=" << expSubchunk << " found=" << *kPtr); + } + } else { + ++failedCount; + LOGS(_log, LOG_LVL_WARN, "lookup failed " << *kPtr); + } + iter = kList.erase(iter); + } else { + ++iter; + } + } + return kList.empty(); +} + + +KeyInfoData::Ptr clientAddLookup(CentralClient& central, uint64_t j) { + CompositeKey cKey(j); + return central.keyInfoReq(cKey); +} + + +int main(int argc, char* argv[]) { + std::string cCfgFile("core/modules/loader/config/client1.cnf"); + if (argc < 3) { + LOGS(_log, LOG_LVL_ERROR, "usage: appClientNum "); + exit(-1); + } + uint64_t numStart = std::stoi(argv[1]); + uint64_t numEnd = std::stoi(argv[2]); + if (argc > 3) { + cCfgFile = argv[3]; + } + LOGS(_log, LOG_LVL_INFO, "start=" << numStart << " end=" << numEnd << " cCfg=" << cCfgFile); + + std::string ourHost; + { + char hName[300]; + if (gethostname(hName, sizeof(hName)) < 0) { + LOGS(_log, LOG_LVL_ERROR, "Failed to get host name errno=" << errno); + exit(-1); + } + ourHost = hName; + } + + boost::asio::io_service ioService; + + ClientConfig cCfg(cCfgFile); + CentralClient cClient(ioService, ourHost, cCfg); + try { + cClient.start(); + } catch (boost::system::system_error const& e) { + LOGS(_log, LOG_LVL_ERROR, "cWorker.start() failed e=" << e.what()); + exit(-1); + } + + // Need to start several threads so messages aren't dropped while being processed. + cClient.run(); + cClient.run(); + cClient.run(); + + KeyInfoDataList kList; + int successCount = 0; + int failedCount = 0; + if (numEnd >= numStart) { + for (uint64_t j=numStart; j<=numEnd; ++j) { + kList.push_back(clientAdd(cClient, j)); + // occasionally trim the list + if (j%1000 == 0) keyInsertListClean(kList, successCount, failedCount); + } + } else { + for (uint64_t j=numStart; j>=numEnd; --j) { + kList.push_back(clientAdd(cClient, j)); + // occasionally trim the list + if (j%1000 == 0) keyInsertListClean(kList, successCount, failedCount); + } + } + + int count = 0; + // If all the requests are done, the list should be empty. + // it should be done well before 100 seconds (TODO: maybe 1 second per 1000 keys) + while (!keyInsertListClean(kList, successCount, failedCount) && count < 100) { + LOGS(_log, LOG_LVL_INFO, "waiting for inserts to finish count=" << count); + sleep(1); + ++count; + } + + + if (!kList.empty()) { + LOGS(_log, LOG_LVL_WARN, "kList not empty, size=" << kList.size()); + std::stringstream ss; + for (auto kPtr:kList) { + ss << "elem=" << *kPtr << "\n"; + } + LOGS(_log, LOG_LVL_WARN, ss.str()); + } + + if (!kList.empty() || failedCount > 0) { + LOGS(_log, LOG_LVL_ERROR, "FAILED to insert all elements. success=" << successCount << + " failed=" << failedCount << " size=" << kList.size()); + exit(-1); + } + + LOGS(_log, LOG_LVL_INFO, "inserted all elements. success=" << successCount << + " failed=" << failedCount << " size=" << kList.size()); + + + // Lookup answers + auto nStart = numStart; + auto nEnd = numEnd; + if (nEnd < nStart) { + nStart = numEnd; + nEnd = numStart; + } + successCount = 0; + failedCount = 0; + for (uint64_t j=nStart; j<=nEnd; ++j) { + kList.push_back(clientAddLookup(cClient, j)); + // occasionally trim the list + if (j%1000 == 0) keyLookupListClean(kList, successCount, failedCount); + } + + count = 0; + // If all the requests are done, the list should be empty. + // it should be done well before 100 seconds (TODO: maybe 1 second per 1000 keys) + while (!keyLookupListClean(kList, successCount, failedCount) && count < 100) { + LOGS(_log, LOG_LVL_INFO, "waiting for lookups to finish count=" << count); + sleep(1); + ++count; + } + + if (!kList.empty()) { + LOGS(_log, LOG_LVL_WARN, "kList not empty, size=" << kList.size()); + std::stringstream ss; + for (auto kPtr:kList) { + ss << "elem=" << *kPtr << "\n"; + } + LOGS(_log, LOG_LVL_WARN, ss.str()); + } + + if (!kList.empty() || failedCount > 0) { + LOGS(_log, LOG_LVL_ERROR, "FAILED to lookup all elements. success=" << successCount << + " failed=" << failedCount << " size=" << kList.size()); + exit(-1); + } + + LOGS(_log, LOG_LVL_INFO, "lookup all elements. success=" << successCount << + " failed=" << failedCount << " size=" << kList.size()); + + + ioService.stop(); // this doesn't seem to work cleanly + LOGS(_log, LOG_LVL_INFO, "client DONE"); +} + +