Skip to content

Commit

Permalink
Added dead message handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 authored and fritzm committed Oct 17, 2024
1 parent c2db90e commit b80021c
Show file tree
Hide file tree
Showing 34 changed files with 240 additions and 379 deletions.
7 changes: 3 additions & 4 deletions src/cconfig/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,17 @@ namespace lsst::qserv::cconfig {

std::mutex CzarConfig::_mtxOnInstance;

std::shared_ptr<CzarConfig> CzarConfig::_instance;
CzarConfig::Ptr CzarConfig::_instance;

std::shared_ptr<CzarConfig> CzarConfig::create(std::string const& configFileName,
std::string const& czarName) {
CzarConfig::Ptr CzarConfig::create(std::string const& configFileName, std::string const& czarName) {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
_instance = std::shared_ptr<CzarConfig>(new CzarConfig(util::ConfigStore(configFileName), czarName));
}
return _instance;
}

std::shared_ptr<CzarConfig> CzarConfig::instance() {
CzarConfig::Ptr CzarConfig::instance() {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created.");
Expand Down
5 changes: 3 additions & 2 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace lsst::qserv::cconfig {
*/
class CzarConfig {
public:
using Ptr = std::shared_ptr<CzarConfig>;
/**
* Create an instance of CzarConfig and load parameters from the specifid file.
* @note One has to call this method at least once before trying to obtain
Expand All @@ -63,15 +64,15 @@ class CzarConfig {
* @param czarName - the unique name of Czar.
* @return the shared pointer to the configuration object
*/
static std::shared_ptr<CzarConfig> create(std::string const& configFileName, std::string const& czarName);
static Ptr create(std::string const& configFileName, std::string const& czarName);

/**
* Get a pointer to an instance that was created by the last call to
* the method 'create'.
* @return the shared pointer to the configuration object
* @throws std::logic_error when attempting to call the bethod before creating an instance.
*/
static std::shared_ptr<CzarConfig> instance();
static Ptr instance();

CzarConfig() = delete;
CzarConfig(CzarConfig const&) = delete;
Expand Down
67 changes: 36 additions & 31 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -303,16 +303,15 @@ void UserQuerySelect::submit() {
}

/// At this point the executive has a map of all jobs with the chunkIds as the key.
if (uberJobsEnabled) {
// TODO:UJ _maxCHunksPerUberJob maybe put in config??? or set on command line??
// Different queries may benefit from different values
// Such as LIMIT=1 may work best with this at 1, where
// 100 would be better for others.
_maxChunksPerUberJob = 2;
// This is needed to prevent Czar::_monitor from starting things before they are ready.
_executive->setReadyToExecute();
buildAndSendUberJobs();
}
// TODO:UJ _maxCHunksPerUberJob maybe put in config??? or set on command line??
// Different queries may benefit from different values
// Such as LIMIT=1 may work best with this at 1, where
// 100 would be better for others.
// &&&
_maxChunksPerUberJob = 2;
// This is needed to prevent Czar::_monitor from starting things before they are ready.
_executive->setReadyToExecute();
buildAndSendUberJobs();

LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
// TODO:UJ Waiting for all jobs to start may not be needed anymore?
Expand All @@ -326,7 +325,6 @@ void UserQuerySelect::submit() {
}

void UserQuerySelect::buildAndSendUberJobs() {
// &&& NEED CODE - this function should check if the worker is DEAD. TODO:UJ
string const funcN("UserQuerySelect::" + string(__func__) + " QID=" + to_string(_qMetaQueryId));
LOGS(_log, LOG_LVL_DEBUG, funcN << " start");

Expand Down Expand Up @@ -376,52 +374,59 @@ void UserQuerySelect::buildAndSendUberJobs() {
// - For failures - If a worker cannot be contacted, that's an uberjob failure.
// - uberjob failures (due to communications problems) will result in the uberjob
// being broken up into multiple UberJobs going to different workers.
// - The best way to do this is probably to just kill the UberJob and mark all
// Jobs that were in that UberJob as needing re-assignment, and re-running
// the code here. The trick is going to be figuring out which workers are alive.
// Maybe force a fresh lookup from the replicator Registry when an UberJob fails.
// - If an UberJob fails, the UberJob is killed and all the Jobs it contained
// are flagged as needing re-assignment and this function will be called
// again to put those Jobs in new UberJobs. Correctly re-assigning the
// Jobs requires accurate information from the registry about which workers
// are alive or dead.
map<string, vector<qdisp::UberJob::Ptr>> workerJobMap;
vector<qdisp::Executive::ChunkIdType> missingChunks;

// unassignedChunksInQuery needs to be in numerical order so that UberJobs contain chunk numbers in
// numerical order. The workers run shared scans in numerical order of chunk id numbers.
// This keeps the number of partially complete UberJobs running on a worker to a minimum,
// Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum,
// and should minimize the time for the first UberJob on the worker to complete.
for (auto const& [chunkId, jqPtr] : unassignedChunksInQuery) {
auto iter = chunkMapPtr->find(chunkId);
if (iter == chunkMapPtr->end()) {
// If too many workers are down, there will be a chunk that cannot be found.
// Just continuing should leave jobs `unassigned` with their attempt count
// increased. Either the chunk will be found and jobs assigned, or the jobs'
// attempt count will reach max and the query will be cancelled
auto lambdaMissingChunk = [&](string const& msg) {
missingChunks.push_back(chunkId);
bool const increaseAttemptCount = true;
jqPtr->getDescription()->incrAttemptCountScrubResultsJson(_executive, increaseAttemptCount);
// Assign as many jobs as possible. Any chunks not found will be attempted later.
LOGS(_log, LOG_LVL_ERROR, msg);
};

auto iter = chunkMapPtr->find(chunkId);
if (iter == chunkMapPtr->end()) {
lambdaMissingChunk(funcN + " No chunkData for=" + to_string(chunkId));
continue;
}
czar::CzarChunkMap::ChunkData::Ptr chunkData = iter->second;
auto targetWorker = chunkData->getPrimaryScanWorker().lock();
// TODO:UJ maybe if (targetWorker == nullptr || this worker already tried for this chunk) {
if (targetWorker == nullptr) {
LOGS(_log, LOG_LVL_ERROR, funcN << " No primary scan worker for chunk=" << chunkData->dump());
// TODO:UJ maybe if (targetWorker == nullptr || ... || this worker already tried for this chunk) {
if (targetWorker == nullptr || targetWorker->isDead()) {
LOGS(_log, LOG_LVL_WARN,
funcN << " No primary scan worker for chunk=" + chunkData->dump()
<< ((targetWorker == nullptr) ? " targ was null" : " targ was dead"));
// Try to assign a different worker to this job
auto workerHasThisChunkMap = chunkData->getWorkerHasThisMapCopy();
bool found = false;
for (auto wIter = workerHasThisChunkMap.begin(); wIter != workerHasThisChunkMap.end() && !found;
++wIter) {
auto maybeTarg = wIter->second.lock();
if (maybeTarg != nullptr) {
if (maybeTarg != nullptr && !maybeTarg->isDead()) {
targetWorker = maybeTarg;
found = true;
LOGS(_log, LOG_LVL_WARN,
funcN << " Alternate worker found for chunk=" << chunkData->dump());
funcN << " Alternate worker=" << targetWorker->getWorkerId()
<< " found for chunk=" << chunkData->dump());
}
}
if (!found) {
// If too many workers are down, there will be a chunk that cannot be found.
// Just continuing should leave jobs `unassigned` with their attempt count
// increased. Either the chunk will be found and jobs assigned, or the jobs'
// attempt count will reach max and the query will be cancelled
// TODO:UJ Needs testing/verification
LOGS(_log, LOG_LVL_ERROR,
funcN << " No primary or alternate worker found for chunk=" << chunkData->dump());
lambdaMissingChunk(funcN +
" No primary or alternate worker found for chunk=" + chunkData->dump());
continue;
}
}
Expand Down
28 changes: 13 additions & 15 deletions src/czar/ActiveWorker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double ti
_changeStateTo(ALIVE, secsSinceUpdate, cName(__func__));
} else {
// Don't waste time on this worker until the registry has heard from it.
// &&& If it's been a really really long time, maybe delete this entry ???
return;
}
break;
Expand All @@ -132,20 +131,6 @@ void ActiveWorker::updateStateAndSendMessages(double timeoutAliveSecs, double ti

shared_ptr<json> jsWorkerReqPtr;
{
lock_guard<mutex> lg(_aMtx); //&&& needed ???
lock_guard<mutex> mapLg(_wqsData->mapMtx);
// Check how many messages are currently being sent to the worker, if at the limit, return
if (_wqsData->qIdDoneKeepFiles.empty() && _wqsData->qIdDoneDeleteFiles.empty() &&
_wqsData->qIdDeadUberJobs.empty()) {
return;
}
int tCount = _conThreadCount;
if (tCount > _maxConThreadCount) {
LOGS(_log, LOG_LVL_DEBUG,
cName(__func__) << " not sending message since at max threads " << tCount);
return;
}

// Go through the _qIdDoneKeepFiles, _qIdDoneDeleteFiles, and _qIdDeadUberJobs lists to build a
// message to send to the worker.
jsWorkerReqPtr = _wqsData->serializeJson(maxLifetime);
Expand Down Expand Up @@ -217,6 +202,17 @@ void ActiveWorker::addDeadUberJob(QueryId qId, UberJobId ujId) {
_wqsData->addDeadUberJob(qId, ujId, now);
}

http::WorkerContactInfo::Ptr ActiveWorker::getWInfo() const {
std::lock_guard lg(_aMtx);
if (_wqsData == nullptr) return nullptr;
return _wqsData->getWInfo();
}

ActiveWorker::State ActiveWorker::getState() const {
std::lock_guard lg(_aMtx);
return _state;
}

string ActiveWorker::dump() const {
lock_guard<mutex> lg(_aMtx);
return _dump();
Expand All @@ -238,6 +234,7 @@ void ActiveWorkerMap::updateMap(http::WorkerContactInfo::WCMap const& wcMap,
auto iter = _awMap.find(wcKey);
if (iter == _awMap.end()) {
auto newAW = ActiveWorker::create(wcVal, czInfo, replicationInstanceId, replicationAuthKey);
LOGS(_log, LOG_LVL_INFO, cName(__func__) << " AciveWorker created for " << wcKey);
_awMap[wcKey] = newAW;
if (_czarCancelAfterRestart) {
newAW->setCzarCancelAfterRestart(_czarCancelAfterRestartCzId, _czarCancelAfterRestartQId);
Expand All @@ -252,6 +249,7 @@ void ActiveWorkerMap::updateMap(http::WorkerContactInfo::WCMap const& wcMap,
// If there is existing information, only host and port values will change.
aWorker->setWorkerContactInfo(wcVal);
}
aWorker->getWInfo()->setRegUpdateTime(wcVal->getRegUpdateTime());
}
}
}
Expand Down
15 changes: 6 additions & 9 deletions src/czar/ActiveWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class ActiveWorker : public std::enable_shared_from_this<ActiveWorker> {
ActiveWorker& operator=(ActiveWorker const&) = delete;

std::string cName(const char* fName) {
return std::string("ActiveWorker::") + fName + " " + ((_wqsData == nullptr) ? "?" : _wqsData->dump());
auto wqsd = _wqsData;
return std::string("ActiveWorker::") + fName + " " + ((wqsd == nullptr) ? "?" : wqsd->dump());
}

static std::string getStateStr(State st);
Expand All @@ -97,10 +98,7 @@ class ActiveWorker : public std::enable_shared_from_this<ActiveWorker> {
_wqsData->setCzarCancelAfterRestart(czId, lastQId);
}

http::WorkerContactInfo::Ptr getWInfo() const {
if (_wqsData == nullptr) return nullptr;
return _wqsData->getWInfo();
}
http::WorkerContactInfo::Ptr getWInfo() const;

~ActiveWorker() = default;

Expand Down Expand Up @@ -138,6 +136,8 @@ class ActiveWorker : public std::enable_shared_from_this<ActiveWorker> {
/// individual UberJobs anymore, so this function will get rid of them.
void removeDeadUberJobsFor(QueryId qId);

State getState() const;

std::string dump() const;

private:
Expand Down Expand Up @@ -169,10 +169,6 @@ class ActiveWorker : public std::enable_shared_from_this<ActiveWorker> {
State _state{QUESTIONABLE}; ///< current state of this worker.

mutable std::mutex _aMtx; ///< protects _wInfo, _state, _qIdDoneKeepFiles, _qIdDoneDeleteFiles

/// The number of communication threads currently in use by this class instance.
std::atomic<int> _conThreadCount{0};
int _maxConThreadCount{2};
};

/// &&& doc
Expand All @@ -182,6 +178,7 @@ class ActiveWorker : public std::enable_shared_from_this<ActiveWorker> {
/// come back from the dead.
class ActiveWorkerMap {
public:
using Ptr = std::shared_ptr<ActiveWorkerMap>;
ActiveWorkerMap() = default;
ActiveWorkerMap(ActiveWorkerMap const&) = delete;
ActiveWorkerMap operator=(ActiveWorkerMap const&) = delete;
Expand Down
28 changes: 17 additions & 11 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,17 @@ void Czar::_monitor() {
LOGS(_log, LOG_LVL_DEBUG, funcN << " start0");

/// Check database for changes in worker chunk assignments and aliveness
_czarFamilyMap->read();
try {
_czarFamilyMap->read();
} catch (ChunkMapException const& cmex) {
// There are probably chunks that don't exist on any alive worker,
// continue on in hopes that workers will show up with the missing chunks
// later.
LOGS(_log, LOG_LVL_ERROR, funcN << " family map read problems " << cmex.what());
}

// Send appropriate messages to all ActiveWorkers. This will
// check if workers have died by timeout. The reponse
// check if workers have died by timeout. The response
// from the worker include
_czarRegistry->sendActiveWorkersMessages();

Expand Down Expand Up @@ -126,14 +133,13 @@ void Czar::_monitor() {
execVal->assignJobsToUberJobs();
}

// TODO:UJ DM-45470 Maybe get missing results from workers.
// To prevent anything from slipping through the cracks:
// Workers will keep trying to transmit results until they think the czar is dead.
// If a worker thinks the czar died, it will cancel all related jobs that it has,
// and if the czar sends a status message to that worker, that worker will send back
// a separate message saying it killed everything that this czar gave it. Upon
// getting this message from a worker, this czar will reassign everything it had
// sent to that worker.
// To prevent anything from slipping through the cracks:
// Workers will keep trying to transmit results until they think the czar is dead.
// If a worker thinks the czar died, it will cancel all related jobs that it has,
// and if the czar sends a status message to that worker, that worker will send back
// a separate message (see WorkerCzarComIssue) saying it killed everything that this
// czar gave it. Upon getting this message from a worker, this czar will reassign
// everything it had sent to that worker.

// TODO:UJ How long should queryId's remain on this list?
}
Expand Down Expand Up @@ -229,7 +235,7 @@ Czar::Czar(string const& configFilePath, string const& czarName)
auto const port = _controlHttpSvc->start();
_czarConfig->setReplicationHttpPort(port);

_czarRegistry = CzarRegistry::create(_czarConfig);
_czarRegistry = CzarRegistry::create(_czarConfig, _activeWorkerMap);

// Start the monitor thread
thread monitorThrd(&Czar::_monitor, this);
Expand Down
Loading

0 comments on commit b80021c

Please sign in to comment.