Skip to content

Commit

Permalink
Fix NodeMonitoring breaks target status
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSpiritXIII committed Jan 29, 2024
1 parent 99644a1 commit d3f905e
Show file tree
Hide file tree
Showing 3 changed files with 244 additions and 79 deletions.
120 changes: 106 additions & 14 deletions pkg/operator/endpoint_status_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package operator

import (
"errors"
"fmt"
"sort"
"strconv"
"strings"
Expand All @@ -32,7 +32,7 @@ const (

func buildEndpointStatuses(targets []*prometheusv1.TargetsResult) (map[string][]monitoringv1.ScrapeEndpointStatus, error) {
endpointBuilder := &scrapeEndpointBuilder{
mapByJobByEndpoint: make(map[string]map[string]*scrapeEndpointStatusBuilder),
mapByKeyByEndpoint: make(map[string]map[string]*scrapeEndpointStatusBuilder),
total: 0,
failed: 0,
time: metav1.Now(),
Expand All @@ -48,7 +48,7 @@ func buildEndpointStatuses(targets []*prometheusv1.TargetsResult) (map[string][]
}

type scrapeEndpointBuilder struct {
mapByJobByEndpoint map[string]map[string]*scrapeEndpointStatusBuilder
mapByKeyByEndpoint map[string]map[string]*scrapeEndpointStatusBuilder
total uint32
failed uint32
time metav1.Time
Expand All @@ -68,24 +68,116 @@ func (b *scrapeEndpointBuilder) add(target *prometheusv1.TargetsResult) error {
return nil
}

// convertScrapeKey converts the key to a CRD. See monitoringv1.PodMonitoringCRD.GetKey().
func convertScrapeKey(key string) (monitoringv1.PodMonitoringCRD, error) {
split := strings.Split(key, "/")
// Generally:
// - 1 for scrape pools without a respective CRDs.
// - 2 for cluster-scoped resources.
// - 3 for namespaced resources.
switch split[0] {
case "kubelet":
if len(split) != 1 {
return nil, fmt.Errorf("invalid kubelet scrape key format %q", key)
}
return nil, nil
case "PodMonitoring":
if len(split) != 3 {
return nil, fmt.Errorf("invalid PodMonitoring scrape key format %q", key)
}
return &monitoringv1.PodMonitoring{
ObjectMeta: metav1.ObjectMeta{
Namespace: split[1],
Name: split[2],
},
}, nil
case "ClusterPodMonitoring":
if len(split) != 2 {
return nil, fmt.Errorf("invalid ClusterPodMonitoring scrape key format %q", key)
}
return &monitoringv1.ClusterPodMonitoring{
ObjectMeta: metav1.ObjectMeta{
Name: split[1],
},
}, nil
case "NodeMonitoring":
if len(split) != 2 {
return nil, fmt.Errorf("invalid NodeMonitoring scrape key format %q", key)
}
return nil, nil
default:
return nil, fmt.Errorf("unknown scrape kind %q", split[0])
}
}

// scrapePool is the parsed Prometheus scrape pool, which is assigned to the job name from our
// configurations. For example, for PodMonitoring this is `PodMonitoring/namespace/name/port`. The
// key is what idenitifies the resource (`PodMonitoring/namespace/name`) and the group indiciates a
// small subset of that resource (`port`).
type scrapePool struct {
key string
group string
}

func parseScrapePool(pool string) (scrapePool, error) {
split := strings.Split(pool, "/")
switch split[0] {
case "kubelet":
if len(split) != 2 {
return scrapePool{}, fmt.Errorf("invalid kubelet scrape pool format %q", pool)
}
return scrapePool{
key: split[0],
group: split[1],
}, nil
case "PodMonitoring":
if len(split) != 4 {
return scrapePool{}, fmt.Errorf("invalid PodMonitoring scrape pool format %q", pool)
}
index := len(split[0]) + len(split[1]) + len(split[2]) + 2 // 2 delimiters.
return scrapePool{
key: pool[:index],
group: pool[index+1:],
}, nil
case "ClusterPodMonitoring":
if len(split) != 3 {
return scrapePool{}, fmt.Errorf("invalid ClusterPodMonitoring scrape pool format %q", pool)
}
index := len(split[0]) + len(split[1]) + 1 // 1 delimiter.
return scrapePool{
key: pool[:index],
group: pool[index+1:],
}, nil
case "NodeMonitoring":
if len(split) != 3 {
return scrapePool{}, fmt.Errorf("invalid NodeMonitoring scrape pool format %q", pool)
}
index := len(split[0]) + len(split[1]) + 1 // 1 delimiter.
return scrapePool{
key: pool[:index],
group: pool[index+1:],
}, nil
default:
return scrapePool{}, fmt.Errorf("unknown scrape kind %q", split[0])
}
}

func (b *scrapeEndpointBuilder) addActiveTarget(activeTarget prometheusv1.ActiveTarget, time metav1.Time) error {
portIndex := strings.LastIndex(activeTarget.ScrapePool, "/")
if portIndex == -1 {
return errors.New("Malformed scrape pool: " + activeTarget.ScrapePool)
scrapePool, err := parseScrapePool(activeTarget.ScrapePool)
if err != nil {
return err
}
job := activeTarget.ScrapePool[:portIndex]
endpoint := activeTarget.ScrapePool[portIndex+1:]
mapByEndpoint, ok := b.mapByJobByEndpoint[job]
mapByEndpoint, ok := b.mapByKeyByEndpoint[scrapePool.key]
if !ok {
tmp := make(map[string]*scrapeEndpointStatusBuilder)
mapByEndpoint = tmp
b.mapByJobByEndpoint[job] = mapByEndpoint
b.mapByKeyByEndpoint[scrapePool.key] = mapByEndpoint
}

statusBuilder, exists := mapByEndpoint[endpoint]
statusBuilder, exists := mapByEndpoint[scrapePool.group]
if !exists {
statusBuilder = newScrapeEndpointStatusBuilder(&activeTarget, time)
mapByEndpoint[endpoint] = statusBuilder
mapByEndpoint[scrapePool.group] = statusBuilder
}
statusBuilder.addSampleTarget(&activeTarget)
return nil
Expand All @@ -96,7 +188,7 @@ func (b *scrapeEndpointBuilder) build() map[string][]monitoringv1.ScrapeEndpoint
collectorsFraction := strconv.FormatFloat(fraction, 'f', -1, 64)
resultMap := make(map[string][]monitoringv1.ScrapeEndpointStatus)

for job, endpointMap := range b.mapByJobByEndpoint {
for key, endpointMap := range b.mapByKeyByEndpoint {
endpointStatuses := make([]monitoringv1.ScrapeEndpointStatus, 0)
for _, statusBuilder := range endpointMap {
endpointStatus := statusBuilder.build()
Expand All @@ -110,7 +202,7 @@ func (b *scrapeEndpointBuilder) build() map[string][]monitoringv1.ScrapeEndpoint
rhsName := endpointStatuses[j].Name
return lhsName < rhsName
})
resultMap[job] = endpointStatuses
resultMap[key] = endpointStatuses
}
return resultMap
}
Expand Down
69 changes: 10 additions & 59 deletions pkg/operator/target_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -289,54 +288,6 @@ func fetchTargets(ctx context.Context, logger logr.Logger, opts Options, httpCli
return results, nil
}

func buildPodMonitoringFromJob(job []string) (*monitoringv1.PodMonitoring, error) {
if len(job) != 3 {
return nil, errors.New("invalid job type")
}
kind := job[0]
if kind != "PodMonitoring" {
return nil, errors.New("invalid object kind")
}
pm := &monitoringv1.PodMonitoring{
ObjectMeta: metav1.ObjectMeta{
Name: job[2],
Namespace: job[1],
},
Spec: monitoringv1.PodMonitoringSpec{},
Status: monitoringv1.PodMonitoringStatus{},
}
return pm, nil
}

func buildClusterPodMonitoringFromJob(job []string) (*monitoringv1.ClusterPodMonitoring, error) {
if len(job) != 2 {
return nil, errors.New("invalid job type")
}
kind := job[0]
if kind != "ClusterPodMonitoring" {
return nil, errors.New("invalid object kind")
}
pm := &monitoringv1.ClusterPodMonitoring{
ObjectMeta: metav1.ObjectMeta{
Name: job[1],
},
Spec: monitoringv1.ClusterPodMonitoringSpec{},
Status: monitoringv1.PodMonitoringStatus{},
}
return pm, nil
}

func buildPodMonitoring(job string) (monitoringv1.PodMonitoringCRD, error) {
split := strings.Split(job, "/")
if pm, err := buildPodMonitoringFromJob(split); err == nil {
return pm, nil
}
if pm, err := buildClusterPodMonitoringFromJob(split); err == nil {
return pm, nil
}
return nil, fmt.Errorf("unable to parse job: %s", job)
}

func patchPodMonitoringStatus(ctx context.Context, kubeClient client.Client, object client.Object, status *monitoringv1.PodMonitoringStatus) error {
patchStatus := map[string]interface{}{
"endpointStatuses": status.EndpointStatuses,
Expand All @@ -362,29 +313,29 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie
return err
}

var patchErr error
var errs []error
for job, endpointStatuses := range endpointMap {
// Kubelet scraping is configured through hard-coding and not through
// a PodMonitoring. As there's no status to update, we skip.
if strings.HasPrefix(job, "kubelet") {
pm, err := convertScrapeKey(job)
if err != nil {
errs = append(errs, fmt.Errorf("building target: %s: %w", job, err))
continue
}
pm, err := buildPodMonitoring(job)
if err != nil {
return fmt.Errorf("building podmonitoring: %s: %w", job, err)
if pm == nil {
// Skip hard-coded jobs which we do not patch.
continue
}
pm.GetPodMonitoringStatus().EndpointStatuses = endpointStatuses

if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
// Save and log any error encountered while patching the status.
// We don't want to prematurely return if the error was transient
// as we should continue patching all statuses before exiting.
patchErr = err
logger.Error(err, "patching podmonitoring status", "job", job)
errs = append(errs, err)
logger.Error(err, "patching status", "job", job, "gvk", pm.GetObjectKind().GroupVersionKind())
}
}

return patchErr
return errors.Join(errs...)
}

func getPrometheusPods(ctx context.Context, kubeClient client.Client, opts Options, selector labels.Selector) ([]*corev1.Pod, error) {
Expand Down
Loading

0 comments on commit d3f905e

Please sign in to comment.