From 93cd9b228927861ea9f74f67b889b5a294136490 Mon Sep 17 00:00:00 2001 From: Haley Wang Date: Fri, 2 Aug 2024 04:18:19 +0800 Subject: [PATCH] stream aggregation: sync options `dropInputLabels`, `ignoreFirstIntervals`, `ignoreOldSamples` from upstream, and support using configmap as the source of aggregation rules --- api/operator/v1beta1/vmagent_types.go | 16 +- api/operator/v1beta1/vmextra_types.go | 25 ++ api/operator/v1beta1/vmsingle_types.go | 2 +- api/operator/v1beta1/zz_generated.deepcopy.go | 20 + config/crd/overlay/crd.yaml | 406 +++++++++++++++++- docs/CHANGELOG.md | 1 + docs/api.md | 13 +- docs/vars.md | 20 +- internal/config/config.go | 16 +- .../operator/factory/k8stools/client_utils.go | 8 + .../operator/factory/vmagent/vmagent.go | 135 ++++-- .../operator/factory/vmagent/vmagent_test.go | 32 +- .../operator/factory/vmsingle/vmsingle.go | 48 ++- 13 files changed, 665 insertions(+), 77 deletions(-) diff --git a/api/operator/v1beta1/vmagent_types.go b/api/operator/v1beta1/vmagent_types.go index cb8b20c71..88af173a4 100644 --- a/api/operator/v1beta1/vmagent_types.go +++ b/api/operator/v1beta1/vmagent_types.go @@ -207,6 +207,9 @@ type VMAgentSpec struct { // InlineRelabelConfig - defines GlobalRelabelConfig for vmagent, can be defined directly at CRD. // +optional InlineRelabelConfig []RelabelConfig `json:"inlineRelabelConfig,omitempty"` + // StreamAggrConfig defines global stream aggregation configuration for VMAgent + // +optional + StreamAggrConfig *StreamAggrConfig `json:"streamAggrConfig,omitempty"` // SelectAllByDefault changes default behavior for empty CRD selectors, such ServiceScrapeSelector. // with selectAllByDefault: true and empty serviceScrapeSelector and ServiceScrapeNamespaceSelector // Operator selects all exist serviceScrapes @@ -539,11 +542,6 @@ func (rw *VMAgentRemoteWriteSpec) AsConfigMapKey(idx int, suffix string) string return fmt.Sprintf("RWS_%d-CM-%s", idx, strings.ToUpper(suffix)) } -// HasStreamAggr returns true if stream aggregation is enabled for this remoteWrite -func (rw *VMAgentRemoteWriteSpec) HasStreamAggr() bool { - return rw.StreamAggrConfig != nil && len(rw.StreamAggrConfig.Rules) > 0 -} - // VMAgentStatus defines the observed state of VMAgent // +k8s:openapi-gen=true type VMAgentStatus struct { @@ -805,13 +803,17 @@ func (cr *VMAgent) HasAnyRelabellingConfigs() bool { return false } -// HasAnyStreamAggrConfigs checks if agent has any streaming aggregation config defined +// HasAnyStreamAggrConfigs checks if vmagent has any defined aggregation rules func (cr *VMAgent) HasAnyStreamAggrConfigs() bool { + if cr.Spec.StreamAggrConfig.HasStreamAggrConfig() { + return true + } for _, rw := range cr.Spec.RemoteWrite { - if rw.HasStreamAggr() { + if rw.StreamAggrConfig.HasStreamAggrConfig() { return true } } + return false } diff --git a/api/operator/v1beta1/vmextra_types.go b/api/operator/v1beta1/vmextra_types.go index 0582c6095..9b7104ce1 100644 --- a/api/operator/v1beta1/vmextra_types.go +++ b/api/operator/v1beta1/vmextra_types.go @@ -474,7 +474,11 @@ type ConfigMapKeyReference struct { // +k8s:openapi-gen=true type StreamAggrConfig struct { // Stream aggregation rules + // +optional Rules []StreamAggrRule `json:"rules"` + // ConfigMap with stream aggregation rules + // +optional + RuleConfigMap *v1.ConfigMapKeySelector `json:"configmap,omitempty"` // Allows writing both raw and aggregate data // +optional KeepInput bool `json:"keepInput,omitempty"` @@ -484,6 +488,13 @@ type StreamAggrConfig struct { // Allows setting different de-duplication intervals per each configured remote storage // +optional DedupInterval string `json:"dedupInterval,omitempty"` + // labels to drop from samples for aggregator before stream de-duplication and aggregation + // +optional + DropInputLabels []string `json:"dropInputLabels,omitempty"` + IgnoreFirstIntervals int `json:"ignoreFirstIntervals,omitempty"` + // IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. + // +optional + IgnoreOldSamples bool `json:"ignoreOldSamples,omitempty"` } // StreamAggrRule defines the rule in stream aggregation config @@ -570,6 +581,8 @@ type StreamAggrRule struct { // +optional Without []string `json:"without,omitempty" yaml:"without,omitempty"` + IgnoreFirstIntervals *int `json:"ignore_first_intervals,omitempty" yaml:"ignore_first_intervals,omitempty"` + // DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples. // // Labels are dropped before de-duplication and aggregation. @@ -587,6 +600,18 @@ type StreamAggrRule struct { OutputRelabelConfigs []RelabelConfig `json:"output_relabel_configs,omitempty" yaml:"output_relabel_configs,omitempty"` } +// HasStreamAggr returns true if stream aggregation is enabled for this remoteWrite +// func (rw *VMAgentRemoteWriteSpec) HasStreamAggr() bool { +// return rw.StreamAggrConfig != nil +// } + +func (config *StreamAggrConfig) HasStreamAggrConfig() bool { + if config != nil && (len(config.Rules) > 0 || config.RuleConfigMap != nil) { + return true + } + return false +} + // KeyValue defines a (key, value) tuple. // +kubebuilder:object:generate=false // +k8s:openapi-gen=false diff --git a/api/operator/v1beta1/vmsingle_types.go b/api/operator/v1beta1/vmsingle_types.go index b370ea4f2..7adb56f02 100644 --- a/api/operator/v1beta1/vmsingle_types.go +++ b/api/operator/v1beta1/vmsingle_types.go @@ -214,7 +214,7 @@ type VMSingleSpec struct { // HasStreamAggrConfig checks if streamAggrConfig present func (cr *VMSingle) HasStreamAggrConfig() bool { - return cr.Spec.StreamAggrConfig != nil && len(cr.Spec.StreamAggrConfig.Rules) > 0 + return cr.Spec.StreamAggrConfig.HasStreamAggrConfig() } // UnmarshalJSON implements json.Unmarshaler interface diff --git a/api/operator/v1beta1/zz_generated.deepcopy.go b/api/operator/v1beta1/zz_generated.deepcopy.go index 9febdc73c..c09434f04 100644 --- a/api/operator/v1beta1/zz_generated.deepcopy.go +++ b/api/operator/v1beta1/zz_generated.deepcopy.go @@ -2445,6 +2445,16 @@ func (in *StreamAggrConfig) DeepCopyInto(out *StreamAggrConfig) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.RuleConfigMap != nil { + in, out := &in.RuleConfigMap, &out.RuleConfigMap + *out = new(v1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } + if in.DropInputLabels != nil { + in, out := &in.DropInputLabels, &out.DropInputLabels + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamAggrConfig. @@ -2495,6 +2505,11 @@ func (in *StreamAggrRule) DeepCopyInto(out *StreamAggrRule) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.IgnoreFirstIntervals != nil { + in, out := &in.IgnoreFirstIntervals, &out.IgnoreFirstIntervals + *out = new(int) + **out = **in + } if in.DropInputLabels != nil { in, out := &in.DropInputLabels, &out.DropInputLabels *out = new([]string) @@ -3293,6 +3308,11 @@ func (in *VMAgentSpec) DeepCopyInto(out *VMAgentSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.StreamAggrConfig != nil { + in, out := &in.StreamAggrConfig, &out.StreamAggrConfig + *out = new(StreamAggrConfig) + (*in).DeepCopyInto(*out) + } if in.ServiceScrapeSelector != nil { in, out := &in.ServiceScrapeSelector, &out.ServiceScrapeSelector *out = new(metav1.LabelSelector) diff --git a/config/crd/overlay/crd.yaml b/config/crd/overlay/crd.yaml index 79214a0c9..b5f96a0b2 100644 --- a/config/crd/overlay/crd.yaml +++ b/config/crd/overlay/crd.yaml @@ -2178,6 +2178,31 @@ spec: description: StreamAggrConfig defines stream aggregation configuration for VMAgent for -remoteWrite.url properties: + configmap: + description: ConfigMap with stream aggregation rules + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the ConfigMap or its key + must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic dedupInterval: description: Allows setting different de-duplication intervals per each configured remote storage @@ -2186,6 +2211,18 @@ spec: description: Allow drop all the input samples after the aggregation type: boolean + dropInputLabels: + description: labels to drop from samples for aggregator + before stream de-duplication and aggregation + items: + type: string + type: array + ignoreFirstIntervals: + type: integer + ignoreOldSamples: + description: IgnoreOldSamples instructs to ignore samples + with old timestamps outside the current aggregation interval. + type: boolean keepInput: description: Allows writing both raw and aggregate data type: boolean @@ -2228,6 +2265,8 @@ spec: It is not recommended changing this setting, unless unfinished aggregations states are preferred to missing data points. type: boolean + ignore_first_intervals: + type: integer ignore_old_samples: description: IgnoreOldSamples instructs to ignore samples with old timestamps outside the current @@ -2462,8 +2501,6 @@ spec: - outputs type: object type: array - required: - - rules type: object tlsConfig: description: TLSConfig describes tls configuration for remote @@ -3939,6 +3976,330 @@ spec: type: object type: object x-kubernetes-map-type: atomic + streamAggrConfig: + description: StreamAggrConfig defines global stream aggregation configuration + for VMAgent + properties: + configmap: + description: ConfigMap with stream aggregation rules + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the ConfigMap or its key must + be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + dedupInterval: + description: Allows setting different de-duplication intervals + per each configured remote storage + type: string + dropInput: + description: Allow drop all the input samples after the aggregation + type: boolean + dropInputLabels: + description: labels to drop from samples for aggregator before + stream de-duplication and aggregation + items: + type: string + type: array + ignoreFirstIntervals: + type: integer + ignoreOldSamples: + description: IgnoreOldSamples instructs to ignore samples with + old timestamps outside the current aggregation interval. + type: boolean + keepInput: + description: Allows writing both raw and aggregate data + type: boolean + rules: + description: Stream aggregation rules + items: + description: StreamAggrRule defines the rule in stream aggregation + config + properties: + by: + description: |- + By is an optional list of labels for grouping input series. + + + See also Without. + + + If neither By nor Without are set, then the Outputs are calculated + individually per each input time series. + items: + type: string + type: array + dedup_interval: + description: DedupInterval is an optional interval for deduplication. + type: string + drop_input_labels: + description: |- + DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples. + + + Labels are dropped before de-duplication and aggregation. + items: + type: string + type: array + flush_on_shutdown: + description: |- + FlushOnShutdown defines whether to flush the aggregation state on process termination + or config reload. Is `false` by default. + It is not recommended changing this setting, unless unfinished aggregations states + are preferred to missing data points. + type: boolean + ignore_first_intervals: + type: integer + ignore_old_samples: + description: IgnoreOldSamples instructs to ignore samples + with old timestamps outside the current aggregation interval. + type: boolean + input_relabel_configs: + description: |- + InputRelabelConfigs is an optional relabeling rules, which are applied on the input + before aggregation. + items: + description: |- + RelabelConfig allows dynamic rewriting of the label set + More info: https://docs.victoriametrics.com/#relabeling + properties: + action: + description: Action to perform based on regex matching. + Default is 'replace' + type: string + if: + description: 'If represents metricsQL match expression + (or list of expressions): ''{__name__=~"foo_.*"}''' + x-kubernetes-preserve-unknown-fields: true + labels: + additionalProperties: + type: string + description: 'Labels is used together with Match for + `action: graphite`' + type: object + match: + description: 'Match is used together with Labels for + `action: graphite`' + type: string + modulus: + description: Modulus to take of the hash of the source + label values. + format: int64 + type: integer + regex: + description: |- + Regular expression against which the extracted value is matched. Default is '(.*)' + victoriaMetrics supports multiline regex joined with | + https://docs.victoriametrics.com/vmagent/#relabeling-enhancements + x-kubernetes-preserve-unknown-fields: true + replacement: + description: |- + Replacement value against which a regex replace is performed if the + regular expression matches. Regex capture groups are available. Default is '$1' + type: string + separator: + description: Separator placed between concatenated + source label values. default is ';'. + type: string + source_labels: + description: |- + UnderScoreSourceLabels - additional form of source labels source_labels + for compatibility with original relabel config. + if set both sourceLabels and source_labels, sourceLabels has priority. + for details https://github.com/VictoriaMetrics/operator/issues/131 + items: + type: string + type: array + sourceLabels: + description: |- + The source labels select values from existing labels. Their content is concatenated + using the configured separator and matched against the configured regular expression + for the replace, keep, and drop actions. + items: + type: string + type: array + target_label: + description: |- + UnderScoreTargetLabel - additional form of target label - target_label + for compatibility with original relabel config. + if set both targetLabel and target_label, targetLabel has priority. + for details https://github.com/VictoriaMetrics/operator/issues/131 + type: string + targetLabel: + description: |- + Label to which the resulting value is written in a replace action. + It is mandatory for replace actions. Regex capture groups are available. + type: string + type: object + type: array + interval: + description: Interval is the interval between aggregations. + type: string + keep_metric_names: + description: KeepMetricNames instructs to leave metric names + as is for the output time series without adding any suffix. + type: boolean + match: + description: |- + Match is a label selector (or list of label selectors) for filtering time series for the given selector. + + + If the match isn't set, then all the input time series are processed. + x-kubernetes-preserve-unknown-fields: true + no_align_flush_to_interval: + description: |- + NoAlignFlushToInterval disables aligning of flushes to multiples of Interval. + By default flushes are aligned to Interval. + type: boolean + output_relabel_configs: + description: |- + OutputRelabelConfigs is an optional relabeling rules, which are applied + on the aggregated output before being sent to remote storage. + items: + description: |- + RelabelConfig allows dynamic rewriting of the label set + More info: https://docs.victoriametrics.com/#relabeling + properties: + action: + description: Action to perform based on regex matching. + Default is 'replace' + type: string + if: + description: 'If represents metricsQL match expression + (or list of expressions): ''{__name__=~"foo_.*"}''' + x-kubernetes-preserve-unknown-fields: true + labels: + additionalProperties: + type: string + description: 'Labels is used together with Match for + `action: graphite`' + type: object + match: + description: 'Match is used together with Labels for + `action: graphite`' + type: string + modulus: + description: Modulus to take of the hash of the source + label values. + format: int64 + type: integer + regex: + description: |- + Regular expression against which the extracted value is matched. Default is '(.*)' + victoriaMetrics supports multiline regex joined with | + https://docs.victoriametrics.com/vmagent/#relabeling-enhancements + x-kubernetes-preserve-unknown-fields: true + replacement: + description: |- + Replacement value against which a regex replace is performed if the + regular expression matches. Regex capture groups are available. Default is '$1' + type: string + separator: + description: Separator placed between concatenated + source label values. default is ';'. + type: string + source_labels: + description: |- + UnderScoreSourceLabels - additional form of source labels source_labels + for compatibility with original relabel config. + if set both sourceLabels and source_labels, sourceLabels has priority. + for details https://github.com/VictoriaMetrics/operator/issues/131 + items: + type: string + type: array + sourceLabels: + description: |- + The source labels select values from existing labels. Their content is concatenated + using the configured separator and matched against the configured regular expression + for the replace, keep, and drop actions. + items: + type: string + type: array + target_label: + description: |- + UnderScoreTargetLabel - additional form of target label - target_label + for compatibility with original relabel config. + if set both targetLabel and target_label, targetLabel has priority. + for details https://github.com/VictoriaMetrics/operator/issues/131 + type: string + targetLabel: + description: |- + Label to which the resulting value is written in a replace action. + It is mandatory for replace actions. Regex capture groups are available. + type: string + type: object + type: array + outputs: + description: |- + Outputs is a list of output aggregate functions to produce. + + + The following names are allowed: + + + - total - aggregates input counters + - increase - counts the increase over input counters + - count_series - counts the input series + - count_samples - counts the input samples + - sum_samples - sums the input samples + - last - the last biggest sample value + - min - the minimum sample value + - max - the maximum sample value + - avg - the average value across all the samples + - stddev - standard deviation across all the samples + - stdvar - standard variance across all the samples + - histogram_bucket - creates VictoriaMetrics histogram for input samples + - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1] + + + The output time series will have the following names: + + + input_name:aggr__ + items: + type: string + type: array + staleness_interval: + description: |- + Staleness interval is interval after which the series state will be reset if no samples have been sent during it. + The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket. + type: string + without: + description: |- + Without is an optional list of labels, which must be excluded when grouping input series. + + + See also By. + + + If neither By nor Without are set, then the Outputs are calculated + individually per each input time series. + items: + type: string + type: array + required: + - interval + - outputs + type: object + type: array + type: object terminationGracePeriodSeconds: description: TerminationGracePeriodSeconds period for container graceful termination @@ -25813,6 +26174,31 @@ spec: description: StreamAggrConfig defines stream aggregation configuration for VMSingle properties: + configmap: + description: ConfigMap with stream aggregation rules + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the ConfigMap or its key must + be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic dedupInterval: description: Allows setting different de-duplication intervals per each configured remote storage @@ -25820,6 +26206,18 @@ spec: dropInput: description: Allow drop all the input samples after the aggregation type: boolean + dropInputLabels: + description: labels to drop from samples for aggregator before + stream de-duplication and aggregation + items: + type: string + type: array + ignoreFirstIntervals: + type: integer + ignoreOldSamples: + description: IgnoreOldSamples instructs to ignore samples with + old timestamps outside the current aggregation interval. + type: boolean keepInput: description: Allows writing both raw and aggregate data type: boolean @@ -25861,6 +26259,8 @@ spec: It is not recommended changing this setting, unless unfinished aggregations states are preferred to missing data points. type: boolean + ignore_first_intervals: + type: integer ignore_old_samples: description: IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. @@ -26093,8 +26493,6 @@ spec: - outputs type: object type: array - required: - - rules type: object terminationGracePeriodSeconds: description: TerminationGracePeriodSeconds period for container graceful diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index c9f8d7aca..a8b6d99fa 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -16,6 +16,7 @@ aliases: - [operator](./README.md): properly release `PodDisruptionBudget` object finalizer. Previously it could be kept due to typo. See this [issue](https://github.com/VictoriaMetrics/operator/issues/1036) for details. - [operator](./README.md): refactors finalizers usage. Simplifies finalizer manipulation with helper functions - [vmalertmanager](./api.md#vmalertmanager): adds `webConfig` that simplifies tls configuration for alertmanager and allows to properly build probes and access urls for alertmanager. See this [issue](https://github.com/VictoriaMetrics/operator/issues/994) for details. +- [vmagent/vmsingle](./api.md#vmagent): sync stream aggregation options `dropInputLabels`, `ignoreFirstIntervals`, `ignoreOldSamples` from [upstream](https://docs.victoriametrics.com/stream-aggregation/), and support using configmap as the source of aggregation rules. ## [v0.46.4](https://github.com/VictoriaMetrics/operator/releases/tag/v0.46.4) - 9 Jul 2024 diff --git a/docs/api.md b/docs/api.md index 5813278d1..c0f363d51 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1818,14 +1818,19 @@ StreamAggrConfig defines the stream aggregation config _Appears in:_ - [VMAgentRemoteWriteSpec](#vmagentremotewritespec) +- [VMAgentSpec](#vmagentspec) - [VMSingleSpec](#vmsinglespec) | Field | Description | Scheme | Required | | --- | --- | --- | --- | +| `configmap` | ConfigMap with stream aggregation rules | _[ConfigMapKeySelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#configmapkeyselector-v1-core)_ | false | | `dedupInterval` | Allows setting different de-duplication intervals per each configured remote storage | _string_ | false | | `dropInput` | Allow drop all the input samples after the aggregation | _boolean_ | false | +| `dropInputLabels` | labels to drop from samples for aggregator before stream de-duplication and aggregation | _string array_ | false | +| `ignoreFirstIntervals` | | _integer_ | true | +| `ignoreOldSamples` | IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. | _boolean_ | false | | `keepInput` | Allows writing both raw and aggregate data | _boolean_ | false | -| `rules` | Stream aggregation rules | _[StreamAggrRule](#streamaggrrule) array_ | true | +| `rules` | Stream aggregation rules | _[StreamAggrRule](#streamaggrrule) array_ | false | #### StreamAggrRule @@ -1845,6 +1850,7 @@ _Appears in:_ | `dedup_interval` | DedupInterval is an optional interval for deduplication. | _string_ | false | | `drop_input_labels` | DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples.

Labels are dropped before de-duplication and aggregation. | _string_ | false | | `flush_on_shutdown` | FlushOnShutdown defines whether to flush the aggregation state on process termination
or config reload. Is `false` by default.
It is not recommended changing this setting, unless unfinished aggregations states
are preferred to missing data points. | _boolean_ | false | +| `ignore_first_intervals` | | _integer_ | true | | `ignore_old_samples` | IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. | _boolean_ | false | | `input_relabel_configs` | InputRelabelConfigs is an optional relabeling rules, which are applied on the input
before aggregation. | _[RelabelConfig](#relabelconfig) array_ | false | | `interval` | Interval is the interval between aggregations. | _string_ | true | @@ -2329,6 +2335,7 @@ _Appears in:_ | `staticScrapeNamespaceSelector` | StaticScrapeNamespaceSelector defines Namespaces to be selected for VMStaticScrape discovery.
Works in combination with NamespaceSelector.
NamespaceSelector nil - only objects at VMAgent namespace.
Selector nil - only objects at NamespaceSelector namespaces.
If both nil - behaviour controlled by selectAllByDefault | _[LabelSelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#labelselector-v1-meta)_ | false | | `staticScrapeRelabelTemplate` | StaticScrapeRelabelTemplate defines relabel config, that will be added to each VMStaticScrape.
it's useful for adding specific labels to all targets | _[RelabelConfig](#relabelconfig) array_ | false | | `staticScrapeSelector` | StaticScrapeSelector defines PodScrapes to be selected for target discovery.
Works in combination with NamespaceSelector.
If both nil - match everything.
NamespaceSelector nil - only objects at VMAgent namespace.
Selector nil - only objects at NamespaceSelector namespaces. | _[LabelSelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#labelselector-v1-meta)_ | false | +| `streamAggrConfig` | StreamAggrConfig defines global stream aggregation configuration for VMAgent | _[StreamAggrConfig](#streamaggrconfig)_ | false | | `terminationGracePeriodSeconds` | TerminationGracePeriodSeconds period for container graceful termination | _[int64](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#int64-v1-core)_ | false | | `tolerations` | Tolerations If specified, the pod's tolerations. | _[Toleration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#toleration-v1-core) array_ | false | | `topologySpreadConstraints` | TopologySpreadConstraints embedded kubernetes pod configuration option,
controls how pods are spread across your cluster among failure-domains
such as regions, zones, nodes, and other user-defined topology domains
https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/ | _[TopologySpreadConstraint](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#topologyspreadconstraint-v1-core) array_ | false | @@ -3736,11 +3743,11 @@ _Appears in:_ | --- | --- | --- | --- | | `cert_file` | CertFile defines path to the pre-mounted file with certificate
mutually exclusive with CertSecretRef | _string_ | true | | `cert_secret_ref` | Cert defines reference for secret with CA content under given key
mutually exclusive with CertFile | _[SecretKeySelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#secretkeyselector-v1-core)_ | true | -| `cipher_suites` | CipherSuites defines list of supported cipher suites for TLS versions up to TLS 1.2 | _string array_ | true | +| `cipher_suites` | CipherSuites defines list of supported cipher suites for TLS versions up to TLS 1.2
https://golang.org/pkg/crypto/tls/#pkg-constants | _string array_ | true | | `client_auth_type` | ClientAuthType defines server policy for client authentication
If you want to enable client authentication (aka mTLS), you need to use RequireAndVerifyClientCert
Note, mTLS is supported only at enterprise version of VictoriaMetrics components | _string_ | true | | `client_ca_file` | ClientCAFile defines path to the pre-mounted file with CA
mutually exclusive with ClientCASecretRef | _string_ | true | | `client_ca_secret_ref` | ClientCA defines reference for secret with CA content under given key
mutually exclusive with ClientCAFile | _[SecretKeySelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#secretkeyselector-v1-core)_ | true | -| `curve_preferences` | CurvePreferences defines elliptic curves that will be used in an ECDHE handshake, in preference order. | _string array_ | true | +| `curve_preferences` | CurvePreferences defines elliptic curves that will be used in an ECDHE handshake, in preference order.
https://golang.org/pkg/crypto/tls/#CurveID | _string array_ | true | | `key_file` | KeyFile defines path to the pre-mounted file with certificate key
mutually exclusive with KeySecretRef | _string_ | true | | `key_secret_ref` | Key defines reference for secret with certificate key content under given key
mutually exclusive with KeyFile | _[SecretKeySelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#secretkeyselector-v1-core)_ | true | | `max_version` | MaxVersion maximum TLS version that is acceptable. | _string_ | true | diff --git a/docs/vars.md b/docs/vars.md index 582713ca4..a8f18786c 100644 --- a/docs/vars.md +++ b/docs/vars.md @@ -11,7 +11,7 @@ aliases: - /operator/vars/index.html --- - updated at Mon Jul 22 07:22:30 UTC 2024 + updated at Thu Aug 1 20:20:07 UTC 2024 | variable name | variable default value | variable required | variable description | @@ -21,7 +21,7 @@ aliases: | VM_CUSTOMCONFIGRELOADERIMAGE | victoriametrics/operator:config-reloader-v0.43.0 | false | - | | VM_PSPAUTOCREATEENABLED | false | false | - | | VM_VMALERTDEFAULT_IMAGE | victoriametrics/vmalert | false | - | -| VM_VMALERTDEFAULT_VERSION | v1.101.0 | false | - | +| VM_VMALERTDEFAULT_VERSION | v1.102.0 | false | - | | VM_VMALERTDEFAULT_PORT | 8080 | false | - | | VM_VMALERTDEFAULT_USEDEFAULTRESOURCES | true | false | - | | VM_VMALERTDEFAULT_RESOURCE_LIMIT_MEM | 500Mi | false | - | @@ -32,7 +32,7 @@ aliases: | VM_VMALERTDEFAULT_CONFIGRELOADERMEMORY | 25Mi | false | - | | VM_VMALERTDEFAULT_CONFIGRELOADIMAGE | jimmidyson/configmap-reload:v0.3.0 | false | - | | VM_VMAGENTDEFAULT_IMAGE | victoriametrics/vmagent | false | - | -| VM_VMAGENTDEFAULT_VERSION | v1.101.0 | false | - | +| VM_VMAGENTDEFAULT_VERSION | v1.102.0 | false | - | | VM_VMAGENTDEFAULT_CONFIGRELOADIMAGE | quay.io/prometheus-operator/prometheus-config-reloader:v0.68.0 | false | - | | VM_VMAGENTDEFAULT_PORT | 8429 | false | - | | VM_VMAGENTDEFAULT_USEDEFAULTRESOURCES | true | false | - | @@ -43,7 +43,7 @@ aliases: | VM_VMAGENTDEFAULT_CONFIGRELOADERCPU | 100m | false | - | | VM_VMAGENTDEFAULT_CONFIGRELOADERMEMORY | 25Mi | false | - | | VM_VMSINGLEDEFAULT_IMAGE | victoriametrics/victoria-metrics | false | - | -| VM_VMSINGLEDEFAULT_VERSION | v1.101.0 | false | - | +| VM_VMSINGLEDEFAULT_VERSION | v1.102.0 | false | - | | VM_VMSINGLEDEFAULT_PORT | 8429 | false | - | | VM_VMSINGLEDEFAULT_USEDEFAULTRESOURCES | true | false | - | | VM_VMSINGLEDEFAULT_RESOURCE_LIMIT_MEM | 1500Mi | false | - | @@ -54,14 +54,14 @@ aliases: | VM_VMSINGLEDEFAULT_CONFIGRELOADERMEMORY | 25Mi | false | - | | VM_VMCLUSTERDEFAULT_USEDEFAULTRESOURCES | true | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_IMAGE | victoriametrics/vmselect | false | - | -| VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_VERSION | v1.101.0-cluster | false | - | +| VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_VERSION | v1.102.0-cluster | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_PORT | 8481 | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_RESOURCE_LIMIT_MEM | 1000Mi | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_RESOURCE_LIMIT_CPU | 500m | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_RESOURCE_REQUEST_MEM | 500Mi | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_RESOURCE_REQUEST_CPU | 100m | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_IMAGE | victoriametrics/vmstorage | false | - | -| VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_VERSION | v1.101.0-cluster | false | - | +| VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_VERSION | v1.102.0-cluster | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_VMINSERTPORT | 8400 | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_VMSELECTPORT | 8401 | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_PORT | 8482 | false | - | @@ -70,7 +70,7 @@ aliases: | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_RESOURCE_REQUEST_MEM | 500Mi | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_RESOURCE_REQUEST_CPU | 250m | false | - | | VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_IMAGE | victoriametrics/vminsert | false | - | -| VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_VERSION | v1.101.0-cluster | false | - | +| VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_VERSION | v1.102.0-cluster | false | - | | VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_PORT | 8480 | false | - | | VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_RESOURCE_LIMIT_MEM | 500Mi | false | - | | VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_RESOURCE_LIMIT_CPU | 500m | false | - | @@ -89,7 +89,7 @@ aliases: | VM_VMALERTMANAGER_RESOURCE_REQUEST_CPU | 30m | false | - | | VM_DISABLESELFSERVICESCRAPECREATION | false | false | - | | VM_VMBACKUP_IMAGE | victoriametrics/vmbackupmanager | false | - | -| VM_VMBACKUP_VERSION | v1.101.0-enterprise | false | - | +| VM_VMBACKUP_VERSION | v1.102.0-enterprise | false | - | | VM_VMBACKUP_PORT | 8300 | false | - | | VM_VMBACKUP_USEDEFAULTRESOURCES | true | false | - | | VM_VMBACKUP_RESOURCE_LIMIT_MEM | 500Mi | false | - | @@ -98,7 +98,7 @@ aliases: | VM_VMBACKUP_RESOURCE_REQUEST_CPU | 150m | false | - | | VM_VMBACKUP_LOGLEVEL | INFO | false | - | | VM_VMAUTHDEFAULT_IMAGE | victoriametrics/vmauth | false | - | -| VM_VMAUTHDEFAULT_VERSION | v1.101.0 | false | - | +| VM_VMAUTHDEFAULT_VERSION | v1.102.0 | false | - | | VM_VMAUTHDEFAULT_CONFIGRELOADIMAGE | quay.io/prometheus-operator/prometheus-config-reloader:v0.68.0 | false | - | | VM_VMAUTHDEFAULT_PORT | 8427 | false | - | | VM_VMAUTHDEFAULT_USEDEFAULTRESOURCES | true | false | - | @@ -130,4 +130,4 @@ aliases: | VM_PODWAITREADYINITDELAY | 10s | false | - | | VM_FORCERESYNCINTERVAL | 60s | false | configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth. | | VM_ENABLESTRICTSECURITY | false | false | EnableStrictSecurity will add default `securityContext` to pods and containers created by operatorDefault PodSecurityContext include:1. RunAsNonRoot: true2. RunAsUser/RunAsGroup/FSGroup: 65534'65534' refers to 'nobody' in all the used default images like alpine, busybox.If you're using customize image, please make sure '65534' is a valid uid in there or specify SecurityContext.3. FSGroupChangePolicy: &onRootMismatchIf KubeVersion>=1.20, use `FSGroupChangePolicy="onRootMismatch"` to skip the recursive permission changewhen the root of the volume already has the correct permissions4. SeccompProfile:type: RuntimeDefaultUse `RuntimeDefault` seccomp profile by default, which is defined by the container runtime,instead of using the Unconfined (seccomp disabled) mode.Default container SecurityContext include:1. AllowPrivilegeEscalation: false2. ReadOnlyRootFilesystem: true3. Capabilities:drop:- allturn off `EnableStrictSecurity` by default, see https://github.com/VictoriaMetrics/operator/issues/749 for details | -[envconfig-sum]: 678e5fdca7d401d391647c5d98827bf9 \ No newline at end of file +[envconfig-sum]: 63cf18daab2bfaab02f87b565d80e6a3 \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index c61a3e06d..e0eb9b3ad 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,7 +55,7 @@ type BaseOperatorConf struct { PSPAutoCreateEnabled bool `default:"false"` VMAlertDefault struct { Image string `default:"victoriametrics/vmalert"` - Version string `default:"v1.101.0"` + Version string `default:"v1.102.0"` Port string `default:"8080"` UseDefaultResources bool `default:"true"` Resource struct { @@ -74,7 +74,7 @@ type BaseOperatorConf struct { } VMAgentDefault struct { Image string `default:"victoriametrics/vmagent"` - Version string `default:"v1.101.0"` + Version string `default:"v1.102.0"` ConfigReloadImage string `default:"quay.io/prometheus-operator/prometheus-config-reloader:v0.68.0"` Port string `default:"8429"` UseDefaultResources bool `default:"true"` @@ -94,7 +94,7 @@ type BaseOperatorConf struct { VMSingleDefault struct { Image string `default:"victoriametrics/victoria-metrics"` - Version string `default:"v1.101.0"` + Version string `default:"v1.102.0"` Port string `default:"8429"` UseDefaultResources bool `default:"true"` Resource struct { @@ -115,7 +115,7 @@ type BaseOperatorConf struct { UseDefaultResources bool `default:"true"` VMSelectDefault struct { Image string `default:"victoriametrics/vmselect"` - Version string `default:"v1.101.0-cluster"` + Version string `default:"v1.102.0-cluster"` Port string `default:"8481"` Resource struct { Limit struct { @@ -130,7 +130,7 @@ type BaseOperatorConf struct { } VMStorageDefault struct { Image string `default:"victoriametrics/vmstorage"` - Version string `default:"v1.101.0-cluster"` + Version string `default:"v1.102.0-cluster"` VMInsertPort string `default:"8400"` VMSelectPort string `default:"8401"` Port string `default:"8482"` @@ -147,7 +147,7 @@ type BaseOperatorConf struct { } VMInsertDefault struct { Image string `default:"victoriametrics/vminsert"` - Version string `default:"v1.101.0-cluster"` + Version string `default:"v1.102.0-cluster"` Port string `default:"8480"` Resource struct { Limit struct { @@ -185,7 +185,7 @@ type BaseOperatorConf struct { DisableSelfServiceScrapeCreation bool `default:"false"` VMBackup struct { Image string `default:"victoriametrics/vmbackupmanager"` - Version string `default:"v1.101.0-enterprise"` + Version string `default:"v1.102.0-enterprise"` Port string `default:"8300"` UseDefaultResources bool `default:"true"` Resource struct { @@ -203,7 +203,7 @@ type BaseOperatorConf struct { } VMAuthDefault struct { Image string `default:"victoriametrics/vmauth"` - Version string `default:"v1.101.0"` + Version string `default:"v1.102.0"` ConfigReloadImage string `default:"quay.io/prometheus-operator/prometheus-config-reloader:v0.68.0"` Port string `default:"8427"` UseDefaultResources bool `default:"true"` diff --git a/internal/controller/operator/factory/k8stools/client_utils.go b/internal/controller/operator/factory/k8stools/client_utils.go index 429890d07..cdcedc39a 100644 --- a/internal/controller/operator/factory/k8stools/client_utils.go +++ b/internal/controller/operator/factory/k8stools/client_utils.go @@ -255,3 +255,11 @@ func (ow *ObjectWatcherForNamespaces) Stop() { } ow.wg.Wait() } + +// FetchConfigMapContentByKey returns configmap content by key +func FetchConfigMapContentByKey(ctx context.Context, rclient client.Client, cm *corev1.ConfigMap, key string) (string, error) { + if err := rclient.Get(ctx, types.NamespacedName{Namespace: cm.Namespace, Name: cm.Name}, cm); err != nil { + return "", err + } + return cm.Data[key], nil +} diff --git a/internal/controller/operator/factory/vmagent/vmagent.go b/internal/controller/operator/factory/vmagent/vmagent.go index 10d35e339..bd53d8531 100644 --- a/internal/controller/operator/factory/vmagent/vmagent.go +++ b/internal/controller/operator/factory/vmagent/vmagent.go @@ -38,10 +38,12 @@ const ( vmAgentPersistentQueueMountName = "persistent-queue-data" globalRelabelingName = "global_relabeling.yaml" urlRelabelingName = "url_relabeling-%d.yaml" - shardNumPlaceholder = "%SHARD_NUM%" - tlsAssetsDir = "/etc/vmagent-tls/certs" - vmagentGzippedFilename = "vmagent.yaml.gz" - configEnvsubstFilename = "vmagent.env.yaml" + globalAggregationConfigName = "global_aggregation.yaml" + + shardNumPlaceholder = "%SHARD_NUM%" + tlsAssetsDir = "/etc/vmagent-tls/certs" + vmagentGzippedFilename = "vmagent.yaml.gz" + configEnvsubstFilename = "vmagent.env.yaml" ) // To save compatibility in the single-shard version still need to fill in %SHARD_NUM% placeholder @@ -510,6 +512,27 @@ func makeSpecForVMAgent(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf, ssCac args = append(args, "-remoteWrite.relabelConfig="+path.Join(vmv1beta1.RelabelingConfigDir, globalRelabelingName)) } + if cr.Spec.StreamAggrConfig != nil { + if cr.Spec.StreamAggrConfig.HasStreamAggrConfig() { + args = append(args, "-streamAggr.config="+path.Join(vmv1beta1.StreamAggrConfigDir, globalAggregationConfigName)) + } + if cr.Spec.StreamAggrConfig.KeepInput { + args = append(args, "-streamAggr.keepInput=true") + } + if cr.Spec.StreamAggrConfig.DropInput { + args = append(args, "-streamAggr.dropInput=true") + } + if cr.Spec.StreamAggrConfig.DedupInterval != "" { + args = append(args, fmt.Sprintf("-streamAggr.dedupInterval=%s", cr.Spec.StreamAggrConfig.DedupInterval)) + } + if len(cr.Spec.StreamAggrConfig.DropInputLabels) > 0 { + args = append(args, fmt.Sprintf("-streamAggr.dropInputLabels=%s", strings.Join(cr.Spec.StreamAggrConfig.DropInputLabels, ","))) + } + if cr.Spec.StreamAggrConfig.IgnoreOldSamples { + args = append(args, "-streamAggr.ignoreOldSamples=true") + } + } + args = build.AppendArgsForInsertPorts(args, cr.Spec.InsertPorts) specRes := build.Resources(cr.Spec.Resources, config.Resource(c.VMAgentDefault.Resource), c.VMAgentDefault.UseDefaultResources) @@ -533,7 +556,7 @@ func makeSpecForVMAgent(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf, ssCac var operatorContainers []corev1.Container var ic []corev1.Container // conditional add config reloader container - if !cr.Spec.IngestOnlyMode || cr.HasAnyRelabellingConfigs() || cr.HasAnyStreamAggrConfigs() { + if !cr.Spec.IngestOnlyMode || cr.HasAnyRelabellingConfigs() || cr.Spec.StreamAggrConfig.HasStreamAggrConfig() { configReloader := buildConfigReloaderContainer(cr, c) operatorContainers = append(operatorContainers, configReloader) if !cr.Spec.IngestOnlyMode { @@ -654,7 +677,7 @@ func buildVMAgentRelabelingsAssets(ctx context.Context, cr *vmv1beta1.VMAgent, r } if cr.Spec.RelabelConfig != nil { // need to fetch content from - data, err := fetchConfigMapContentByKey(ctx, rclient, + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: cr.Spec.RelabelConfig.Name, Namespace: cr.Namespace}}, cr.Spec.RelabelConfig.Key) if err != nil { @@ -678,7 +701,7 @@ func buildVMAgentRelabelingsAssets(ctx context.Context, cr *vmv1beta1.VMAgent, r } } if rw.UrlRelabelConfig != nil { - data, err := fetchConfigMapContentByKey(ctx, rclient, + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: rw.UrlRelabelConfig.Name, Namespace: cr.Namespace}}, rw.UrlRelabelConfig.Key) if err != nil { @@ -713,7 +736,7 @@ func createOrUpdateRelabelConfigsAssets(ctx context.Context, cr *vmv1beta1.VMAge } // buildVMAgentStreamAggrConfig combines all possible stream aggregation configs and adding it to the configmap. -func buildVMAgentStreamAggrConfig(cr *vmv1beta1.VMAgent) (*corev1.ConfigMap, error) { +func buildVMAgentStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) (*corev1.ConfigMap, error) { cfgCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: cr.Namespace, @@ -724,17 +747,56 @@ func buildVMAgentStreamAggrConfig(cr *vmv1beta1.VMAgent) (*corev1.ConfigMap, err }, Data: make(map[string]string), } - for i, rw := range cr.Spec.RemoteWrite { - if !rw.HasStreamAggr() { - continue + // global section + if cr.Spec.StreamAggrConfig != nil { + if len(cr.Spec.StreamAggrConfig.Rules) > 0 { + data, err := yaml.Marshal(cr.Spec.StreamAggrConfig.Rules) + if err != nil { + return nil, fmt.Errorf("cannot serialize relabelConfig as yaml: %w", err) + } + if len(data) > 0 { + cfgCM.Data[globalAggregationConfigName] = string(data) + } } - data, err := yaml.Marshal(rw.StreamAggrConfig.Rules) - if err != nil { - return nil, fmt.Errorf("cannot serialize StreamAggrConfig rules as yaml for remoteWrite with url %s: %w", rw.URL, err) + if cr.Spec.StreamAggrConfig.RuleConfigMap != nil { + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: cr.Spec.StreamAggrConfig.RuleConfigMap.Name, Namespace: cr.Namespace}}, + cr.Spec.StreamAggrConfig.RuleConfigMap.Key) + if err != nil { + return nil, fmt.Errorf("cannot fetch configmap: %s, err: %w", cr.Spec.StreamAggrConfig.RuleConfigMap.Name, err) + } + if len(data) > 0 { + cfgCM.Data[globalAggregationConfigName] += data + } } - if len(data) > 0 { - cfgCM.Data[rw.AsConfigMapKey(i, "stream-aggr-conf")] = string(data) + } + + for i := range cr.Spec.RemoteWrite { + rw := cr.Spec.RemoteWrite[i] + if rw.StreamAggrConfig != nil { + if len(rw.StreamAggrConfig.Rules) > 0 { + data, err := yaml.Marshal(rw.StreamAggrConfig.Rules) + if err != nil { + return nil, fmt.Errorf("cannot serialize relabelConfig as yaml: %w", err) + } + if len(data) > 0 { + cfgCM.Data[rw.AsConfigMapKey(i, "stream-aggr-conf")] = string(data) + } + } + if rw.StreamAggrConfig.RuleConfigMap != nil { + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: rw.StreamAggrConfig.RuleConfigMap.Name, Namespace: cr.Namespace}}, + rw.StreamAggrConfig.RuleConfigMap.Key) + if err != nil { + return nil, fmt.Errorf("cannot fetch configmap: %s, err: %w", rw.StreamAggrConfig.RuleConfigMap.Name, err) + } + if len(data) > 0 { + cfgCM.Data[rw.AsConfigMapKey(i, "stream-aggr-conf")] += data + } + } + } + } return cfgCM, nil } @@ -745,7 +807,7 @@ func CreateOrUpdateVMAgentStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VM if !cr.HasAnyStreamAggrConfigs() { return nil } - streamAggrCM, err := buildVMAgentStreamAggrConfig(cr) + streamAggrCM, err := buildVMAgentStreamAggrConfig(ctx, cr, rclient) if err != nil { return err } @@ -760,13 +822,6 @@ func CreateOrUpdateVMAgentStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VM return rclient.Update(ctx, streamAggrCM) } -func fetchConfigMapContentByKey(ctx context.Context, rclient client.Client, cm *corev1.ConfigMap, key string) (string, error) { - if err := rclient.Get(ctx, types.NamespacedName{Namespace: cm.Namespace, Name: cm.Name}, cm); err != nil { - return "", err - } - return cm.Data[key], nil -} - func createOrUpdateTLSAssets(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client, assets map[string]string) error { tlsAssetsSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -1129,6 +1184,9 @@ func buildRemoteWrites(cr *vmv1beta1.VMAgent, ssCache *scrapesSecretsCache) []st streamAggrKeepInput := remoteFlag{flagSetting: "-remoteWrite.streamAggr.keepInput="} streamAggrDropInput := remoteFlag{flagSetting: "-remoteWrite.streamAggr.dropInput="} streamAggrDedupInterval := remoteFlag{flagSetting: "-remoteWrite.streamAggr.dedupInterval="} + streamAggrDropInputLabels := remoteFlag{flagSetting: "-remoteWrite.streamAggr.dropInputLabels="} + streamAggrIgnoreFirstIntervals := remoteFlag{flagSetting: "-remoteWrite.streamAggr.ignoreFirstIntervals="} + streamAggrIgnoreOldSamples := remoteFlag{flagSetting: "-remoteWrite.streamAggr.ignoreOldSamples="} pathPrefix := path.Join(tlsAssetsDir, cr.Namespace) @@ -1269,10 +1327,13 @@ func buildRemoteWrites(cr *vmv1beta1.VMAgent, ssCache *scrapesSecretsCache) []st oauth2Scopes.flagSetting += fmt.Sprintf("%s,", oascopes) var dedupIntVal, streamConfVal string - var keepInputVal, dropInputVal bool - if rws.HasStreamAggr() { - streamAggrConfig.isNotNull = true - streamConfVal = path.Join(vmv1beta1.StreamAggrConfigDir, rws.AsConfigMapKey(i, "stream-aggr-conf")) + var keepInputVal, dropInputVal, ignoreOldSamples bool + var ignoreFirstIntervalsVal int + if rws.StreamAggrConfig != nil { + if rws.StreamAggrConfig.HasStreamAggrConfig() { + streamAggrConfig.isNotNull = true + streamConfVal = path.Join(vmv1beta1.StreamAggrConfigDir, rws.AsConfigMapKey(i, "stream-aggr-conf")) + } dedupIntVal = rws.StreamAggrConfig.DedupInterval if dedupIntVal != "" { @@ -1287,17 +1348,31 @@ func buildRemoteWrites(cr *vmv1beta1.VMAgent, ssCache *scrapesSecretsCache) []st if dropInputVal { streamAggrDropInput.isNotNull = true } + if len(rws.StreamAggrConfig.DropInputLabels) > 0 { + streamAggrDropInputLabels.isNotNull = true + streamAggrDropInputLabels.flagSetting += fmt.Sprintf("%s,", strings.Join(rws.StreamAggrConfig.DropInputLabels, ",")) + } + ignoreFirstIntervalsVal = rws.StreamAggrConfig.IgnoreFirstIntervals + if ignoreFirstIntervalsVal > 0 { + streamAggrIgnoreFirstIntervals.isNotNull = true + } + ignoreOldSamples = rws.StreamAggrConfig.IgnoreOldSamples + if ignoreOldSamples { + streamAggrIgnoreOldSamples.isNotNull = true + } } streamAggrConfig.flagSetting += fmt.Sprintf("%s,", streamConfVal) streamAggrKeepInput.flagSetting += fmt.Sprintf("%v,", keepInputVal) streamAggrDropInput.flagSetting += fmt.Sprintf("%v,", dropInputVal) streamAggrDedupInterval.flagSetting += fmt.Sprintf("%s,", dedupIntVal) + streamAggrIgnoreFirstIntervals.flagSetting += fmt.Sprintf("%d,", ignoreFirstIntervalsVal) + streamAggrIgnoreOldSamples.flagSetting += fmt.Sprintf("%v,", ignoreOldSamples) } remoteArgs = append(remoteArgs, url, authUser, bearerTokenFile, urlRelabelConfig, tlsInsecure, sendTimeout) remoteArgs = append(remoteArgs, tlsServerName, tlsKeys, tlsCerts, tlsCAs) remoteArgs = append(remoteArgs, oauth2ClientID, oauth2ClientSecretFile, oauth2Scopes, oauth2TokenURL) remoteArgs = append(remoteArgs, headers, authPasswordFile) - remoteArgs = append(remoteArgs, streamAggrConfig, streamAggrKeepInput, streamAggrDedupInterval, streamAggrDropInput) + remoteArgs = append(remoteArgs, streamAggrConfig, streamAggrKeepInput, streamAggrDedupInterval, streamAggrDropInput, streamAggrDropInputLabels, streamAggrIgnoreFirstIntervals, streamAggrIgnoreOldSamples) for _, remoteArgType := range remoteArgs { if remoteArgType.isNotNull { @@ -1332,7 +1407,7 @@ func buildConfigReloaderContainer(cr *vmv1beta1.VMAgent, c *config.BaseOperatorC MountPath: vmv1beta1.RelabelingConfigDir, }) } - if cr.HasAnyStreamAggrConfigs() { + if cr.Spec.StreamAggrConfig.HasStreamAggrConfig() { configReloadVolumeMounts = append(configReloadVolumeMounts, corev1.VolumeMount{ Name: "stream-aggr-conf", diff --git a/internal/controller/operator/factory/vmagent/vmagent_test.go b/internal/controller/operator/factory/vmagent/vmagent_test.go index 9ac84270f..18d0f57f3 100644 --- a/internal/controller/operator/factory/vmagent/vmagent_test.go +++ b/internal/controller/operator/factory/vmagent/vmagent_test.go @@ -1362,11 +1362,20 @@ func TestCreateOrUpdateStreamAggrConfig(t *testing.T) { predefinedObjects: []runtime.Object{}, }, { - name: "simple stream aggr config", + name: "simple global and remoteWrite stream aggr config", args: args{ ctx: context.TODO(), cr: &vmv1beta1.VMAgent{ Spec: vmv1beta1.VMAgentSpec{ + StreamAggrConfig: &vmv1beta1.StreamAggrConfig{ + Rules: []vmv1beta1.StreamAggrRule{{ + Match: []string{`test`}, + Interval: "30s", + Outputs: []string{"total"}, + By: []string{"job", "instance"}, + Without: []string{"pod"}, + }}, + }, RemoteWrite: []vmv1beta1.VMAgentRemoteWriteSpec{ { URL: "localhost:8429", @@ -1391,11 +1400,26 @@ func TestCreateOrUpdateStreamAggrConfig(t *testing.T) { }, }, validate: func(cm *corev1.ConfigMap) error { - data, ok := cm.Data["RWS_0-CM-STREAM-AGGR-CONF"] + globalData, ok := cm.Data["global_aggregation.yaml"] + if !ok { + return fmt.Errorf("key: %s, not exists at map: %v", "global_aggregation.yaml", cm.BinaryData) + } + wantGlobal := `- match: test + interval: 30s + outputs: + - total + by: + - job + - instance + without: + - pod +` + assert.Equal(t, wantGlobal, globalData) + remoteData, ok := cm.Data["RWS_0-CM-STREAM-AGGR-CONF"] if !ok { return fmt.Errorf("key: %s, not exists at map: %v", "RWS_0-CM-STREAM-AGGR-CONFl", cm.BinaryData) } - wantGlobal := `- match: + wantRemote := `- match: - '{__name__="count1"}' - '{__name__="count2"}' interval: 1m @@ -1411,7 +1435,7 @@ func TestCreateOrUpdateStreamAggrConfig(t *testing.T) { output_relabel_configs: - regex: (.+):.+ ` - assert.Equal(t, wantGlobal, data) + assert.Equal(t, wantRemote, remoteData) return nil }, predefinedObjects: []runtime.Object{}, diff --git a/internal/controller/operator/factory/vmsingle/vmsingle.go b/internal/controller/operator/factory/vmsingle/vmsingle.go index 18cfeb8bd..037362bb1 100644 --- a/internal/controller/operator/factory/vmsingle/vmsingle.go +++ b/internal/controller/operator/factory/vmsingle/vmsingle.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "sort" + "strings" vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" "github.com/VictoriaMetrics/operator/internal/config" @@ -282,10 +283,25 @@ func makeSpecForVMSingle(ctx context.Context, cr *vmv1beta1.VMSingle, c *config. if cr.Spec.StreamAggrConfig.KeepInput { args = append(args, "--streamAggr.keepInput=true") } - if cr.Spec.StreamAggrConfig.DedupInterval != "" { - args = append(args, fmt.Sprintf("--streamAggr.dedupInterval=%s", cr.Spec.StreamAggrConfig.DedupInterval)) + if cr.Spec.StreamAggrConfig.DropInput { + args = append(args, "--streamAggr.dropInput=true") + } + if len(cr.Spec.StreamAggrConfig.DropInputLabels) > 0 { + args = append(args, fmt.Sprintf("--streamAggr.dropInputLabels=%s", strings.Join(cr.Spec.StreamAggrConfig.DropInputLabels, ","))) + } + if cr.Spec.StreamAggrConfig.IgnoreFirstIntervals > 0 { + args = append(args, fmt.Sprintf("--streamAggr.ignoreFirstIntervals=%d", cr.Spec.StreamAggrConfig.IgnoreFirstIntervals)) + } + if cr.Spec.StreamAggrConfig.IgnoreOldSamples { + args = append(args, "--streamAggr.ignoreOldSamples=true") } } + + // deduplication can work without stream aggregation rules + if cr.Spec.StreamAggrConfig != nil && cr.Spec.StreamAggrConfig.DedupInterval != "" { + args = append(args, fmt.Sprintf("--streamAggr.dedupInterval=%s", cr.Spec.StreamAggrConfig.DedupInterval)) + } + volumes, vmMounts = cr.Spec.License.MaybeAddToVolumes(volumes, vmMounts, vmv1beta1.SecretsDir) args = cr.Spec.License.MaybeAddToArgs(args, vmv1beta1.SecretsDir) @@ -414,7 +430,7 @@ func CreateOrUpdateVMSingleService(ctx context.Context, cr *vmv1beta1.VMSingle, } // buildVMSingleStreamAggrConfig build configmap with stream aggregation config for vmsingle. -func buildVMSingleStreamAggrConfig(cr *vmv1beta1.VMSingle) (*corev1.ConfigMap, error) { +func buildVMSingleStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMSingle, rclient client.Client) (*corev1.ConfigMap, error) { cfgCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: cr.Namespace, @@ -425,14 +441,26 @@ func buildVMSingleStreamAggrConfig(cr *vmv1beta1.VMSingle) (*corev1.ConfigMap, e }, Data: make(map[string]string), } - data, err := yaml.Marshal(cr.Spec.StreamAggrConfig.Rules) - if err != nil { - return nil, fmt.Errorf("cannot serialize StreamAggrConfig rules as yaml: %w", err) + if len(cr.Spec.StreamAggrConfig.Rules) > 0 { + data, err := yaml.Marshal(cr.Spec.StreamAggrConfig.Rules) + if err != nil { + return nil, fmt.Errorf("cannot serialize relabelConfig as yaml: %w", err) + } + if len(data) > 0 { + cfgCM.Data["config.yaml"] = string(data) + } } - if len(data) > 0 { - cfgCM.Data["config.yaml"] = string(data) + if cr.Spec.StreamAggrConfig.RuleConfigMap != nil { + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: cr.Spec.StreamAggrConfig.RuleConfigMap.Name, Namespace: cr.Namespace}}, + cr.Spec.StreamAggrConfig.RuleConfigMap.Key) + if err != nil { + return nil, fmt.Errorf("cannot fetch configmap: %s, err: %w", cr.Spec.StreamAggrConfig.RuleConfigMap.Name, err) + } + if len(data) > 0 { + cfgCM.Data["config.yaml"] += data + } } - return cfgCM, nil } @@ -441,7 +469,7 @@ func CreateOrUpdateVMSingleStreamAggrConfig(ctx context.Context, cr *vmv1beta1.V if !cr.HasStreamAggrConfig() { return nil } - streamAggrCM, err := buildVMSingleStreamAggrConfig(cr) + streamAggrCM, err := buildVMSingleStreamAggrConfig(ctx, cr, rclient) if err != nil { return err }