diff --git a/secondary/common/config.go b/secondary/common/config.go index 5bff6bb1d..cdc386174 100644 --- a/secondary/common/config.go +++ b/secondary/common/config.go @@ -2384,6 +2384,14 @@ var SystemConfig = Config{ false, // mutable false, // case-insensitive }, + "indexer.cgroup.memory_quota": ConfigValue{ + uint64(0), + "Linux cgroup override of indexer.settings.memory_quota;" + + " 0 if cgroups are not supported", + uint64(0), + false, // mutable + false, // case-insensitive + }, "indexer.settings.max_cpu_percent": ConfigValue{ 0, "Maximum percent of CPU that indexer can use. " + @@ -2393,6 +2401,14 @@ var SystemConfig = Config{ false, // mutable false, // case-insensitive }, + "indexer.cgroup.max_cpu_percent": ConfigValue{ + 0, + "Linux cgroup override of indexer.settings.max_cpu_percent;" + + " 0 if cgroups are not supported", + 0, + false, // mutable + false, // case-insensitive + }, "indexer.settings.log_level": ConfigValue{ "info", // keep in sync with index_settings_manager.erl "Indexer logging level", @@ -3383,6 +3399,72 @@ func (config Config) Json() []byte { return bytes } +// getIndexerConfig gets an Indexer config value of any type from a config map that may or may not +// have the "indexer." prefix stripped from its keys. Caller provides the key with prefix stripped. +func (config Config) getIndexerConfig(strippedKey string) ConfigValue { + value, exists := config[strippedKey] + if !exists { + value = config["indexer."+strippedKey] + } + return value +} + +// getIndexerConfigInt gets an Indexer int config value from a config map that may or may not have +// the "indexer." prefix stripped from its keys. Caller provides the key with prefix stripped. +func (config Config) getIndexerConfigInt(strippedKey string) int { + return config.getIndexerConfig(strippedKey).Int() +} + +// getIndexerConfigString gets an Indexer string config value from a config map that may or may not +// have the "indexer." prefix stripped from its keys. Caller provides the key with prefix stripped. +func (config Config) getIndexerConfigString(strippedKey string) string { + return config.getIndexerConfig(strippedKey).String() +} + +// getIndexerConfigUint64 gets an Indexer uint64 config value from a config map that may or may not +// have the "indexer." prefix stripped from its keys. Caller provides the key with prefix stripped. +func (config Config) getIndexerConfigUint64(strippedKey string) uint64 { + return config.getIndexerConfig(strippedKey).Uint64() +} + +// GetIndexerMemoryQuota gets the Indexer's memory quota in bytes as logical +// min(indexer.settings.memory_quota, indexer.cgroup.memory_quota). +// The latter is from sigar memory_max and only included if cgroups are supported. +func (config Config) GetIndexerMemoryQuota() uint64 { + gsiMemQuota := config.getIndexerConfigUint64("settings.memory_quota") + cgroupMemQuota := config.getIndexerConfigUint64("cgroup.memory_quota") + if cgroupMemQuota > 0 && cgroupMemQuota < gsiMemQuota { + gsiMemQuota = cgroupMemQuota + } + return gsiMemQuota +} + +// GetIndexerNumCpuPrc gets the Indexer's percentage of CPU to use (e.g. 400 means 4 cores). It is +// the logical minimum min(node, cgroup, GSI) * 100 available CPUs, where: +// node : # CPUs available on the node +// cgroup: # CPUs the Indexer cgroup is allocated (if cgroups are supported, else 0); +// indexer.cgroup.max_cpu_percent set from sigar num_cpu_prc +// GSI : indexer.settings.max_cpu_percent "Indexer Threads" UI config (if specified, else 0) +func (config Config) GetIndexerNumCpuPrc() int { + const _GetIndexerNumCpuPrc = "Config::GetIndexerNumCpuPrc:" + + numCpuPrc := runtime.NumCPU() * 100 // node-level CPUs as a percent + cgroupCpuPrc := config.getIndexerConfigInt("cgroup.max_cpu_percent") + if cgroupCpuPrc > 0 && cgroupCpuPrc < numCpuPrc { // sigar gave a value and it is lower + numCpuPrc = cgroupCpuPrc + } + gsiCpuPrc := config.getIndexerConfigInt("settings.max_cpu_percent") // GSI UI override + if gsiCpuPrc > numCpuPrc { + consoleMsg := fmt.Sprintf("Indexer Threads setting %v exceeds CPU cores"+ + " available %v. Using %v.", gsiCpuPrc/100, numCpuPrc/100, numCpuPrc/100) + Console(config.getIndexerConfigString("clusterAddr"), consoleMsg) + logging.Warnf("%v %v", _GetIndexerNumCpuPrc, consoleMsg) + } else if gsiCpuPrc > 0 { // GSI overide is set; known at this point that it is not higher + numCpuPrc = gsiCpuPrc + } + return numCpuPrc +} + // Int assumes config value is an integer and returns the same. func (cv ConfigValue) Int() int { if val, ok := cv.Value.(int); ok { @@ -3430,4 +3512,4 @@ func (cv ConfigValue) Strings() []string { // Bool assumes config value is a Bool and returns the same. func (cv ConfigValue) Bool() bool { return cv.Value.(bool) -} \ No newline at end of file +} diff --git a/secondary/common/util.go b/secondary/common/util.go index 019f777ac..839c5b1b2 100644 --- a/secondary/common/util.go +++ b/secondary/common/util.go @@ -1046,7 +1046,8 @@ func ComputeAvg(lastAvg, lastValue, currValue int64) int64 { return (diff + lastAvg) / 2 } -// Write to the admin console +// Console writes a message to the admin console (Logs page of UI). It does not pop up a UI toaster. +// Console messages will also be written to ns_server's info.log and debug.log but not indexer.log. func Console(clusterAddr string, format string, v ...interface{}) error { msg := fmt.Sprintf(format, v...) values := url.Values{"message": {msg}, "logLevel": {"info"}, "component": {"indexing"}} diff --git a/secondary/indexer/cpu_throttle.go b/secondary/indexer/cpu_throttle.go index 0a586f9dd..69e61fc68 100644 --- a/secondary/indexer/cpu_throttle.go +++ b/secondary/indexer/cpu_throttle.go @@ -8,11 +8,13 @@ package indexer import ( + "os" "sync" "sync/atomic" "time" "unsafe" + "github.com/couchbase/indexing/secondary/common" "github.com/couchbase/indexing/secondary/logging" "github.com/couchbase/indexing/secondary/system" ) @@ -35,9 +37,19 @@ type CpuThrottle struct { throttleDelayMs unsafe.Pointer // *int64 ms to delay an action when CPU throttling is enabled throttleStateMutex sync.Mutex // sync throttling enabled/disabled state changes, incl stopCh - // Circular buffer of past CPU stats and pointer to next one to (re)use - cpuStats [NUM_CPU_STATS]*system.SigarCpuT - cpuStatsIdx int // next index into cpuStats + // getCurrentCpuUsageStd's circular buffer of past CPU stats and index of next one to (re)use + cpuStatsStd [NUM_CPU_STATS]*system.SigarCpuT // CPU ticks in categories since sigar start + cpuStatsStdIdx int // next index into cpuStatsStd + + // getCurrentCpuUsageCgroup's circular buffer of past CPU stats and index of next one to (re)use + cpuStatsCgroup [NUM_CPU_STATS]*cgroupCpuStats // CPU usage stats when cgroup is supported + cpuStatsCgroupIdx int // next index into cpuStatsCgroup +} + +// cgroupCpuStats holds CPU usage stats in the case where cgroups are supported. +type cgroupCpuStats struct { + usageUsec uint64 // microsecs of CPU usage over used cores since sigar start + timeUnixMicro int64 // approximate Unix epoch microsecond timestamp of usageUsec retrieval } // NewCpuThrottle is the constructor for the CpuThrottle class. It returns an instance with @@ -158,7 +170,8 @@ func (this *CpuThrottle) runThrottling(stopCh chan struct{}) { logging.Infof("%v Starting. cpuTarget: %v, throttleDelayMs: %v", method, this.getCpuTarget(), this.getThrottleDelayMs()) - // Get a handle to sigar wrappers for CPU stats + // Start a sigar process for CPU stats + os.Unsetenv("COUCHBASE_CPU_COUNT") // so sigar will return cgroups num_cpu_prc in cgroups case systemStats, err := system.NewSystemStats() if err != nil { logging.Infof("%v Failed to start: NewSystemStats returned error: %v", method, err) @@ -173,9 +186,10 @@ func (this *CpuThrottle) runThrottling(stopCh chan struct{}) { for { select { case <-stopCh: - // Clear out cpuStats so a later restart does not use old garbage - for idx := range this.cpuStats { - this.cpuStats[idx] = nil + // Clear out cpuStatsStd and cpuStatsCgroup so a later restart does not use old garbage + for idx := 0; idx < NUM_CPU_STATS; idx++ { + this.cpuStatsStd[idx] = nil + this.cpuStatsCgroup[idx] = nil } logging.Infof("%v Shutting down.", method) return @@ -233,21 +247,34 @@ func (this *CpuThrottle) adjustThrottleDelay(systemStats *system.SystemStats) { } // getCurrentCpuUsage gets the latest CPU usage stats from sigar, diffs them with the oldest stats, -// and returns the result as a value in range [0.0, 1.0] (regardless of number of cores). The fields -// counted as CPU "in use" are Sys + User + Nice + Irq + SoftIrq. (This is different from the -// sigar_cpu_perc_calculate function's perc.combined calculation, whose semantics are unclear.) +// and returns the result as a value in range [0.0, 1.0] regardless of number of cores. If cgroups +// (Linux control groups) are supported, the stats will be from the newer cgroups implementation in +// sigar, else they will be from the older tick-based implementation ("Std" for "standard"). func (this *CpuThrottle) getCurrentCpuUsage(systemStats *system.SystemStats) float64 { - const method string = "CpuThrottle::getCurrentCpuUsage:" // for logging + cgroupsSupported, cpuUse := this.getCurrentCpuUsageCgroup(systemStats) + if !cgroupsSupported { + cpuUse = this.getCurrentCpuUsageStd(systemStats) + } + return cpuUse +} + +// getCurrentCpuUsageStd gets the latest CPU usage stats from sigar when cgroups are not supported, +// diffs them with the oldest stats, and returns the result as a value in range [0.0, 1.0] +// regardless of number of cores. The fields counted as CPU "in use" are Sys + User + Nice + Irq + +// SoftIrq. (This is different from the sigar_cpu_perc_calculate function's perc.combined +// calculation, whose semantics are unclear.) +func (this *CpuThrottle) getCurrentCpuUsageStd(systemStats *system.SystemStats) float64 { + const _getCurrentCpuUsageStd string = "CpuThrottle::getCurrentCpuUsageStd:" // Get new stats and update the circular stats buffer cpuStatsNew, err := systemStats.SigarCpuGet() if err != nil { - logging.Infof("%v SigarCpuGet returned error: %v", method, err) + logging.Infof("%v SigarCpuGet returned error: %v", _getCurrentCpuUsageStd, err) return 0.0 } - cpuStatsOld := this.cpuStats[this.cpuStatsIdx] // oldest stats in the circular buffer - this.cpuStats[this.cpuStatsIdx] = cpuStatsNew - this.cpuStatsIdx = (this.cpuStatsIdx + 1) % NUM_CPU_STATS + cpuStatsOld := this.cpuStatsStd[this.cpuStatsStdIdx] // oldest stats in circular buffer + this.cpuStatsStd[this.cpuStatsStdIdx] = cpuStatsNew + this.cpuStatsStdIdx = (this.cpuStatsStdIdx + 1) % NUM_CPU_STATS if cpuStatsOld == nil { // have not wrapped around yet return 0.0 @@ -269,3 +296,46 @@ func (this *CpuThrottle) getCurrentCpuUsage(systemStats *system.SystemStats) flo } return float64(cpuUseNew-cpuUseOld) / float64(deltaTime) } + +// getCurrentCpuUsageCgroup gets the latest CPU usage stats from sigar when cgroups are supported, +// diffs them with the oldest stats, and returns the result as a value in range [0.0, 1.0] +// regardless of number of cores. The bool return value is true if cgroups are supported, false +// otherwise. If false, the float64 result is garbage and the caller should switch to +// getCurrentCpuUsageStd instead. +func (this *CpuThrottle) getCurrentCpuUsageCgroup(systemStats *system.SystemStats) (bool, float64) { + cgroupInfo := systemStats.GetControlGroupInfo() + if cgroupInfo.Supported != common.SIGAR_CGROUP_SUPPORTED { + return false, 0.0 + } + timeNewUsec := (time.Now().UnixNano() + 500) / 1000 // rounded to nearest microsecond + + cpuStatsNew := &cgroupCpuStats{ + usageUsec: cgroupInfo.UsageUsec, + timeUnixMicro: timeNewUsec, + } + cpuStatsOld := this.cpuStatsCgroup[this.cpuStatsCgroupIdx] // oldest stats in circular buffer + this.cpuStatsCgroup[this.cpuStatsCgroupIdx] = cpuStatsNew + this.cpuStatsCgroupIdx = (this.cpuStatsCgroupIdx + 1) % NUM_CPU_STATS + + if cpuStatsOld == nil || // have not wrapped around yet + cgroupInfo.NumCpuPrc <= 0 { // bad CPU percentage; should not occur + return true, 0.0 + } + + // Calculate current CPU usage + maxPotentialUsage := float64(cpuStatsNew.timeUnixMicro-cpuStatsOld.timeUnixMicro) * + (float64(cgroupInfo.NumCpuPrc) / 100.0) + if maxPotentialUsage <= 0.0 { // could be 0.0 if no time elapsed or < 0.0 if clock changed + return true, 0.0 + } + + actualUsage := float64(cpuStatsNew.usageUsec - cpuStatsOld.usageUsec) + cpuUseNew := actualUsage / maxPotentialUsage + if cpuUseNew > 1.0 { // can occur due to time rounding/slop and if cgroupInfo.NumCpuPrc changes + cpuUseNew = 1.0 + } + if cpuUseNew < 0.0 { // for safety; should not occur + cpuUseNew = 0.0 + } + return true, cpuUseNew +} diff --git a/secondary/indexer/forestdb_slice_writer.go b/secondary/indexer/forestdb_slice_writer.go index 4af348f8f..bf19eee71 100644 --- a/secondary/indexer/forestdb_slice_writer.go +++ b/secondary/indexer/forestdb_slice_writer.go @@ -62,8 +62,7 @@ func NewForestDBSlice(path string, sliceId SliceId, idxDefn common.IndexDefn, config := forestdb.DefaultConfig() config.SetDurabilityOpt(forestdb.DRB_ASYNC) - memQuota := sysconf["settings.memory_quota"].Uint64() - logging.Debugf("NewForestDBSlice(): buffer cache size %d", memQuota) + memQuota := sysconf.GetIndexerMemoryQuota() config.SetBufferCacheSize(memQuota) logging.Debugf("NewForestDBSlice(): buffer cache size %d", memQuota) diff --git a/secondary/indexer/indexer.go b/secondary/indexer/indexer.go index c75546190..eef39d134 100644 --- a/secondary/indexer/indexer.go +++ b/secondary/indexer/indexer.go @@ -158,7 +158,7 @@ type indexer struct { schedIdxCreator *schedIndexCreator //handle to scheduled index creator clustMgrAgent ClustMgrAgent //handle to ClustMgrAgent kvSender KVSender //handle to KVSender - settingsMgr settingsManager //handle to settings manager + settingsMgr *settingsManager //handle to settings manager statsMgr *statsManager //handle to statistics manager scanCoord ScanCoordinator //handle to ScanCoordinator cpuThrottle *CpuThrottle //handle to CPU throttler (for Autofailover) @@ -167,7 +167,7 @@ type indexer struct { // ns_server only supports registering a single object for RPC calls. masterMgr *MasterServiceManager - config common.Config + config common.Config // map of current indexer config settings with "indexer." prefix stripped kvlock sync.Mutex //fine-grain lock for KVSender clustMgrLock sync.Mutex // lock to protect concurrent reads and writes from clustMgrAgentCmdCh @@ -662,7 +662,7 @@ func (idx *indexer) handleSecurityChange(msg Message) { func (idx *indexer) initFromConfig() { // Read memquota setting - memQuota := int64(idx.config["settings.memory_quota"].Uint64()) + memQuota := int64(idx.config.GetIndexerMemoryQuota()) idx.stats.memoryQuota.Set(memQuota) plasma.SetMemoryQuota(int64(float64(memQuota) * PLASMA_MEMQUOTA_FRAC)) memdb.Debug(idx.config["settings.moi.debug"].Bool()) @@ -1533,17 +1533,20 @@ func (idx *indexer) updateStorageMode(newConfig common.Config) { } } +// handleConfigUpdate updates Indexer config settings and propagates them to children / workers. func (idx *indexer) handleConfigUpdate(msg Message) { cfgUpdate := msg.(*MsgConfigUpdate) + oldConfig := idx.config newConfig := cfgUpdate.GetConfig() + idx.config = newConfig idx.updateStorageMode(newConfig) if newConfig["settings.memory_quota"].Uint64() != - idx.config["settings.memory_quota"].Uint64() { + oldConfig["settings.memory_quota"].Uint64() { - memQuota := int64(newConfig["settings.memory_quota"].Uint64()) + memQuota := int64(idx.config.GetIndexerMemoryQuota()) idx.stats.memoryQuota.Set(memQuota) plasma.SetMemoryQuota(int64(float64(memQuota) * PLASMA_MEMQUOTA_FRAC)) @@ -1555,7 +1558,7 @@ func (idx *indexer) handleConfigUpdate(msg Message) { } } if common.GetStorageMode() == common.MOI { - if moiPersisters := newConfig["settings.moi.persistence_threads"].Int(); moiPersisters != idx.config["settings.moi.persistence_threads"].Int() { + if moiPersisters := newConfig["settings.moi.persistence_threads"].Int(); moiPersisters != oldConfig["settings.moi.persistence_threads"].Int() { if moiPersisters <= cap(moiWriterSemaphoreCh) { logging.Infof("Indexer: Setting MOI persisters to %v", moiPersisters) @@ -1569,7 +1572,7 @@ func (idx *indexer) handleConfigUpdate(msg Message) { } if newConfig["settings.compaction.plasma.manual"].Bool() != - idx.config["settings.compaction.plasma.manual"].Bool() { + oldConfig["settings.compaction.plasma.manual"].Bool() { logging.Infof("Indexer::handleConfigUpdate restart indexer due to compaction.plasma.manual") idx.stats.needsRestart.Set(true) os.Exit(0) @@ -1583,21 +1586,21 @@ func (idx *indexer) handleConfigUpdate(msg Message) { if newConfig["api.enableTestServer"].Bool() && !idx.testServRunning { // Start indexer endpoints for CRUD operations. // Initialize the QE REST server on config change. - certFile := idx.config["certFile"].String() - keyFile := idx.config["keyFile"].String() - NewTestServer(idx.config["clusterAddr"].String(), certFile, keyFile) + certFile := newConfig["certFile"].String() + keyFile := newConfig["keyFile"].String() + NewTestServer(newConfig["clusterAddr"].String(), certFile, keyFile) idx.testServRunning = true } if mcdTimeout, ok := newConfig["memcachedTimeout"]; ok { - if mcdTimeout.Int() != idx.config["memcachedTimeout"].Int() { + if mcdTimeout.Int() != oldConfig["memcachedTimeout"].Int() { common.SetDcpMemcachedTimeout(uint32(mcdTimeout.Int())) logging.Infof("memcachedTimeout set to %v\n", uint32(mcdTimeout.Int())) } } if newConfig["settings.max_cpu_percent"].Int() != - idx.config["settings.max_cpu_percent"].Int() { + oldConfig["settings.max_cpu_percent"].Int() { value := common.ConfigValue{ Value: uint64(math.Max(2.0, float64(runtime.GOMAXPROCS(0))*0.25)), Help: "Minimum number of shard", @@ -1610,17 +1613,16 @@ func (idx *indexer) handleConfigUpdate(msg Message) { if workersPerReader, ok := newConfig["vbseqnos.workers_per_reader"]; ok { if newConfig["vbseqnos.workers_per_reader"].Int() != - idx.config["vbseqnos.workers_per_reader"].Int() { + oldConfig["vbseqnos.workers_per_reader"].Int() { common.UpdateVbSeqnosWorkersPerReader(int32(workersPerReader.Int())) common.ResetBucketSeqnos() } } - memdb.Debug(idx.config["settings.moi.debug"].Bool()) + memdb.Debug(oldConfig["settings.moi.debug"].Bool()) idx.setProfilerOptions(newConfig) idx.cpuThrottle.SetCpuTarget(newConfig["cpu.throttle.target"].Float64()) - idx.config = newConfig idx.compactMgrCmdCh <- msg <-idx.compactMgrCmdCh idx.tkCmdCh <- msg @@ -7069,7 +7071,7 @@ func (idx *indexer) bootstrap2() error { } //check if Paused state is required - memory_quota := idx.config["settings.memory_quota"].Uint64() + memory_quota := idx.config.GetIndexerMemoryQuota() high_mem_mark := idx.config["high_mem_mark"].Float64() //free memory after bootstrap before deciding to pause @@ -9142,7 +9144,7 @@ func (idx *indexer) monitorMemUsage() { if common.GetStorageMode() == common.MOI && pause_if_oom { - memory_quota := idx.config["settings.memory_quota"].Uint64() + memory_quota := idx.config.GetIndexerMemoryQuota() high_mem_mark := idx.config["high_mem_mark"].Float64() low_mem_mark := idx.config["low_mem_mark"].Float64() min_oom_mem := idx.config["min_oom_memory"].Uint64() @@ -9196,7 +9198,6 @@ func (idx *indexer) monitorMemUsage() { time.Sleep(time.Second * time.Duration(monitorInterval)) } - } func (idx *indexer) handleIndexerPause(msg Message) { @@ -9413,7 +9414,7 @@ func (idx *indexer) updateMemstats() { func (idx *indexer) needsGCMoi() bool { var memUsed uint64 - memQuota := idx.config["settings.memory_quota"].Uint64() + memQuota := idx.config.GetIndexerMemoryQuota() if idx.getIndexerState() == common.INDEXER_PAUSED { memUsed, _, _ = idx.memoryUsed(true) @@ -9439,7 +9440,7 @@ func (idx *indexer) needsGCMoi() bool { func (idx *indexer) needsGCFdb() bool { var memUsed uint64 - memQuota := idx.config["settings.memory_quota"].Uint64() + memQuota := idx.config.GetIndexerMemoryQuota() //ignore till 1GB ignoreThreshold := idx.config["min_oom_memory"].Uint64() * 4 diff --git a/secondary/indexer/mutation_manager.go b/secondary/indexer/mutation_manager.go index 883582a1a..d9d2bb5a3 100644 --- a/secondary/indexer/mutation_manager.go +++ b/secondary/indexer/mutation_manager.go @@ -1373,7 +1373,7 @@ func (m *mutationMgr) handleConfigUpdate(cmd Message) { //Calculate mutation queue length from memory quota func (m *mutationMgr) setMaxMemoryFromQuota() { - memQuota := m.config["settings.memory_quota"].Uint64() + memQuota := m.config.GetIndexerMemoryQuota() fracQueueMem := getMutationQueueMemFrac(m.config) maxMem := int64(fracQueueMem * float64(memQuota)) @@ -1384,7 +1384,6 @@ func (m *mutationMgr) setMaxMemoryFromQuota() { atomic.StoreInt64(&m.maxMemory, maxMem) logging.Infof("MutationMgr::MaxQueueMemoryQuota %v", maxMem) - } func (m *mutationMgr) handleIndexerPause(cmd Message) { diff --git a/secondary/indexer/settings.go b/secondary/indexer/settings.go index 5c311684c..7b620df7b 100644 --- a/secondary/indexer/settings.go +++ b/secondary/indexer/settings.go @@ -22,6 +22,7 @@ import ( "github.com/couchbase/indexing/secondary/pipeline" "github.com/couchbase/indexing/secondary/stubs/nitro/mm" "github.com/couchbase/indexing/secondary/stubs/nitro/plasma" + "github.com/couchbase/indexing/secondary/system" "io/ioutil" "net/http" @@ -37,7 +38,7 @@ const ( compactionDaysSetting = "indexer.settings.compaction.days_of_week" ) -// Implements dynamic settings management for indexer +// settingsManager implements dynamic settings management for indexer. type settingsManager struct { supvCmdch MsgChannel supvMsgch MsgChannel @@ -48,8 +49,9 @@ type settingsManager struct { notifyPending bool } +// NewSettingsManager is the settingsManager constructor. Indexer creates a child singleton of this. func NewSettingsManager(supvCmdch MsgChannel, - supvMsgch MsgChannel, config common.Config) (settingsManager, common.Config, Message) { + supvMsgch MsgChannel, config common.Config) (*settingsManager, common.Config, Message) { s := settingsManager{ supvCmdch: supvCmdch, supvMsgch: supvMsgch, @@ -60,7 +62,7 @@ func NewSettingsManager(supvCmdch MsgChannel, // This method will merge metakv indexer settings onto default settings. config, err := common.GetSettingsConfig(config) if err != nil { - return s, nil, &MsgError{ + return &s, nil, &MsgError{ err: Error{ category: INDEXER, cause: err, @@ -68,7 +70,19 @@ func NewSettingsManager(supvCmdch MsgChannel, }} } - initGlobalSettings(nil, config) + // Set cgroup overrides; these will be 0 if cgroups are not supported + sigarMemoryMax, sigarNumCpuPrc := sigarGetMemoryMaxAndNumCpuPrc() + const memKey = "indexer.cgroup.memory_quota" + const cpuKey = "indexer.cgroup.max_cpu_percent" + value := config[memKey] + value.Value = sigarMemoryMax + config[memKey] = value + value = config[cpuKey] + value.Value = sigarNumCpuPrc + config[cpuKey] = value + + // Initialize the global config settings + s.setGlobalSettings(nil, config) go func() { fn := func(r int, err error) error { @@ -89,7 +103,38 @@ func NewSettingsManager(supvCmdch MsgChannel, go s.run() indexerConfig := config.SectionConfig("indexer.", true) - return s, indexerConfig, &MsgSuccess{} + return &s, indexerConfig, &MsgSuccess{} +} + +// sigarGetMemoryMaxAndNumCpuPrc returns memory_max and num_cpu_prc from sigar when cgroups are +// supported. +// memory_max is the memory quota in bytes, not accounting for GSI's exposed user override +// indexer.settings.memory_quota. +// num_cpu_prc is the number of CPUs to use * 100 either from the cgroup or from ns_server's +// COUCHBASE_CPU_COUNT environment variable which overrides it. This value does not account for +// GSI's exposed user override indexer.settings.max_cpu_percent. +// Returns 0, 0 if cgroups are not supported or sigar fails. +func sigarGetMemoryMaxAndNumCpuPrc() (uint64, int) { + const _sigarGetMemoryMaxAndNumCpuPrc = "settings::sigarGetMemoryMaxAndNumCpuPrc:" + + // Start a sigar process + systemStats, err := system.NewSystemStats() + if err != nil { + logging.Infof("%v sigar failed to start: NewSystemStats returned error: %v", + _sigarGetMemoryMaxAndNumCpuPrc, err) + return 0, 0 + } + defer systemStats.Close() + + // Get cgroup info and check if cgroups are supported + cgroupInfo := systemStats.GetControlGroupInfo() + if cgroupInfo.Supported != common.SIGAR_CGROUP_SUPPORTED { + return 0, 0 + } + + // Cgroups are supported, so return its mem and CPU values. (CPU value will pick up any user + // override from ns_server's COUCHBASE_CPU_COUNT environment variable.) + return cgroupInfo.MemoryMax, int(cgroupInfo.NumCpuPrc) } func (s *settingsManager) RegisterRestEndpoints() { @@ -349,7 +394,7 @@ func (s *settingsManager) applySettings(path string, value []byte, rev interface newConfig := s.config.Clone() newConfig.Update(value) - initGlobalSettings(s.config, newConfig) + s.setGlobalSettings(s.config, newConfig) s.config = newConfig @@ -450,11 +495,15 @@ func setBlockPoolSize(o, n common.Config) { } } -func initGlobalSettings(oldCfg, newCfg common.Config) { +// setGlobalSettings is used to both initialize and update global config settings. +func (s *settingsManager) setGlobalSettings(oldCfg, newCfg common.Config) { + const _setGlobalSettings = "settingsManager::setGlobalSettings:" + setBlockPoolSize(oldCfg, newCfg) - ncpu := common.SetNumCPUs(newCfg["indexer.settings.max_cpu_percent"].Int()) - logging.Infof("Setting maxcpus = %d", ncpu) + // Set number of CPU cores to use to min(node, cgroup, GSI) + ncpu := common.SetNumCPUs(newCfg.GetIndexerNumCpuPrc()) + logging.Infof("%v Setting maxcpus = %d", _setGlobalSettings, ncpu) setLogger(newCfg) useMutationSyncPool = newCfg["indexer.useMutationSyncPool"].Bool() diff --git a/secondary/manager/manager.go b/secondary/manager/manager.go index 5c771fc96..5e0856a4d 100644 --- a/secondary/manager/manager.go +++ b/secondary/manager/manager.go @@ -826,7 +826,7 @@ func (m *IndexManager) stopMasterService() { //Calculate forestdb buffer cache from memory quota func (m *IndexManager) calcBufCacheFromMemQuota(config common.Config) uint64 { - totalQuota := config["settings.memory_quota"].Uint64() + totalQuota := config.GetIndexerMemoryQuota() //calculate queue memory fracQueueMem := config["mutation_manager.fdb.fracMutationQueueMem"].Float64() diff --git a/secondary/planner/proxy.go b/secondary/planner/proxy.go index 9d469fa5d..bfab868c6 100644 --- a/secondary/planner/proxy.go +++ b/secondary/planner/proxy.go @@ -141,10 +141,26 @@ func RetrievePlanFromCluster(clusterUrl string, hosts []string, isRebalance bool return nil, err } - err = getIndexSettings(plan) - if err != nil { - return nil, err - } + // GOMAXPROCS is now set to the logical minimum of + // 1. runtime.NumCPU + // 2. SigarControlGroupInfo.NumCpuPrc / 100 (only included if cgroups are supported) + // 3. indexer.settings.max_cpu_percent / 100 (current Indexer Threads UI setting) + // We assume all Index nodes are configured the same w.r.t the above. It is impossible to + // configure #3 differently across nodes, but it is possible, though not advised, to configure + // #1 and #2 differently. The resulting CpuQuota is correct if Planner is running directly on + // an Index node and they are all configured the same, but it will not pick up #3 if it is + // running on a non-Index node (e.g. Query not colocated with Index) and the result in this case + // may also be based on a different value for #2 than Index nodes use. + // + // Currently plan.CpuQuota is not actually used other than to be logged in some messages, so we + // have not spent time making this more sophisticated. Note also there is only one plan.CpuQuota + // value, so if nodes are configured differently it is not clear which one's value to use. + // + // If CpuQuota constraints are ever re-enabled, we will need to revisit the above questions. We + // could implement a means of retrieving GOMAXPROCS from all Index nodes, but we'd also need to + // address the issue of them possibly being different, e.g. by enhancing Planner to save + // CpuQuota on a per-node basis and related enhancements to how it uses these values. + plan.CpuQuota = uint64(runtime.GOMAXPROCS(0)) err = getIndexNumReplica(plan, isRebalance) if err != nil { @@ -845,45 +861,6 @@ func getIndexStats(plan *Plan, config common.Config) error { return nil } -// -// This function retrieves the index settings. -// -func getIndexSettings(plan *Plan) error { - - cinfo := cinfoClient.GetClusterInfoCache() - cinfo.RLock() - defer cinfo.RUnlock() - - // find all nodes that has a index http service - nids := cinfo.GetNodesByServiceType(common.INDEX_HTTP_SERVICE) - - if len(nids) == 0 { - return errors.New("No indexing service available.") - } - - resp, err := restHelperNoLock(getLocalSettingsResp, nil, plan.Placement, cinfo, "LocalSettingsResp") - if err != nil { - return err - } - - for _, res := range resp { - - settings := res.settings - - // Find the cpu quota from setting. If it is set to 0, then find out avail core on the node. - quota, ok := settings["indexer.settings.max_cpu_percent"] - if !ok || uint64(quota.(float64)) == 0 { - plan.CpuQuota = uint64(runtime.NumCPU()) - } else { - plan.CpuQuota = uint64(quota.(float64) / 100) - } - - return nil - } - - return nil -} - // // This function extract the topology metadata for a bucket, scope and collection. // @@ -1035,20 +1012,6 @@ func getLocalDropInstanceTokensResp(addr string) (*http.Response, error) { return resp, nil } -// -// This function gets the marshalled indexer settings for a specific indexer host. -// -func getLocalSettingsResp(addr string) (*http.Response, error) { - - resp, err := getWithCbauth(addr + "/settings") - if err != nil { - logging.Errorf("Planner::getLocalSettingsResp: Failed to get settings from node: %v, err: %v", addr, err) - return nil, err - } - - return resp, nil -} - // // This function gets the marshalled num replica for a specific indexer host. // diff --git a/secondary/system/systemStats.go b/secondary/system/systemStats.go index 5fa570a17..b5f8e2806 100644 --- a/secondary/system/systemStats.go +++ b/secondary/system/systemStats.go @@ -158,6 +158,10 @@ type SigarControlGroupInfo struct { // Current memory usage by this cgroup. Derived from memory.usage_in_bytes MemoryCurrent uint64 + + // UsageUsec gives the total microseconds of CPU used from sigar start across all available + // cores, so this can increase at a rate of N times real time if there are N cores in use + UsageUsec uint64 } func (h *SystemStats) GetControlGroupInfo() *SigarControlGroupInfo { @@ -170,5 +174,6 @@ func (h *SystemStats) GetControlGroupInfo() *SigarControlGroupInfo { NumCpuPrc: uint16(info.num_cpu_prc), MemoryMax: uint64(info.memory_max), MemoryCurrent: uint64(info.memory_current), + UsageUsec: uint64(info.usage_usec), } }