Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tickets/dm 45548 #884

Draft
wants to merge 25 commits into
base: tickets/DM-43715
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0d07782
Removed QueryRequest and XrdSsiMocks.
jgates108 Aug 1, 2024
85461dc
Removed unnecessary code.
jgates108 Aug 2, 2024
5bb2b08
Added ActiveWorker.
jgates108 Aug 6, 2024
74c3a57
Added unit test for query status message.
jgates108 Aug 30, 2024
ac9afad
Added cancellation code and for queries, uberjobs, and czar restart.
jgates108 Sep 3, 2024
68d591a
More cancellation code added.
jgates108 Sep 10, 2024
43a894e
Added query retries.
jgates108 Sep 19, 2024
61f1a9b
Added worker believed czar was dead handling.
jgates108 Oct 1, 2024
0cff54a
Added dead message handling.
jgates108 Oct 4, 2024
5e3642a
Fixed problems with rowlimit and WorkerCzarComIssue.
jgates108 Oct 15, 2024
662e5aa
Rebase.
jgates108 Oct 18, 2024
82811de
Added comments and removed dead code.
jgates108 Oct 21, 2024
769affb
Fixed dead worker check.
jgates108 Oct 23, 2024
fa16b4f
Created protojson namespace.
jgates108 Nov 18, 2024
831b5ec
clang-format
fritzm Nov 26, 2024
9c4c602
Added unit test.
jgates108 Nov 22, 2024
d51fa7d
Reworked the UberJob json message.
jgates108 Dec 5, 2024
e1fea4b
Enabled chunk Id replacement, and added connection pools.
jgates108 Dec 6, 2024
8ad73c1
Rearranged UberJob building and removed chunkResultName.
jgates108 Dec 13, 2024
816da25
Removed TaskMsgFactory.
jgates108 Dec 16, 2024
d4bf9e9
Changed Czar to catch 5GB limit.
jgates108 Dec 18, 2024
055702f
Improved Job creation performance.
jgates108 Jan 9, 2025
fd9df34
Contention testing.
jgates108 Jan 22, 2025
2277f63
The blocking version of the FQDN retrieval function
jgates108 Feb 3, 2025
ccab87c
Some cleanup.
jgates108 Feb 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class CzarConfig {
*/
std::string const& getXrootdFrontendUrl() const { return _xrootdFrontendUrl->getVal(); }

/* Get the maximum number of threads for xrootd to use.
/* Get the maximum number of threads for xrootd to use. // TODO:UJ delete
*
* @return the maximum number of threads for xrootd to use.
*/
Expand Down Expand Up @@ -371,6 +371,7 @@ class CzarConfig {
CVTStrPtr _qdispVectMinRunningSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3");

// TODO:UJ delete xrootd specific entries.
CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4);
CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create(
_configValMap, "tuning", "qMetaSecsBetweenChunkCompletionUpdates", notReq, 60);
Expand Down Expand Up @@ -416,8 +417,8 @@ class CzarConfig {
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutDeadSecs", notReq, 60 * 10);
CVTIntPtr _activeWorkerMaxLifetimeSecs = // 1hr
util::ConfigValTInt::create(_configValMap, "activeworker", "maxLifetimeSecs", notReq, 60 * 60);
CVTIntPtr _monitorSleepTimeMilliSec =
util::ConfigValTInt::create(_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000);
CVTIntPtr _monitorSleepTimeMilliSec = util::ConfigValTInt::create(
_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000);

// UberJobs
CVTIntPtr _uberJobMaxChunks =
Expand Down
22 changes: 14 additions & 8 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
int headerCount = 0;
uint64_t totalBytesRead = 0;
try {
auto exec = uberJob->getExecutive();
if (exec == nullptr || exec->getCancelled()) {
throw runtime_error(context + " query was cancelled");
}
string const noClientData;
vector<string> const noClientHeaders;
http::ClientConfig clientConfig;
Expand All @@ -139,10 +143,12 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
bool last = false;
char const* next = inBuf;
char const* const end = inBuf + inBufSize;
LOGS(_log, LOG_LVL_INFO,
context << " next=" << (uint64_t)next << " end=" << (uint64_t)end); // &&& DEBUG
while ((next < end) && !last) {
LOGS(_log, LOG_LVL_WARN,
context << "TODO:UJ next=" << (uint64_t)next << " end=" << (uint64_t)end
<< " last=" << last);
if (exec->getCancelled()) {
throw runtime_error(context + " query was cancelled");
}
if (msgSizeBytes == 0) {
// Continue or finish reading the frame header.
size_t const bytes2read =
Expand Down Expand Up @@ -210,15 +216,15 @@ std::tuple<bool, bool> readHttpFileAndMergeHttp(
msgSizeBytes = 0;
} else {
LOGS(_log, LOG_LVL_WARN,
context << " headerCount=" << headerCount
<< " incomplete read diff=" << (msgSizeBytes - msgBufNext));
context << " headerCount=" << headerCount << " incomplete read diff="
<< (msgSizeBytes - msgBufNext)); // &&& DEBUG
}
}
}
});
LOGS(_log, LOG_LVL_DEBUG,
LOGS(_log, LOG_LVL_WARN,
context << " headerCount=" << headerCount << " msgSizeBytes=" << msgSizeBytes
<< " totalBytesRead=" << totalBytesRead);
<< " totalBytesRead=" << totalBytesRead); // &&&
if (msgSizeBufNext != 0) {
throw runtime_error("short read of the message header at offset " +
to_string(offset - msgSizeBytes) + ", file: " + httpUrl);
Expand Down Expand Up @@ -366,7 +372,7 @@ tuple<bool, bool> MergingHandler::flushHttp(string const& fileUrl, uint64_t expe
}

if (success) {
_infileMerger->mergeCompleteFor(uberJob->getJobId());
_infileMerger->mergeCompleteFor(uberJob->getUjId());
}
return {success, shouldCancel};
}
Expand Down
1 change: 1 addition & 0 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class MergingHandler : public qdisp::ResponseHandler {
/// Prepare for first call to flush().
void _initState();

// &&& delete
bool _merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData,
std::shared_ptr<qdisp::JobQuery> const& jobQuery);

Expand Down
79 changes: 49 additions & 30 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,16 @@ std::string UserQuerySelect::getError() const {

/// Attempt to kill in progress.
void UserQuerySelect::kill() {
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect kill");
LOGS(_log, LOG_LVL_INFO, "UserQuerySelect KILL");
std::lock_guard<std::mutex> lock(_killMutex);
if (!_killed) {
_killed = true;
int64_t collectedRows = _executive->getTotalResultRows();
auto exec = _executive;
int64_t collectedRows = (exec) ? exec->getTotalResultRows() : -1;
size_t collectedBytes = _infileMerger->getTotalResultSize();
try {
// make a copy of executive pointer to keep it alive and avoid race
// with pointer being reset in discard() method
std::shared_ptr<qdisp::Executive> exec = _executive;
if (exec != nullptr) {
exec->squash();
}
Expand Down Expand Up @@ -233,6 +233,11 @@ std::string UserQuerySelect::getResultQuery() const {

/// Begin running on all chunks added so far.
void UserQuerySelect::submit() {
auto exec = _executive;
if (exec == nullptr) {
LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::submit() executive is null at start");
Copy link
Contributor

@iagaponenko iagaponenko Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing. Shouldn't the user query object be constructed with a valid pointer to the "executive"? And if so then why is this test? What makes the pointer to suddenly disappear?
I'm also seeing the very same pattern of checking for the validity of a pointer all around the code of the PR. Perhaps there is a safer way of constructing objects to avoid polutting the code with these tests?
My other worry is that if there is such an uncertainty (with the elusive pointer) then such code must be really hard to debug.

Copy link
Contributor Author

@jgates108 jgates108 Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_executive being null at this point should really never happen, so the check can be removed, but the check is harmless and consistent with other UserQuerySelect methods. UserQuerySelect::discard() sets _executive to null (this has been around for a very long time) so the _executive pointer isn't safe to use directly and must be copied. Changing discard() to not reset _executive had side effects, so I'd rather not mess with it at this time.

return;
}
_qSession->finalize();

// Using the QuerySession, generate query specs (text, db, chunkId) and then
Expand All @@ -259,13 +264,13 @@ void UserQuerySelect::submit() {
LOGS(_log, LOG_LVL_WARN, "Failed queryStatsTmpRegister " << e.what());
}

_executive->setScanInteractive(_qSession->getScanInteractive());
_executive->setScanInfo(_qSession->getScanInfo());
exec->setScanInteractive(_qSession->getScanInteractive());
exec->setScanInfo(_qSession->getScanInfo());

string dbName("");
bool dbNameSet = false;

for (auto i = _qSession->cQueryBegin(), e = _qSession->cQueryEnd(); i != e && !_executive->getCancelled();
for (auto i = _qSession->cQueryBegin(), e = _qSession->cQueryEnd(); i != e && !exec->getCancelled();
++i) {
auto& chunkSpec = *i;

Expand Down Expand Up @@ -297,9 +302,9 @@ void UserQuerySelect::submit() {
ResourceUnit ru;
ru.setAsDbChunk(cs->db, cs->chunkId);
qdisp::JobDescription::Ptr jobDesc = qdisp::JobDescription::create(
_qMetaCzarId, _executive->getId(), sequence, ru,
_qMetaCzarId, exec->getId(), sequence, ru,
std::make_shared<MergingHandler>(_infileMerger, chunkResultName), cs, chunkResultName);
auto job = _executive->add(jobDesc);
auto job = exec->add(jobDesc);
++sequence;
}

Expand All @@ -309,12 +314,12 @@ void UserQuerySelect::submit() {

/// At this point the executive has a map of all jobs with the chunkIds as the key.
// This is needed to prevent Czar::_monitor from starting things before they are ready.
_executive->setReadyToExecute();
exec->setReadyToExecute();
buildAndSendUberJobs();

LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
// TODO:UJ Waiting for all jobs to start may not be needed anymore?
_executive->waitForAllJobsToStart();
exec->waitForAllJobsToStart();

// we only care about per-chunk info for ASYNC queries
if (_async) {
Expand All @@ -331,18 +336,23 @@ void UserQuerySelect::buildAndSendUberJobs() {
LOGS(_log, LOG_LVL_DEBUG, funcN << " start " << _uberJobMaxChunks);

// Ensure `_monitor()` doesn't do anything until everything is ready.
if (!_executive->isReadyToExecute()) {
auto exec = _executive;
if (exec == nullptr) {
LOGS(_log, LOG_LVL_ERROR, funcN << " called with null exec " << getQueryIdString());
return;
}
if (!exec->isReadyToExecute()) {
LOGS(_log, LOG_LVL_INFO, funcN << " executive isn't ready to generate UberJobs.");
return;
}

// Only one thread should be generating UberJobs for this user query at any given time.
lock_guard fcLock(_buildUberJobMtx);
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " totalJobs=" << _executive->getTotalJobs());
LOGS(_log, LOG_LVL_DEBUG, "UserQuerySelect::" << __func__ << " totalJobs=" << exec->getTotalJobs());

vector<qdisp::UberJob::Ptr> uberJobs;

qdisp::Executive::ChunkIdJobMapType unassignedChunksInQuery = _executive->unassignedChunksInQuery();
qdisp::Executive::ChunkIdJobMapType unassignedChunksInQuery = exec->unassignedChunksInQuery();
if (unassignedChunksInQuery.empty()) {
LOGS(_log, LOG_LVL_DEBUG, funcN << " no unassigned Jobs");
return;
Expand Down Expand Up @@ -397,19 +407,16 @@ void UserQuerySelect::buildAndSendUberJobs() {
// Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum,
// and should minimize the time for the first UberJob on the worker to complete.
for (auto const& [chunkId, jqPtr] : unassignedChunksInQuery) {

bool const increaseAttemptCount = true;
jqPtr->getDescription()->incrAttemptCount(_executive, increaseAttemptCount);
jqPtr->getDescription()->incrAttemptCount(exec, increaseAttemptCount);

// If too many workers are down, there will be a chunk that cannot be found.
// Just continuing should leave jobs `unassigned` with their attempt count
// increased. Either the chunk will be found and jobs assigned, or the jobs'
// attempt count will reach max and the query will be cancelled
auto lambdaMissingChunk = [&](string const& msg) {
missingChunks.push_back(chunkId);
//&&&bool const increaseAttemptCount = true;
//&&&jqPtr->getDescription()->incrAttemptCountScrubResultsJson(_executive, increaseAttemptCount);
LOGS(_log, LOG_LVL_ERROR, msg);
LOGS(_log, LOG_LVL_WARN, msg);
};

auto iter = chunkMapPtr->find(chunkId);
Expand Down Expand Up @@ -463,8 +470,8 @@ void UserQuerySelect::buildAndSendUberJobs() {
auto ujId = _uberJobIdSeq++; // keep ujId consistent
string uberResultName = _ttn->make(ujId);
auto respHandler = make_shared<ccontrol::MergingHandler>(_infileMerger, uberResultName);
auto uJob = qdisp::UberJob::create(_executive, respHandler, _executive->getId(), ujId,
_qMetaCzarId, targetWorker);
auto uJob = qdisp::UberJob::create(exec, respHandler, exec->getId(), ujId, _qMetaCzarId,
targetWorker);
uJob->setWorkerContactInfo(wInfUJ->wInf);
wInfUJ->uberJobPtr = uJob;
};
Expand All @@ -473,7 +480,7 @@ void UserQuerySelect::buildAndSendUberJobs() {

if (wInfUJ->uberJobPtr->getJobCount() >= _uberJobMaxChunks) {
// Queue the UberJob to be sent to a worker
_executive->addAndQueueUberJob(wInfUJ->uberJobPtr);
exec->addAndQueueUberJob(wInfUJ->uberJobPtr);

// Clear the pinter so a new UberJob is created later if needed.
wInfUJ->uberJobPtr = nullptr;
Expand All @@ -498,18 +505,23 @@ void UserQuerySelect::buildAndSendUberJobs() {
if (winfUjPtr != nullptr) {
auto& ujPtr = winfUjPtr->uberJobPtr;
if (ujPtr != nullptr) {
_executive->addAndQueueUberJob(ujPtr);
exec->addAndQueueUberJob(ujPtr);
}
}
}

LOGS(_log, LOG_LVL_DEBUG, funcN << " " << _executive->dumpUberJobCounts());
LOGS(_log, LOG_LVL_DEBUG, funcN << " " << exec->dumpUberJobCounts());
}

/// Block until a submit()'ed query completes.
/// @return the QueryState indicating success or failure
QueryState UserQuerySelect::join() {
bool successful = _executive->join(); // Wait for all data
auto exec = _executive;
if (exec == nullptr) {
LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::join() called with null exec " << getQueryIdString());
return ERROR;
}
bool successful = exec->join(); // Wait for all data
// Since all data are in, run final SQL commands like GROUP BY.
size_t collectedBytes = 0;
int64_t finalRows = 0;
Expand All @@ -520,7 +532,7 @@ QueryState UserQuerySelect::join() {
_messageStore->addMessage(-1, "MERGE", 1105, "Failure while merging result",
MessageSeverity::MSG_ERROR);
}
_executive->updateProxyMessages();
exec->updateProxyMessages();

try {
_discardMerger();
Expand All @@ -533,7 +545,7 @@ QueryState UserQuerySelect::join() {
// Update the permanent message table.
_qMetaUpdateMessages();

int64_t collectedRows = _executive->getTotalResultRows();
int64_t collectedRows = exec->getTotalResultRows();
// finalRows < 0 indicates there was no postprocessing, so collected rows and final rows should be the
// same.
if (finalRows < 0) finalRows = collectedRows;
Expand All @@ -555,7 +567,7 @@ QueryState UserQuerySelect::join() {

// Notify workers on the query completion/cancellation to ensure
// resources are properly cleaned over there as well.
czar::Czar::getCzar()->getActiveWorkerMap()->addToDoneDeleteFiles(_executive->getId());
czar::Czar::getCzar()->getActiveWorkerMap()->addToDoneDeleteFiles(exec->getId());
return state;
}

Expand All @@ -577,8 +589,14 @@ void UserQuerySelect::discard() {
}
}

auto exec = _executive;
if (exec == nullptr) {
LOGS(_log, LOG_LVL_ERROR, "UserQuerySelect::discard called with null exec " << getQueryIdString());
return;
}

// Make sure resources are released.
if (_executive && _executive->getNumInflight() > 0) {
if (exec->getNumInflight() > 0) {
throw UserQueryError(getQueryIdString() + " Executive unfinished, cannot discard");
}

Expand Down Expand Up @@ -777,8 +795,9 @@ void UserQuerySelect::qMetaRegister(std::string const& resultLocation, std::stri
throw UserQueryError(getQueryIdString() + _errorExtra);
}

if (_executive != nullptr) {
_executive->setQueryId(_qMetaQueryId);
auto exec = _executive;
if (exec != nullptr) {
exec->setQueryId(_qMetaQueryId);
} else {
LOGS(_log, LOG_LVL_WARN, "No Executive, assuming invalid query");
}
Expand Down
24 changes: 12 additions & 12 deletions src/czar/Czar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Czar::Czar(string const& configFilePath, string const& czarName)
_idCounter(),
_uqFactory(),
_clientToQuery(),
_monitorSleepTime (_czarConfig->getMonitorSleepTimeMilliSec()),
_monitorSleepTime(_czarConfig->getMonitorSleepTimeMilliSec()),
_activeWorkerMap(new ActiveWorkerMap(_czarConfig)) {
// set id counter to milliseconds since the epoch, mod 1 year.
struct timeval tv;
Expand Down Expand Up @@ -402,45 +402,45 @@ void Czar::killQuery(string const& query, string const& clientId) {
int threadId;
QueryId queryId;
if (ccontrol::UserQueryType::isKill(query, threadId)) {
LOGS(_log, LOG_LVL_DEBUG, "thread ID: " << threadId);
LOGS(_log, LOG_LVL_INFO, "KILL thread ID: " << threadId);
lock_guard<mutex> lock(_mutex);

// find it in the client map based in client/thread id
ClientThreadId ctId(clientId, threadId);
auto iter = _clientToQuery.find(ctId);
if (iter == _clientToQuery.end()) {
LOGS(_log, LOG_LVL_INFO, "Cannot find client thread id: " << threadId);
throw std::runtime_error("Unknown thread ID: " + query);
LOGS(_log, LOG_LVL_INFO, "KILL Cannot find client thread id: " << threadId);
throw std::runtime_error("KILL Unknown thread ID: " + query);
}
uq = iter->second.lock();
} else if (ccontrol::UserQueryType::isCancel(query, queryId)) {
LOGS(_log, LOG_LVL_DEBUG, "query ID: " << queryId);
LOGS(_log, LOG_LVL_INFO, "KILL query ID: " << queryId);
lock_guard<mutex> lock(_mutex);

// find it in the client map based in client/thread id
auto iter = _idToQuery.find(queryId);
if (iter == _idToQuery.end()) {
LOGS(_log, LOG_LVL_INFO, "Cannot find query id: " << queryId);
throw std::runtime_error("Unknown or finished query ID: " + query);
LOGS(_log, LOG_LVL_INFO, "KILL Cannot find query id: " << queryId);
throw std::runtime_error("KILL unknown or finished query ID: " + query);
}
uq = iter->second.lock();
} else {
throw std::runtime_error("Failed to parse query: " + query);
throw std::runtime_error("KILL failed to parse query: " + query);
}

// assume this cannot fail or throw
if (uq) {
LOGS(_log, LOG_LVL_DEBUG, "Killing query: " << uq->getQueryId());
LOGS(_log, LOG_LVL_INFO, "KILLing query: " << uq->getQueryId());
// query killing can potentially take very long and we do now want to block
// proxy from serving other requests so run it in a detached thread
thread killThread([uq]() {
uq->kill();
LOGS(_log, LOG_LVL_DEBUG, "Finished killing query: " << uq->getQueryId());
LOGS(_log, LOG_LVL_INFO, "Finished KILLing query: " << uq->getQueryId());
});
killThread.detach();
} else {
LOGS(_log, LOG_LVL_DEBUG, "Query has expired/finished: " << query);
throw std::runtime_error("Query has already finished: " + query);
LOGS(_log, LOG_LVL_INFO, "KILL query has expired/finished: " << query);
throw std::runtime_error("KILL query has already finished: " + query);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ bool CzarFamilyMap::_read() {
LOGS(_log, LOG_LVL_TRACE, "CzarFamilyMap::_read() start");
// If replacing the map, this may take a bit of time, but it's probably
// better to wait for new maps if something changed.
std::lock_guard gLock(_familyMapMtx); // &&& check waiting is really needed
std::lock_guard gLock(_familyMapMtx); // &&& check waiting is really needed
qmeta::QMetaChunkMap qChunkMap = _qmeta->getChunkMap(_lastUpdateTime);
if (_lastUpdateTime == qChunkMap.updateTime) {
LOGS(_log, LOG_LVL_DEBUG,
Expand Down
Loading
Loading