Skip to content

Commit

Permalink
Only watch metadata for ReplicaSets in metricbeat k8s module (#41289) (
Browse files Browse the repository at this point in the history
…#41323)

(cherry picked from commit 9e6a942)

Co-authored-by: Mikołaj Świątek <[email protected]>
  • Loading branch information
mergify[bot] and swiatekm authored Oct 21, 2024
1 parent 5c85f91 commit b3abb30
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 71 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added Cisco Meraki module {pull}40836[40836]
- Added Palo Alto Networks module {pull}40686[40686]
- Restore docker.network.in.* and docker.network.out.* fields in docker module {pull}40968[40968]
- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41289[41289]

*Metricbeat*

Expand Down
187 changes: 126 additions & 61 deletions metricbeat/module/kubernetes/util/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import (
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/runtime/schema"

k8sclient "k8s.io/client-go/kubernetes"
k8sclientmeta "k8s.io/client-go/metadata"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -304,6 +309,7 @@ func createWatcher(
resource kubernetes.Resource,
options kubernetes.WatchOptions,
client k8sclient.Interface,
metadataClient k8sclientmeta.Interface,
resourceWatchers *Watchers,
namespace string,
extraWatcher bool) (bool, error) {
Expand Down Expand Up @@ -355,9 +361,27 @@ func createWatcher(
if isNamespaced(resourceName) {
options.Namespace = namespace
}
watcher, err := kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
var (
watcher kubernetes.Watcher
err error
)
switch resource.(type) {
// use a metadata informer for ReplicaSets, as we only need their metadata
case *kubernetes.ReplicaSet:
watcher, err = kubernetes.NewNamedMetadataWatcher(
"resource_metadata_enricher_rs",
client,
metadataClient,
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"},
options,
nil,
transformReplicaSetMetadata,
)
default:
watcher, err = kubernetes.NewNamedWatcher(resourceName, client, resource, options, nil)
}
if err != nil {
return false, err
return false, fmt.Errorf("error creating watcher for %T: %w", resource, err)
}

resourceMetaWatcher = &metaWatcher{
Expand Down Expand Up @@ -450,6 +474,7 @@ func removeFromMetricsetsUsing(resourceName string, notUsingName string, resourc
// createAllWatchers creates all the watchers required by a metricset
func createAllWatchers(
client k8sclient.Interface,
metadataClient k8sclientmeta.Interface,
metricsetName string,
resourceName string,
nodeScope bool,
Expand All @@ -469,7 +494,7 @@ func createAllWatchers(
// Create the main watcher for the given resource.
// For example pod metricset's main watcher will be pod watcher.
// If it fails, we return an error, so we can stop the extra watchers from creating.
created, err := createWatcher(resourceName, res, *options, client, resourceWatchers, config.Namespace, false)
created, err := createWatcher(resourceName, res, *options, client, metadataClient, resourceWatchers, config.Namespace, false)
if err != nil {
return fmt.Errorf("error initializing Kubernetes watcher %s, required by %s: %w", resourceName, metricsetName, err)
} else if created {
Expand All @@ -484,7 +509,7 @@ func createAllWatchers(
for _, extra := range extraWatchers {
extraRes := getResource(extra)
if extraRes != nil {
created, err = createWatcher(extra, extraRes, *options, client, resourceWatchers, config.Namespace, true)
created, err = createWatcher(extra, extraRes, *options, client, metadataClient, resourceWatchers, config.Namespace, true)
if err != nil {
log.Errorf("Error initializing Kubernetes watcher %s, required by %s: %s", extra, metricsetName, err)
} else {
Expand Down Expand Up @@ -620,11 +645,16 @@ func NewResourceMetadataEnricher(
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}

metricsetName := base.Name()
resourceName := getResourceName(metricsetName)
// Create all watchers needed for this metricset
err = createAllWatchers(client, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, resourceName, nodeScope, config, log, resourceWatchers)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand All @@ -650,61 +680,7 @@ func NewResourceMetadataEnricher(
// It is responsible for generating the metadata for a detected resource by executing the metadata generators Generate method.
// It is a common handler for all resource watchers. The kind of resource(e.g. pod or deployment) is checked inside the function.
// It returns a map of a resource identifier(i.e. namespace-resource_name) as key and the metadata as value.
updateFunc := func(r kubernetes.Resource) map[string]mapstr.M {
accessor, _ := meta.Accessor(r)
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = join(namespace, id)
}

switch r := r.(type) {
case *kubernetes.Pod:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}

case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := r.Status.Capacity["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := r.Status.Capacity["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)

return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)}
case *kubernetes.Deployment:
return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)}
case *kubernetes.Job:
return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)}
case *kubernetes.CronJob:
return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)}
case *kubernetes.Service:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}
case *kubernetes.StatefulSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)}
case *kubernetes.Namespace:
return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)}
case *kubernetes.ReplicaSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)}
case *kubernetes.DaemonSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)}
case *kubernetes.PersistentVolume:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)}
case *kubernetes.PersistentVolumeClaim:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)}
case *kubernetes.StorageClass:
return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)}
default:
return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)}
}
}
updateFunc := getEventMetadataFunc(log, generalMetaGen, specificMetaGen, metricsRepo)

// deleteFunc to be used as the resource watcher's delete handler.
// The deleteFunc is executed when a watcher is triggered for a resource deletion(e.g. pod deleted).
Expand Down Expand Up @@ -788,10 +764,15 @@ func NewContainerMetadataEnricher(
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}
metadataClient, err := kubernetes.GetKubernetesMetadataClient(config.KubeConfig, config.KubeClientOptions)
if err != nil {
log.Errorf("Error creating Kubernetes client: %s", err)
return &nilEnricher{}
}

metricsetName := base.Name()

err = createAllWatchers(client, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
err = createAllWatchers(client, metadataClient, metricsetName, PodResource, nodeScope, config, log, resourceWatchers)
if err != nil {
log.Errorf("Error starting the watchers: %s", err)
return &nilEnricher{}
Expand Down Expand Up @@ -1213,3 +1194,87 @@ func AddClusterECSMeta(base mb.BaseMetricSet) mapstr.M {
}
return ecsClusterMeta
}

// transformReplicaSetMetadata ensures that the PartialObjectMetadata resources we get from a metadata watcher
// can be correctly interpreted by the update function returned by getEventMetadataFunc.
// This really just involves adding missing type information.
func transformReplicaSetMetadata(obj interface{}) (interface{}, error) {
old, ok := obj.(*metav1.PartialObjectMetadata)
if !ok {
return nil, fmt.Errorf("obj of type %T neither a ReplicaSet nor a PartialObjectMetadata", obj)
}
old.TypeMeta = metav1.TypeMeta{
APIVersion: "apps/v1",
Kind: "ReplicaSet",
}
return old, nil
}

// getEventMetadataFunc returns a function that takes a kubernetes Resource as an argument and returns metadata
// that can directly be used for event enrichment.
// This function is intended to be used as the resource watchers add and update handler.
func getEventMetadataFunc(
logger *logp.Logger,
generalMetaGen *metadata.Resource,
specificMetaGen metadata.MetaGen,
metricsRepo *MetricsRepo,
) func(r kubernetes.Resource) map[string]mapstr.M {
return func(r kubernetes.Resource) map[string]mapstr.M {
accessor, accErr := meta.Accessor(r)
if accErr != nil {
logger.Errorf("Error creating accessor: %s", accErr)
}
id := accessor.GetName()
namespace := accessor.GetNamespace()
if namespace != "" {
id = join(namespace, id)
}

switch r := r.(type) {
case *kubernetes.Pod:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}

case *kubernetes.Node:
nodeName := r.GetObjectMeta().GetName()
metrics := NewNodeMetrics()
if cpu, ok := r.Status.Capacity["cpu"]; ok {
if q, err := resource.ParseQuantity(cpu.String()); err == nil {
metrics.CoresAllocatable = NewFloat64Metric(float64(q.MilliValue()) / 1000)
}
}
if memory, ok := r.Status.Capacity["memory"]; ok {
if q, err := resource.ParseQuantity(memory.String()); err == nil {
metrics.MemoryAllocatable = NewFloat64Metric(float64(q.Value()))
}
}
nodeStore, _ := metricsRepo.AddNodeStore(nodeName)
nodeStore.SetNodeMetrics(metrics)

return map[string]mapstr.M{id: generalMetaGen.Generate(NodeResource, r)}
case *kubernetes.Deployment:
return map[string]mapstr.M{id: generalMetaGen.Generate(DeploymentResource, r)}
case *kubernetes.Job:
return map[string]mapstr.M{id: generalMetaGen.Generate(JobResource, r)}
case *kubernetes.CronJob:
return map[string]mapstr.M{id: generalMetaGen.Generate(CronJobResource, r)}
case *kubernetes.Service:
return map[string]mapstr.M{id: specificMetaGen.Generate(r)}
case *kubernetes.StatefulSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(StatefulSetResource, r)}
case *kubernetes.Namespace:
return map[string]mapstr.M{id: generalMetaGen.Generate(NamespaceResource, r)}
case *kubernetes.ReplicaSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(ReplicaSetResource, r)}
case *kubernetes.DaemonSet:
return map[string]mapstr.M{id: generalMetaGen.Generate(DaemonSetResource, r)}
case *kubernetes.PersistentVolume:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeResource, r)}
case *kubernetes.PersistentVolumeClaim:
return map[string]mapstr.M{id: generalMetaGen.Generate(PersistentVolumeClaimResource, r)}
case *kubernetes.StorageClass:
return map[string]mapstr.M{id: generalMetaGen.Generate(StorageClassResource, r)}
default:
return map[string]mapstr.M{id: generalMetaGen.Generate(r.GetObjectKind().GroupVersionKind().Kind, r)}
}
}
}
Loading

0 comments on commit b3abb30

Please sign in to comment.