Skip to content

Commit

Permalink
Improved Job creation performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Jan 9, 2025
1 parent d4bf9e9 commit 055702f
Show file tree
Hide file tree
Showing 24 changed files with 122 additions and 238 deletions.
23 changes: 2 additions & 21 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,9 +269,7 @@ shared_ptr<http::ClientConnPool> const& MergingHandler::_getHttpConnPool() {
}

MergingHandler::MergingHandler(std::shared_ptr<rproc::InfileMerger> merger, std::string const& tableName)
: _infileMerger{merger}, _tableName{tableName} {
_initState();
}
: _infileMerger{merger}, _tableName{tableName} {}

MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__ << " " << _tableName); }

Expand All @@ -293,23 +291,6 @@ std::ostream& MergingHandler::print(std::ostream& os) const {
return os << "MergingRequester(" << _tableName << ", flushed=" << (_flushed ? "true)" : "false)");
}

void MergingHandler::_initState() { _setError(0, ""); }

bool MergingHandler::_merge(proto::ResponseSummary const& responseSummary,
proto::ResponseData const& responseData,
shared_ptr<qdisp::JobQuery> const& jobQuery) {
if (_flushed) {
throw util::Bug(ERR_LOC, "already flushed");
}
bool success = _infileMerger->merge(responseSummary, responseData, jobQuery);
if (!success) {
LOGS(_log, LOG_LVL_WARN, __func__ << " failed");
util::Error const& err = _infileMerger->getError();
_setError(ccontrol::MSG_RESULT_ERROR, err.getMsg());
}
return success;
}

bool MergingHandler::_mergeHttp(shared_ptr<qdisp::UberJob> const& uberJob,
proto::ResponseData const& responseData) {
if (_flushed) {
Expand All @@ -325,7 +306,7 @@ bool MergingHandler::_mergeHttp(shared_ptr<qdisp::UberJob> const& uberJob,
}

void MergingHandler::_setError(int code, std::string const& msg) {
LOGS(_log, LOG_LVL_DEBUG, "_setErr: code: " << code << ", message: " << msg);
LOGS(_log, LOG_LVL_DEBUG, "_setError: code: " << code << ", message: " << msg);
std::lock_guard<std::mutex> lock(_errorMutex);
_error = Error(code, msg);
}
Expand Down
7 changes: 0 additions & 7 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,6 @@ class MergingHandler : public qdisp::ResponseHandler {
void prepScrubResults(int jobId, int attempt) override;

private:
/// Prepare for first call to flush().
void _initState();

// &&& delete
bool _merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData,
std::shared_ptr<qdisp::JobQuery> const& jobQuery);

/// Call InfileMerger to do the work of merging this data to the result.
bool _mergeHttp(std::shared_ptr<qdisp::UberJob> const& uberJob, proto::ResponseData const& responseData);

Expand Down
13 changes: 6 additions & 7 deletions src/ccontrol/UserQuerySelect.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void UserQuerySelect::kill() {
// make a copy of executive pointer to keep it alive and avoid race
// with pointer being reset in discard() method
if (exec != nullptr) {
exec->squash();
exec->squash("UserQuerySelect::kill");
}
} catch (UserQueryError const& e) {
// Silence merger discarding errors, because this object is being
Expand Down Expand Up @@ -296,6 +296,7 @@ void UserQuerySelect::submit() {
return;
}
dbName = cs->db;
_queryDbName = dbName;
dbNameSet = true;
}

Expand All @@ -308,13 +309,9 @@ void UserQuerySelect::submit() {
++sequence;
}

if (dbNameSet) {
_queryDbName = dbName;
}

/// At this point the executive has a map of all jobs with the chunkIds as the key.
// This is needed to prevent Czar::_monitor from starting things before they are ready.
exec->setReadyToExecute();
exec->setAllJobsCreated();
buildAndSendUberJobs();

LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence);
Expand All @@ -341,7 +338,8 @@ void UserQuerySelect::buildAndSendUberJobs() {
LOGS(_log, LOG_LVL_ERROR, funcN << " called with null exec " << getQueryIdString());
return;
}
if (!exec->isReadyToExecute()) {

if (!exec->isAllJobsCreated()) {
LOGS(_log, LOG_LVL_INFO, funcN << " executive isn't ready to generate UberJobs.");
return;
}
Expand Down Expand Up @@ -406,6 +404,7 @@ void UserQuerySelect::buildAndSendUberJobs() {
// numerical order. The workers run shared scans in numerical order of chunkId numbers.
// Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum,
// and should minimize the time for the first UberJob on the worker to complete.
LOGS(_log, LOG_LVL_WARN, " &&&d " << funcN << " start assigning");
for (auto const& [chunkId, jqPtr] : unassignedChunksInQuery) {
bool const increaseAttemptCount = true;
jqPtr->getDescription()->incrAttemptCount(exec, increaseAttemptCount);
Expand Down
2 changes: 1 addition & 1 deletion src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ void CzarFamilyMap::insertIntoMaps(std::shared_ptr<FamilyMapType> const& newFami
CzarChunkMap::SizeT sz) {
// Get the CzarChunkMap for this family
auto familyName = getFamilyNameFromDbName(dbName);
LOGS(_log, LOG_LVL_INFO,
LOGS(_log, LOG_LVL_TRACE,
cName(__func__) << " familyInsrt{w=" << workerId << " fN=" << familyName << " dbN=" << dbName
<< " tblN=" << tableName << " chunk=" << chunkIdNum << " sz=" << sz << "}");
auto& nfMap = *newFamilyMap;
Expand Down
5 changes: 3 additions & 2 deletions src/czar/HttpCzarWorkerModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,13 @@ json HttpCzarWorkerModule::_workerCzarComIssue() {

json HttpCzarWorkerModule::_handleJobError(string const& func) {
LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleJobError start");
LOGS(_log, LOG_LVL_WARN, "&&& HttpCzarWorkerModule::_handleJobError start " << body().objJson);
// Metadata-only responses for the file-based protocol should not have any data

// Parse and verify the json message and then kill the UberJob.
json jsRet = {{"success", 0}, {"errortype", "unknown"}, {"note", "initialized"}};
try {
// See qdisp::UberJob::runUberJob() for json message construction. &&&
// TODO:UJ see wbase::UberJobData::responseError for message construction
string const targetWorkerId = body().required<string>("workerid");
string const czarName = body().required<string>("czar");
qmeta::CzarId const czarId = body().required<qmeta::CzarId>("czarid");
Expand Down Expand Up @@ -147,7 +148,7 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) {
try {
// &&& TODO:UJ file response - move construction and parsing
// &&& TODO:UJ to a class so it can be added to WorkerCzarComIssue
// See qdisp::UberJob::runUberJob() for json message construction. &&&
// See wbase::UberJobData::responseFileReady
string const targetWorkerId = body().required<string>("workerid");
string const czarName = body().required<string>("czar");
qmeta::CzarId const czarId = body().required<qmeta::CzarId>("czarid");
Expand Down
35 changes: 20 additions & 15 deletions src/qdisp/Executive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,22 @@ JobQuery::Ptr Executive::add(JobDescription::Ptr const& jobDesc) {
QSERV_LOGCONTEXT_QUERY_JOB(jobQuery->getQueryId(), jobQuery->getJobId());

{
lock_guard lock(_cancelled.getMutex());
if (_cancelled) {
LOGS(_log, LOG_LVL_DEBUG,
"Executive already cancelled, ignoring add(" << jobDesc->id() << ")");
return nullptr;
{
lock_guard lock(_cancelled.getMutex());
if (_cancelled) {
LOGS(_log, LOG_LVL_DEBUG,
"Executive already cancelled, ignoring add(" << jobDesc->id() << ")");
return nullptr;
}
}

if (!_addJobToMap(jobQuery)) {
LOGS(_log, LOG_LVL_ERROR, "Executive ignoring duplicate job add");
if (!_track(jobQuery->getJobId(), jobQuery)) {
LOGS(_log, LOG_LVL_ERROR, "Executive ignoring duplicate track add");
return jobQuery;
}

if (!_track(jobQuery->getJobId(), jobQuery)) {
LOGS(_log, LOG_LVL_ERROR, "Executive ignoring duplicate track add");
if (!_addJobToMap(jobQuery)) {
LOGS(_log, LOG_LVL_ERROR, "Executive ignoring duplicate job add");
return jobQuery;
}

Expand Down Expand Up @@ -240,7 +242,7 @@ void Executive::addAndQueueUberJob(shared_ptr<UberJob> const& uj) {
lock_guard<mutex> lck(_uberJobsMapMtx);
UberJobId ujId = uj->getUjId();
_uberJobsMap[ujId] = uj;
LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " ujId=" << ujId << " uj.sz=" << uj->getJobCount());
LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " ujId=" << ujId << " uj.sz=" << uj->getJobCount());
}

auto runUberJobFunc = [uj](util::CmdData*) { uj->runUberJob(); };
Expand Down Expand Up @@ -415,18 +417,19 @@ void Executive::markCompleted(JobId jobId, bool success) {
LOGS(_log, logLvl,
"Executive: requesting squash, cause: " << " failed (code=" << err.getCode() << " "
<< err.getMsg() << ")");
squash(); // ask to squash
squash(string("markComplete error ") + err.getMsg()); // ask to squash
}
}

void Executive::squash() {
void Executive::squash(string const& note) {
bool alreadyCancelled = _cancelled.exchange(true);
if (alreadyCancelled) {
LOGS(_log, LOG_LVL_DEBUG, "Executive::squash() already cancelled! refusing. qid=" << getId());
return;
}

LOGS(_log, LOG_LVL_INFO, "Executive::squash Trying to cancel all queries... qid=" << getId());
LOGS(_log, LOG_LVL_WARN,
"Executive::squash Trying to cancel all queries... qid=" << getId() << " " << note);
deque<JobQuery::Ptr> jobsToCancel;
{
lock_guard<recursive_mutex> lockJobMap(_jobMapMtx);
Expand Down Expand Up @@ -670,6 +673,7 @@ void Executive::_waitAllUntilEmpty() {
int moreDetailThreshold = 10;
int complainCount = 0;
const chrono::seconds statePrintDelay(5);
// Loop until all jobs have completed and all jobs have been created.
while (!_incompleteJobs.empty()) {
count = _incompleteJobs.size();
if (count != lastCount) {
Expand Down Expand Up @@ -769,9 +773,10 @@ void Executive::checkResultFileSize(uint64_t fileSize) {
cName(__func__) << "recheck total=" << total << " max=" << maxResultTableSizeBytes);
if (total > maxResultTableSizeBytes) {
LOGS(_log, LOG_LVL_ERROR, "Executive: requesting squash, result file size too large " << total);
ResponseHandler::Error err(0, string("Incomplete result already too large ") + to_string(total));
ResponseHandler::Error err(util::ErrorCode::CZAR_RESULT_TOO_LARGE,
string("Incomplete result already too large ") + to_string(total));
_multiError.push_back(err);
squash();
squash("czar, file too large");
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/qdisp/Executive.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class Executive : public std::enable_shared_from_this<Executive> {
void markCompleted(JobId refNum, bool success);

/// Squash all the jobs.
void squash();
void squash(std::string const& note);

bool getEmpty() { return _empty; }

Expand Down Expand Up @@ -210,13 +210,13 @@ class Executive : public std::enable_shared_from_this<Executive> {

// The below value should probably be based on the user query, with longer sleeps for slower queries.
int getAttemptSleepSeconds() const { return 15; } // As above or until added to config file.
int getMaxAttempts() const { return 5; } // Should be set by config
int getMaxAttempts() const { return 50; } // TODO:UJ Should be set by config

/// Calling this indicates the executive is ready to create and execute UberJobs.
void setReadyToExecute() { _readyToExecute = true; }
/// Calling this indicates all Jobs for this user query have been created.
void setAllJobsCreated() { _allJobsCreated = true; }

/// Returns true if the executive is ready to create and execute UberJobs.
bool isReadyToExecute() { return _readyToExecute; }
/// Returns true if all jobs have been created.
bool isAllJobsCreated() { return _allJobsCreated; }

/// Send a message to all workers to cancel this query.
/// @param deleteResults - If true, delete all result files for this query on the workers.
Expand Down Expand Up @@ -346,8 +346,8 @@ class Executive : public std::enable_shared_from_this<Executive> {
/// Weak pointer to the UserQuerySelect object for this query.
std::weak_ptr<ccontrol::UserQuerySelect> _userQuerySelect;

/// Flag that is set to true when ready to create and run UberJobs.
std::atomic<bool> _readyToExecute{false};
/// Flag that is set to true when all jobs have been created.
std::atomic<bool> _allJobsCreated{false};

protojson::ScanInfo::Ptr _scanInfo; ///< Scan rating and tables.

Expand Down
16 changes: 10 additions & 6 deletions src/qdisp/JobDescription.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,25 +65,29 @@ bool JobDescription::incrAttemptCount(std::shared_ptr<Executive> const& exec, bo
if (increase) {
++_attemptCount;
}
if (_attemptCount >= MAX_JOB_ATTEMPTS) {
LOGS(_log, LOG_LVL_ERROR, "attemptCount greater than maximum number of retries " << _attemptCount);
return false;
}

if (exec != nullptr) {
int maxAttempts = exec->getMaxAttempts();
LOGS(_log, LOG_LVL_INFO, "JoQDescription::" << __func__ << " attempts=" << _attemptCount);
if (_attemptCount > 0) {
LOGS(_log, LOG_LVL_INFO, "JoBDescription::" << __func__ << " attempts=" << _attemptCount);
}
if (_attemptCount > maxAttempts) {
LOGS(_log, LOG_LVL_ERROR,
"JoQDescription::" << __func__ << " attempts(" << _attemptCount << ") > maxAttempts("
<< maxAttempts << ") cancelling");
exec->addMultiError(qmeta::JobStatus::RETRY_ERROR,
"max attempts reached " + to_string(_attemptCount) + " " + _qIdStr,
util::ErrorCode::INTERNAL);
exec->squash();
exec->squash(string("incrAttemptCount ") + to_string(_attemptCount));
return false;
}
}

if (_attemptCount >= MAX_JOB_ATTEMPTS) {
LOGS(_log, LOG_LVL_ERROR, "attemptCount greater than maximum number of retries " << _attemptCount);
return false;
}

return true;
}

Expand Down
3 changes: 2 additions & 1 deletion src/qdisp/JobQuery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ JobQuery::~JobQuery() {
/// Cancel response handling. Return true if this is the first time cancel has been called.
bool JobQuery::cancel(bool superfluous) {
QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getJobId());
LOGS(_log, LOG_LVL_DEBUG, "JobQuery::cancel()");
LOGS(_log, LOG_LVL_DEBUG, "JobQuery::cancel() " << superfluous);
LOGS(_log, LOG_LVL_WARN, "&&&JobQuery::cancel() " << superfluous);
if (_cancelled.exchange(true) == false) {
VMUTEX_NOT_HELD(_jqMtx);
lock_guard lock(_jqMtx);
Expand Down
19 changes: 7 additions & 12 deletions src/qdisp/UberJob.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ util::HistogramRolling histoUJSerialize("&&&uj histoUJSerialize", {0.1, 1.0, 10.

void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelled
LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " start");
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj start");
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj runuj start");
// Build the uberjob payload for each job.
nlohmann::json uj;
unique_lock<mutex> jobsLock(_jobsMtx);
Expand All @@ -119,7 +119,7 @@ void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelle
vector<string> const headers = {"Content-Type: application/json"};
auto const& czarConfig = cconfig::CzarConfig::instance();

int maxTableSizeMB = czarConfig->getMaxTableSizeMB();
uint64_t maxTableSizeMB = czarConfig->getMaxTableSizeMB();
auto czInfo = protojson::CzarContactInfo::create(
czarConfig->name(), czarConfig->id(), czarConfig->replicationHttpPort(),
util::get_current_host_fqdn(), czar::Czar::czarStartupTime);
Expand Down Expand Up @@ -220,7 +220,7 @@ void UberJob::_unassignJobs() {
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " could not unassign job=" << jid << " cancelling");
exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "unable to re-assign " + jid,
util::ErrorCode::INTERNAL);
exec->squash();
exec->squash("_unassignJobs failure");
return;
}
LOGS(_log, LOG_LVL_DEBUG,
Expand Down Expand Up @@ -292,14 +292,9 @@ void UberJob::callMarkCompleteFunc(bool success) {

util::HistogramRolling histoQueImp("&&&uj histoQueImp", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000);

/// Retrieve and process a result file using the file-based protocol
/// Uses a copy of JobQuery::Ptr instead of _jobQuery as a call to cancel() would reset _jobQuery.
json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_t fileSize) {
LOGS(_log, LOG_LVL_DEBUG,
cName(__func__) << " fileUrl=" << fileUrl << " rowCount=" << rowCount << " fileSize=" << fileSize);
LOGS(_log, LOG_LVL_WARN,
cName(__func__) << "&&& fileUrl=" << fileUrl << " rowCount=" << rowCount
<< " fileSize=" << fileSize);

if (isQueryCancelled()) {
LOGS(_log, LOG_LVL_WARN, cName(__func__) << " import job was cancelled.");
Expand Down Expand Up @@ -398,7 +393,7 @@ json UberJob::workerError(int errorCode, string const& errorMsg) {
// TODO:UJ see if recoverable errors can be detected on the workers, or
// maybe allow a single retry before sending the error back to the user?
bool recoverableError = false;
recoverableError = true; // TODO:UJ delete after testing

if (recoverableError) {
// The czar should have new maps before the the new UberJob(s) for
// these Jobs are created. (see Czar::_monitor)
Expand All @@ -408,7 +403,7 @@ json UberJob::workerError(int errorCode, string const& errorMsg) {
int errState = util::ErrorCode::MYSQLEXEC;
getRespHandler()->flushHttpError(errorCode, errorMsg, errState);
exec->addMultiError(errorCode, errorMsg, errState);
exec->squash();
exec->squash(string("UberJob::workerError ") + errorMsg);
}

string errType = to_string(errorCode) + ":" + errorMsg;
Expand All @@ -427,7 +422,7 @@ json UberJob::_importResultError(bool shouldCancel, string const& errorType, str
if (shouldCancel) {
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " failing jobs");
callMarkCompleteFunc(false); // all jobs failed, no retry
exec->squash();
exec->squash(string("_importResultError shouldCancel"));
} else {
/// - each JobQuery in _jobs needs to be flagged as needing to be
/// put in an UberJob and it's attempt count increased and checked
Expand Down Expand Up @@ -465,7 +460,7 @@ void UberJob::_importResultFinish(uint64_t resultRows) {
if (!statusSet) {
LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " failed to set status, squashing " << getIdStr());
// Something has gone very wrong
exec->squash();
exec->squash("UberJob::_importResultFinish couldn't set status");
return;
}

Expand Down
2 changes: 2 additions & 0 deletions src/qdisp/UberJob.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class UberJob : public std::enable_shared_from_this<UberJob> {
czar::CzarChunkMap::WorkerChunksData::Ptr getWorkerData() { return _workerData; }

/// Queue the lambda function to collect and merge the results from the worker.
/// @return a json message indicating success unless the query has been
/// cancelled, limit row complete, or similar.
nlohmann::json importResultFile(std::string const& fileUrl, uint64_t rowCount, uint64_t fileSize);

/// Handle an error from the worker.
Expand Down
Loading

0 comments on commit 055702f

Please sign in to comment.