Skip to content

Commit

Permalink
fix: resolve enabling NodeMonitoring breaks target status
Browse files Browse the repository at this point in the history
After NodeMonitoring was added, when it is configured, target status stops working completely. Long story short, the problem is early exit on an unknown scrape pool format.

This change includes some refactoring originally part of the ProbeMonitoring PR (#766) but also adds in the possibility of supporting a NodeMonitoring target status if we choose to add it in the future, by at the very least detecting and validating the scrape pool format.

Changes include:

- Ensures we still patch everything on error (with new unit test case).
- Show all errors, not just the last one we found.
- Ensures we parse NodeMonitoring scrape pool formats (with new unit test case).
- Validate expected errors in the test cases.

Thanks @bwplotka for finding this issue!
  • Loading branch information
TheSpiritXIII committed Jan 30, 2024
1 parent e547f5b commit a2df2c4
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 79 deletions.
130 changes: 116 additions & 14 deletions pkg/operator/endpoint_status_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package operator

import (
"errors"
"fmt"
"sort"
"strconv"
"strings"
Expand All @@ -32,7 +32,7 @@ const (

func buildEndpointStatuses(targets []*prometheusv1.TargetsResult) (map[string][]monitoringv1.ScrapeEndpointStatus, error) {
endpointBuilder := &scrapeEndpointBuilder{
mapByJobByEndpoint: make(map[string]map[string]*scrapeEndpointStatusBuilder),
mapByKeyByEndpoint: make(map[string]map[string]*scrapeEndpointStatusBuilder),
total: 0,
failed: 0,
time: metav1.Now(),
Expand All @@ -48,7 +48,7 @@ func buildEndpointStatuses(targets []*prometheusv1.TargetsResult) (map[string][]
}

type scrapeEndpointBuilder struct {
mapByJobByEndpoint map[string]map[string]*scrapeEndpointStatusBuilder
mapByKeyByEndpoint map[string]map[string]*scrapeEndpointStatusBuilder
total uint32
failed uint32
time metav1.Time
Expand All @@ -68,24 +68,126 @@ func (b *scrapeEndpointBuilder) add(target *prometheusv1.TargetsResult) error {
return nil
}

func setNamespacedObjectByScrapeJobKey(o monitoringv1.PodMonitoringCRD, split []string, full string) (monitoringv1.PodMonitoringCRD, error) {
if len(split) != 3 {
return nil, fmt.Errorf("invalid %s scrape key format %q", split[0], full)
}

o.SetNamespace(split[1])
o.SetName(split[2])
return o, nil
}

func setClusterScopedObjectByScrapeJobKey(o monitoringv1.PodMonitoringCRD, split []string, full string) (monitoringv1.PodMonitoringCRD, error) {
if len(split) != 2 {
return nil, fmt.Errorf("invalid %s scrape key format %q", split[0], full)
}

o.SetName(split[1])
return o, nil
}

// getObjectByScrapeJobKey converts the key to a CRD. See monitoringv1.PodMonitoringCRD.GetKey().
func getObjectByScrapeJobKey(key string) (monitoringv1.PodMonitoringCRD, error) {
split := strings.Split(key, "/")
// Generally:
// - "kind" for scrape pools without a respective CRD.
// - "kind/name" for cluster-scoped resources.
// - "kind/namespace/name" for namespaced resources.
switch split[0] {
case "kubelet":
if len(split) != 1 {
return nil, fmt.Errorf("invalid kubelet scrape key format %q", key)
}
return nil, nil
case "PodMonitoring":
return setNamespacedObjectByScrapeJobKey(&monitoringv1.PodMonitoring{}, split, key)
case "ClusterPodMonitoring":
return setClusterScopedObjectByScrapeJobKey(&monitoringv1.ClusterPodMonitoring{}, split, key)
case "NodeMonitoring":
if _, err := setClusterScopedObjectByScrapeJobKey(&monitoringv1.ClusterPodMonitoring{}, split, key); err != nil {
return nil, err
}
return nil, nil
default:
return nil, fmt.Errorf("unknown scrape kind %q", split[0])
}
}

// scrapePool is the parsed Prometheus scrape pool, which is assigned to the job name from our
// configurations. For example, for PodMonitoring this is `PodMonitoring/namespace/name/port`. The
// key is what identifies the resource (`PodMonitoring/namespace/name`) and the group indicates a
// small subset of that resource (`port`).
type scrapePool struct {
key string
group string
}

func getNamespacedScrapePool(full string, split []string) scrapePool {
// Same as: len(strings.Join(split, "/")) for "kind/namespace/name"
index := len(split[0]) + 1 + len(split[1]) + 1 + len(split[2])
return scrapePool{
key: full[:index],
group: full[index:],
}
}

func getClusterScopedScrapePool(full string, split []string) scrapePool {
// Same as: len(strings.Join(split, "/")) for "kind/namespace"
index := len(split[0]) + 1 + len(split[1])
return scrapePool{
key: full[:index],
group: full[index:],
}
}

func parseScrapePool(pool string) (scrapePool, error) {
split := strings.Split(pool, "/")
switch split[0] {
case "kubelet":
if len(split) != 2 {
return scrapePool{}, fmt.Errorf("invalid kubelet scrape pool format %q", pool)
}
return scrapePool{
key: split[0],
group: split[1],
}, nil
case "PodMonitoring":
if len(split) != 4 {
return scrapePool{}, fmt.Errorf("invalid PodMonitoring scrape pool format %q", pool)
}
return getNamespacedScrapePool(pool, split), nil
case "ClusterPodMonitoring":
if len(split) != 3 {
return scrapePool{}, fmt.Errorf("invalid ClusterPodMonitoring scrape pool format %q", pool)
}
return getClusterScopedScrapePool(pool, split), nil
case "NodeMonitoring":
if len(split) != 3 {
return scrapePool{}, fmt.Errorf("invalid NodeMonitoring scrape pool format %q", pool)
}
return getClusterScopedScrapePool(pool, split), nil
default:
return scrapePool{}, fmt.Errorf("unknown scrape kind %q", split[0])
}
}

func (b *scrapeEndpointBuilder) addActiveTarget(activeTarget prometheusv1.ActiveTarget, time metav1.Time) error {
portIndex := strings.LastIndex(activeTarget.ScrapePool, "/")
if portIndex == -1 {
return errors.New("Malformed scrape pool: " + activeTarget.ScrapePool)
scrapePool, err := parseScrapePool(activeTarget.ScrapePool)
if err != nil {
return err
}
job := activeTarget.ScrapePool[:portIndex]
endpoint := activeTarget.ScrapePool[portIndex+1:]
mapByEndpoint, ok := b.mapByJobByEndpoint[job]
mapByEndpoint, ok := b.mapByKeyByEndpoint[scrapePool.key]
if !ok {
tmp := make(map[string]*scrapeEndpointStatusBuilder)
mapByEndpoint = tmp
b.mapByJobByEndpoint[job] = mapByEndpoint
b.mapByKeyByEndpoint[scrapePool.key] = mapByEndpoint
}

statusBuilder, exists := mapByEndpoint[endpoint]
statusBuilder, exists := mapByEndpoint[scrapePool.group]
if !exists {
statusBuilder = newScrapeEndpointStatusBuilder(&activeTarget, time)
mapByEndpoint[endpoint] = statusBuilder
mapByEndpoint[scrapePool.group] = statusBuilder
}
statusBuilder.addSampleTarget(&activeTarget)
return nil
Expand All @@ -96,7 +198,7 @@ func (b *scrapeEndpointBuilder) build() map[string][]monitoringv1.ScrapeEndpoint
collectorsFraction := strconv.FormatFloat(fraction, 'f', -1, 64)
resultMap := make(map[string][]monitoringv1.ScrapeEndpointStatus)

for job, endpointMap := range b.mapByJobByEndpoint {
for key, endpointMap := range b.mapByKeyByEndpoint {
endpointStatuses := make([]monitoringv1.ScrapeEndpointStatus, 0)
for _, statusBuilder := range endpointMap {
endpointStatus := statusBuilder.build()
Expand All @@ -110,7 +212,7 @@ func (b *scrapeEndpointBuilder) build() map[string][]monitoringv1.ScrapeEndpoint
rhsName := endpointStatuses[j].Name
return lhsName < rhsName
})
resultMap[job] = endpointStatuses
resultMap[key] = endpointStatuses
}
return resultMap
}
Expand Down
69 changes: 10 additions & 59 deletions pkg/operator/target_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -289,54 +288,6 @@ func fetchTargets(ctx context.Context, logger logr.Logger, opts Options, httpCli
return results, nil
}

func buildPodMonitoringFromJob(job []string) (*monitoringv1.PodMonitoring, error) {
if len(job) != 3 {
return nil, errors.New("invalid job type")
}
kind := job[0]
if kind != "PodMonitoring" {
return nil, errors.New("invalid object kind")
}
pm := &monitoringv1.PodMonitoring{
ObjectMeta: metav1.ObjectMeta{
Name: job[2],
Namespace: job[1],
},
Spec: monitoringv1.PodMonitoringSpec{},
Status: monitoringv1.PodMonitoringStatus{},
}
return pm, nil
}

func buildClusterPodMonitoringFromJob(job []string) (*monitoringv1.ClusterPodMonitoring, error) {
if len(job) != 2 {
return nil, errors.New("invalid job type")
}
kind := job[0]
if kind != "ClusterPodMonitoring" {
return nil, errors.New("invalid object kind")
}
pm := &monitoringv1.ClusterPodMonitoring{
ObjectMeta: metav1.ObjectMeta{
Name: job[1],
},
Spec: monitoringv1.ClusterPodMonitoringSpec{},
Status: monitoringv1.PodMonitoringStatus{},
}
return pm, nil
}

func buildPodMonitoring(job string) (monitoringv1.PodMonitoringCRD, error) {
split := strings.Split(job, "/")
if pm, err := buildPodMonitoringFromJob(split); err == nil {
return pm, nil
}
if pm, err := buildClusterPodMonitoringFromJob(split); err == nil {
return pm, nil
}
return nil, fmt.Errorf("unable to parse job: %s", job)
}

func patchPodMonitoringStatus(ctx context.Context, kubeClient client.Client, object client.Object, status *monitoringv1.PodMonitoringStatus) error {
patchStatus := map[string]interface{}{
"endpointStatuses": status.EndpointStatuses,
Expand All @@ -362,29 +313,29 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie
return err
}

var patchErr error
var errs []error
for job, endpointStatuses := range endpointMap {
// Kubelet scraping is configured through hard-coding and not through
// a PodMonitoring. As there's no status to update, we skip.
if strings.HasPrefix(job, "kubelet") {
pm, err := getObjectByScrapeJobKey(job)
if err != nil {
errs = append(errs, fmt.Errorf("building target: %s: %w", job, err))
continue
}
pm, err := buildPodMonitoring(job)
if err != nil {
return fmt.Errorf("building podmonitoring: %s: %w", job, err)
if pm == nil {
// Skip hard-coded jobs which we do not patch.
continue
}
pm.GetPodMonitoringStatus().EndpointStatuses = endpointStatuses

if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
// Save and log any error encountered while patching the status.
// We don't want to prematurely return if the error was transient
// as we should continue patching all statuses before exiting.
patchErr = err
logger.Error(err, "patching podmonitoring status", "job", job)
errs = append(errs, err)
logger.Error(err, "patching status", "job", job, "gvk", pm.GetObjectKind().GroupVersionKind())
}
}

return patchErr
return errors.Join(errs...)
}

func getPrometheusPods(ctx context.Context, kubeClient client.Client, opts Options, selector labels.Selector) ([]*corev1.Pod, error) {
Expand Down
Loading

0 comments on commit a2df2c4

Please sign in to comment.