Skip to content

Commit

Permalink
MB-49382 (7.1.0 2012) Indexer use cgroups CPU and mem info when enabled
Browse files Browse the repository at this point in the history
1. cpu_throttle.go should use cgroup CPU usage stats when cgroups are
   supported. This is so the usage is correctly calculated against the
   number of CPUs allocated to the cgroup rather than the whole node.
   If not supported fall back to the standard algorithm of CPU ticks in
   different categories that is collected for the whole node.

2. Indexer set number of CPUs to min(node, cgroup, GSI) available CPUs,
   where cgroup is sigar num_cpu_prc when cgroups are supported and GSI
   is config value indexer.settings.max_cpu_percent (exposed on UI as
   Advanced Index Settings / Indexer Threads).

3. Log message to console if, in #2, GSI > min(node, cgroup) and use the
   latter instead.

4. planner/proxy.go RetrievePlanFromCluster was calling
   getIndexSettings() to set CpuQuota based on runtime.NumCPUs() and
   indexer.settings.max_cpu_percent, but the code in that method was
   incorrect. Replaced with a call to runtime.GOMAXPROCS(0) which will
   now get the logical min(node, cgroup, GSI) per #2 above. (Note that
   CpuQuota is not used in Planner anyway -- its enforcement is
   commented out in planner/planner.go Validate().) This changes
   eliminates REST calls to all remote indexer nodes and a use of
   ClusterInfoCache.

5. Replace direct lookup of indexer.settings.memory_quota with new
   Config.GetIndexerMemoryQuota() method that takes cgroup setting into
   account (sigar memory_max) if cgroups are supported.

Change-Id: I3c906070d78ba51888a6c07c7dd05959fba9b27b
  • Loading branch information
cherkauer-couchbase committed Jan 12, 2022
1 parent 4b83dc9 commit 8351d36
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 108 deletions.
84 changes: 83 additions & 1 deletion secondary/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2384,6 +2384,14 @@ var SystemConfig = Config{
false, // mutable
false, // case-insensitive
},
"indexer.cgroup.memory_quota": ConfigValue{
uint64(0),
"Linux cgroup override of indexer.settings.memory_quota;" +
" 0 if cgroups are not supported",
uint64(0),
false, // mutable
false, // case-insensitive
},
"indexer.settings.max_cpu_percent": ConfigValue{
0,
"Maximum percent of CPU that indexer can use. " +
Expand All @@ -2393,6 +2401,14 @@ var SystemConfig = Config{
false, // mutable
false, // case-insensitive
},
"indexer.cgroup.max_cpu_percent": ConfigValue{
0,
"Linux cgroup override of indexer.settings.max_cpu_percent;" +
" 0 if cgroups are not supported",
0,
false, // mutable
false, // case-insensitive
},
"indexer.settings.log_level": ConfigValue{
"info", // keep in sync with index_settings_manager.erl
"Indexer logging level",
Expand Down Expand Up @@ -3383,6 +3399,72 @@ func (config Config) Json() []byte {
return bytes
}

// getIndexerConfig gets an Indexer config value of any type from a config map that may or may not
// have the "indexer." prefix stripped from its keys. Caller provides the key with prefix stripped.
func (config Config) getIndexerConfig(strippedKey string) ConfigValue {
value, exists := config[strippedKey]
if !exists {
value = config["indexer."+strippedKey]
}
return value
}

// getIndexerConfigInt gets an Indexer int config value from a config map that may or may not have
// the "indexer." prefix stripped from its keys. Caller provides the key with prefix stripped.
func (config Config) getIndexerConfigInt(strippedKey string) int {
return config.getIndexerConfig(strippedKey).Int()
}

// getIndexerConfigString gets an Indexer string config value from a config map that may or may not
// have the "indexer." prefix stripped from its keys. Caller provides the key with prefix stripped.
func (config Config) getIndexerConfigString(strippedKey string) string {
return config.getIndexerConfig(strippedKey).String()
}

// getIndexerConfigUint64 gets an Indexer uint64 config value from a config map that may or may not
// have the "indexer." prefix stripped from its keys. Caller provides the key with prefix stripped.
func (config Config) getIndexerConfigUint64(strippedKey string) uint64 {
return config.getIndexerConfig(strippedKey).Uint64()
}

// GetIndexerMemoryQuota gets the Indexer's memory quota in bytes as logical
// min(indexer.settings.memory_quota, indexer.cgroup.memory_quota).
// The latter is from sigar memory_max and only included if cgroups are supported.
func (config Config) GetIndexerMemoryQuota() uint64 {
gsiMemQuota := config.getIndexerConfigUint64("settings.memory_quota")
cgroupMemQuota := config.getIndexerConfigUint64("cgroup.memory_quota")
if cgroupMemQuota > 0 && cgroupMemQuota < gsiMemQuota {
gsiMemQuota = cgroupMemQuota
}
return gsiMemQuota
}

// GetIndexerNumCpuPrc gets the Indexer's percentage of CPU to use (e.g. 400 means 4 cores). It is
// the logical minimum min(node, cgroup, GSI) * 100 available CPUs, where:
// node : # CPUs available on the node
// cgroup: # CPUs the Indexer cgroup is allocated (if cgroups are supported, else 0);
// indexer.cgroup.max_cpu_percent set from sigar num_cpu_prc
// GSI : indexer.settings.max_cpu_percent "Indexer Threads" UI config (if specified, else 0)
func (config Config) GetIndexerNumCpuPrc() int {
const _GetIndexerNumCpuPrc = "Config::GetIndexerNumCpuPrc:"

numCpuPrc := runtime.NumCPU() * 100 // node-level CPUs as a percent
cgroupCpuPrc := config.getIndexerConfigInt("cgroup.max_cpu_percent")
if cgroupCpuPrc > 0 && cgroupCpuPrc < numCpuPrc { // sigar gave a value and it is lower
numCpuPrc = cgroupCpuPrc
}
gsiCpuPrc := config.getIndexerConfigInt("settings.max_cpu_percent") // GSI UI override
if gsiCpuPrc > numCpuPrc {
consoleMsg := fmt.Sprintf("Indexer Threads setting %v exceeds CPU cores"+
" available %v. Using %v.", gsiCpuPrc/100, numCpuPrc/100, numCpuPrc/100)
Console(config.getIndexerConfigString("clusterAddr"), consoleMsg)
logging.Warnf("%v %v", _GetIndexerNumCpuPrc, consoleMsg)
} else if gsiCpuPrc > 0 { // GSI overide is set; known at this point that it is not higher
numCpuPrc = gsiCpuPrc
}
return numCpuPrc
}

// Int assumes config value is an integer and returns the same.
func (cv ConfigValue) Int() int {
if val, ok := cv.Value.(int); ok {
Expand Down Expand Up @@ -3430,4 +3512,4 @@ func (cv ConfigValue) Strings() []string {
// Bool assumes config value is a Bool and returns the same.
func (cv ConfigValue) Bool() bool {
return cv.Value.(bool)
}
}
3 changes: 2 additions & 1 deletion secondary/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,8 @@ func ComputeAvg(lastAvg, lastValue, currValue int64) int64 {
return (diff + lastAvg) / 2
}

// Write to the admin console
// Console writes a message to the admin console (Logs page of UI). It does not pop up a UI toaster.
// Console messages will also be written to ns_server's info.log and debug.log but not indexer.log.
func Console(clusterAddr string, format string, v ...interface{}) error {
msg := fmt.Sprintf(format, v...)
values := url.Values{"message": {msg}, "logLevel": {"info"}, "component": {"indexing"}}
Expand Down
100 changes: 85 additions & 15 deletions secondary/indexer/cpu_throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
package indexer

import (
"os"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/couchbase/indexing/secondary/common"
"github.com/couchbase/indexing/secondary/logging"
"github.com/couchbase/indexing/secondary/system"
)
Expand All @@ -35,9 +37,19 @@ type CpuThrottle struct {
throttleDelayMs unsafe.Pointer // *int64 ms to delay an action when CPU throttling is enabled
throttleStateMutex sync.Mutex // sync throttling enabled/disabled state changes, incl stopCh

// Circular buffer of past CPU stats and pointer to next one to (re)use
cpuStats [NUM_CPU_STATS]*system.SigarCpuT
cpuStatsIdx int // next index into cpuStats
// getCurrentCpuUsageStd's circular buffer of past CPU stats and index of next one to (re)use
cpuStatsStd [NUM_CPU_STATS]*system.SigarCpuT // CPU ticks in categories since sigar start
cpuStatsStdIdx int // next index into cpuStatsStd

// getCurrentCpuUsageCgroup's circular buffer of past CPU stats and index of next one to (re)use
cpuStatsCgroup [NUM_CPU_STATS]*cgroupCpuStats // CPU usage stats when cgroup is supported
cpuStatsCgroupIdx int // next index into cpuStatsCgroup
}

// cgroupCpuStats holds CPU usage stats in the case where cgroups are supported.
type cgroupCpuStats struct {
usageUsec uint64 // microsecs of CPU usage over used cores since sigar start
timeUnixMicro int64 // approximate Unix epoch microsecond timestamp of usageUsec retrieval
}

// NewCpuThrottle is the constructor for the CpuThrottle class. It returns an instance with
Expand Down Expand Up @@ -158,7 +170,8 @@ func (this *CpuThrottle) runThrottling(stopCh chan struct{}) {
logging.Infof("%v Starting. cpuTarget: %v, throttleDelayMs: %v", method,
this.getCpuTarget(), this.getThrottleDelayMs())

// Get a handle to sigar wrappers for CPU stats
// Start a sigar process for CPU stats
os.Unsetenv("COUCHBASE_CPU_COUNT") // so sigar will return cgroups num_cpu_prc in cgroups case
systemStats, err := system.NewSystemStats()
if err != nil {
logging.Infof("%v Failed to start: NewSystemStats returned error: %v", method, err)
Expand All @@ -173,9 +186,10 @@ func (this *CpuThrottle) runThrottling(stopCh chan struct{}) {
for {
select {
case <-stopCh:
// Clear out cpuStats so a later restart does not use old garbage
for idx := range this.cpuStats {
this.cpuStats[idx] = nil
// Clear out cpuStatsStd and cpuStatsCgroup so a later restart does not use old garbage
for idx := 0; idx < NUM_CPU_STATS; idx++ {
this.cpuStatsStd[idx] = nil
this.cpuStatsCgroup[idx] = nil
}
logging.Infof("%v Shutting down.", method)
return
Expand Down Expand Up @@ -233,21 +247,34 @@ func (this *CpuThrottle) adjustThrottleDelay(systemStats *system.SystemStats) {
}

// getCurrentCpuUsage gets the latest CPU usage stats from sigar, diffs them with the oldest stats,
// and returns the result as a value in range [0.0, 1.0] (regardless of number of cores). The fields
// counted as CPU "in use" are Sys + User + Nice + Irq + SoftIrq. (This is different from the
// sigar_cpu_perc_calculate function's perc.combined calculation, whose semantics are unclear.)
// and returns the result as a value in range [0.0, 1.0] regardless of number of cores. If cgroups
// (Linux control groups) are supported, the stats will be from the newer cgroups implementation in
// sigar, else they will be from the older tick-based implementation ("Std" for "standard").
func (this *CpuThrottle) getCurrentCpuUsage(systemStats *system.SystemStats) float64 {
const method string = "CpuThrottle::getCurrentCpuUsage:" // for logging
cgroupsSupported, cpuUse := this.getCurrentCpuUsageCgroup(systemStats)
if !cgroupsSupported {
cpuUse = this.getCurrentCpuUsageStd(systemStats)
}
return cpuUse
}

// getCurrentCpuUsageStd gets the latest CPU usage stats from sigar when cgroups are not supported,
// diffs them with the oldest stats, and returns the result as a value in range [0.0, 1.0]
// regardless of number of cores. The fields counted as CPU "in use" are Sys + User + Nice + Irq +
// SoftIrq. (This is different from the sigar_cpu_perc_calculate function's perc.combined
// calculation, whose semantics are unclear.)
func (this *CpuThrottle) getCurrentCpuUsageStd(systemStats *system.SystemStats) float64 {
const _getCurrentCpuUsageStd string = "CpuThrottle::getCurrentCpuUsageStd:"

// Get new stats and update the circular stats buffer
cpuStatsNew, err := systemStats.SigarCpuGet()
if err != nil {
logging.Infof("%v SigarCpuGet returned error: %v", method, err)
logging.Infof("%v SigarCpuGet returned error: %v", _getCurrentCpuUsageStd, err)
return 0.0
}
cpuStatsOld := this.cpuStats[this.cpuStatsIdx] // oldest stats in the circular buffer
this.cpuStats[this.cpuStatsIdx] = cpuStatsNew
this.cpuStatsIdx = (this.cpuStatsIdx + 1) % NUM_CPU_STATS
cpuStatsOld := this.cpuStatsStd[this.cpuStatsStdIdx] // oldest stats in circular buffer
this.cpuStatsStd[this.cpuStatsStdIdx] = cpuStatsNew
this.cpuStatsStdIdx = (this.cpuStatsStdIdx + 1) % NUM_CPU_STATS

if cpuStatsOld == nil { // have not wrapped around yet
return 0.0
Expand All @@ -269,3 +296,46 @@ func (this *CpuThrottle) getCurrentCpuUsage(systemStats *system.SystemStats) flo
}
return float64(cpuUseNew-cpuUseOld) / float64(deltaTime)
}

// getCurrentCpuUsageCgroup gets the latest CPU usage stats from sigar when cgroups are supported,
// diffs them with the oldest stats, and returns the result as a value in range [0.0, 1.0]
// regardless of number of cores. The bool return value is true if cgroups are supported, false
// otherwise. If false, the float64 result is garbage and the caller should switch to
// getCurrentCpuUsageStd instead.
func (this *CpuThrottle) getCurrentCpuUsageCgroup(systemStats *system.SystemStats) (bool, float64) {
cgroupInfo := systemStats.GetControlGroupInfo()
if cgroupInfo.Supported != common.SIGAR_CGROUP_SUPPORTED {
return false, 0.0
}
timeNewUsec := (time.Now().UnixNano() + 500) / 1000 // rounded to nearest microsecond

cpuStatsNew := &cgroupCpuStats{
usageUsec: cgroupInfo.UsageUsec,
timeUnixMicro: timeNewUsec,
}
cpuStatsOld := this.cpuStatsCgroup[this.cpuStatsCgroupIdx] // oldest stats in circular buffer
this.cpuStatsCgroup[this.cpuStatsCgroupIdx] = cpuStatsNew
this.cpuStatsCgroupIdx = (this.cpuStatsCgroupIdx + 1) % NUM_CPU_STATS

if cpuStatsOld == nil || // have not wrapped around yet
cgroupInfo.NumCpuPrc <= 0 { // bad CPU percentage; should not occur
return true, 0.0
}

// Calculate current CPU usage
maxPotentialUsage := float64(cpuStatsNew.timeUnixMicro-cpuStatsOld.timeUnixMicro) *
(float64(cgroupInfo.NumCpuPrc) / 100.0)
if maxPotentialUsage <= 0.0 { // could be 0.0 if no time elapsed or < 0.0 if clock changed
return true, 0.0
}

actualUsage := float64(cpuStatsNew.usageUsec - cpuStatsOld.usageUsec)
cpuUseNew := actualUsage / maxPotentialUsage
if cpuUseNew > 1.0 { // can occur due to time rounding/slop and if cgroupInfo.NumCpuPrc changes
cpuUseNew = 1.0
}
if cpuUseNew < 0.0 { // for safety; should not occur
cpuUseNew = 0.0
}
return true, cpuUseNew
}
3 changes: 1 addition & 2 deletions secondary/indexer/forestdb_slice_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ func NewForestDBSlice(path string, sliceId SliceId, idxDefn common.IndexDefn,
config := forestdb.DefaultConfig()
config.SetDurabilityOpt(forestdb.DRB_ASYNC)

memQuota := sysconf["settings.memory_quota"].Uint64()
logging.Debugf("NewForestDBSlice(): buffer cache size %d", memQuota)
memQuota := sysconf.GetIndexerMemoryQuota()
config.SetBufferCacheSize(memQuota)
logging.Debugf("NewForestDBSlice(): buffer cache size %d", memQuota)

Expand Down
Loading

0 comments on commit 8351d36

Please sign in to comment.