Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Match concurrency to available CPU bandwidth #2300

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions src/build.cc
Original file line number Diff line number Diff line change
Expand Up @@ -475,10 +475,44 @@ void RealCommandRunner::Abort() {

bool RealCommandRunner::CanRunMore() const {
size_t subproc_number =
subprocs_.running_.size() + subprocs_.finished_.size();
return (int)subproc_number < config_.parallelism
&& ((subprocs_.running_.empty() || config_.max_load_average <= 0.0f)
|| GetLoadAverage() < config_.max_load_average);
subprocs_.running_.size() + subprocs_.finished_.size();

if ((int)subproc_number >= config_.parallelism)
return false;

if (subprocs_.running_.empty())
return true;

if (config_.max_load_average > 0.0f) {
double loadavg = GetLoadAverage();

if (loadavg < config_.max_load_average)
return true;

if (g_syslimits)
fprintf (stderr, "\nninja syslimits: loadavg %.0f >= %.0f\n",
loadavg, config_.max_load_average);

return false;
} else if (config_.max_load_average < -0.1f) {
double wait_ratio = GetCPUWaitRatio(subproc_number, config_.parallelism);

if (wait_ratio < -0.1f) {
fprintf (stderr, "\nninja syslimits: system does not support PSI\n");
return false;
}

if (wait_ratio < -config_.max_load_average)
return true;

if (g_syslimits)
fprintf (stderr,
"\nninja syslimits: wait_ratio %.0f >= %.0f; subprocs: %zu\n",
wait_ratio, -config_.max_load_average, subproc_number);

return false;
} else
return true;
}

bool RealCommandRunner::StartCommand(Edge* edge) {
Expand Down
2 changes: 2 additions & 0 deletions src/debug_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

bool g_explaining = false;

bool g_syslimits = false;

bool g_keep_depfile = false;

bool g_keep_rsp = false;
Expand Down
2 changes: 2 additions & 0 deletions src/debug_flags.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,6 @@ extern bool g_keep_rsp;

extern bool g_experimental_statcache;

extern bool g_syslimits;

#endif // NINJA_EXPLAIN_H_
14 changes: 12 additions & 2 deletions src/ninja.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,13 @@ void Usage(const BuildConfig& config) {
"\n"
" -j N run N jobs in parallel (0 means infinity) [default=%d on this system]\n"
" -k N keep going until N jobs fail (0 means infinity) [default=1]\n"
" -l N do not start new jobs if the load average is greater than N\n"
" -l N do not start new jobs if system load is greater than N;\n"
" if N is positive,\n"
" then compare against system load average (absolute value);\n"
" if N is negative,\n"
" then compare against process stalled time (percentage);\n"
" e.g., -l-10 will not start new jobs if existing processes\n"
" spend, on average, 10%% of their time waiting for CPU slice;\n"
" -n dry run (don't run commands but act like they succeeded)\n"
"\n"
" -d MODE enable debugging (use '-d list' to list modes)\n"
Expand Down Expand Up @@ -1161,6 +1167,7 @@ bool DebugEnable(const string& name) {
#ifdef _WIN32
" nostatcache don't batch stat() calls per directory and cache them\n"
#endif
" syslimits print notes when parallelism is limited by system pressure\n"
"multiple modes can be enabled via -d FOO -d BAR\n");
return false;
} else if (name == "stats") {
Expand All @@ -1178,11 +1185,14 @@ bool DebugEnable(const string& name) {
} else if (name == "nostatcache") {
g_experimental_statcache = false;
return true;
} else if (name == "syslimits") {
g_syslimits = true;
return true;
} else {
const char* suggestion =
SpellcheckString(name.c_str(),
"stats", "explain", "keepdepfile", "keeprsp",
"nostatcache", NULL);
"nostatcache", "syslimits", NULL);
if (suggestion) {
Error("unknown debug setting '%s', did you mean '%s'?",
name.c_str(), suggestion);
Expand Down
90 changes: 90 additions & 0 deletions src/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <libperfstat.h>
#elif defined(linux) || defined(__GLIBC__)
#include <sys/sysinfo.h>
#include <cstdlib>
#include <fstream>
#include <map>
#include "string_piece_util.h"
Expand All @@ -59,6 +60,7 @@
#endif

#include "edit_distance.h"
#include "metrics.h"

using namespace std;

Expand Down Expand Up @@ -835,6 +837,94 @@ double GetLoadAverage() {
}
#endif // _WIN32

double GetCPUWaitRatio(size_t subproc_number, int parallelism) {
#if defined(linux) || defined(__GLIBC__)
static double oncpu_ratio = 100.0f;
static uint64_t prev_stalled(0);
static int64_t prev_timestamp(0);

// We use kernel's PSI infrastructure to calculate amount of time
// we are waiting for CPU. It would be great to just use 10-second
// average (avg10 below), but, unfortunately, that's too "slow"
// an average to provide satisfactory results. Using avg10 we will
// oscillate too far into overloading and underloading the system.
// Instead, we use raw total stalled count and divide it by time
// elapsed since previous measurement.
//
// The "total" units are microseconds, but documentation does not say
// whether it's cumulative across all CPUs or not. Apparently, it's
// not cumulative. IIUC, on an 8-core system if we have 6 processes
// running at 100% and another 2 stalled at 100% -- then every second
// the "total" stalled count will be increased by 1000000 [microseconds].
// The count will be increased by the same 1000000 [microseconds] if all
// 8 processes are 100% stalled.

ifstream cpupressure("/sys/fs/cgroup/cpu.pressure", ifstream::in);
string token;
maxim-kuvyrkov marked this conversation as resolved.
Show resolved Hide resolved
uint64_t stalled(0);
bool psi_ok(false);
while (cpupressure >> token) {
// Extract "total" from
// some avg10=0.01 avg60=4.76 avg300=6.17 total=11527181835
if (token == "some") {
cpupressure >> token; // avg10=
cpupressure >> token; // avg60=
cpupressure >> token; // avg300=
cpupressure >> token; // total=

// Parse total=NUM
token = token.substr(token.find("=") + 1);
stalled = (uint64_t) strtoull(token.c_str(), NULL, 10);
psi_ok = true;
break;
}
}

if (! psi_ok)
// Unsupported.
return -1.0f;

// We could use micro-second HighResTimer(), if we wanted to,
// but milliseconds provide good-enough granularity.
int64_t timestamp = GetTimeMillis();

if (prev_timestamp == 0) {
prev_timestamp = timestamp;
prev_stalled = stalled;
return 0.0f;
}

uint64_t stalled_ticks = stalled - prev_stalled;
uint64_t clock_ticks = 1000 * (timestamp - prev_timestamp);

if (stalled_ticks < clock_ticks) {
// Clock advanced, so update oncpu_ratio with latest measurements.
// Pass new measurements through a simple noise filter.
oncpu_ratio *= ((double) subproc_number
/ (subproc_number + 1));
oncpu_ratio += ((100.0f * (clock_ticks - stalled_ticks) / clock_ticks)
/ (subproc_number + 1));

if (0 < stalled_ticks) {
// Again, to reduce noise in oncpu_ratio we update prev_* values only
// we get a new "stalled" reading.
prev_timestamp = timestamp;
prev_stalled = stalled;
}
} else {
// Clock didn't advance, this usually happens during initial
// startup, when we start config_.parallelism tasks in rapid
// succession. Slightly reduce oncpu_ratio to throttle startup
// of new processes until we get an updated measurement.
oncpu_ratio *= (double) parallelism / (parallelism + 1);
}

return 100.0f - oncpu_ratio;
#else
return -1.0f;
#endif
}

string ElideMiddle(const string& str, size_t width) {
switch (width) {
case 0: return "";
Expand Down
4 changes: 4 additions & 0 deletions src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ int GetProcessorCount();
/// on error.
double GetLoadAverage();

/// @return percentage of time tasks are waiting for CPU.
/// A negative value is returned for unsupported platforms.
double GetCPUWaitRatio(size_t subproc_number, int parallelism);

/// Elide the given string @a str with '...' in the middle if the length
/// exceeds @a width.
std::string ElideMiddle(const std::string& str, size_t width);
Expand Down