Skip to content

Commit

Permalink
Add Prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
DrJosh9000 committed Nov 27, 2024
1 parent a54f600 commit 0e7b9d5
Show file tree
Hide file tree
Showing 17 changed files with 643 additions and 54 deletions.
4 changes: 4 additions & 0 deletions charts/agent-stack-k8s/templates/deployment.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ spec:
subPath: config.yaml
resources:
{{- toYaml .Values.resources | nindent 10 }}
{{ with index .Values.config "prometheus-port" -}}
ports:
- containerPort: {{.}}
{{ end -}}
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
Expand Down
5 changes: 5 additions & 0 deletions cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func AddConfigFlags(cmd *cobra.Command) {
"",
"Bind address to expose the pprof profiler (e.g. localhost:6060)",
)
cmd.Flags().Uint16(
"prometheus-port",
0,
"Bind port to expose Prometheus /metrics; 0 disables it",
)
cmd.Flags().String("graphql-endpoint", "", "Buildkite GraphQL endpoint URL")

cmd.Flags().Duration(
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 // indirect
github.com/aws/smithy-go v1.22.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bitfield/gotestdox v0.2.2 // indirect
github.com/buildkite/go-pipeline v0.13.2 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
Expand All @@ -73,6 +74,7 @@ require (
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.7 // indirect
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lestrrat-go/blackmagic v1.0.2 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
Expand All @@ -84,6 +86,9 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/outcaste-io/ristretto v0.2.3 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
Expand Down Expand Up @@ -161,6 +166,7 @@ require (
github.com/philhofer/fwd v1.1.3-0.20240612014219-fbbf4953d986 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.20.5
github.com/puzpuzpuz/xsync/v2 v2.5.1 // indirect
github.com/qri-io/jsonpointer v0.1.1 // indirect
github.com/qri-io/jsonschema v0.2.1 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGI
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bitfield/gotestdox v0.2.2 h1:x6RcPAbBbErKLnapz1QeAlf3ospg8efBsedU93CDsnE=
github.com/bitfield/gotestdox v0.2.2/go.mod h1:D+gwtS0urjBrzguAkTM2wodsTQYFHdpx8eqRJ3N+9pY=
Expand Down Expand Up @@ -290,6 +292,8 @@ github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6 h1:IsMZxCuZqKu
github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6/go.mod h1:3VeWNIJaW+O5xpRQbPp0Ybqu1vJd/pm7s2F473HRrkw=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
Expand Down Expand Up @@ -361,7 +365,15 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/puzpuzpuz/xsync/v2 v2.5.1 h1:mVGYAvzDSu52+zaGyNjC+24Xw2bQi3kTr4QJ6N9pIIU=
github.com/puzpuzpuz/xsync/v2 v2.5.1/go.mod h1:gD2H2krq/w52MfPLE+Uy64TzJDVY7lP2znR9qmR35kU=
github.com/qri-io/jsonpointer v0.1.1 h1:prVZBZLL6TW5vsSB9fFHFAMBLI4b0ri5vribQlTJiBA=
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
Namespace string `json:"namespace" validate:"required"`
Org string `json:"org" validate:"required"`
Tags stringSlice `json:"tags" validate:"min=1"`
PrometheusPort uint16 `json:"prometheus-port" validate:"omitempty"`
ProfilerAddress string `json:"profiler-address" validate:"omitempty,hostname_port"`
GraphQLEndpoint string `json:"graphql-endpoint" validate:"omitempty"`
// Agent endpoint is set in agent-config.
Expand Down Expand Up @@ -89,6 +90,7 @@ func (c Config) MarshalLogObject(enc zapcore.ObjectEncoder) error {
return err
}
enc.AddString("profiler-address", c.ProfilerAddress)
enc.AddUint16("prometheus-port", c.PrometheusPort)
enc.AddString("cluster-uuid", c.ClusterUUID)
enc.AddBool("prohibit-kubernetes-plugin", c.ProhibitKubernetesPlugin)
if err := enc.AddArray("additional-redacted-vars", c.AdditionalRedactedVars); err != nil {
Expand Down
32 changes: 29 additions & 3 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
_ "net/http/pprof"
"strconv"
"time"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/monitor"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/scheduler"

"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -30,13 +32,37 @@ func Run(
k8sClient kubernetes.Interface,
cfg *config.Config,
) {
httpMuxes := make(map[string]*http.ServeMux)

if cfg.ProfilerAddress != "" {
logger.Info("profiler listening for requests")
// Specifically set the mux to DefaultServeMux, because the pprof
// handlers are registered there.
httpMuxes[cfg.ProfilerAddress] = http.DefaultServeMux
}
if cfg.PrometheusPort > 0 {
logger.Info("prometheus metrics handler listening for requests")
addr := ":" + strconv.Itoa(int(cfg.PrometheusPort))
// If PrometheusAddress == ProfilerAddress, the mux will already be set
// to DefaultServeMux.
// If PrometheusAddress != ProfilerAddress, we don't want to
// expose pprof handlers on this mux, so make a new mux.
mux := httpMuxes[addr]
if mux == nil {
mux = http.NewServeMux()
}
mux.Handle("GET /metrics", promhttp.Handler())
httpMuxes[addr] = mux
}

for addr, mux := range httpMuxes {
go func() {
srv := http.Server{Addr: cfg.ProfilerAddress, ReadHeaderTimeout: 2 * time.Second}
if err := srv.ListenAndServe(); err != nil {
logger.Error("problem running profiler server", zap.Error(err))
svr := &http.Server{
Addr: addr,
ReadHeaderTimeout: 2 * time.Second,
Handler: mux,
}
logger.Error("http server exited", zap.Error(svr.ListenAndServe()))
}()
}

Expand Down
29 changes: 28 additions & 1 deletion internal/controller/deduper/deduper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func New(logger *zap.Logger, handler model.JobHandler) *Deduper {
logger: logger,
inFlight: make(map[uuid.UUID]bool),
}
// Provide the callback for numInFlightGauge.
jobsRunningGaugeFunc = func() int {
d.inFlightMu.Lock()
defer d.inFlightMu.Unlock()
return len(d.inFlight)
}
return d
}

Expand Down Expand Up @@ -67,22 +73,33 @@ func (d *Deduper) Handle(ctx context.Context, job model.Job) error {
return err
}
if numInFlight, ok := d.casa(uuid, true); !ok {
jobsAlreadyRunningCounter.WithLabelValues("handle").Inc()
d.logger.Debug("job is already in-flight",
zap.String("uuid", job.Uuid),
zap.Int("num-in-flight", numInFlight),
)
return model.ErrDuplicateJob
}
jobsMarkedRunningCounter.WithLabelValues("Handle").Inc()

// Not a duplicate: pass to the next handler, which could be either the
// limiter or the scheudler.
d.logger.Debug("passing job to next handler",
zap.Stringer("handler", reflect.TypeOf(d.handler)),
zap.String("uuid", job.Uuid),
)
jobHandlerCallsCounter.Inc()

if err := d.handler.Handle(ctx, job); err != nil {
jobHandlerErrorCounter.Inc()

// Couldn't schedule the job. Oh well. Record as not-in-flight.
numInFlight, _ := d.casa(uuid, false)
numInFlight, ok := d.casa(uuid, false)
if ok {
jobsUnmarkedRunningCounter.WithLabelValues("Handle").Inc()
} else {
jobsAlreadyNotRunningCounter.WithLabelValues("Handle").Inc()
}

d.logger.Debug("next handler failed",
zap.String("uuid", job.Uuid),
Expand All @@ -96,6 +113,8 @@ func (d *Deduper) Handle(ctx context.Context, job model.Job) error {

// OnAdd is called by k8s to inform us a resource is added.
func (d *Deduper) OnAdd(obj any, inInitialList bool) {
onAddEventCounter.Inc()

job, _ := obj.(*batchv1.Job)
if job == nil {
return
Expand All @@ -115,6 +134,8 @@ func (d *Deduper) OnAdd(obj any, inInitialList bool) {

// OnUpdate is called by k8s to inform us a resource is updated.
func (d *Deduper) OnUpdate(prev, curr any) {
onUpdateEventCounter.Inc()

prevState, _ := prev.(*batchv1.Job)
currState, _ := curr.(*batchv1.Job)
if prevState == nil || currState == nil {
Expand All @@ -137,6 +158,8 @@ func (d *Deduper) OnUpdate(prev, curr any) {

// OnDelete is called by k8s to inform us a resource is deleted.
func (d *Deduper) OnDelete(prev any) {
onDeleteEventCounter.Inc()

prevState, _ := prev.(*batchv1.Job)
if prevState == nil {
return
Expand All @@ -159,6 +182,7 @@ func (d *Deduper) markRunning(id uuid.UUID, source string) {
// Change state from not in-flight to in-flight.
numInFlight, ok := d.casa(id, true)
if !ok {
jobsAlreadyRunningCounter.WithLabelValues(source).Inc()
d.logger.Debug("job was already in inFlight!",
zap.String("uuid", id.String()),
zap.String("source", source),
Expand All @@ -167,6 +191,7 @@ func (d *Deduper) markRunning(id uuid.UUID, source string) {
return
}

jobsMarkedRunningCounter.WithLabelValues(source).Inc()
d.logger.Debug("added previous job to inFlight",
zap.String("uuid", id.String()),
zap.String("source", source),
Expand All @@ -184,6 +209,7 @@ func (d *Deduper) unmarkRunning(id uuid.UUID, source string) {
zap.String("source", source),
zap.Int("num-in-flight", numInFlight),
)
jobsAlreadyNotRunningCounter.WithLabelValues(source).Inc()
return
}

Expand All @@ -192,6 +218,7 @@ func (d *Deduper) unmarkRunning(id uuid.UUID, source string) {
zap.String("source", source),
zap.Int("num-in-flight", numInFlight),
)
jobsUnmarkedRunningCounter.WithLabelValues(source).Inc()
}

// casa is an atomic compare-and-swap-like primitive.
Expand Down
80 changes: 80 additions & 0 deletions internal/controller/deduper/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package deduper

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
promNamespace = "buildkite"
promSubsystem = "deduper"
)

// Overridden by New to return len(inFlight).
var jobsRunningGaugeFunc = func() int { return 0 }

var (
_ = promauto.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "jobs_running",
Help: "Current number of running jobs according to deduper",
}, func() float64 { return float64(jobsRunningGaugeFunc()) })

jobHandlerCallsCounter = promauto.NewCounter(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "job_handler_calls_total",
Help: "Count of jobs that were passed to the next handler in the chain",
})
jobHandlerErrorCounter = promauto.NewCounter(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "job_handler_errors_total",
Help: "Count of jobs that weren't scheduled because the next handler in the chain returned an error",
})

onAddEventCounter = promauto.NewCounter(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "onadd_events_total",
Help: "Count of OnAdd informer events",
})
onUpdateEventCounter = promauto.NewCounter(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "onupdate_event_total",
Help: "Count of OnUpdate informer events",
})
onDeleteEventCounter = promauto.NewCounter(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "ondelete_events_total",
Help: "Count of OnDelete informer events",
})

jobsMarkedRunningCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "jobs_marked_running_total",
Help: "Count of times a job was added to inFlight",
}, []string{"source"})
jobsUnmarkedRunningCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "jobs_unmarked_running_total",
Help: "Count of times a job was removed from inFlight",
}, []string{"source"})
jobsAlreadyRunningCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "jobs_already_running_total",
Help: "Count of times a job was already present in inFlight",
}, []string{"source"})
jobsAlreadyNotRunningCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: promNamespace,
Subsystem: promSubsystem,
Name: "jobs_already_not_running_total",
Help: "Count of times a job was already missing from inFlight",
}, []string{"source"})
)
Loading

0 comments on commit 0e7b9d5

Please sign in to comment.