Skip to content

Commit

Permalink
Improved comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Apr 5, 2024
1 parent 16323fb commit a45a91a
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 24 deletions.
14 changes: 0 additions & 14 deletions src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ string FileChannelShared::makeIdStr(int qId, int jId) {

bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared_ptr<Task> const& task,
bool cancelled) {
// &&& Arena may not really be needed.
std::unique_ptr<google::protobuf::Arena> protobufArena = make_unique<google::protobuf::Arena>();
lock_guard<mutex> const tMtxLock(_tMtx);
if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) {
Expand All @@ -329,7 +328,6 @@ bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared

bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Task> const& task,
util::MultiError& multiErr, atomic<bool>& cancelled) {
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult start");
// Operation stats. Note that "buffer fill time" included the amount
// of time needed to write the result set to disk.
util::Timer transmitT;
Expand All @@ -347,7 +345,6 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
bool erred = false;
bool hasMoreRows = true;

// &&& Arena may not really be needed.
std::unique_ptr<google::protobuf::Arena> protobufArena = make_unique<google::protobuf::Arena>();
proto::ResponseData* responseData = nullptr;

Expand Down Expand Up @@ -389,9 +386,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
// the current request (note that certain classes of requests may require
// more than one task for processing).
if (!hasMoreRows && transmitTaskLast()) {
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult e");
lock_guard<mutex> const tMtxLock(_tMtx);
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult e1");

// Make sure the file is sync to disk before notifying Czar.
_file.flush();
Expand Down Expand Up @@ -424,11 +419,6 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
// successfully processing the query and writing all results into the file.
// The file is not going to be used by Czar in either of these scenarios.
if (cancelled || erred || isDead()) {
/* &&&
//&&& it may be better to set a flag and call _removeFile in the destructor.
lock_guard<mutex> const tMtxLockA(_tMtx);
_removeFile(tMtxLockA);
*/
// Set a flag to delete the file in the destructor. That should prevent any
// possible race conditions with other threads expecting the file to exist.
_issueRequiresFileRemoval = true;
Expand All @@ -446,7 +436,6 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
shared_ptr<Task> const& task, MYSQL_RES* mResult, int& bytes, int& rows,
util::MultiError& multiErr) {
// Transfer rows from a result set into the response data object.
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile start");
if (nullptr == responseData) {
responseData = google::protobuf::Arena::CreateMessage<proto::ResponseData>(protobufArena.get());
} else {
Expand All @@ -464,9 +453,7 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
bytes = msg.size();

LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start");
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d");
lock_guard<mutex> const tMtxLock(_tMtx);
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d1");
// Create the file if not open.
if (!_file.is_open()) {
_fileName = task->resultFilePath();
Expand All @@ -488,7 +475,6 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
throw runtime_error("FileChannelShared::" + string(__func__) + " failed to write " +
to_string(msg.size()) + " bytes into the file '" + _fileName + "'.");
}
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile end");
return hasMoreRows;
}

Expand Down
8 changes: 4 additions & 4 deletions src/wbase/FileChannelShared.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,8 @@ class FileChannelShared {
* implementation. Also, the iterative approach to the data extraction allows
* the driving code to be interrupted should the correponding query be cancelled
* during the lengthy data processing phase.
* @param tMtxLock - a lock on the mutex tMtx
* @param responseData - proto buffer to hold the response being constructed.
* @param protobufArena - proto buffer memory management control.
* @param task - a task that produced the result set
* @param mResult - MySQL result to be used as a source
* @param bytes - the number of bytes in the result message recorded into the file
Expand All @@ -192,7 +193,6 @@ class FileChannelShared {
* @throws std::runtime_error for problems encountered when attemting to create the file
* or write into the file.
*/
// &&& fix doc tMtxLock responseData protobufArena
bool _writeToFile(proto::ResponseData* responseData,
std::unique_ptr<google::protobuf::Arena> const& protobufArena,
std::shared_ptr<Task> const& task, MYSQL_RES* mResult, int& bytes, int& rows,
Expand All @@ -201,13 +201,13 @@ class FileChannelShared {
/**
* Extract as many rows as allowed by the Google Protobuf implementation from
* from the input result set into the output result object.
* @param tMtxLock - a lock on the mutex tMtx
* @param responseData - proto buffer to hold the response being constructed.
* @param protobufArena - proto buffer memory management control.
* @param mResult - MySQL result to be used as a source
* @param rows - the number of rows extracted from the result set
* @param tSize - the approximate amount of data extracted from the result set
* @return 'true' if there are more rows left in the result set.
*/
//&&& fix doc
static bool _fillRows(proto::ResponseData* responseData, MYSQL_RES* mResult, int& rows, size_t& tSize);

/**
Expand Down
6 changes: 0 additions & 6 deletions src/wdb/QueryRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ size_t QueryRunner::_getDesiredLimit() {
util::TimerHistogram memWaitHisto("memWait Hist", {1, 5, 10, 20, 40});

bool QueryRunner::runQuery() {
LOGS(_log, LOG_LVL_WARN, "&&& runQuery start");
util::InstanceCount ic(to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB
util::HoldTrack::Mark runQueryMarkA(ERR_LOC, "runQuery " + to_string(_task->getQueryId()));
QSERV_LOGCONTEXT_QUERY_JOB(_task->getQueryId(), _task->getJobId());
Expand Down Expand Up @@ -255,7 +254,6 @@ class ChunkResourceRequest {
};

bool QueryRunner::_dispatchChannel() {
LOGS(_log, LOG_LVL_WARN, "&&& dispatch start");
bool erred = false;
bool needToFreeRes = false; // set to true once there are results to be freed.
// Collect the result in _transmitData. When a reasonable amount of data has been collected,
Expand Down Expand Up @@ -300,9 +298,7 @@ bool QueryRunner::_dispatchChannel() {
if (sendChan == nullptr) {
throw util::Bug(ERR_LOC, "QueryRunner::_dispatchChannel() sendChan==null");
}
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a");
erred = sendChan->buildAndTransmitResult(res, _task, _multiError, _cancelled);
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a1");
}
}
} catch (sql::SqlErrorObject const& e) {
Expand All @@ -325,12 +321,10 @@ bool QueryRunner::_dispatchChannel() {
erred = true;
// Send results. This needs to happen after the error check.
// If any errors were found, send an error back.
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b");
if (!_task->getSendChannel()->buildAndTransmitError(_multiError, _task, _cancelled)) {
LOGS(_log, LOG_LVL_WARN,
" Could not report error to czar as sendChannel not accepting msgs." << _task->getIdStr());
}
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b1");
}
return !erred;
}
Expand Down

0 comments on commit a45a91a

Please sign in to comment.