From a688dced9b140276e41a44de1513b2aea271a266 Mon Sep 17 00:00:00 2001 From: Daniel Hrabovcak Date: Fri, 26 Jan 2024 14:56:31 -0500 Subject: [PATCH] fix: enabling NodeMonitoring breaks target status --- pkg/operator/endpoint_status_builder.go | 136 +++++++++++++++++++++--- pkg/operator/target_status.go | 69 ++---------- pkg/operator/target_status_test.go | 134 +++++++++++++++++++++-- 3 files changed, 260 insertions(+), 79 deletions(-) diff --git a/pkg/operator/endpoint_status_builder.go b/pkg/operator/endpoint_status_builder.go index 428d40a1a6..42696062a7 100644 --- a/pkg/operator/endpoint_status_builder.go +++ b/pkg/operator/endpoint_status_builder.go @@ -15,7 +15,7 @@ package operator import ( - "errors" + "fmt" "sort" "strconv" "strings" @@ -32,7 +32,7 @@ const ( func buildEndpointStatuses(targets []*prometheusv1.TargetsResult) (map[string][]monitoringv1.ScrapeEndpointStatus, error) { endpointBuilder := &scrapeEndpointBuilder{ - mapByJobByEndpoint: make(map[string]map[string]*scrapeEndpointStatusBuilder), + mapByKeyByEndpoint: make(map[string]map[string]*scrapeEndpointStatusBuilder), total: 0, failed: 0, time: metav1.Now(), @@ -48,7 +48,7 @@ func buildEndpointStatuses(targets []*prometheusv1.TargetsResult) (map[string][] } type scrapeEndpointBuilder struct { - mapByJobByEndpoint map[string]map[string]*scrapeEndpointStatusBuilder + mapByKeyByEndpoint map[string]map[string]*scrapeEndpointStatusBuilder total uint32 failed uint32 time metav1.Time @@ -68,24 +68,132 @@ func (b *scrapeEndpointBuilder) add(target *prometheusv1.TargetsResult) error { return nil } +func setNamespacedObjectByScrapeJobKey(o monitoringv1.PodMonitoringCRD, split []string, full string) (monitoringv1.PodMonitoringCRD, error) { + if len(split) != 3 { + return nil, fmt.Errorf("invalid %s scrape key format %q", split[0], full) + } + + o.SetNamespace(split[1]) + o.SetName(split[2]) + return o, nil +} + +func setClusterScopedObjectByScrapeJobKey(o monitoringv1.PodMonitoringCRD, split []string, full string) (monitoringv1.PodMonitoringCRD, error) { + if len(split) != 2 { + return nil, fmt.Errorf("invalid %s scrape key format %q", split[0], full) + } + + o.SetName(split[1]) + return o, nil +} + +// getObjectByScrapeJobKey converts the key to a CRD. See monitoringv1.PodMonitoringCRD.GetKey(). +func getObjectByScrapeJobKey(key string) (monitoringv1.PodMonitoringCRD, error) { + split := strings.Split(key, "/") + // Generally: + // - "kind" for scrape pools without a respective CRD. + // - "kind/name" for cluster-scoped resources. + // - "kind/namespace/name" for namespaced resources. + switch split[0] { + case "kubelet": + if len(split) != 1 { + return nil, fmt.Errorf("invalid kubelet scrape key format %q", key) + } + return nil, nil + case "PodMonitoring": + return setNamespacedObjectByScrapeJobKey(&monitoringv1.PodMonitoring{}, split, key) + case "ClusterPodMonitoring": + return setClusterScopedObjectByScrapeJobKey(&monitoringv1.ClusterPodMonitoring{}, split, key) + case "NodeMonitoring": + if _, err := setClusterScopedObjectByScrapeJobKey(&monitoringv1.ClusterPodMonitoring{}, split, key); err != nil { + return nil, err + } + return nil, nil + default: + return nil, fmt.Errorf("unknown scrape kind %q", split[0]) + } +} + +// scrapePool is the parsed Prometheus scrape pool, which is assigned to the job name from our +// configurations. For example, for PodMonitoring this is `PodMonitoring/namespace/name/port`. The +// key is what identifies the resource (`PodMonitoring/namespace/name`) and the group indicates a +// small subset of that resource (`port`). +type scrapePool struct { + key string + group string +} + +// slicesJoinLength is strings.Join but only returns the length of the resulting string. +func stringsJoinLength(elems []string, sep string) int { + if len(elems) == 0 { + return 0 + } + j := len(elems) - 1 + for _, elem := range elems { + j += len(elem) + } + return j +} + +func parseScrapePool(pool string) (scrapePool, error) { + split := strings.Split(pool, "/") + switch split[0] { + case "kubelet": + if len(split) != 2 { + return scrapePool{}, fmt.Errorf("invalid kubelet scrape pool format %q", pool) + } + return scrapePool{ + key: split[0], + group: split[1], + }, nil + case "PodMonitoring": + if len(split) != 4 { + return scrapePool{}, fmt.Errorf("invalid PodMonitoring scrape pool format %q", pool) + } + index := stringsJoinLength(split[:3], "/") + return scrapePool{ + key: pool[:index], + group: pool[index+1:], + }, nil + case "ClusterPodMonitoring": + if len(split) != 3 { + return scrapePool{}, fmt.Errorf("invalid ClusterPodMonitoring scrape pool format %q", pool) + } + index := stringsJoinLength(split[:2], "/") + return scrapePool{ + key: pool[:index], + group: pool[index+1:], + }, nil + case "NodeMonitoring": + if len(split) != 3 { + return scrapePool{}, fmt.Errorf("invalid NodeMonitoring scrape pool format %q", pool) + } + index := stringsJoinLength(split[:2], "/") + return scrapePool{ + key: pool[:index], + group: pool[index+1:], + }, nil + default: + return scrapePool{}, fmt.Errorf("unknown scrape kind %q", split[0]) + } +} + func (b *scrapeEndpointBuilder) addActiveTarget(activeTarget prometheusv1.ActiveTarget, time metav1.Time) error { - portIndex := strings.LastIndex(activeTarget.ScrapePool, "/") - if portIndex == -1 { - return errors.New("Malformed scrape pool: " + activeTarget.ScrapePool) + scrapePool, err := parseScrapePool(activeTarget.ScrapePool) + if err != nil { + return err } - job := activeTarget.ScrapePool[:portIndex] - endpoint := activeTarget.ScrapePool[portIndex+1:] - mapByEndpoint, ok := b.mapByJobByEndpoint[job] + mapByEndpoint, ok := b.mapByKeyByEndpoint[scrapePool.key] if !ok { tmp := make(map[string]*scrapeEndpointStatusBuilder) mapByEndpoint = tmp - b.mapByJobByEndpoint[job] = mapByEndpoint + b.mapByKeyByEndpoint[scrapePool.key] = mapByEndpoint } - statusBuilder, exists := mapByEndpoint[endpoint] + statusBuilder, exists := mapByEndpoint[scrapePool.group] if !exists { statusBuilder = newScrapeEndpointStatusBuilder(&activeTarget, time) - mapByEndpoint[endpoint] = statusBuilder + mapByEndpoint[scrapePool.group] = statusBuilder } statusBuilder.addSampleTarget(&activeTarget) return nil @@ -96,7 +204,7 @@ func (b *scrapeEndpointBuilder) build() map[string][]monitoringv1.ScrapeEndpoint collectorsFraction := strconv.FormatFloat(fraction, 'f', -1, 64) resultMap := make(map[string][]monitoringv1.ScrapeEndpointStatus) - for job, endpointMap := range b.mapByJobByEndpoint { + for key, endpointMap := range b.mapByKeyByEndpoint { endpointStatuses := make([]monitoringv1.ScrapeEndpointStatus, 0) for _, statusBuilder := range endpointMap { endpointStatus := statusBuilder.build() @@ -110,7 +218,7 @@ func (b *scrapeEndpointBuilder) build() map[string][]monitoringv1.ScrapeEndpoint rhsName := endpointStatuses[j].Name return lhsName < rhsName }) - resultMap[job] = endpointStatuses + resultMap[key] = endpointStatuses } return resultMap } diff --git a/pkg/operator/target_status.go b/pkg/operator/target_status.go index 6445f5a645..b75121da67 100644 --- a/pkg/operator/target_status.go +++ b/pkg/operator/target_status.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "net/http" - "strings" "sync" "time" @@ -289,54 +288,6 @@ func fetchTargets(ctx context.Context, logger logr.Logger, opts Options, httpCli return results, nil } -func buildPodMonitoringFromJob(job []string) (*monitoringv1.PodMonitoring, error) { - if len(job) != 3 { - return nil, errors.New("invalid job type") - } - kind := job[0] - if kind != "PodMonitoring" { - return nil, errors.New("invalid object kind") - } - pm := &monitoringv1.PodMonitoring{ - ObjectMeta: metav1.ObjectMeta{ - Name: job[2], - Namespace: job[1], - }, - Spec: monitoringv1.PodMonitoringSpec{}, - Status: monitoringv1.PodMonitoringStatus{}, - } - return pm, nil -} - -func buildClusterPodMonitoringFromJob(job []string) (*monitoringv1.ClusterPodMonitoring, error) { - if len(job) != 2 { - return nil, errors.New("invalid job type") - } - kind := job[0] - if kind != "ClusterPodMonitoring" { - return nil, errors.New("invalid object kind") - } - pm := &monitoringv1.ClusterPodMonitoring{ - ObjectMeta: metav1.ObjectMeta{ - Name: job[1], - }, - Spec: monitoringv1.ClusterPodMonitoringSpec{}, - Status: monitoringv1.PodMonitoringStatus{}, - } - return pm, nil -} - -func buildPodMonitoring(job string) (monitoringv1.PodMonitoringCRD, error) { - split := strings.Split(job, "/") - if pm, err := buildPodMonitoringFromJob(split); err == nil { - return pm, nil - } - if pm, err := buildClusterPodMonitoringFromJob(split); err == nil { - return pm, nil - } - return nil, fmt.Errorf("unable to parse job: %s", job) -} - func patchPodMonitoringStatus(ctx context.Context, kubeClient client.Client, object client.Object, status *monitoringv1.PodMonitoringStatus) error { patchStatus := map[string]interface{}{ "endpointStatuses": status.EndpointStatuses, @@ -362,16 +313,16 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie return err } - var patchErr error + var errs []error for job, endpointStatuses := range endpointMap { - // Kubelet scraping is configured through hard-coding and not through - // a PodMonitoring. As there's no status to update, we skip. - if strings.HasPrefix(job, "kubelet") { + pm, err := getObjectByScrapeJobKey(job) + if err != nil { + errs = append(errs, fmt.Errorf("building target: %s: %w", job, err)) continue } - pm, err := buildPodMonitoring(job) - if err != nil { - return fmt.Errorf("building podmonitoring: %s: %w", job, err) + if pm == nil { + // Skip hard-coded jobs which we do not patch. + continue } pm.GetPodMonitoringStatus().EndpointStatuses = endpointStatuses @@ -379,12 +330,12 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie // Save and log any error encountered while patching the status. // We don't want to prematurely return if the error was transient // as we should continue patching all statuses before exiting. - patchErr = err - logger.Error(err, "patching podmonitoring status", "job", job) + errs = append(errs, err) + logger.Error(err, "patching status", "job", job, "gvk", pm.GetObjectKind().GroupVersionKind()) } } - return patchErr + return errors.Join(errs...) } func getPrometheusPods(ctx context.Context, kubeClient client.Client, opts Options, selector labels.Selector) ([]*corev1.Pod, error) { diff --git a/pkg/operator/target_status_test.go b/pkg/operator/target_status_test.go index b9bab83902..1903b1bfe9 100644 --- a/pkg/operator/target_status_test.go +++ b/pkg/operator/target_status_test.go @@ -49,7 +49,7 @@ type updateTargetStatusTestCase struct { targets []*prometheusv1.TargetsResult podMonitorings []monitoringv1.PodMonitoring clusterPodMonitorings []monitoringv1.ClusterPodMonitoring - expErr bool + expErr func(err error) bool } // Given a list of test cases on PodMonitoring, creates a new list containing @@ -171,7 +171,10 @@ func TestUpdateTargetStatus(t *testing.T) { }}, }, }, - expErr: true, + expErr: func(err error) bool { + msg := err.Error() + return strings.HasPrefix(msg, "unable to patch status:") && strings.HasSuffix(msg, "\"prom-example-1\" not found") + }, }, // Single healthy target with no error, with matching PodMonitoring. { @@ -233,7 +236,8 @@ func TestUpdateTargetStatus(t *testing.T) { }, }, }, - }}, + }, + }, }, // Collectors target fetch failure. { @@ -357,8 +361,12 @@ func TestUpdateTargetStatus(t *testing.T) { Port: intstr.FromString("metrics"), }}, }, - }}, - expErr: true, + }, + }, + expErr: func(err error) bool { + msg := err.Error() + return strings.HasPrefix(msg, "unable to patch status:") && strings.HasSuffix(msg, "\"prom-example-2\" not found") + }, }, // Single healthy target with no error, with single matching PodMonitoring. { @@ -1055,6 +1063,84 @@ func TestUpdateTargetStatus(t *testing.T) { }, }}, }, + // Multiple healthy targets with one non-matching PodMonitoring. + { + desc: "multiple-healthy-target-one-non-match", + targets: []*prometheusv1.TargetsResult{ + { + Active: []prometheusv1.ActiveTarget{{ + Health: "up", + LastError: "err x", + ScrapePool: "PodMonitoring/gmp-test/prom-example-1/metrics", + Labels: model.LabelSet(map[model.LabelName]model.LabelValue{ + "instance": "a", + }), + LastScrapeDuration: 1.2, + }}, + }, + { + Active: []prometheusv1.ActiveTarget{{ + Health: "up", + ScrapePool: "PodMonitoring/gmp-test/prom-example-2/metrics", + Labels: model.LabelSet(map[model.LabelName]model.LabelValue{ + "instance": "b", + }), + LastScrapeDuration: 4.3, + }}, + }, + }, + podMonitorings: []monitoringv1.PodMonitoring{ + { + ObjectMeta: metav1.ObjectMeta{Name: "prom-example-1", Namespace: "gmp-test"}, + Spec: monitoringv1.PodMonitoringSpec{ + Endpoints: []monitoringv1.ScrapeEndpoint{{ + Port: intstr.FromString("metrics"), + }}, + }, + Status: monitoringv1.PodMonitoringStatus{ + MonitoringStatus: monitoringv1.MonitoringStatus{ + ObservedGeneration: 2, + Conditions: []monitoringv1.MonitoringCondition{{ + Type: monitoringv1.ConfigurationCreateSuccess, + Status: corev1.ConditionTrue, + LastUpdateTime: metav1.Time{}, + LastTransitionTime: metav1.Time{}, + Reason: "", + Message: "", + }}, + }, + EndpointStatuses: []monitoringv1.ScrapeEndpointStatus{ + { + Name: "PodMonitoring/gmp-test/prom-example-1/metrics", + ActiveTargets: 1, + UnhealthyTargets: 0, + LastUpdateTime: date, + SampleGroups: []monitoringv1.SampleGroup{ + { + SampleTargets: []monitoringv1.SampleTarget{ + { + Health: "up", + LastError: ptr.To("err x"), + Labels: map[model.LabelName]model.LabelValue{ + "instance": "a", + }, + LastScrapeDurationSeconds: "1.2", + }, + }, + Count: ptr.To[int32](1), + }, + }, + CollectorsFraction: "1", + }, + }, + }, + }, + }, + expErr: func(err error) bool { + msg := err.Error() + return strings.HasPrefix(msg, "unable to patch status:") && strings.HasSuffix(msg, "\"prom-example-2\" not found") + }, + }, { desc: "kubelet hardcoded scrape configs", targets: []*prometheusv1.TargetsResult{ @@ -1086,6 +1172,42 @@ func TestUpdateTargetStatus(t *testing.T) { }, }, }, + { + desc: "NodeMonitoring hardcoded scrape configs", + targets: []*prometheusv1.TargetsResult{ + { + Active: []prometheusv1.ActiveTarget{{ + Health: "up", + LastError: "", + ScrapePool: "NodeMonitoring/node-example-1/metrics", + Labels: model.LabelSet(map[model.LabelName]model.LabelValue{ + "instance": "a", + "node": "node-1-default-pool-abcd1234", + }), + LastScrapeDuration: 1.2, + }}, + }, + }, + }, + { + desc: "Unknown hardcoded scrape configs", + targets: []*prometheusv1.TargetsResult{ + { + Active: []prometheusv1.ActiveTarget{{ + Health: "up", + LastError: "", + ScrapePool: "unknown/example-1/metrics", + Labels: model.LabelSet(map[model.LabelName]model.LabelValue{ + "instance": "a", + }), + LastScrapeDuration: 1.2, + }}, + }, + }, + expErr: func(err error) bool { + return err.Error() == "unknown scrape kind \"unknown\"" + }, + }, }) for _, testCase := range testCases { @@ -1105,7 +1227,7 @@ func TestUpdateTargetStatus(t *testing.T) { kubeClient := clientBuilder.Build() err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets) - if err != nil && !testCase.expErr { + if err != nil && (testCase.expErr == nil || !testCase.expErr(err)) { t.Fatalf("unexpected error updating target status: %s", err) }