Skip to content

Commit

Permalink
simplified ThreadExecutor class by moving some code out of it / fix…
Browse files Browse the repository at this point in the history
…ed some thread safety issues (#4849)
  • Loading branch information
firewave authored Mar 4, 2023
1 parent 9291421 commit a00b6e1
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 122 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ cli/cppcheckexecutorseh.o: cli/cppcheckexecutorseh.cpp cli/cppcheckexecutor.h cl
cli/cppcheckexecutorsig.o: cli/cppcheckexecutorsig.cpp cli/cppcheckexecutor.h cli/cppcheckexecutorsig.h cli/stacktrace.h lib/color.h lib/config.h lib/errorlogger.h lib/errortypes.h lib/suppressions.h
$(CXX) ${INCLUDE_FOR_CLI} $(CPPFLAGS) $(CXXFLAGS) -c -o $@ cli/cppcheckexecutorsig.cpp

cli/executor.o: cli/executor.cpp cli/executor.h
cli/executor.o: cli/executor.cpp cli/executor.h lib/color.h lib/config.h lib/errorlogger.h lib/errortypes.h lib/importproject.h lib/library.h lib/mathlib.h lib/platform.h lib/settings.h lib/standards.h lib/suppressions.h lib/timer.h lib/utils.h
$(CXX) ${INCLUDE_FOR_CLI} $(CPPFLAGS) $(CXXFLAGS) -c -o $@ cli/executor.cpp

cli/filelister.o: cli/filelister.cpp cli/filelister.h lib/config.h lib/path.h lib/pathmatch.h lib/utils.h
Expand Down
2 changes: 2 additions & 0 deletions cli/cppcheckexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ int CppCheckExecutor::check_internal(CppCheck& cppcheck)
}
}

// TODO: not performed when multiple jobs are being used
// second loop to parse all markup files which may not work until all
// c/cpp files have been parsed and checked
for (std::map<std::string, std::size_t>::const_iterator i = mFiles.cbegin(); i != mFiles.cend(); ++i) {
Expand Down Expand Up @@ -462,6 +463,7 @@ void CppCheckExecutor::reportStatus(std::size_t fileindex, std::size_t filecount
oss << fileindex << '/' << filecount
<< " files checked " << percentDone
<< "% done";
// TODO: do not unconditionally print in color
std::cout << Color::FgBlue << oss.str() << Color::Reset << std::endl;
}
}
Expand Down
20 changes: 20 additions & 0 deletions cli/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,30 @@

#include "executor.h"

#include "errorlogger.h"
#include "settings.h"

#include <algorithm>

Executor::Executor(const std::map<std::string, std::size_t> &files, Settings &settings, ErrorLogger &errorLogger)
: mFiles(files), mSettings(settings), mErrorLogger(errorLogger)
{}

Executor::~Executor()
{}

bool Executor::hasToLog(const ErrorMessage &msg)
{
if (!mSettings.nomsg.isSuppressed(msg))
{
std::string errmsg = msg.toString(mSettings.verbose);

std::lock_guard<std::mutex> lg(mErrorListSync);
if (std::find(mErrorList.cbegin(), mErrorList.cend(), errmsg) == mErrorList.cend()) {
mErrorList.emplace_back(std::move(errmsg));
return true;
}
}
return false;
}

12 changes: 12 additions & 0 deletions cli/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
#include <cstddef>
#include <list>
#include <map>
#include <mutex>
#include <string>

class Settings;
class ErrorLogger;
class ErrorMessage;

/// @addtogroup CLI
/// @{
Expand All @@ -45,9 +47,19 @@ class Executor {
virtual unsigned int check() = 0;

protected:
/**
* @brief Check if message is being suppressed and unique.
* @param msg the message to check
* @return true if message is not suppressed and unique
*/
bool hasToLog(const ErrorMessage &msg);

const std::map<std::string, std::size_t> &mFiles;
Settings &mSettings;
ErrorLogger &mErrorLogger;

private:
std::mutex mErrorListSync;
std::list<std::string> mErrorList;
};

Expand Down
15 changes: 5 additions & 10 deletions cli/processexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,11 @@ int ProcessExecutor::handleRead(int rpipe, unsigned int &result)
std::exit(EXIT_FAILURE);
}

if (!mSettings.nomsg.isSuppressed(msg)) {
// Alert only about unique errors
std::string errmsg = msg.toString(mSettings.verbose);
if (std::find(mErrorList.cbegin(), mErrorList.cend(), errmsg) == mErrorList.cend()) {
mErrorList.emplace_back(std::move(errmsg));
if (type == PipeWriter::REPORT_ERROR)
mErrorLogger.reportErr(msg);
else
mErrorLogger.reportInfo(msg);
}
if (hasToLog(msg)) {
if (type == PipeWriter::REPORT_ERROR)
mErrorLogger.reportErr(msg);
else
mErrorLogger.reportInfo(msg);
}
} else if (type == PipeWriter::CHILD_END) {
std::istringstream iss(buf);
Expand Down
200 changes: 106 additions & 94 deletions cli/threadexecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "threadexecutor.h"

#include "color.h"
#include "config.h"
#include "cppcheck.h"
#include "cppcheckexecutor.h"
#include "errorlogger.h"
Expand All @@ -45,97 +46,158 @@ ThreadExecutor::ThreadExecutor(const std::map<std::string, std::size_t> &files,
ThreadExecutor::~ThreadExecutor()
{}

class ThreadExecutor::SyncLogForwarder : public ErrorLogger
class Data
{
public:
explicit SyncLogForwarder(ThreadExecutor &threadExecutor)
: mThreadExecutor(threadExecutor), mProcessedFiles(0), mTotalFiles(0), mProcessedSize(0) {

const std::map<std::string, std::size_t>& files = mThreadExecutor.mFiles;
mItNextFile = files.begin();
mItNextFileSettings = mThreadExecutor.mSettings.project.fileSettings.begin();
Data(const std::map<std::string, std::size_t> &files, const std::list<ImportProject::FileSettings> &fileSettings)
: mFiles(files), mFileSettings(fileSettings), mProcessedFiles(0), mProcessedSize(0)
{
mItNextFile = mFiles.begin();
mItNextFileSettings = mFileSettings.begin();

mTotalFiles = files.size() + mThreadExecutor.mSettings.project.fileSettings.size();
mTotalFileSize = std::accumulate(files.cbegin(), files.cend(), std::size_t(0), [](std::size_t v, const std::pair<std::string, std::size_t>& p) {
mTotalFiles = mFiles.size() + mFileSettings.size();
mTotalFileSize = std::accumulate(mFiles.cbegin(), mFiles.cend(), std::size_t(0), [](std::size_t v, const std::pair<std::string, std::size_t>& p) {
return v + p.second;
});
}

void reportOut(const std::string &outmsg, Color c) override
{
std::lock_guard<std::mutex> lg(mReportSync);

mThreadExecutor.mErrorLogger.reportOut(outmsg, c);
bool finished() {
std::lock_guard<std::mutex> l(mFileSync);
return mItNextFile == mFiles.cend() && mItNextFileSettings == mFileSettings.cend();
}

void reportErr(const ErrorMessage &msg) override {
report(msg, MessageType::REPORT_ERROR);
}
bool next(const std::string *&file, const ImportProject::FileSettings *&fs, std::size_t &fileSize) {
std::lock_guard<std::mutex> l(mFileSync);
if (mItNextFile != mFiles.end()) {
file = &mItNextFile->first;
fileSize = mItNextFile->second;
++mItNextFile;
return true;
}
if (mItNextFileSettings != mFileSettings.end()) {
fs = &(*mItNextFileSettings);
fileSize = 0;
++mItNextFileSettings;
return true;
}

void reportInfo(const ErrorMessage &msg) override {
report(msg, MessageType::REPORT_INFO);
return false;
}

ThreadExecutor &mThreadExecutor;

private:
const std::map<std::string, std::size_t> &mFiles;
std::map<std::string, std::size_t>::const_iterator mItNextFile;
const std::list<ImportProject::FileSettings> &mFileSettings;
std::list<ImportProject::FileSettings>::const_iterator mItNextFileSettings;

public:
std::size_t mProcessedFiles;
std::size_t mTotalFiles;
std::size_t mProcessedSize;
std::size_t mTotalFileSize;

std::mutex mFileSync;
std::mutex mErrorSync;
};

class SyncLogForwarder : public ErrorLogger
{
public:
explicit SyncLogForwarder(ThreadExecutor &threadExecutor, ErrorLogger &errorLogger)
: mThreadExecutor(threadExecutor), mErrorLogger(errorLogger) {}

void reportOut(const std::string &outmsg, Color c) override
{
std::lock_guard<std::mutex> lg(mReportSync);

mErrorLogger.reportOut(outmsg, c);
}

void reportErr(const ErrorMessage &msg) override {
report(msg, MessageType::REPORT_ERROR);
}

void reportInfo(const ErrorMessage &msg) override {
report(msg, MessageType::REPORT_INFO);
}

std::mutex mReportSync;

private:
enum class MessageType {REPORT_ERROR, REPORT_INFO};

void report(const ErrorMessage &msg, MessageType msgType)
{
if (mThreadExecutor.mSettings.nomsg.isSuppressed(msg))
if (!mThreadExecutor.hasToLog(msg))
return;

// Alert only about unique errors
bool reportError = false;
std::lock_guard<std::mutex> lg(mReportSync);

{
std::string errmsg = msg.toString(mThreadExecutor.mSettings.verbose);
switch (msgType) {
case MessageType::REPORT_ERROR:
mErrorLogger.reportErr(msg);
break;
case MessageType::REPORT_INFO:
mErrorLogger.reportInfo(msg);
break;
}
}

std::lock_guard<std::mutex> lg(mErrorSync);
if (std::find(mThreadExecutor.mErrorList.cbegin(), mThreadExecutor.mErrorList.cend(), errmsg) == mThreadExecutor.mErrorList.cend()) {
mThreadExecutor.mErrorList.emplace_back(std::move(errmsg));
reportError = true;
}
ThreadExecutor &mThreadExecutor;
ErrorLogger &mErrorLogger;
};

static unsigned int STDCALL threadProc(Data *data, SyncLogForwarder* logForwarder, const Settings &settings)
{
unsigned int result = 0;

for (;;) {
if (data->finished()) {
break;
}

if (reportError) {
std::lock_guard<std::mutex> lg(mReportSync);
const std::string *file = nullptr;
const ImportProject::FileSettings *fs = nullptr;
std::size_t fileSize;
if (!data->next(file, fs, fileSize))
break;

switch (msgType) {
case MessageType::REPORT_ERROR:
mThreadExecutor.mErrorLogger.reportErr(msg);
break;
case MessageType::REPORT_INFO:
mThreadExecutor.mErrorLogger.reportInfo(msg);
break;
CppCheck fileChecker(*logForwarder, false, CppCheckExecutor::executeCommand);
fileChecker.settings() = settings;

if (fs) {
// file settings..
result += fileChecker.check(*fs);
if (settings.clangTidy)
fileChecker.analyseClangTidy(*fs);
} else {
// Read file from a file
result += fileChecker.check(*file);
}

{
std::lock_guard<std::mutex> l(data->mFileSync);
data->mProcessedSize += fileSize;
data->mProcessedFiles++;
if (!settings.quiet) {
std::lock_guard<std::mutex> lg(logForwarder->mReportSync);
CppCheckExecutor::reportStatus(data->mProcessedFiles, data->mTotalFiles, data->mProcessedSize, data->mTotalFileSize);
}
}
}
};
return result;
}

unsigned int ThreadExecutor::check()
{
std::vector<std::future<unsigned int>> threadFutures;
threadFutures.reserve(mSettings.jobs);

SyncLogForwarder logforwarder(*this);
Data data(mFiles, mSettings.project.fileSettings);
SyncLogForwarder logforwarder(*this, mErrorLogger);

for (unsigned int i = 0; i < mSettings.jobs; ++i) {
try {
threadFutures.emplace_back(std::async(std::launch::async, threadProc, &logforwarder));
threadFutures.emplace_back(std::async(std::launch::async, &threadProc, &data, &logforwarder, mSettings));
}
catch (const std::system_error &e) {
std::cerr << "#### ThreadExecutor::check exception :" << e.what() << std::endl;
Expand All @@ -147,53 +209,3 @@ unsigned int ThreadExecutor::check()
return v + f.get();
});
}

unsigned int STDCALL ThreadExecutor::threadProc(SyncLogForwarder* logForwarder)
{
unsigned int result = 0;

std::map<std::string, std::size_t>::const_iterator &itFile = logForwarder->mItNextFile;
std::list<ImportProject::FileSettings>::const_iterator &itFileSettings = logForwarder->mItNextFileSettings;

// guard static members of CppCheck against concurrent access
logForwarder->mFileSync.lock();

for (;;) {
if (itFile == logForwarder->mThreadExecutor.mFiles.cend() && itFileSettings == logForwarder->mThreadExecutor.mSettings.project.fileSettings.cend()) {
logForwarder->mFileSync.unlock();
break;
}

CppCheck fileChecker(*logForwarder, false, CppCheckExecutor::executeCommand);
fileChecker.settings() = logForwarder->mThreadExecutor.mSettings;

std::size_t fileSize = 0;
if (itFile != logForwarder->mThreadExecutor.mFiles.end()) {
const std::string &file = itFile->first;
fileSize = itFile->second;
++itFile;

logForwarder->mFileSync.unlock();

// Read file from a file
result += fileChecker.check(file);
} else { // file settings..
const ImportProject::FileSettings &fs = *itFileSettings;
++itFileSettings;
logForwarder->mFileSync.unlock();
result += fileChecker.check(fs);
if (logForwarder->mThreadExecutor.mSettings.clangTidy)
fileChecker.analyseClangTidy(fs);
}

logForwarder->mFileSync.lock();

logForwarder->mProcessedSize += fileSize;
logForwarder->mProcessedFiles++;
if (!logForwarder->mThreadExecutor.mSettings.quiet) {
std::lock_guard<std::mutex> lg(logForwarder->mReportSync);
CppCheckExecutor::reportStatus(logForwarder->mProcessedFiles, logForwarder->mTotalFiles, logForwarder->mProcessedSize, logForwarder->mTotalFileSize);
}
}
return result;
}
6 changes: 1 addition & 5 deletions cli/threadexecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
#ifndef THREADEXECUTOR_H
#define THREADEXECUTOR_H

#include "config.h"

#include "executor.h"

#include <cstddef>
Expand All @@ -46,9 +44,7 @@ class ThreadExecutor : public Executor {

unsigned int check() override;

private:
class SyncLogForwarder;
static unsigned int STDCALL threadProc(SyncLogForwarder *logForwarder);
friend class SyncLogForwarder;
};

/// @}
Expand Down
Loading

0 comments on commit a00b6e1

Please sign in to comment.