Skip to content

Commit

Permalink
Created separate programs for master, client, and server.
Browse files Browse the repository at this point in the history
Code cleaned up, system tested with 100k inserts and lookups.
  • Loading branch information
jgates108 committed Dec 13, 2019
1 parent e31b764 commit 1e58e8e
Show file tree
Hide file tree
Showing 14 changed files with 448 additions and 50 deletions.
53 changes: 32 additions & 21 deletions core/modules/loader/BufferUdp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,30 +160,41 @@ bool BufferUdp::retrieveString(std::string& out, size_t len) {


std::string BufferUdp::dumpStr(bool hexDump, bool charDump) const {
std::stringstream os;
os << "maxLength=" << _length;
os << " buffer=" << (void*)_buffer;
os << " wCurLen=" << getAvailableWriteLength();
os << " wCursor=" << (void*)_wCursor;
os << " rCurLen=" << getBytesLeftToRead();
os << " rCursor=" << (void*)_rCursor;
os << " end=" << (void*)_end;

// hex dump
if (hexDump) {
os << "(";
for (const char* j=_buffer; j < _wCursor; ++j) {
os << std::hex << (int)*j << " ";
}
os << ")";
}
std::stringstream os;
dump(os, hexDump, charDump);
return os.str();
}


// character dump
if (charDump) {
os << "(" << std::string(_buffer, _wCursor) << ")";
std::ostream& BufferUdp::dump(std::ostream &os, bool hexDump, bool charDump) const {
os << "maxLength=" << _length;
os << " buffer=" << (void*)_buffer;
os << " wCurLen=" << getAvailableWriteLength();
os << " wCursor=" << (void*)_wCursor;
os << " rCurLen=" << getBytesLeftToRead();
os << " rCursor=" << (void*)_rCursor;
os << " end=" << (void*)_end;

// hex dump
if (hexDump) {
os << "(";
for (const char* j=_buffer; j < _wCursor; ++j) {
os << std::hex << (int)*j << " ";
}
os << ")";
}

return os.str();
// character dump
if (charDump) {
os << "(" << std::string(_buffer, _wCursor) << ")";
}

return os;
}


std::ostream& operator<<(std::ostream& os, BufferUdp const& buf) {
return buf.dump(os, false, false);
}

}}} // namespace lsst:qserv:loader
5 changes: 5 additions & 0 deletions core/modules/loader/BufferUdp.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class BufferUdp {
/// in ascii.
std::string dumpStr(bool hexDump, bool charDump) const;

std::ostream& dump(std::ostream &os, bool hexDump, bool charDump) const;

private:
void _setupBuffer() {
_end = _buffer + _length;
Expand All @@ -161,6 +163,9 @@ class BufferUdp {
bool _ourBuffer{false}; ///< true if this class object is responsible for deleting the buffer.
};

/// Print basic buffer information. Use BufferUdp::dump() directly if the buffer contents are needed.
std::ostream& operator<<(std::ostream& os, BufferUdp const& buf);

}}} // namespace lsst:qserv:loader

#endif // LSST_QSERV_LOADER_BUFFERUDP_H
2 changes: 1 addition & 1 deletion core/modules/loader/Central.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class Central {
int getErrCount() const { return _server->getErrCount(); }

/// Send the contents of 'sendBuf' to 'host:port'. This waits for the message to be
/// sent before returning.
/// sent before returning. Throws boost::system::system_error on failure.
void sendBufferTo(std::string const& host, int port, BufferUdp& sendBuf) {
_server->sendBufferTo(host, port, sendBuf);
}
Expand Down
24 changes: 19 additions & 5 deletions core/modules/loader/CentralClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ CentralClient::CentralClient(boost::asio::io_service& ioService_,
_defWorkerHost(cfg.getDefWorkerHost()),
_defWorkerPortUdp(cfg.getDefWorkerPortUdp()),
_doListMaxLookups(cfg.getMaxLookups()),
_doListMaxInserts(cfg.getMaxInserts()) {
_doListMaxInserts(cfg.getMaxInserts()),
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()){
}


Expand Down Expand Up @@ -89,7 +90,7 @@ void CentralClient::_handleKeyInfo(LoaderMsg const& inMsg, std::unique_ptr<proto
CompositeKey key(protoData->keyint(), protoData->keystr());
ChunkSubchunk chunkInfo(protoData->chunk(), protoData->subchunk());

LOGS(_log, LOG_LVL_INFO, "trying to remove oneShot for lookup key=" << key << " " << chunkInfo);
LOGS(_log, LOG_LVL_DEBUG, "trying to remove oneShot for lookup key=" << key << " " << chunkInfo);
/// Locate the original one shot and mark it as done.
CentralClient::KeyInfoReqOneShot::Ptr keyInfoOneShot;
{
Expand Down Expand Up @@ -227,8 +228,14 @@ void CentralClient::_keyInsertReq(CompositeKey const& key, int chunk, int subchu
StringElement strElem;
protoKeyInsert.SerializeToString(&(strElem.element));
strElem.appendToData(msgData);

sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData);
try {
sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInsertReq boost system_error=" << e.what() <<
" key=" << key << " chunk=" << chunk << " sub=" << subchunk);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}


Expand Down Expand Up @@ -296,7 +303,14 @@ void CentralClient::_keyInfoReq(CompositeKey const& key) {
protoKeyInsert.SerializeToString(&(strElem.element));
strElem.appendToData(msgData);

sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData);
try {
sendBufferTo(getDefWorkerHost(), getDefWorkerPortUdp(), msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralClient::_keyInfoReq boost system_error=" << e.what() <<
" key=" << key);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought.
// So just blow up so it's unmistakable something bad happened for now.
}
}


Expand Down
6 changes: 3 additions & 3 deletions core/modules/loader/CentralClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ class CentralClient : public Central {
const int _defWorkerPortUdp; ///< Default worker UDP port


size_t _doListMaxLookups; ///< Maximum number of concurrent lookups in DoList DM-16555 &&&
size_t _doListMaxInserts; ///< Maximum number of concurrent inserts in DoList DM-16555 &&&
int _maxRequestSleepTime{100000}; ///< Time to sleep between checking requests when at max length &&& add config file entry
size_t _doListMaxLookups{1000}; ///< Maximum number of concurrent lookups in DoList (set by config)
size_t _doListMaxInserts{1000}; ///< Maximum number of concurrent inserts in DoList (set by config)
int _maxRequestSleepTime{100000}; ///< Time to sleep between checking requests when at max length (set by config)

std::map<CompositeKey, KeyInsertReqOneShot::Ptr> _waitingKeyInsertMap;
std::mutex _waitingKeyInsertMtx; ///< protects _waitingKeyInsertMap, _doListMaxInserts
Expand Down
20 changes: 18 additions & 2 deletions core/modules/loader/CentralMaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,15 @@ void CentralMaster::setWorkerNeighbor(MWorkerListItem::WPtr const& target, int m
UInt32Element neighborIdElem(neighborId);
neighborIdElem.appendToData(msgData);
auto addr = targetWorker->getUdpAddress();
sendBufferTo(addr.ip, addr.port, msgData);
try {
sendBufferTo(addr.ip, addr.port, msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralMaster::setWorkerNeighbor boost system_error=" << e.what() <<
" targ=" << *targetWorker << " msg=" << message <<
" neighborId=" << neighborId);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}


Expand Down Expand Up @@ -196,7 +204,15 @@ void CentralMaster::reqWorkerKeysInfo(uint64_t msgId, std::string const& targetI
LoaderMsg reqMsg(LoaderMsg::WORKER_KEYS_INFO_REQ, msgId, ourHostName, ourPort);
BufferUdp data;
reqMsg.appendToData(data);
sendBufferTo(targetIp, targetPort, data);
try {
sendBufferTo(targetIp, targetPort, data);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralMaster::reqWorkerKeysInfo boost system_error=" << e.what() <<
" msgId=" << msgId << " tIp=" << targetIp << " tPort=" << targetPort <<
" ourHost=" << ourHostName << " ourPort=" << ourPort);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}

}}} // namespace lsst::qserv::loader
60 changes: 53 additions & 7 deletions core/modules/loader/CentralWorker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,14 @@ void CentralWorker::_workerKeyInsertReq(LoaderMsg const& inMsg, std::unique_ptr<
protoReply.SerializeToString(&(strElem.element));
strElem.appendToData(msgData);
LOGS(_log, LOG_LVL_INFO, "sending complete " << key << " to " << nAddr << " from " << _ourId);
sendBufferTo(nAddr.ip, nAddr.port, msgData);
try {
sendBufferTo(nAddr.ip, nAddr.port, msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_workerKeyInsertReq boost system_error=" << e.what() <<
" msg=" << inMsg);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
} else {
lck.unlock();
// Find the target range in the list and send the request there
Expand Down Expand Up @@ -842,7 +849,14 @@ void CentralWorker::_forwardKeyInsertRequest(NetworkAddress const& targetAddr, L
StringElement strElem;
protoData->SerializeToString(&(strElem.element));
strElem.appendToData(msgData);
sendBufferTo(targetAddr.ip, targetAddr.port, msgData);
try {
sendBufferTo(targetAddr.ip, targetAddr.port, msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_forwardKeyInsertRequest boost system_error=" << e.what() <<
" tAddr=" << targetAddr << " inMsg=" << inMsg);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}


Expand Down Expand Up @@ -909,7 +923,14 @@ void CentralWorker::_workerKeyInfoReq(LoaderMsg const& inMsg, std::unique_ptr<pr
protoReply.SerializeToString(&(strElem.element));
strElem.appendToData(msgData);
LOGS(_log, LOG_LVL_INFO, "sending key lookup " << key << " to " << nAddr << " from " << _ourId);
sendBufferTo(nAddr.ip, nAddr.port, msgData);
try {
sendBufferTo(nAddr.ip, nAddr.port, msgData);
}catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_workerKeyInfoReq boost system_error=" << e.what() <<
" inMsg=" << inMsg);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
} else {
// Find the target range in the list and send the request there
auto targetWorker = _wWorkerList->findWorkerForKey(key);
Expand Down Expand Up @@ -986,7 +1007,14 @@ void CentralWorker::_sendWorkerKeysInfo(NetworkAddress const& nAddr, uint64_t ms
LOGS(_log, LOG_LVL_INFO, "sending WorkerKeysInfo name=" << _ourId <<
" mapsize=" << protoWKI->mapsize() << " recentAdds=" << protoWKI->recentadds() <<
" to " << nAddr);
sendBufferTo(nAddr.ip, nAddr.port, msgData);
try {
sendBufferTo(nAddr.ip, nAddr.port, msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_sendWorkerKeysInfo boost system_error=" << e.what() <<
" nAddr=" << nAddr << "msgId=" << msgId);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}


Expand Down Expand Up @@ -1035,7 +1063,14 @@ void CentralWorker::_forwardKeyInfoRequest(WWorkerListItem::Ptr const& target, L
strElem.appendToData(msgData);

auto nAddr = target->getUdpAddress();
sendBufferTo(nAddr.ip, nAddr.port, msgData);
try {
sendBufferTo(nAddr.ip, nAddr.port, msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_forwardKeyInfoRequest boost system_error=" << e.what() <<
" target=" << target << " inMsg=" << inMsg);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}


Expand All @@ -1053,7 +1088,13 @@ void CentralWorker::_registerWithMaster() {
protoBuf.SerializeToString(&(strElem.element));
strElem.appendToData(msgData);

sendBufferTo(getMasterHostName(), getMasterPort(), msgData);
try {
sendBufferTo(getMasterHostName(), getMasterPort(), msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralWorker::_registerWithMaster boost system_error=" << e.what());
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}


Expand All @@ -1063,7 +1104,12 @@ void CentralWorker::testSendBadMessage() {
LOGS(_log, LOG_LVL_INFO, "testSendBadMessage msg=" << msg);
BufferUdp msgData(128);
msg.appendToData(msgData);
sendBufferTo(getMasterHostName(), getMasterPort(), msgData);
try {
sendBufferTo(getMasterHostName(), getMasterPort(), msgData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "CentralWorker::testSendBadMessage boost system_error=" << e.what());
throw e; // This would not be the expected error, re-throw so it is noticed.
}
}


Expand Down
7 changes: 6 additions & 1 deletion core/modules/loader/ClientConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ClientConfig : public ConfigBase {
int getLoopSleepTime() const { return _loopSleepTime->getInt(); } // TODO: Maybe chrono types for times
int getMaxLookups() const { return _maxLookups->getInt(); }
int getMaxInserts() const { return _maxInserts->getInt(); }
int getMaxRequestSleepTime() const { return _maxRequestSleepTime->getInt(); }

std::ostream& dump(std::ostream &os) const override;

Expand Down Expand Up @@ -86,7 +87,11 @@ class ClientConfig : public ConfigBase {
/// Maximum number of insert requests allowed in the DoList.
ConfigElement::Ptr _maxInserts{
ConfigElement::create(cfgList, header, "maxInserts", ConfigElement::INT, false, "90000")};

/// When reaching maxInserts or maxLookups, sleep this long before trying to add more,
/// in micro seconds. 100000micro = 0.1sec
ConfigElement::Ptr _maxRequestSleepTime{
ConfigElement::create(cfgList, header,
"maxRequestSleepTime", ConfigElement::INT, false, "100000")};
};


Expand Down
10 changes: 9 additions & 1 deletion core/modules/loader/MWorkerList.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,15 @@ bool MWorkerList::sendListTo(uint64_t msgId, std::string const& ip, short port,
workerList.appendToData(*_stateListData);
}
}
_central->sendBufferTo(ip, port, *_stateListData);
try {
_central->sendBufferTo(ip, port, *_stateListData);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "MWorkerList::sendListTo boost system_error=" << e.what() <<
" msgId=" << msgId << " ip=" << ip << " port=" << port <<
" ourName=" << ourHostName << " ourPort=" << ourPort);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
}

// See if this worker is know.
Expand Down
10 changes: 8 additions & 2 deletions core/modules/loader/MasterServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,14 @@ BufferUdp::Ptr MasterServer::workerInfoRequest(LoaderMsg const& inMsg, BufferUdp
seItem.appendToData(sendBuf);

// Send the response to the worker that asked for it.
_centralMaster->sendBufferTo(requestorAddr->ip, requestorAddr->port, sendBuf);

try {
_centralMaster->sendBufferTo(requestorAddr->ip, requestorAddr->port, sendBuf);
} catch (boost::system::system_error e) {
LOGS(_log, LOG_LVL_ERROR, "MasterServer::workerInfoRequest boost system_error=" << e.what() <<
" inMsg=" << inMsg);
exit(-1); // TODO:&&& The correct course of action is unclear and requires thought,
// so just blow up so it's unmistakable something bad happened for now.
}
} catch (LoaderMsgErr &msgErr) {
LOGS(_log, LOG_LVL_ERROR, msgErr.what());
return prepareReplyMsg(senderEndpoint, inMsg, LoaderMsg::STATUS_PARSE_ERR, msgErr.what());
Expand Down
25 changes: 20 additions & 5 deletions core/modules/loader/ServerUdpBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,15 @@ void ServerUdpBase::sendBufferTo(std::string const& hostName, int port, BufferUd
cv.wait(uLock, [&done](){return done;});
#else
using namespace boost::asio;
LOGS(_log, LOG_LVL_INFO, "ServerUdpBase::sendBufferTo hostName=" << hostName << " port=" << port); // &&&
ip::udp::endpoint dest(boost::asio::ip::address::from_string(hostName), port);
_socket.send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest);
LOGS(_log, LOG_LVL_DEBUG, "ServerUdpBase::sendBufferTo hostName=" << hostName << " port=" << port);
try {
ip::udp::endpoint dest = resolve(hostName, port);
_socket.send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest);
} catch (boost::system::system_error const& e) {
LOGS(_log, LOG_LVL_ERROR, "ServerUdpBase::sendBufferTo boost system_error=" << e.what() <<
" host=" << hostName << " port=" << port << " buf=" << sendBuf);
throw e;
}
#endif
}

Expand Down Expand Up @@ -147,15 +153,24 @@ void ServerUdpBase::_receivePrepare() {


boost::asio::ip::udp::endpoint ServerUdpBase::resolve(std::string const& hostName, int port) {
/* More flexible version
#if 1 // &&&
std::lock_guard<std::mutex> lg(_resolveMtx);
/* &&&
using namespace boost::asio;
io_context ioContext;
ip::udp::resolver resolver(ioContext);
return *resolver.resolve(udp::v4(), hostName, std::to_string(port)).begin();
return *resolver.resolve(ip::udp::v4(), hostName, std::to_string(port)).begin();
*/
using namespace boost::asio;
// resolver returns an iterator. This uses the first item only.
ip::udp::endpoint dest =
*_resolver.resolve(ip::udp::v4(), hostName, std::to_string(port)).begin();
return dest;
#else
using namespace boost::asio;
ip::udp::endpoint dest(ip::address::from_string(hostName), port);
return dest;
#endif
}


Expand Down
Loading

0 comments on commit 1e58e8e

Please sign in to comment.