Skip to content

Commit

Permalink
Changed Czar to catch 5GB limit.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jan 7, 2025
1 parent 816da25 commit d4bf9e9
Show file tree
Hide file tree
Showing 28 changed files with 251 additions and 251 deletions.
7 changes: 4 additions & 3 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class CzarConfig {
*/
std::string const& getXrootdFrontendUrl() const { return _xrootdFrontendUrl->getVal(); }

/* Get the maximum number of threads for xrootd to use.
/* Get the maximum number of threads for xrootd to use. // TODO:UJ delete
*
* @return the maximum number of threads for xrootd to use.
*/
Expand Down Expand Up @@ -371,6 +371,7 @@ class CzarConfig {
CVTStrPtr _qdispVectMinRunningSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3");

// TODO:UJ delete xrootd specific entries.
CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4);
CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create(
_configValMap, "tuning", "qMetaSecsBetweenChunkCompletionUpdates", notReq, 60);
Expand Down Expand Up @@ -416,8 +417,8 @@ class CzarConfig {
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutDeadSecs", notReq, 60 * 10);
CVTIntPtr _activeWorkerMaxLifetimeSecs = // 1hr
util::ConfigValTInt::create(_configValMap, "activeworker", "maxLifetimeSecs", notReq, 60 * 60);
CVTIntPtr _monitorSleepTimeMilliSec =
util::ConfigValTInt::create(_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000);
CVTIntPtr _monitorSleepTimeMilliSec = util::ConfigValTInt::create(
_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000);

// UberJobs
CVTIntPtr _uberJobMaxChunks =
Expand Down
22 changes: 14 additions & 8 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
int headerCount = 0;
uint64_t totalBytesRead = 0;
try {
auto exec = uberJob->getExecutive();
if (exec == nullptr || exec->getCancelled()) {
throw runtime_error(context + " query was cancelled");
}
string const noClientData;
vector<string> const noClientHeaders;
http::ClientConfig clientConfig;
Expand All @@ -139,10 +143,12 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
bool last = false;
char const* next = inBuf;
char const* const end = inBuf + inBufSize;
LOGS(_log, LOG_LVL_INFO,
context << " next=" << (uint64_t)next << " end=" << (uint64_t)end); // &&& DEBUG
while ((next < end) && !last) {
LOGS(_log, LOG_LVL_WARN,
context << "TODO:UJ next=" << (uint64_t)next << " end=" << (uint64_t)end
<< " last=" << last);
if (exec->getCancelled()) {
throw runtime_error(context + " query was cancelled");
}
if (msgSizeBytes == 0) {
// Continue or finish reading the frame header.
size_t const bytes2read =
Expand Down Expand Up @@ -210,15 +216,15 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
msgSizeBytes = 0;
} else {
LOGS(_log, LOG_LVL_WARN,
context << " headerCount=" << headerCount
<< " incomplete read diff=" << (msgSizeBytes - msgBufNext));
context << " headerCount=" << headerCount << " incomplete read diff="
<< (msgSizeBytes - msgBufNext)); // &&& DEBUG
}
}
}
});
LOGS(_log, LOG_LVL_DEBUG,
LOGS(_log, LOG_LVL_WARN,
context << " headerCount=" << headerCount << " msgSizeBytes=" << msgSizeBytes
<< " totalBytesRead=" << totalBytesRead);
<< " totalBytesRead=" << totalBytesRead); // &&&
if (msgSizeBufNext != 0) {
throw runtime_error("short read of the message header at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
Expand Down Expand Up @@ -366,7 +372,7 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
}

if (success) {
_infileMerger->mergeCompleteFor(uberJob->getJobId());
_infileMerger->mergeCompleteFor(uberJob->getUjId());
}
return {success, shouldCancel};
}
Expand Down
1 change: 1 addition & 0 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class MergingHandler : public qdisp::ResponseHandler {
/// Prepare for first call to flush().
void _initState();

// &&& delete
bool _merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData,
std::shared_ptr<qdisp::JobQuery> const& jobQuery);

Expand Down
79 changes: 49 additions & 30 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,16 @@ std::string UserQuerySelect::getError() const {

/// Attempt to kill in progress.
void UserQuerySelect::kill() {
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect kill");
LOGS(_log, LOG_LVL_INFO, "UserQuerySelect KILL");
std::lock_guard<std::mutex> lock(_killMutex);
if (!_killed) {
_killed = true;
int64_t collectedRows = _executive->getTotalResultRows();
auto exec = _executive;
int64_t collectedRows = (exec) ? exec->getTotalResultRows() : -1;
size_t collectedBytes = _infileMerger->getTotalResultSize();
try {
// make a copy of executive pointer to keep it alive and avoid race
// with pointer being reset in discard() method
std::shared_ptr<qdisp::Executive> exec = _executive;
if (exec != nullptr) {
exec->squash();
}
Expand Down Expand Up @@ -233,6 +233,11 @@ std::string UserQuerySelect::getResultQuery() const {

/// Begin running on all chunks added so far.
void UserQuerySelect::submit() {
auto exec = _executive;
if (exec == nullptr) {
LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::submit() executive is null at start");
return;
}
_qSession->finalize();

// Using the QuerySession, generate query specs (text, db, chunkId) and then
Expand All @@ -259,13 +264,13 @@ void UserQuerySelect::submit() {
LOGS(_log, LOG_LVL_WARN, "Failed queryStatsTmpRegister " << e.what());
}

_executive->setScanInteractive(_qSession->getScanInteractive());
_executive->setScanInfo(_qSession->getScanInfo());
exec->setScanInteractive(_qSession->getScanInteractive());
exec->setScanInfo(_qSession->getScanInfo());

string dbName("");
bool dbNameSet = false;

for (auto i = _qSession->cQueryBegin(), e = _qSession->cQueryEnd(); i != e && !_executive->getCancelled();
for (auto i = _qSession->cQueryBegin(), e = _qSession->cQueryEnd(); i != e && !exec->getCancelled();
++i) {
auto& chunkSpec = *i;

Expand Down Expand Up @@ -297,9 +302,9 @@ void UserQuerySelect::submit() {
ResourceUnit ru;
ru.setAsDbChunk(cs->db, cs->chunkId);
qdisp::JobDescription::Ptr jobDesc = qdisp::JobDescription::create(
_qMetaCzarId, _executive->getId(), sequence, ru,
_qMetaCzarId, exec->getId(), sequence, ru,
std::make_shared<MergingHandler>(_infileMerger, chunkResultName), cs, chunkResultName);
auto job = _executive->add(jobDesc);
auto job = exec->add(jobDesc);
++sequence;
}

Expand All @@ -309,12 +314,12 @@ void UserQuerySelect::submit() {

/// At this point the executive has a map of all jobs with the chunkIds as the key.
// This is needed to prevent Czar::_monitor from starting things before they are ready.
_executive->setReadyToExecute();
exec->setReadyToExecute();
buildAndSendUberJobs();

LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
// TODO:UJ Waiting for all jobs to start may not be needed anymore?
_executive->waitForAllJobsToStart();
exec->waitForAllJobsToStart();

// we only care about per-chunk info for ASYNC queries
if (_async) {
Expand All @@ -331,18 +336,23 @@ void UserQuerySelect::buildAndSendUberJobs() {
LOGS(_log, LOG_LVL_DEBUG, funcN << " start " << _uberJobMaxChunks);

// Ensure `_monitor()` doesn't do anything until everything is ready.
if (!_executive->isReadyToExecute()) {
auto exec = _executive;
if (exec == nullptr) {
LOGS(_log, LOG_LVL_ERROR, funcN << " called with null exec " << getQueryIdString());
return;
}
if (!exec->isReadyToExecute()) {
LOGS(_log, LOG_LVL_INFO, funcN << " executive isn't ready to generate UberJobs.");
return;
}

// Only one thread should be generating UberJobs for this user query at any given time.
lock_guard fcLock(_buildUberJobMtx);
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " totalJobs=" << _executive->getTotalJobs());
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " totalJobs=" << exec->getTotalJobs());

vector<qdisp::UberJob::Ptr> uberJobs;

qdisp::Executive::ChunkIdJobMapType unassignedChunksInQuery = _executive->unassignedChunksInQuery();
qdisp::Executive::ChunkIdJobMapType unassignedChunksInQuery = exec->unassignedChunksInQuery();
if (unassignedChunksInQuery.empty()) {
LOGS(_log, LOG_LVL_DEBUG, funcN << " no unassigned Jobs");
return;
Expand Down Expand Up @@ -397,19 +407,16 @@ void UserQuerySelect::buildAndSendUberJobs() {
// Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum,
// and should minimize the time for the first UberJob on the worker to complete.
for (auto const& [chunkId, jqPtr] : unassignedChunksInQuery) {

bool const increaseAttemptCount = true;
jqPtr->getDescription()->incrAttemptCount(_executive, increaseAttemptCount);
jqPtr->getDescription()->incrAttemptCount(exec, increaseAttemptCount);

// If too many workers are down, there will be a chunk that cannot be found.
// Just continuing should leave jobs `unassigned` with their attempt count
// increased. Either the chunk will be found and jobs assigned, or the jobs'
// attempt count will reach max and the query will be cancelled
auto lambdaMissingChunk = [&](string const& msg) {
missingChunks.push_back(chunkId);
//&&&bool const increaseAttemptCount = true;
//&&&jqPtr->getDescription()->incrAttemptCountScrubResultsJson(_executive, increaseAttemptCount);
LOGS(_log, LOG_LVL_ERROR, msg);
LOGS(_log, LOG_LVL_WARN, msg);
};

auto iter = chunkMapPtr->find(chunkId);
Expand Down Expand Up @@ -463,8 +470,8 @@ void UserQuerySelect::buildAndSendUberJobs() {
auto ujId = _uberJobIdSeq++; // keep ujId consistent
string uberResultName = _ttn->make(ujId);
auto respHandler = make_shared<ccontrol::MergingHandler>(_infileMerger, uberResultName);
auto uJob = qdisp::UberJob::create(_executive, respHandler, _executive->getId(), ujId,
_qMetaCzarId, targetWorker);
auto uJob = qdisp::UberJob::create(exec, respHandler, exec->getId(), ujId, _qMetaCzarId,
targetWorker);
uJob->setWorkerContactInfo(wInfUJ->wInf);
wInfUJ->uberJobPtr = uJob;
};
Expand All @@ -473,7 +480,7 @@ void UserQuerySelect::buildAndSendUberJobs() {

if (wInfUJ->uberJobPtr->getJobCount() >= _uberJobMaxChunks) {
// Queue the UberJob to be sent to a worker
_executive->addAndQueueUberJob(wInfUJ->uberJobPtr);
exec->addAndQueueUberJob(wInfUJ->uberJobPtr);

// Clear the pinter so a new UberJob is created later if needed.
wInfUJ->uberJobPtr = nullptr;
Expand All @@ -498,18 +505,23 @@ void UserQuerySelect::buildAndSendUberJobs() {
if (winfUjPtr != nullptr) {
auto& ujPtr = winfUjPtr->uberJobPtr;
if (ujPtr != nullptr) {
_executive->addAndQueueUberJob(ujPtr);
exec->addAndQueueUberJob(ujPtr);
}
}
}

LOGS(_log, LOG_LVL_DEBUG, funcN << " " << _executive->dumpUberJobCounts());
LOGS(_log, LOG_LVL_DEBUG, funcN << " " << exec->dumpUberJobCounts());
}

/// Block until a submit()'ed query completes.
/// @return the QueryState indicating success or failure
QueryState UserQuerySelect::join() {
bool successful = _executive->join(); // Wait for all data
auto exec = _executive;
if (exec == nullptr) {
LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::join() called with null exec " << getQueryIdString());
return ERROR;
}
bool successful = exec->join(); // Wait for all data
// Since all data are in, run final SQL commands like GROUP BY.
size_t collectedBytes = 0;
int64_t finalRows = 0;
Expand All @@ -520,7 +532,7 @@ QueryState UserQuerySelect::join() {
_messageStore->addMessage(-1, "MERGE", 1105, "Failure while merging result",
MessageSeverity::MSG_ERROR);
}
_executive->updateProxyMessages();
exec->updateProxyMessages();

try {
_discardMerger();
Expand All @@ -533,7 +545,7 @@ QueryState UserQuerySelect::join() {
// Update the permanent message table.
_qMetaUpdateMessages();

int64_t collectedRows = _executive->getTotalResultRows();
int64_t collectedRows = exec->getTotalResultRows();
// finalRows < 0 indicates there was no postprocessing, so collected rows and final rows should be the
// same.
if (finalRows < 0) finalRows = collectedRows;
Expand All @@ -555,7 +567,7 @@ QueryState UserQuerySelect::join() {

// Notify workers on the query completion/cancellation to ensure
// resources are properly cleaned over there as well.
czar::Czar::getCzar()->getActiveWorkerMap()->addToDoneDeleteFiles(_executive->getId());
czar::Czar::getCzar()->getActiveWorkerMap()->addToDoneDeleteFiles(exec->getId());
return state;
}

Expand All @@ -577,8 +589,14 @@ void UserQuerySelect::discard() {
}
}

auto exec = _executive;
if (exec == nullptr) {
LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::discard called with null exec " << getQueryIdString());
return;
}

// Make sure resources are released.
if (_executive && _executive->getNumInflight() > 0) {
if (exec->getNumInflight() > 0) {
throw UserQueryError(getQueryIdString() + " Executive unfinished, cannot discard");
}

Expand Down Expand Up @@ -777,8 +795,9 @@ void UserQuerySelect::qMetaRegister(std::string const& resultLocation, std::stri
throw UserQueryError(getQueryIdString() + _errorExtra);
}

if (_executive != nullptr) {
_executive->setQueryId(_qMetaQueryId);
auto exec = _executive;
if (exec != nullptr) {
exec->setQueryId(_qMetaQueryId);
} else {
LOGS(_log, LOG_LVL_WARN, "No Executive, assuming invalid query");
}
Expand Down
24 changes: 12 additions & 12 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Czar::Czar(string const& configFilePath, string const& czarName)
_idCounter(),
_uqFactory(),
_clientToQuery(),
_monitorSleepTime (_czarConfig->getMonitorSleepTimeMilliSec()),
_monitorSleepTime(_czarConfig->getMonitorSleepTimeMilliSec()),
_activeWorkerMap(new ActiveWorkerMap(_czarConfig)) {
// set id counter to milliseconds since the epoch, mod 1 year.
struct timeval tv;
Expand Down Expand Up @@ -402,45 +402,45 @@ void Czar::killQuery(string const& query, string const& clientId) {
int threadId;
QueryId queryId;
if (ccontrol::UserQueryType::isKill(query, threadId)) {
LOGS(_log, LOG_LVL_DEBUG, "thread ID: " << threadId);
LOGS(_log, LOG_LVL_INFO, "KILL thread ID: " << threadId);
lock_guard<mutex> lock(_mutex);

// find it in the client map based in client/thread id
ClientThreadId ctId(clientId, threadId);
auto iter = _clientToQuery.find(ctId);
if (iter == _clientToQuery.end()) {
LOGS(_log, LOG_LVL_INFO, "Cannot find client thread id: " << threadId);
throw std::runtime_error("Unknown thread ID: " + query);
LOGS(_log, LOG_LVL_INFO, "KILL Cannot find client thread id: " << threadId);
throw std::runtime_error("KILL Unknown thread ID: " + query);
}
uq = iter->second.lock();
} else if (ccontrol::UserQueryType::isCancel(query, queryId)) {
LOGS(_log, LOG_LVL_DEBUG, "query ID: " << queryId);
LOGS(_log, LOG_LVL_INFO, "KILL query ID: " << queryId);
lock_guard<mutex> lock(_mutex);

// find it in the client map based in client/thread id
auto iter = _idToQuery.find(queryId);
if (iter == _idToQuery.end()) {
LOGS(_log, LOG_LVL_INFO, "Cannot find query id: " << queryId);
throw std::runtime_error("Unknown or finished query ID: " + query);
LOGS(_log, LOG_LVL_INFO, "KILL Cannot find query id: " << queryId);
throw std::runtime_error("KILL unknown or finished query ID: " + query);
}
uq = iter->second.lock();
} else {
throw std::runtime_error("Failed to parse query: " + query);
throw std::runtime_error("KILL failed to parse query: " + query);
}

// assume this cannot fail or throw
if (uq) {
LOGS(_log, LOG_LVL_DEBUG, "Killing query: " << uq->getQueryId());
LOGS(_log, LOG_LVL_INFO, "KILLing query: " << uq->getQueryId());
// query killing can potentially take very long and we do now want to block
// proxy from serving other requests so run it in a detached thread
thread killThread([uq]() {
uq->kill();
LOGS(_log, LOG_LVL_DEBUG, "Finished killing query: " << uq->getQueryId());
LOGS(_log, LOG_LVL_INFO, "Finished KILLing query: " << uq->getQueryId());
});
killThread.detach();
} else {
LOGS(_log, LOG_LVL_DEBUG, "Query has expired/finished: " << query);
throw std::runtime_error("Query has already finished: " + query);
LOGS(_log, LOG_LVL_INFO, "KILL query has expired/finished: " << query);
throw std::runtime_error("KILL query has already finished: " + query);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ bool CzarFamilyMap::_read() {
LOGS(_log, LOG_LVL_TRACE, "CzarFamilyMap::_read() start");
// If replacing the map, this may take a bit of time, but it's probably
// better to wait for new maps if something changed.
std::lock_guard gLock(_familyMapMtx); // &&& check waiting is really needed
std::lock_guard gLock(_familyMapMtx); // &&& check waiting is really needed
qmeta::QMetaChunkMap qChunkMap = _qmeta->getChunkMap(_lastUpdateTime);
if (_lastUpdateTime == qChunkMap.updateTime) {
LOGS(_log, LOG_LVL_DEBUG,
Expand Down
Loading

0 comments on commit d4bf9e9

Please sign in to comment.