From d939e0bb8c8fe83fda33d6aadcefcf0fc7fbe1d9 Mon Sep 17 00:00:00 2001 From: ChrsMark Date: Fri, 2 Aug 2024 17:05:50 +0300 Subject: [PATCH] [receiver/receiver_creator] Add support for metrics' hints Signed-off-by: ChrsMark --- .chloggen/hints.yaml | 27 ++++ receiver/receivercreator/README.md | 140 +++++++++++++++++ receiver/receivercreator/config.go | 13 ++ receiver/receivercreator/hints.go | 154 +++++++++++++++++++ receiver/receivercreator/hints_test.go | 143 ++++++++++++++++++ receiver/receivercreator/observerhandler.go | 159 +++++++++++--------- 6 files changed, 565 insertions(+), 71 deletions(-) create mode 100644 .chloggen/hints.yaml create mode 100644 receiver/receivercreator/hints.go create mode 100644 receiver/receivercreator/hints_test.go diff --git a/.chloggen/hints.yaml b/.chloggen/hints.yaml new file mode 100644 index 000000000000..6fb21310cfa2 --- /dev/null +++ b/.chloggen/hints.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receivercreator + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for generating metrics receivers based on provided annotations' hints + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34427] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/receivercreator/README.md b/receiver/receivercreator/README.md index d0af39c09a12..a419d19ed935 100644 --- a/receiver/receivercreator/README.md +++ b/receiver/receivercreator/README.md @@ -437,3 +437,143 @@ service: The full list of settings exposed for this receiver are documented [here](./config.go) with detailed sample configurations [here](./testdata/config.yaml). + + +## Generate receiver configurations from provided Hints + +Currently this feature is only supported for K8s environments and the `k8sobserver`. + +The feature for K8s is enabled with the following setting: + +```yaml +receiver_creator/metrics: + watch_observers: [ k8s_observer ] + hints: + k8s: + metrics: + enabled: true +``` + +Users can use the following annotations to automatically enable receivers to start collecting metrics from the target Pods/containers. + +### Supported metrics annotations +1. `io.opentelemetry.collector.receiver-creator.metrics/receiver` (example: `nginx`) +2. `io.opentelemetry.collector.receiver-creator.metrics/endpoint` (example: ```"http://`endpoint`/nginx_status"```, if not provided it defaults to `endpoint` which is of form `pod_ip:container_port`.) +3. `io.opentelemetry.collector.receiver-creator.metrics/collection_interval` (example: `20s`) +4. `io.opentelemetry.collector.receiver-creator.metrics/timeout` (example: `1m`) +5. `io.opentelemetry.collector.receiver-creator.metrics/username` (example: `admin`) +6. `io.opentelemetry.collector.receiver-creator.metrics/password` (example: `passpass`) + + +### Support multiple target containers + +Users can target the annotation to a specific container by suffixing it with the name of the port that container exposes, +for example ```io.opentelemetry.collector.receiver-creator.metrics/endpoint.webserver: "http://`endpoint`/nginx_status"``` +where `webserver` is the name of the port the target container exposes. + +If a Pod is annotated with both container level hints and pod level hints the container level hints have priority and +the Pod level hints are used as a fallback (see detailed example bellow). + +The current implementation relies on the implementation of `k8sobserver` extension and specifically +the [pod_endpoint](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.111.0/extension/observer/k8sobserver/pod_endpoint.go). +The hints are evaluated per container by extracting the annotations from each `Port` endpoint that is emitted. + + +### Examples + +#### Metrics example + +Collector's configuration: +```yaml +receivers: + receiver_creator/metrics: + watch_observers: [ k8s_observer ] + hints: + k8s: + metrics: + enabled: true + receivers: + +service: + extensions: [ k8s_observer] + pipelines: + metrics: + receivers: [ receiver_creator/metrics ] + processors: [] + exporters: [ debug ] +``` + +Target Pod annotated with hints: + +```yaml +apiVersion: v1 +kind: ConfigMap +metadata: + name: nginx-conf +data: + nginx.conf: | + user nginx; + worker_processes 1; + error_log /dev/stderr warn; + pid /var/run/nginx.pid; + events { + worker_connections 1024; + } + http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + access_log /dev/stdout main; + server { + listen 80; + server_name localhost; + + location /nginx_status { + stub_status on; + } + } + include /etc/nginx/conf.d/*; + } +--- +apiVersion: v1 +kind: Pod +metadata: + name: redis + annotations: + io.opentelemetry.collector.receiver-creator.metrics/receiver: redis + io.opentelemetry.collector.receiver-creator.metrics/collection_interval: '20s' + io.opentelemetry.collector.receiver-creator.metrics/receiver.webserver: nginx + io.opentelemetry.collector.receiver-creator.metrics/endpoint.webserver: "http://`endpoint`/nginx_status" + labels: + k8s-app: redis + app: redis +spec: + volumes: + - name: nginx-conf + configMap: + name: nginx-conf + items: + - key: nginx.conf + path: nginx.conf + containers: + - name: webserver + image: nginx:latest + ports: + - containerPort: 80 + name: webserver + volumeMounts: + - mountPath: /etc/nginx/nginx.conf + readOnly: true + subPath: nginx.conf + name: nginx-conf + - image: redis + imagePullPolicy: IfNotPresent + name: redis + ports: + - name: redis + containerPort: 6379 + protocol: TCP +``` \ No newline at end of file diff --git a/receiver/receivercreator/config.go b/receiver/receivercreator/config.go index bb5ebfaa4f6f..8b79b5d0cd70 100644 --- a/receiver/receivercreator/config.go +++ b/receiver/receivercreator/config.go @@ -78,6 +78,19 @@ type Config struct { // ResourceAttributes is a map of default resource attributes to add to each resource // object received by this receiver from dynamically created receivers. ResourceAttributes resourceAttributes `mapstructure:"resource_attributes"` + Hints HintsConfig `mapstructure:"hints"` +} + +type HintsConfig struct { + K8s K8sHintsConfig `mapstructure:"k8s"` +} + +type K8sHintsConfig struct { + Metrics MetricsHints `mapstructure:"metrics"` +} + +type MetricsHints struct { + Enabled bool `mapstructure:"enabled"` } func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { diff --git a/receiver/receivercreator/hints.go b/receiver/receivercreator/hints.go new file mode 100644 index 000000000000..e1fbebb6e1f6 --- /dev/null +++ b/receiver/receivercreator/hints.go @@ -0,0 +1,154 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receivercreator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator" + +import ( + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +const ( + hintsMetricsReceiver = "io.opentelemetry.collector.receiver-creator.metrics/receiver" + hintsMetricsEndpoint = "io.opentelemetry.collector.receiver-creator.metrics/endpoint" + hintsMetricsCollectionInterval = "io.opentelemetry.collector.receiver-creator.metrics/collection_interval" + hintsMetricsTimeout = "io.opentelemetry.collector.receiver-creator.metrics/timeout" + hintsMetricsUsername = "io.opentelemetry.collector.receiver-creator.metrics/username" + hintsMetricsPassword = "io.opentelemetry.collector.receiver-creator.metrics/password" +) + +// HintsTemplatesBuilder creates configuration templates from provided hints. +type HintsTemplatesBuilder interface { + createReceiverTemplatesFromHints() ([]receiverTemplate, error) +} + +// K8sHintsBuilder creates configurations from hints provided as Pod's annotations. +type K8sHintsBuilder struct { + logger *zap.Logger +} + +func (builder *K8sHintsBuilder) createReceiverTemplatesFromHints(env observer.EndpointEnv) ([]receiverTemplate, error) { + var endpointType string + var podUID string + var port uint16 + var annotations map[string]string + var receiverTemplates []receiverTemplate + + builder.logger.Debug("handling hints for added endpoint", zap.Any("env", env)) + + if pod, ok := env["pod"]; !ok { + return receiverTemplates, nil + } else { + endpointPod, ok := pod.(observer.EndpointEnv) + if !ok { + return receiverTemplates, fmt.Errorf("could not extract endpoint's pod: %v", zap.Any("endpointPod", pod)) + } + ann := endpointPod["annotations"] + if ann != nil { + annotations, ok = ann.(map[string]string) + if !ok { + return receiverTemplates, fmt.Errorf("could not extract annotations: %v", zap.Any("annotations", ann)) + } + } + podUID = endpointPod["uid"].(string) + } + + if valType, ok := env["type"]; !ok { + return receiverTemplates, fmt.Errorf("could not get endpoint type: %v", zap.Any("env", env)) + } else { + endpointType, ok = valType.(string) + if !ok { + return receiverTemplates, fmt.Errorf("could not extract endpointType: %v", zap.Any("endpointType", valType)) + } + } + + if len(annotations) > 0 { + if endpointType == string(observer.PortType) { + // Only handle Endpoints of type port for metrics + portName := env["name"].(string) + metricsReceiverEnabled := getHintAnnotation(annotations, hintsMetricsReceiver, portName) + if metricsReceiverEnabled != "" { + subreceiverKey := metricsReceiverEnabled + if subreceiverKey == "" { + return receiverTemplates, nil + } + builder.logger.Debug("handling added hinted receiver", zap.Any("subreceiverKey", subreceiverKey)) + + userConfMap := createMetricsConfig(annotations, env, portName) + + if p, ok := env["port"]; ok { + port = p.(uint16) + if port == 0 { + return receiverTemplates, fmt.Errorf("could not extract port: %v", zap.Any("env", env)) + } + } else { + return receiverTemplates, fmt.Errorf("could not extract port: %v", zap.Any("env", env)) + } + subreceiver, err := newReceiverTemplate(fmt.Sprintf("%v/%v_%v", subreceiverKey, podUID, port), userConfMap) + if err != nil { + builder.logger.Error("error adding subreceiver", zap.Any("err", err)) + return receiverTemplates, err + } + + subreceiver.Rule = fmt.Sprintf("type == \"port\" && port ==%v", port) // + subreceiver.rule, err = newRule(subreceiver.Rule) + if err != nil { + builder.logger.Error("error adding subreceiver rule", zap.Any("err", err)) + return receiverTemplates, err + } + builder.logger.Debug("adding hinted receiver", zap.Any("subreceiver", subreceiver)) + receiverTemplates = append(receiverTemplates, subreceiver) + } + } + } + return receiverTemplates, nil +} + +func createMetricsConfig(annotations map[string]string, env observer.EndpointEnv, portName string) userConfigMap { + confMap := map[string]any{} + + defaultEndpoint := env["endpoint"] + // get endpoint directly from the Port endpoint + if defaultEndpoint != "" { + confMap["endpoint"] = defaultEndpoint + } + + subreceiverEndpoint := getHintAnnotation(annotations, hintsMetricsEndpoint, portName) + if subreceiverEndpoint != "" { + confMap["endpoint"] = subreceiverEndpoint + } + subreceiverColInterval := getHintAnnotation(annotations, hintsMetricsCollectionInterval, portName) + if subreceiverColInterval != "" { + confMap["collection_interval"] = subreceiverColInterval + } + subreceiverTimeout := getHintAnnotation(annotations, hintsMetricsTimeout, portName) + if subreceiverTimeout != "" { + confMap["timeout"] = subreceiverTimeout + } + subreceiverUsername := getHintAnnotation(annotations, hintsMetricsUsername, portName) + if subreceiverUsername != "" { + confMap["username"] = subreceiverUsername + } + subreceiverPassword := getHintAnnotation(annotations, hintsMetricsPassword, portName) + if subreceiverPassword != "" { + confMap["password"] = subreceiverPassword + } + return confMap +} + +func getHintAnnotation(annotations map[string]string, hintKey string, portName string) string { + containerLevelHint := annotations[fmt.Sprintf("%s.%s", hintKey, portName)] + if containerLevelHint != "" { + return containerLevelHint + } + + // if there is no container level hint defined try to scope the hint more on container level by suffixing with . + podLevelHint := annotations[hintKey] + if podLevelHint != "" { + return podLevelHint + } + return "" +} diff --git a/receiver/receivercreator/hints_test.go b/receiver/receivercreator/hints_test.go new file mode 100644 index 000000000000..17983f6b5585 --- /dev/null +++ b/receiver/receivercreator/hints_test.go @@ -0,0 +1,143 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receivercreator + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer" +) + +func TestK8sHintsBuilderMetrics(t *testing.T) { + logger := zaptest.NewLogger(t, zaptest.Level(zap.InfoLevel)) + logger.Level() + + tests := map[string]struct { + inputEndpoint observer.Endpoint + expectedReceiver receiverTemplate + wantError bool + }{ + `metrics_pod_level_hints_only`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/username": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + Rule: "type == \"port\" && port ==6379", + receiverConfig: receiverConfig{ + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "30s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_container_level_hints`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver.redis": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval.redis": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout.redis": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/username.redis": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password.redis": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + Rule: "type == \"port\" && port ==6379", + receiverConfig: receiverConfig{ + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "30s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_mix_level_hints`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4:6379", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver.redis": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout.redis": "130s", + "io.opentelemetry.collector.receiver-creator.metrics/username.redis": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password.redis": "changeme", + }}, + Port: 6379}, + }, + expectedReceiver: receiverTemplate{ + Rule: "type == \"port\" && port ==6379", + receiverConfig: receiverConfig{ + config: userConfigMap{"collection_interval": "20s", "endpoint": "1.2.3.4:6379", "password": "changeme", "timeout": "130s", "username": "username"}, + }, + }, + wantError: false, + }, `metrics_no_port_error`: { + inputEndpoint: observer.Endpoint{ + ID: "namespace/pod-2-UID/redis(6379)", + Target: "1.2.3.4", + Details: &observer.Port{ + Name: "redis", Pod: observer.Pod{ + Name: "pod-2", + Namespace: "default", + UID: "pod-2-UID", + Labels: map[string]string{"env": "prod"}, + Annotations: map[string]string{ + "io.opentelemetry.collector.receiver-creator.metrics/receiver.redis": "redis", + "io.opentelemetry.collector.receiver-creator.metrics/collection_interval": "20s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout": "30s", + "io.opentelemetry.collector.receiver-creator.metrics/timeout.redis": "130s", + "io.opentelemetry.collector.receiver-creator.metrics/username.redis": "username", + "io.opentelemetry.collector.receiver-creator.metrics/password.redis": "changeme", + }}}, + }, + expectedReceiver: receiverTemplate{}, + wantError: true, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + k8sHintsBuilder := K8sHintsBuilder{logger} + env, err := test.inputEndpoint.Env() + require.NoError(t, err) + subreceiverTemplates, err := k8sHintsBuilder.createReceiverTemplatesFromHints(env) + if !test.wantError { + require.NoError(t, err) + require.Equal(t, subreceiverTemplates[0].receiverConfig.config, test.expectedReceiver.receiverConfig.config) + require.Equal(t, subreceiverTemplates[0].Rule, test.expectedReceiver.Rule) + } else { + require.Error(t, err) + } + }) + } +} diff --git a/receiver/receivercreator/observerhandler.go b/receiver/receivercreator/observerhandler.go index de1d9689953b..aecddbe664e1 100644 --- a/receiver/receivercreator/observerhandler.go +++ b/receiver/receivercreator/observerhandler.go @@ -83,85 +83,31 @@ func (obs *observerHandler) OnAdd(added []observer.Endpoint) { continue } - obs.params.TelemetrySettings.Logger.Debug("handling added endpoint", zap.Any("env", env)) - - for _, template := range obs.config.receiverTemplates { - if matches, e := template.rule.eval(env); e != nil { - obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(e)) - continue - } else if !matches { - continue - } - - obs.params.TelemetrySettings.Logger.Info("starting receiver", - zap.String("name", template.id.String()), - zap.String("endpoint", e.Target), - zap.String("endpoint_id", string(e.ID))) - - resolvedConfig, err := expandConfig(template.config, env) + if obs.config.Hints.K8s.Metrics.Enabled { + k8sHintsBuilder := K8sHintsBuilder{obs.params.TelemetrySettings.Logger} + subreceiverTemplates, err := k8sHintsBuilder.createReceiverTemplatesFromHints(env) if err != nil { - obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err)) - continue - } - obs.params.TelemetrySettings.Logger.Debug("resolved config", zap.String("receiver", template.id.String()), zap.Any("config", resolvedConfig)) - - discoveredCfg := userConfigMap{} - // If user didn't set endpoint set to default value as well as - // flag indicating we've done this for later validation. - if _, ok := resolvedConfig[endpointConfigKey]; !ok { - discoveredCfg[endpointConfigKey] = e.Target - discoveredCfg[tmpSetEndpointConfigKey] = struct{}{} + obs.params.TelemetrySettings.Logger.Error("could not extract configurations from K8s hints' annotations", zap.Any("err", err)) + break } - - // Though not necessary with contrib provided observers, nothing is stopping custom - // ones from using expr in their Target values. - discoveredConfig, err := expandConfig(discoveredCfg, env) - if err != nil { - obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err)) - continue - } - - resAttrs := map[string]string{} - for k, v := range template.ResourceAttributes { - strVal, ok := v.(string) - if !ok { - obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v)) - continue + if len(subreceiverTemplates) > 0 { + // loop over the receiverTemplates. Some Pods might have both logs+metrics templates + for _, subreceiver := range subreceiverTemplates { + obs.params.TelemetrySettings.Logger.Debug("adding K8s hinted receiver", zap.Any("subreceiver", subreceiver)) + obs.startReceiver(subreceiver, env, e) } - resAttrs[k] = strVal - } - - // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources - // as telemetry is emitted. - var consumer *enhancingConsumer - if consumer, err = newEnhancingConsumer( - obs.config.ResourceAttributes, - resAttrs, - env, - e, - obs.nextLogsConsumer, - obs.nextMetricsConsumer, - obs.nextTracesConsumer, - ); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) continue } + } - var receiver component.Component - if receiver, err = obs.runner.start( - receiverConfig{ - id: template.id, - config: resolvedConfig, - endpointID: e.ID, - }, - discoveredConfig, - consumer, - ); err != nil { - obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) + for _, template := range obs.config.receiverTemplates { + if matches, err := template.rule.eval(env); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed matching rule", zap.String("rule", template.Rule), zap.Error(err)) + continue + } else if !matches { continue } - - obs.receiversByEndpointID.Put(e.ID, receiver) + obs.startReceiver(template, env, e) } } } @@ -200,3 +146,74 @@ func (obs *observerHandler) OnChange(changed []observer.Endpoint) { obs.OnRemove(changed) obs.OnAdd(changed) } + +func (obs *observerHandler) startReceiver(template receiverTemplate, env observer.EndpointEnv, e observer.Endpoint) { + obs.params.TelemetrySettings.Logger.Info("starting receiver", + zap.String("name", template.id.String()), + zap.String("endpoint", e.Target), + zap.String("endpoint_id", string(e.ID)), + zap.Any("config", template.config)) + + resolvedConfig, err := expandConfig(template.config, env) + if err != nil { + obs.params.TelemetrySettings.Logger.Error("unable to resolve template config", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + discoveredCfg := userConfigMap{} + // If user didn't set endpoint set to default value as well as + // flag indicating we've done this for later validation. + if _, ok := resolvedConfig[endpointConfigKey]; !ok { + discoveredCfg[endpointConfigKey] = e.Target + discoveredCfg[tmpSetEndpointConfigKey] = struct{}{} + } + + // Though not necessary with contrib provided observers, nothing is stopping custom + // ones from using expr in their Target values. + discoveredConfig, err := expandConfig(discoveredCfg, env) + if err != nil { + obs.params.TelemetrySettings.Logger.Error("unable to resolve discovered config", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + resAttrs := map[string]string{} + for k, v := range template.ResourceAttributes { + strVal, ok := v.(string) + if !ok { + obs.params.TelemetrySettings.Logger.Info(fmt.Sprintf("ignoring unsupported `resource_attributes` %q value %v", k, v)) + continue + } + resAttrs[k] = strVal + } + + // Adds default and/or configured resource attributes (e.g. k8s.pod.uid) to resources + // as telemetry is emitted. + var consumer *enhancingConsumer + if consumer, err = newEnhancingConsumer( + obs.config.ResourceAttributes, + resAttrs, + env, + e, + obs.nextLogsConsumer, + obs.nextMetricsConsumer, + obs.nextTracesConsumer, + ); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed creating resource enhancer", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + + var receiver component.Component + if receiver, err = obs.runner.start( + receiverConfig{ + id: template.id, + config: resolvedConfig, + endpointID: e.ID, + }, + discoveredConfig, + consumer, + ); err != nil { + obs.params.TelemetrySettings.Logger.Error("failed to start receiver", zap.String("receiver", template.id.String()), zap.Error(err)) + return + } + obs.receiversByEndpointID.Put(e.ID, receiver) +}