Skip to content

Commit

Permalink
Rebased on main and added file removal flag.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Apr 5, 2024
1 parent 3a77cbb commit 16323fb
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
32 changes: 15 additions & 17 deletions src/wbase/FileChannelShared.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ FileChannelShared::~FileChannelShared() {
// dead it means there was a problem to process a query or send back a response
// to Czar. In either case, the file would be useless and it has to be deleted
// in order to avoid leaving unclaimed result files within the results folder.
if (isDead()) {
if (_issueRequiresFileRemoval || isDead()) {
_removeFile(lock_guard<mutex>(_tMtx));
}
if (_sendChannel != nullptr) {
Expand Down Expand Up @@ -329,6 +329,7 @@ bool FileChannelShared::buildAndTransmitError(util::MultiError& multiErr, shared

bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Task> const& task,
util::MultiError& multiErr, atomic<bool>& cancelled) {
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult start");
// Operation stats. Note that "buffer fill time" included the amount
// of time needed to write the result set to disk.
util::Timer transmitT;
Expand All @@ -348,22 +349,16 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta

// &&& Arena may not really be needed.
std::unique_ptr<google::protobuf::Arena> protobufArena = make_unique<google::protobuf::Arena>();
proto::ResponseData* responseData = 0;
proto::ResponseData* responseData = nullptr;

while (hasMoreRows && !cancelled) {
// This lock is to protect the stream from having other Tasks mess with it
// while data is loading.
lock_guard<mutex> const tMtxLockA(_tMtx);

util::Timer bufferFillT;
bufferFillT.start();

// Transfer as many rows as it's allowed by limitations of
// the Google Protobuf into the output file.
int bytes = 0;
int rows = 0;
//&&& hasMoreRows = _writeToFile(tMtxLockA, task, mResult, bytes, rows, multiErr);

hasMoreRows = _writeToFile(responseData, protobufArena, task, mResult, bytes, rows, multiErr);
bytesTransmitted += bytes;
rowsTransmitted += rows;
Expand Down Expand Up @@ -394,15 +389,16 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
// the current request (note that certain classes of requests may require
// more than one task for processing).
if (!hasMoreRows && transmitTaskLast()) {
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult e");
lock_guard<mutex> const tMtxLock(_tMtx);
LOGS(_log, LOG_LVL_WARN, "&&& FileChannelShared::buildAndTransmitResult e1");

// Make sure the file is sync to disk before notifying Czar.
_file.flush();
_file.close();

// Only the last ("summary") message, w/o any rows, is sent to the Czar to notify
// it about the completion of the request.
//&&&if (!_sendResponse(tMtxLockA, task, cancelled, multiErr)) {
if (!_sendResponse(tMtxLock, protobufArena, task, cancelled, multiErr)) {
LOGS(_log, LOG_LVL_ERROR, "Could not transmit the request completion message to Czar.");
erred = true;
Expand All @@ -428,9 +424,14 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptr<Ta
// successfully processing the query and writing all results into the file.
// The file is not going to be used by Czar in either of these scenarios.
if (cancelled || erred || isDead()) {
/* &&&
//&&& it may be better to set a flag and call _removeFile in the destructor.
lock_guard<mutex> const tMtxLockA(_tMtx);
_removeFile(tMtxLockA);
*/
// Set a flag to delete the file in the destructor. That should prevent any
// possible race conditions with other threads expecting the file to exist.
_issueRequiresFileRemoval = true;
}
return erred;
}
Expand All @@ -445,19 +446,13 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
shared_ptr<Task> const& task, MYSQL_RES* mResult, int& bytes, int& rows,
util::MultiError& multiErr) {
// Transfer rows from a result set into the response data object.
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile start");
if (nullptr == responseData) {
responseData = google::protobuf::Arena::CreateMessage<proto::ResponseData>(protobufArena.get());
} else {
responseData->clear_row();
}
size_t tSize = 0;
/* &&&
LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " start");
bool const hasMoreRows = _fillRows(tMtxLock, mResult, rows, tSize);
LOGS(_log, LOG_LVL_TRACE, __func__ << " _fillRows " << task->getIdStr() << " end");
_responseData->set_rowcount(rows);
_responseData->set_transmitsize(tSize);
*/
bool const hasMoreRows = _fillRows(responseData, mResult, rows, tSize);
responseData->set_rowcount(rows);
responseData->set_transmitsize(tSize);
Expand All @@ -468,8 +463,10 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
responseData->SerializeToString(&msg);
bytes = msg.size();

//&&&LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start");
LOGS(_log, LOG_LVL_TRACE, __func__ << " file write " << task->getIdStr() << " start");
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d");
lock_guard<mutex> const tMtxLock(_tMtx);
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile d1");
// Create the file if not open.
if (!_file.is_open()) {
_fileName = task->resultFilePath();
Expand All @@ -491,6 +488,7 @@ bool FileChannelShared::_writeToFile(proto::ResponseData* responseData,
throw runtime_error("FileChannelShared::" + string(__func__) + " failed to write " +
to_string(msg.size()) + " bytes into the file '" + _fileName + "'.");
}
LOGS(_log, LOG_LVL_WARN, "&&& _writeToFile end");
return hasMoreRows;
}

Expand Down
4 changes: 4 additions & 0 deletions src/wbase/FileChannelShared.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ class FileChannelShared {

uint32_t _rowcount = 0; ///< The total numnber of rows in all result sets of a query.
uint64_t _transmitsize = 0; ///< The total amount of data (bytes) in all result sets of a query.

/// This should be set to true if there were any issues that invalidate the file, such as errors
/// or cancellation.
std::atomic<bool> _issueRequiresFileRemoval{false};
};

} // namespace lsst::qserv::wbase
Expand Down
6 changes: 6 additions & 0 deletions src/wdb/QueryRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ size_t QueryRunner::_getDesiredLimit() {
util::TimerHistogram memWaitHisto("memWait Hist", {1, 5, 10, 20, 40});

bool QueryRunner::runQuery() {
LOGS(_log, LOG_LVL_WARN, "&&& runQuery start");
util::InstanceCount ic(to_string(_task->getQueryId()) + "_rq_LDB"); // LockupDB
util::HoldTrack::Mark runQueryMarkA(ERR_LOC, "runQuery " + to_string(_task->getQueryId()));
QSERV_LOGCONTEXT_QUERY_JOB(_task->getQueryId(), _task->getJobId());
Expand Down Expand Up @@ -254,6 +255,7 @@ class ChunkResourceRequest {
};

bool QueryRunner::_dispatchChannel() {
LOGS(_log, LOG_LVL_WARN, "&&& dispatch start");
bool erred = false;
bool needToFreeRes = false; // set to true once there are results to be freed.
// Collect the result in _transmitData. When a reasonable amount of data has been collected,
Expand Down Expand Up @@ -298,7 +300,9 @@ bool QueryRunner::_dispatchChannel() {
if (sendChan == nullptr) {
throw util::Bug(ERR_LOC, "QueryRunner::_dispatchChannel() sendChan==null");
}
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a");
erred = sendChan->buildAndTransmitResult(res, _task, _multiError, _cancelled);
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmitResult a1");
}
}
} catch (sql::SqlErrorObject const& e) {
Expand All @@ -321,10 +325,12 @@ bool QueryRunner::_dispatchChannel() {
erred = true;
// Send results. This needs to happen after the error check.
// If any errors were found, send an error back.
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b");
if (!_task->getSendChannel()->buildAndTransmitError(_multiError, _task, _cancelled)) {
LOGS(_log, LOG_LVL_WARN,
" Could not report error to czar as sendChannel not accepting msgs." << _task->getIdStr());
}
LOGS(_log, LOG_LVL_WARN, "&&& dispatch calling buildAndTransmiError b1");
}
return !erred;
}
Expand Down

0 comments on commit 16323fb

Please sign in to comment.