Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process_collector: Add Platform-Specific Describe for processCollector #1625

Merged
merged 5 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions prometheus/process_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

type processCollector struct {
collectFn func(chan<- Metric)
describeFn func(chan<- *Desc)
pidFn func() (int, error)
reportErrors bool
cpuTotal *Desc
Expand Down Expand Up @@ -122,33 +123,35 @@ func NewProcessCollector(opts ProcessCollectorOpts) Collector {
// Set up process metric collection if supported by the runtime.
if canCollectProcess() {
c.collectFn = c.processCollect
c.describeFn = c.describe
} else {
c.collectFn = func(ch chan<- Metric) {
c.reportError(ch, nil, errors.New("process metrics not supported on this platform"))
}
c.collectFn = c.errorCollectFn
c.describeFn = c.errorDescribeFn
}

return c
}

// Describe returns all descriptions of the collector.
func (c *processCollector) Describe(ch chan<- *Desc) {
ch <- c.cpuTotal
ch <- c.openFDs
ch <- c.maxFDs
ch <- c.vsize
ch <- c.maxVsize
ch <- c.rss
ch <- c.startTime
ch <- c.inBytes
ch <- c.outBytes
func (c *processCollector) errorCollectFn(ch chan<- Metric) {
c.reportError(ch, nil, errors.New("process metrics not supported on this platform"))
}

func (c *processCollector) errorDescribeFn(ch chan<- *Desc) {
if c.reportErrors {
ch <- NewInvalidDesc(errors.New("process metrics not supported on this platform"))
}
}

// Collect returns the current state of all metrics of the collector.
func (c *processCollector) Collect(ch chan<- Metric) {
c.collectFn(ch)
}

// Describe returns all descriptions of the collector.
func (c *processCollector) Describe(ch chan<- *Desc) {
c.describeFn(ch)
}

func (c *processCollector) reportError(ch chan<- Metric, desc *Desc, err error) {
if !c.reportErrors {
return
Expand Down
22 changes: 21 additions & 1 deletion prometheus/process_collector_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package prometheus
import (
"errors"
"fmt"
"golang.org/x/sys/unix"
"os"
"syscall"
"time"

"golang.org/x/sys/unix"
)

// notImplementedErr is returned by stub functions that replace cgo functions, when cgo
Expand Down Expand Up @@ -68,6 +69,25 @@ func getOpenFileCount() (float64, error) {
}
}

// describe returns all descriptions of the collector for Darwin.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) describe(ch chan<- *Desc) {
ch <- c.cpuTotal
ch <- c.openFDs
ch <- c.maxFDs
ch <- c.maxVsize
ch <- c.startTime

/* the process could be collected but not implemented yet
ch <- c.rss
ch <- c.vsize
ch <- c.inBytes
ch <- c.outBytes
*/
}

func (c *processCollector) processCollect(ch chan<- Metric) {
if procs, err := unix.SysctlKinfoProcSlice("kern.proc.pid", os.Getpid()); err == nil {
if len(procs) == 1 {
Expand Down
11 changes: 9 additions & 2 deletions prometheus/process_collector_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ func canCollectProcess() bool {
return false
}

// describe returns all descriptions of the collector for js.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I'm not sure why we have files for js and wasip1 that explicitly say "false" in can collect function, but other potentially also non supported environments will go through _other file. Maybe we can find the source of the file/blame to see why it was added vs going through _other file flow 🤔

We can do in other PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may caused by below function is not available in js and wasip1 PR, but I think you are still right, we can at least merge those 2 since they are exactly the same today. Just left them now as they are, let me know what do you think, I can open a follow PR.

func canCollectProcess() bool {
	_, err := procfs.NewDefaultFS()
	return err == nil
}

// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) processCollect(ch chan<- Metric) {
// noop on this platform
return
c.errorCollectFn(ch)
}

func (c *processCollector) describe(ch chan<- *Desc) {
c.errorDescribeFn(ch)
}
16 changes: 16 additions & 0 deletions prometheus/process_collector_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,19 @@ func (c *processCollector) processCollect(ch chan<- Metric) {
c.reportError(ch, nil, err)
}
}

// describe returns all descriptions of the collector for others than windows, js, wasip1 and darwin.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) describe(ch chan<- *Desc) {
ch <- c.cpuTotal
ch <- c.openFDs
ch <- c.maxFDs
ch <- c.vsize
ch <- c.maxVsize
ch <- c.rss
ch <- c.startTime
ch <- c.inBytes
ch <- c.outBytes
}
49 changes: 49 additions & 0 deletions prometheus/process_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,52 @@ func TestNewPidFileFn(t *testing.T) {
}
}
}

func TestDescribeAndCollectAlignment(t *testing.T) {
collector := &processCollector{
pidFn: getPIDFn(),
cpuTotal: NewDesc("cpu_total", "Total CPU usage", nil, nil),
openFDs: NewDesc("open_fds", "Number of open file descriptors", nil, nil),
maxFDs: NewDesc("max_fds", "Maximum file descriptors", nil, nil),
vsize: NewDesc("vsize", "Virtual memory size", nil, nil),
maxVsize: NewDesc("max_vsize", "Maximum virtual memory size", nil, nil),
rss: NewDesc("rss", "Resident Set Size", nil, nil),
startTime: NewDesc("start_time", "Process start time", nil, nil),
inBytes: NewDesc("in_bytes", "Input bytes", nil, nil),
outBytes: NewDesc("out_bytes", "Output bytes", nil, nil),
}

// Collect and get descriptors
descCh := make(chan *Desc, 15)
collector.describe(descCh)
close(descCh)

definedDescs := make(map[string]bool)
for desc := range descCh {
definedDescs[desc.String()] = true
}

// Collect and get metrics
metricsCh := make(chan Metric, 15)
collector.processCollect(metricsCh)
close(metricsCh)

collectedMetrics := make(map[string]bool)
for metric := range metricsCh {
collectedMetrics[metric.Desc().String()] = true
}

// Verify that all described metrics are collected
for desc := range definedDescs {
if !collectedMetrics[desc] {
t.Errorf("Metric %s described but not collected", desc)
}
}

// Verify that no extra metrics are collected
for desc := range collectedMetrics {
if !definedDescs[desc] {
t.Errorf("Metric %s collected but not described", desc)
}
}
}
13 changes: 10 additions & 3 deletions prometheus/process_collector_wasip1.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,14 @@ func canCollectProcess() bool {
return false
}

func (*processCollector) processCollect(chan<- Metric) {
// noop on this platform
return
func (c *processCollector) processCollect(ch chan<- Metric) {
c.errorCollectFn(ch)
}

// describe returns all descriptions of the collector for wasip1.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) describe(ch chan<- *Desc) {
c.errorDescribeFn(ch)
}
21 changes: 15 additions & 6 deletions prometheus/process_collector_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,10 @@ func getProcessHandleCount(handle windows.Handle) (uint32, error) {
}

func (c *processCollector) processCollect(ch chan<- Metric) {
h, err := windows.GetCurrentProcess()
if err != nil {
c.reportError(ch, nil, err)
return
}
h := windows.CurrentProcess()

var startTime, exitTime, kernelTime, userTime windows.Filetime
err = windows.GetProcessTimes(h, &startTime, &exitTime, &kernelTime, &userTime)
err := windows.GetProcessTimes(h, &startTime, &exitTime, &kernelTime, &userTime)
if err != nil {
c.reportError(ch, nil, err)
return
Expand All @@ -111,6 +107,19 @@ func (c *processCollector) processCollect(ch chan<- Metric) {
ch <- MustNewConstMetric(c.maxFDs, GaugeValue, float64(16*1024*1024)) // Windows has a hard-coded max limit, not per-process.
}

// describe returns all descriptions of the collector for windows.
// Ensure that this list of descriptors is kept in sync with the metrics collected
// in the processCollect method. Any changes to the metrics in processCollect
// (such as adding or removing metrics) should be reflected in this list of descriptors.
func (c *processCollector) describe(ch chan<- *Desc) {
ch <- c.cpuTotal
ch <- c.openFDs
ch <- c.maxFDs
ch <- c.vsize
ch <- c.rss
ch <- c.startTime
}

func fileTimeToSeconds(ft windows.Filetime) float64 {
return float64(uint64(ft.HighDateTime)<<32+uint64(ft.LowDateTime)) / 1e7
}
49 changes: 49 additions & 0 deletions prometheus/process_collector_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,52 @@ func TestWindowsProcessCollector(t *testing.T) {
}
}
}

func TestWindowsDescribeAndCollectAlignment(t *testing.T) {
collector := &processCollector{
pidFn: getPIDFn(),
cpuTotal: NewDesc("cpu_total", "Total CPU usage", nil, nil),
openFDs: NewDesc("open_fds", "Number of open file descriptors", nil, nil),
maxFDs: NewDesc("max_fds", "Maximum file descriptors", nil, nil),
vsize: NewDesc("vsize", "Virtual memory size", nil, nil),
maxVsize: NewDesc("max_vsize", "Maximum virtual memory size", nil, nil),
rss: NewDesc("rss", "Resident Set Size", nil, nil),
startTime: NewDesc("start_time", "Process start time", nil, nil),
inBytes: NewDesc("in_bytes", "Input bytes", nil, nil),
outBytes: NewDesc("out_bytes", "Output bytes", nil, nil),
}

// Collect and get descriptors
descCh := make(chan *Desc, 15)
collector.describe(descCh)
close(descCh)

definedDescs := make(map[string]bool)
for desc := range descCh {
definedDescs[desc.String()] = true
}

// Collect and get metrics
metricsCh := make(chan Metric, 15)
collector.processCollect(metricsCh)
close(metricsCh)

collectedMetrics := make(map[string]bool)
for metric := range metricsCh {
collectedMetrics[metric.Desc().String()] = true
}

// Verify that all described metrics are collected
for desc := range definedDescs {
if !collectedMetrics[desc] {
t.Errorf("Metric %s described but not collected", desc)
}
}

// Verify that no extra metrics are collected
for desc := range collectedMetrics {
if !definedDescs[desc] {
t.Errorf("Metric %s collected but not described", desc)
}
}
}