diff --git a/api/operator/v1beta1/vmagent_types.go b/api/operator/v1beta1/vmagent_types.go index cb8b20c7..e07bdb7b 100644 --- a/api/operator/v1beta1/vmagent_types.go +++ b/api/operator/v1beta1/vmagent_types.go @@ -207,6 +207,9 @@ type VMAgentSpec struct { // InlineRelabelConfig - defines GlobalRelabelConfig for vmagent, can be defined directly at CRD. // +optional InlineRelabelConfig []RelabelConfig `json:"inlineRelabelConfig,omitempty"` + // StreamAggrConfig defines global stream aggregation configuration for VMAgent + // +optional + StreamAggrConfig *StreamAggrConfig `json:"streamAggrConfig,omitempty"` // SelectAllByDefault changes default behavior for empty CRD selectors, such ServiceScrapeSelector. // with selectAllByDefault: true and empty serviceScrapeSelector and ServiceScrapeNamespaceSelector // Operator selects all exist serviceScrapes @@ -539,11 +542,6 @@ func (rw *VMAgentRemoteWriteSpec) AsConfigMapKey(idx int, suffix string) string return fmt.Sprintf("RWS_%d-CM-%s", idx, strings.ToUpper(suffix)) } -// HasStreamAggr returns true if stream aggregation is enabled for this remoteWrite -func (rw *VMAgentRemoteWriteSpec) HasStreamAggr() bool { - return rw.StreamAggrConfig != nil && len(rw.StreamAggrConfig.Rules) > 0 -} - // VMAgentStatus defines the observed state of VMAgent // +k8s:openapi-gen=true type VMAgentStatus struct { @@ -805,13 +803,17 @@ func (cr *VMAgent) HasAnyRelabellingConfigs() bool { return false } -// HasAnyStreamAggrConfigs checks if agent has any streaming aggregation config defined -func (cr *VMAgent) HasAnyStreamAggrConfigs() bool { +// HasAnyStreamAggrRule checks if vmagent has any defined aggregation rules +func (cr *VMAgent) HasAnyStreamAggrRule() bool { + if cr.Spec.StreamAggrConfig.HasAnyRule() { + return true + } for _, rw := range cr.Spec.RemoteWrite { - if rw.HasStreamAggr() { + if rw.StreamAggrConfig.HasAnyRule() { return true } } + return false } diff --git a/api/operator/v1beta1/vmextra_types.go b/api/operator/v1beta1/vmextra_types.go index 793a5fa0..87878c3c 100644 --- a/api/operator/v1beta1/vmextra_types.go +++ b/api/operator/v1beta1/vmextra_types.go @@ -474,7 +474,11 @@ type ConfigMapKeyReference struct { // +k8s:openapi-gen=true type StreamAggrConfig struct { // Stream aggregation rules + // +optional Rules []StreamAggrRule `json:"rules"` + // ConfigMap with stream aggregation rules + // +optional + RuleConfigMap *v1.ConfigMapKeySelector `json:"configmap,omitempty"` // Allows writing both raw and aggregate data // +optional KeepInput bool `json:"keepInput,omitempty"` @@ -484,6 +488,13 @@ type StreamAggrConfig struct { // Allows setting different de-duplication intervals per each configured remote storage // +optional DedupInterval string `json:"dedupInterval,omitempty"` + // labels to drop from samples for aggregator before stream de-duplication and aggregation + // +optional + DropInputLabels []string `json:"dropInputLabels,omitempty"` + IgnoreFirstIntervals int `json:"ignoreFirstIntervals,omitempty"` + // IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. + // +optional + IgnoreOldSamples bool `json:"ignoreOldSamples,omitempty"` } // StreamAggrRule defines the rule in stream aggregation config @@ -570,6 +581,8 @@ type StreamAggrRule struct { // +optional Without []string `json:"without,omitempty" yaml:"without,omitempty"` + IgnoreFirstIntervals *int `json:"ignore_first_intervals,omitempty" yaml:"ignore_first_intervals,omitempty"` + // DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples. // // Labels are dropped before de-duplication and aggregation. @@ -587,6 +600,14 @@ type StreamAggrRule struct { OutputRelabelConfigs []RelabelConfig `json:"output_relabel_configs,omitempty" yaml:"output_relabel_configs,omitempty"` } +// HasAnyRule returns true if there is at least one aggregation rule +func (config *StreamAggrConfig) HasAnyRule() bool { + if config != nil && (len(config.Rules) > 0 || config.RuleConfigMap != nil) { + return true + } + return false +} + // KeyValue defines a (key, value) tuple. // +kubebuilder:object:generate=false // +k8s:openapi-gen=false diff --git a/api/operator/v1beta1/vmsingle_types.go b/api/operator/v1beta1/vmsingle_types.go index b370ea4f..a1071cff 100644 --- a/api/operator/v1beta1/vmsingle_types.go +++ b/api/operator/v1beta1/vmsingle_types.go @@ -212,9 +212,9 @@ type VMSingleSpec struct { Paused bool `json:"paused,omitempty"` } -// HasStreamAggrConfig checks if streamAggrConfig present -func (cr *VMSingle) HasStreamAggrConfig() bool { - return cr.Spec.StreamAggrConfig != nil && len(cr.Spec.StreamAggrConfig.Rules) > 0 +// HasAnyStreamAggrRule checks if vmsingle has any defined aggregation rules +func (cr *VMSingle) HasAnyStreamAggrRule() bool { + return cr.Spec.StreamAggrConfig.HasAnyRule() } // UnmarshalJSON implements json.Unmarshaler interface diff --git a/api/operator/v1beta1/zz_generated.deepcopy.go b/api/operator/v1beta1/zz_generated.deepcopy.go index b6a62d84..01004e88 100644 --- a/api/operator/v1beta1/zz_generated.deepcopy.go +++ b/api/operator/v1beta1/zz_generated.deepcopy.go @@ -2441,6 +2441,16 @@ func (in *StreamAggrConfig) DeepCopyInto(out *StreamAggrConfig) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.RuleConfigMap != nil { + in, out := &in.RuleConfigMap, &out.RuleConfigMap + *out = new(v1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } + if in.DropInputLabels != nil { + in, out := &in.DropInputLabels, &out.DropInputLabels + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamAggrConfig. @@ -2491,6 +2501,11 @@ func (in *StreamAggrRule) DeepCopyInto(out *StreamAggrRule) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.IgnoreFirstIntervals != nil { + in, out := &in.IgnoreFirstIntervals, &out.IgnoreFirstIntervals + *out = new(int) + **out = **in + } if in.DropInputLabels != nil { in, out := &in.DropInputLabels, &out.DropInputLabels *out = new([]string) @@ -3363,6 +3378,11 @@ func (in *VMAgentSpec) DeepCopyInto(out *VMAgentSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.StreamAggrConfig != nil { + in, out := &in.StreamAggrConfig, &out.StreamAggrConfig + *out = new(StreamAggrConfig) + (*in).DeepCopyInto(*out) + } if in.ServiceScrapeSelector != nil { in, out := &in.ServiceScrapeSelector, &out.ServiceScrapeSelector *out = new(metav1.LabelSelector) diff --git a/config/crd/overlay/crd.yaml b/config/crd/overlay/crd.yaml index 8b2b3e87..01728d17 100644 --- a/config/crd/overlay/crd.yaml +++ b/config/crd/overlay/crd.yaml @@ -2178,6 +2178,31 @@ spec: description: StreamAggrConfig defines stream aggregation configuration for VMAgent for -remoteWrite.url properties: + configmap: + description: ConfigMap with stream aggregation rules + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the ConfigMap or its key + must be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic dedupInterval: description: Allows setting different de-duplication intervals per each configured remote storage @@ -2186,6 +2211,18 @@ spec: description: Allow drop all the input samples after the aggregation type: boolean + dropInputLabels: + description: labels to drop from samples for aggregator + before stream de-duplication and aggregation + items: + type: string + type: array + ignoreFirstIntervals: + type: integer + ignoreOldSamples: + description: IgnoreOldSamples instructs to ignore samples + with old timestamps outside the current aggregation interval. + type: boolean keepInput: description: Allows writing both raw and aggregate data type: boolean @@ -2228,6 +2265,8 @@ spec: It is not recommended changing this setting, unless unfinished aggregations states are preferred to missing data points. type: boolean + ignore_first_intervals: + type: integer ignore_old_samples: description: IgnoreOldSamples instructs to ignore samples with old timestamps outside the current @@ -2462,8 +2501,6 @@ spec: - outputs type: object type: array - required: - - rules type: object tlsConfig: description: TLSConfig describes tls configuration for remote @@ -3939,6 +3976,330 @@ spec: type: object type: object x-kubernetes-map-type: atomic + streamAggrConfig: + description: StreamAggrConfig defines global stream aggregation configuration + for VMAgent + properties: + configmap: + description: ConfigMap with stream aggregation rules + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the ConfigMap or its key must + be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + dedupInterval: + description: Allows setting different de-duplication intervals + per each configured remote storage + type: string + dropInput: + description: Allow drop all the input samples after the aggregation + type: boolean + dropInputLabels: + description: labels to drop from samples for aggregator before + stream de-duplication and aggregation + items: + type: string + type: array + ignoreFirstIntervals: + type: integer + ignoreOldSamples: + description: IgnoreOldSamples instructs to ignore samples with + old timestamps outside the current aggregation interval. + type: boolean + keepInput: + description: Allows writing both raw and aggregate data + type: boolean + rules: + description: Stream aggregation rules + items: + description: StreamAggrRule defines the rule in stream aggregation + config + properties: + by: + description: |- + By is an optional list of labels for grouping input series. + + + See also Without. + + + If neither By nor Without are set, then the Outputs are calculated + individually per each input time series. + items: + type: string + type: array + dedup_interval: + description: DedupInterval is an optional interval for deduplication. + type: string + drop_input_labels: + description: |- + DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples. + + + Labels are dropped before de-duplication and aggregation. + items: + type: string + type: array + flush_on_shutdown: + description: |- + FlushOnShutdown defines whether to flush the aggregation state on process termination + or config reload. Is `false` by default. + It is not recommended changing this setting, unless unfinished aggregations states + are preferred to missing data points. + type: boolean + ignore_first_intervals: + type: integer + ignore_old_samples: + description: IgnoreOldSamples instructs to ignore samples + with old timestamps outside the current aggregation interval. + type: boolean + input_relabel_configs: + description: |- + InputRelabelConfigs is an optional relabeling rules, which are applied on the input + before aggregation. + items: + description: |- + RelabelConfig allows dynamic rewriting of the label set + More info: https://docs.victoriametrics.com/#relabeling + properties: + action: + description: Action to perform based on regex matching. + Default is 'replace' + type: string + if: + description: 'If represents metricsQL match expression + (or list of expressions): ''{__name__=~"foo_.*"}''' + x-kubernetes-preserve-unknown-fields: true + labels: + additionalProperties: + type: string + description: 'Labels is used together with Match for + `action: graphite`' + type: object + match: + description: 'Match is used together with Labels for + `action: graphite`' + type: string + modulus: + description: Modulus to take of the hash of the source + label values. + format: int64 + type: integer + regex: + description: |- + Regular expression against which the extracted value is matched. Default is '(.*)' + victoriaMetrics supports multiline regex joined with | + https://docs.victoriametrics.com/vmagent/#relabeling-enhancements + x-kubernetes-preserve-unknown-fields: true + replacement: + description: |- + Replacement value against which a regex replace is performed if the + regular expression matches. Regex capture groups are available. Default is '$1' + type: string + separator: + description: Separator placed between concatenated + source label values. default is ';'. + type: string + source_labels: + description: |- + UnderScoreSourceLabels - additional form of source labels source_labels + for compatibility with original relabel config. + if set both sourceLabels and source_labels, sourceLabels has priority. + for details https://github.com/VictoriaMetrics/operator/issues/131 + items: + type: string + type: array + sourceLabels: + description: |- + The source labels select values from existing labels. Their content is concatenated + using the configured separator and matched against the configured regular expression + for the replace, keep, and drop actions. + items: + type: string + type: array + target_label: + description: |- + UnderScoreTargetLabel - additional form of target label - target_label + for compatibility with original relabel config. + if set both targetLabel and target_label, targetLabel has priority. + for details https://github.com/VictoriaMetrics/operator/issues/131 + type: string + targetLabel: + description: |- + Label to which the resulting value is written in a replace action. + It is mandatory for replace actions. Regex capture groups are available. + type: string + type: object + type: array + interval: + description: Interval is the interval between aggregations. + type: string + keep_metric_names: + description: KeepMetricNames instructs to leave metric names + as is for the output time series without adding any suffix. + type: boolean + match: + description: |- + Match is a label selector (or list of label selectors) for filtering time series for the given selector. + + + If the match isn't set, then all the input time series are processed. + x-kubernetes-preserve-unknown-fields: true + no_align_flush_to_interval: + description: |- + NoAlignFlushToInterval disables aligning of flushes to multiples of Interval. + By default flushes are aligned to Interval. + type: boolean + output_relabel_configs: + description: |- + OutputRelabelConfigs is an optional relabeling rules, which are applied + on the aggregated output before being sent to remote storage. + items: + description: |- + RelabelConfig allows dynamic rewriting of the label set + More info: https://docs.victoriametrics.com/#relabeling + properties: + action: + description: Action to perform based on regex matching. + Default is 'replace' + type: string + if: + description: 'If represents metricsQL match expression + (or list of expressions): ''{__name__=~"foo_.*"}''' + x-kubernetes-preserve-unknown-fields: true + labels: + additionalProperties: + type: string + description: 'Labels is used together with Match for + `action: graphite`' + type: object + match: + description: 'Match is used together with Labels for + `action: graphite`' + type: string + modulus: + description: Modulus to take of the hash of the source + label values. + format: int64 + type: integer + regex: + description: |- + Regular expression against which the extracted value is matched. Default is '(.*)' + victoriaMetrics supports multiline regex joined with | + https://docs.victoriametrics.com/vmagent/#relabeling-enhancements + x-kubernetes-preserve-unknown-fields: true + replacement: + description: |- + Replacement value against which a regex replace is performed if the + regular expression matches. Regex capture groups are available. Default is '$1' + type: string + separator: + description: Separator placed between concatenated + source label values. default is ';'. + type: string + source_labels: + description: |- + UnderScoreSourceLabels - additional form of source labels source_labels + for compatibility with original relabel config. + if set both sourceLabels and source_labels, sourceLabels has priority. + for details https://github.com/VictoriaMetrics/operator/issues/131 + items: + type: string + type: array + sourceLabels: + description: |- + The source labels select values from existing labels. Their content is concatenated + using the configured separator and matched against the configured regular expression + for the replace, keep, and drop actions. + items: + type: string + type: array + target_label: + description: |- + UnderScoreTargetLabel - additional form of target label - target_label + for compatibility with original relabel config. + if set both targetLabel and target_label, targetLabel has priority. + for details https://github.com/VictoriaMetrics/operator/issues/131 + type: string + targetLabel: + description: |- + Label to which the resulting value is written in a replace action. + It is mandatory for replace actions. Regex capture groups are available. + type: string + type: object + type: array + outputs: + description: |- + Outputs is a list of output aggregate functions to produce. + + + The following names are allowed: + + + - total - aggregates input counters + - increase - counts the increase over input counters + - count_series - counts the input series + - count_samples - counts the input samples + - sum_samples - sums the input samples + - last - the last biggest sample value + - min - the minimum sample value + - max - the maximum sample value + - avg - the average value across all the samples + - stddev - standard deviation across all the samples + - stdvar - standard variance across all the samples + - histogram_bucket - creates VictoriaMetrics histogram for input samples + - quantiles(phi1, ..., phiN) - quantiles' estimation for phi in the range [0..1] + + + The output time series will have the following names: + + + input_name:aggr__ + items: + type: string + type: array + staleness_interval: + description: |- + Staleness interval is interval after which the series state will be reset if no samples have been sent during it. + The parameter is only relevant for outputs: total, total_prometheus, increase, increase_prometheus and histogram_bucket. + type: string + without: + description: |- + Without is an optional list of labels, which must be excluded when grouping input series. + + + See also By. + + + If neither By nor Without are set, then the Outputs are calculated + individually per each input time series. + items: + type: string + type: array + required: + - interval + - outputs + type: object + type: array + type: object terminationGracePeriodSeconds: description: TerminationGracePeriodSeconds period for container graceful termination @@ -26955,6 +27316,31 @@ spec: description: StreamAggrConfig defines stream aggregation configuration for VMSingle properties: + configmap: + description: ConfigMap with stream aggregation rules + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + TODO: Add other useful fields. apiVersion, kind, uid? + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + TODO: Drop `kubebuilder:default` when controller-gen doesn't need it https://github.com/kubernetes-sigs/kubebuilder/issues/3896. + type: string + optional: + description: Specify whether the ConfigMap or its key must + be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic dedupInterval: description: Allows setting different de-duplication intervals per each configured remote storage @@ -26962,6 +27348,18 @@ spec: dropInput: description: Allow drop all the input samples after the aggregation type: boolean + dropInputLabels: + description: labels to drop from samples for aggregator before + stream de-duplication and aggregation + items: + type: string + type: array + ignoreFirstIntervals: + type: integer + ignoreOldSamples: + description: IgnoreOldSamples instructs to ignore samples with + old timestamps outside the current aggregation interval. + type: boolean keepInput: description: Allows writing both raw and aggregate data type: boolean @@ -27003,6 +27401,8 @@ spec: It is not recommended changing this setting, unless unfinished aggregations states are preferred to missing data points. type: boolean + ignore_first_intervals: + type: integer ignore_old_samples: description: IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. @@ -27235,8 +27635,6 @@ spec: - outputs type: object type: array - required: - - rules type: object terminationGracePeriodSeconds: description: TerminationGracePeriodSeconds period for container graceful diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 7aaaa405..b77f8c1f 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -24,6 +24,7 @@ aliases: - [vmalertmanagerconfig](./api.md#vmalertmanagerconfig): Improves config validation. Now it properly tracks required fields and provides better feedback for misconfiguration. Adds new `status` fields - `status` and `lastSyncError`. Related [issue](https://github.com/VictoriaMetrics/operator/issues/825). - [vmalertmanager](./api.md#vmalertmanager): adds `webConfig` that simplifies tls configuration for alertmanager and allows to properly build probes and access urls for alertmanager. See this [issue](https://github.com/VictoriaMetrics/operator/issues/994) for details. - [vmalertmanager](./api.md#vmalertmanager): adds `gossipConfig` to setup client and server TLS configuration for alertmanager. +- [vmagent/vmsingle](./api.md#vmagent): sync stream aggregation options `dropInputLabels`, `ignoreFirstIntervals`, `ignoreOldSamples` from [upstream](https://docs.victoriametrics.com/stream-aggregation/), and support using configMap as the source of aggregation rules. ## [v0.46.4](https://github.com/VictoriaMetrics/operator/releases/tag/v0.46.4) - 9 Jul 2024 diff --git a/docs/api.md b/docs/api.md index 15b7b4b0..73a1bc9f 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1814,14 +1814,19 @@ StreamAggrConfig defines the stream aggregation config _Appears in:_ - [VMAgentRemoteWriteSpec](#vmagentremotewritespec) +- [VMAgentSpec](#vmagentspec) - [VMSingleSpec](#vmsinglespec) | Field | Description | Scheme | Required | | --- | --- | --- | --- | +| `configmap` | ConfigMap with stream aggregation rules | _[ConfigMapKeySelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#configmapkeyselector-v1-core)_ | false | | `dedupInterval` | Allows setting different de-duplication intervals per each configured remote storage | _string_ | false | | `dropInput` | Allow drop all the input samples after the aggregation | _boolean_ | false | +| `dropInputLabels` | labels to drop from samples for aggregator before stream de-duplication and aggregation | _string array_ | false | +| `ignoreFirstIntervals` | | _integer_ | true | +| `ignoreOldSamples` | IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. | _boolean_ | false | | `keepInput` | Allows writing both raw and aggregate data | _boolean_ | false | -| `rules` | Stream aggregation rules | _[StreamAggrRule](#streamaggrrule) array_ | true | +| `rules` | Stream aggregation rules | _[StreamAggrRule](#streamaggrrule) array_ | false | #### StreamAggrRule @@ -1841,6 +1846,7 @@ _Appears in:_ | `dedup_interval` | DedupInterval is an optional interval for deduplication. | _string_ | false | | `drop_input_labels` | DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples.

Labels are dropped before de-duplication and aggregation. | _string_ | false | | `flush_on_shutdown` | FlushOnShutdown defines whether to flush the aggregation state on process termination
or config reload. Is `false` by default.
It is not recommended changing this setting, unless unfinished aggregations states
are preferred to missing data points. | _boolean_ | false | +| `ignore_first_intervals` | | _integer_ | true | | `ignore_old_samples` | IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval. | _boolean_ | false | | `input_relabel_configs` | InputRelabelConfigs is an optional relabeling rules, which are applied on the input
before aggregation. | _[RelabelConfig](#relabelconfig) array_ | false | | `interval` | Interval is the interval between aggregations. | _string_ | true | @@ -2394,6 +2400,7 @@ _Appears in:_ | `staticScrapeNamespaceSelector` | StaticScrapeNamespaceSelector defines Namespaces to be selected for VMStaticScrape discovery.
Works in combination with NamespaceSelector.
NamespaceSelector nil - only objects at VMAgent namespace.
Selector nil - only objects at NamespaceSelector namespaces.
If both nil - behaviour controlled by selectAllByDefault | _[LabelSelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#labelselector-v1-meta)_ | false | | `staticScrapeRelabelTemplate` | StaticScrapeRelabelTemplate defines relabel config, that will be added to each VMStaticScrape.
it's useful for adding specific labels to all targets | _[RelabelConfig](#relabelconfig) array_ | false | | `staticScrapeSelector` | StaticScrapeSelector defines PodScrapes to be selected for target discovery.
Works in combination with NamespaceSelector.
If both nil - match everything.
NamespaceSelector nil - only objects at VMAgent namespace.
Selector nil - only objects at NamespaceSelector namespaces. | _[LabelSelector](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#labelselector-v1-meta)_ | false | +| `streamAggrConfig` | StreamAggrConfig defines global stream aggregation configuration for VMAgent | _[StreamAggrConfig](#streamaggrconfig)_ | false | | `terminationGracePeriodSeconds` | TerminationGracePeriodSeconds period for container graceful termination | _[int64](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#int64-v1-core)_ | false | | `tolerations` | Tolerations If specified, the pod's tolerations. | _[Toleration](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#toleration-v1-core) array_ | false | | `topologySpreadConstraints` | TopologySpreadConstraints embedded kubernetes pod configuration option,
controls how pods are spread across your cluster among failure-domains
such as regions, zones, nodes, and other user-defined topology domains
https://kubernetes.io/docs/concepts/workloads/pods/pod-topology-spread-constraints/ | _[TopologySpreadConstraint](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.30/#topologyspreadconstraint-v1-core) array_ | false | diff --git a/docs/vars.md b/docs/vars.md index 6f1b8009..4e4f6042 100644 --- a/docs/vars.md +++ b/docs/vars.md @@ -10,7 +10,7 @@ aliases: - /operator/vars/index.html --- - updated at Mon Jul 22 07:22:30 UTC 2024 + updated at Thu Aug 8 04:43:43 UTC 2024 | variable name | variable default value | variable required | variable description | @@ -20,7 +20,7 @@ aliases: | VM_CUSTOMCONFIGRELOADERIMAGE | victoriametrics/operator:config-reloader-v0.43.0 | false | - | | VM_PSPAUTOCREATEENABLED | false | false | - | | VM_VMALERTDEFAULT_IMAGE | victoriametrics/vmalert | false | - | -| VM_VMALERTDEFAULT_VERSION | v1.101.0 | false | - | +| VM_VMALERTDEFAULT_VERSION | v1.102.0 | false | - | | VM_VMALERTDEFAULT_PORT | 8080 | false | - | | VM_VMALERTDEFAULT_USEDEFAULTRESOURCES | true | false | - | | VM_VMALERTDEFAULT_RESOURCE_LIMIT_MEM | 500Mi | false | - | @@ -31,7 +31,7 @@ aliases: | VM_VMALERTDEFAULT_CONFIGRELOADERMEMORY | 25Mi | false | - | | VM_VMALERTDEFAULT_CONFIGRELOADIMAGE | jimmidyson/configmap-reload:v0.3.0 | false | - | | VM_VMAGENTDEFAULT_IMAGE | victoriametrics/vmagent | false | - | -| VM_VMAGENTDEFAULT_VERSION | v1.101.0 | false | - | +| VM_VMAGENTDEFAULT_VERSION | v1.102.0 | false | - | | VM_VMAGENTDEFAULT_CONFIGRELOADIMAGE | quay.io/prometheus-operator/prometheus-config-reloader:v0.68.0 | false | - | | VM_VMAGENTDEFAULT_PORT | 8429 | false | - | | VM_VMAGENTDEFAULT_USEDEFAULTRESOURCES | true | false | - | @@ -42,7 +42,7 @@ aliases: | VM_VMAGENTDEFAULT_CONFIGRELOADERCPU | 100m | false | - | | VM_VMAGENTDEFAULT_CONFIGRELOADERMEMORY | 25Mi | false | - | | VM_VMSINGLEDEFAULT_IMAGE | victoriametrics/victoria-metrics | false | - | -| VM_VMSINGLEDEFAULT_VERSION | v1.101.0 | false | - | +| VM_VMSINGLEDEFAULT_VERSION | v1.102.0 | false | - | | VM_VMSINGLEDEFAULT_PORT | 8429 | false | - | | VM_VMSINGLEDEFAULT_USEDEFAULTRESOURCES | true | false | - | | VM_VMSINGLEDEFAULT_RESOURCE_LIMIT_MEM | 1500Mi | false | - | @@ -53,14 +53,14 @@ aliases: | VM_VMSINGLEDEFAULT_CONFIGRELOADERMEMORY | 25Mi | false | - | | VM_VMCLUSTERDEFAULT_USEDEFAULTRESOURCES | true | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_IMAGE | victoriametrics/vmselect | false | - | -| VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_VERSION | v1.101.0-cluster | false | - | +| VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_VERSION | v1.102.0-cluster | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_PORT | 8481 | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_RESOURCE_LIMIT_MEM | 1000Mi | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_RESOURCE_LIMIT_CPU | 500m | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_RESOURCE_REQUEST_MEM | 500Mi | false | - | | VM_VMCLUSTERDEFAULT_VMSELECTDEFAULT_RESOURCE_REQUEST_CPU | 100m | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_IMAGE | victoriametrics/vmstorage | false | - | -| VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_VERSION | v1.101.0-cluster | false | - | +| VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_VERSION | v1.102.0-cluster | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_VMINSERTPORT | 8400 | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_VMSELECTPORT | 8401 | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_PORT | 8482 | false | - | @@ -69,7 +69,7 @@ aliases: | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_RESOURCE_REQUEST_MEM | 500Mi | false | - | | VM_VMCLUSTERDEFAULT_VMSTORAGEDEFAULT_RESOURCE_REQUEST_CPU | 250m | false | - | | VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_IMAGE | victoriametrics/vminsert | false | - | -| VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_VERSION | v1.101.0-cluster | false | - | +| VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_VERSION | v1.102.0-cluster | false | - | | VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_PORT | 8480 | false | - | | VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_RESOURCE_LIMIT_MEM | 500Mi | false | - | | VM_VMCLUSTERDEFAULT_VMINSERTDEFAULT_RESOURCE_LIMIT_CPU | 500m | false | - | @@ -88,7 +88,7 @@ aliases: | VM_VMALERTMANAGER_RESOURCE_REQUEST_CPU | 30m | false | - | | VM_DISABLESELFSERVICESCRAPECREATION | false | false | - | | VM_VMBACKUP_IMAGE | victoriametrics/vmbackupmanager | false | - | -| VM_VMBACKUP_VERSION | v1.101.0-enterprise | false | - | +| VM_VMBACKUP_VERSION | v1.102.0-enterprise | false | - | | VM_VMBACKUP_PORT | 8300 | false | - | | VM_VMBACKUP_USEDEFAULTRESOURCES | true | false | - | | VM_VMBACKUP_RESOURCE_LIMIT_MEM | 500Mi | false | - | @@ -97,7 +97,7 @@ aliases: | VM_VMBACKUP_RESOURCE_REQUEST_CPU | 150m | false | - | | VM_VMBACKUP_LOGLEVEL | INFO | false | - | | VM_VMAUTHDEFAULT_IMAGE | victoriametrics/vmauth | false | - | -| VM_VMAUTHDEFAULT_VERSION | v1.101.0 | false | - | +| VM_VMAUTHDEFAULT_VERSION | v1.102.0 | false | - | | VM_VMAUTHDEFAULT_CONFIGRELOADIMAGE | quay.io/prometheus-operator/prometheus-config-reloader:v0.68.0 | false | - | | VM_VMAUTHDEFAULT_PORT | 8427 | false | - | | VM_VMAUTHDEFAULT_USEDEFAULTRESOURCES | true | false | - | @@ -129,4 +129,4 @@ aliases: | VM_PODWAITREADYINITDELAY | 10s | false | - | | VM_FORCERESYNCINTERVAL | 60s | false | configures force resync interval for VMAgent, VMAlert, VMAlertmanager and VMAuth. | | VM_ENABLESTRICTSECURITY | false | false | EnableStrictSecurity will add default `securityContext` to pods and containers created by operatorDefault PodSecurityContext include:1. RunAsNonRoot: true2. RunAsUser/RunAsGroup/FSGroup: 65534'65534' refers to 'nobody' in all the used default images like alpine, busybox.If you're using customize image, please make sure '65534' is a valid uid in there or specify SecurityContext.3. FSGroupChangePolicy: &onRootMismatchIf KubeVersion>=1.20, use `FSGroupChangePolicy="onRootMismatch"` to skip the recursive permission changewhen the root of the volume already has the correct permissions4. SeccompProfile:type: RuntimeDefaultUse `RuntimeDefault` seccomp profile by default, which is defined by the container runtime,instead of using the Unconfined (seccomp disabled) mode.Default container SecurityContext include:1. AllowPrivilegeEscalation: false2. ReadOnlyRootFilesystem: true3. Capabilities:drop:- allturn off `EnableStrictSecurity` by default, see https://github.com/VictoriaMetrics/operator/issues/749 for details | -[envconfig-sum]: 678e5fdca7d401d391647c5d98827bf9 +[envconfig-sum]: 7eaa83b5c8a827dc4943a2f284423b4c \ No newline at end of file diff --git a/internal/config/config.go b/internal/config/config.go index c61a3e06..e0eb9b3a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -55,7 +55,7 @@ type BaseOperatorConf struct { PSPAutoCreateEnabled bool `default:"false"` VMAlertDefault struct { Image string `default:"victoriametrics/vmalert"` - Version string `default:"v1.101.0"` + Version string `default:"v1.102.0"` Port string `default:"8080"` UseDefaultResources bool `default:"true"` Resource struct { @@ -74,7 +74,7 @@ type BaseOperatorConf struct { } VMAgentDefault struct { Image string `default:"victoriametrics/vmagent"` - Version string `default:"v1.101.0"` + Version string `default:"v1.102.0"` ConfigReloadImage string `default:"quay.io/prometheus-operator/prometheus-config-reloader:v0.68.0"` Port string `default:"8429"` UseDefaultResources bool `default:"true"` @@ -94,7 +94,7 @@ type BaseOperatorConf struct { VMSingleDefault struct { Image string `default:"victoriametrics/victoria-metrics"` - Version string `default:"v1.101.0"` + Version string `default:"v1.102.0"` Port string `default:"8429"` UseDefaultResources bool `default:"true"` Resource struct { @@ -115,7 +115,7 @@ type BaseOperatorConf struct { UseDefaultResources bool `default:"true"` VMSelectDefault struct { Image string `default:"victoriametrics/vmselect"` - Version string `default:"v1.101.0-cluster"` + Version string `default:"v1.102.0-cluster"` Port string `default:"8481"` Resource struct { Limit struct { @@ -130,7 +130,7 @@ type BaseOperatorConf struct { } VMStorageDefault struct { Image string `default:"victoriametrics/vmstorage"` - Version string `default:"v1.101.0-cluster"` + Version string `default:"v1.102.0-cluster"` VMInsertPort string `default:"8400"` VMSelectPort string `default:"8401"` Port string `default:"8482"` @@ -147,7 +147,7 @@ type BaseOperatorConf struct { } VMInsertDefault struct { Image string `default:"victoriametrics/vminsert"` - Version string `default:"v1.101.0-cluster"` + Version string `default:"v1.102.0-cluster"` Port string `default:"8480"` Resource struct { Limit struct { @@ -185,7 +185,7 @@ type BaseOperatorConf struct { DisableSelfServiceScrapeCreation bool `default:"false"` VMBackup struct { Image string `default:"victoriametrics/vmbackupmanager"` - Version string `default:"v1.101.0-enterprise"` + Version string `default:"v1.102.0-enterprise"` Port string `default:"8300"` UseDefaultResources bool `default:"true"` Resource struct { @@ -203,7 +203,7 @@ type BaseOperatorConf struct { } VMAuthDefault struct { Image string `default:"victoriametrics/vmauth"` - Version string `default:"v1.101.0"` + Version string `default:"v1.102.0"` ConfigReloadImage string `default:"quay.io/prometheus-operator/prometheus-config-reloader:v0.68.0"` Port string `default:"8427"` UseDefaultResources bool `default:"true"` diff --git a/internal/controller/operator/factory/k8stools/client_utils.go b/internal/controller/operator/factory/k8stools/client_utils.go index 429890d0..cdcedc39 100644 --- a/internal/controller/operator/factory/k8stools/client_utils.go +++ b/internal/controller/operator/factory/k8stools/client_utils.go @@ -255,3 +255,11 @@ func (ow *ObjectWatcherForNamespaces) Stop() { } ow.wg.Wait() } + +// FetchConfigMapContentByKey returns configmap content by key +func FetchConfigMapContentByKey(ctx context.Context, rclient client.Client, cm *corev1.ConfigMap, key string) (string, error) { + if err := rclient.Get(ctx, types.NamespacedName{Namespace: cm.Namespace, Name: cm.Name}, cm); err != nil { + return "", err + } + return cm.Data[key], nil +} diff --git a/internal/controller/operator/factory/vmagent/vmagent.go b/internal/controller/operator/factory/vmagent/vmagent.go index 10d35e33..4431c3c6 100644 --- a/internal/controller/operator/factory/vmagent/vmagent.go +++ b/internal/controller/operator/factory/vmagent/vmagent.go @@ -38,10 +38,12 @@ const ( vmAgentPersistentQueueMountName = "persistent-queue-data" globalRelabelingName = "global_relabeling.yaml" urlRelabelingName = "url_relabeling-%d.yaml" - shardNumPlaceholder = "%SHARD_NUM%" - tlsAssetsDir = "/etc/vmagent-tls/certs" - vmagentGzippedFilename = "vmagent.yaml.gz" - configEnvsubstFilename = "vmagent.env.yaml" + globalAggregationConfigName = "global_aggregation.yaml" + + shardNumPlaceholder = "%SHARD_NUM%" + tlsAssetsDir = "/etc/vmagent-tls/certs" + vmagentGzippedFilename = "vmagent.yaml.gz" + configEnvsubstFilename = "vmagent.env.yaml" ) // To save compatibility in the single-shard version still need to fill in %SHARD_NUM% placeholder @@ -426,7 +428,7 @@ func makeSpecForVMAgent(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf, ssCac MountPath: vmAgentConfDir, }) } - if cr.HasAnyStreamAggrConfigs() { + if cr.HasAnyStreamAggrRule() { volumes = append(volumes, corev1.Volume{ Name: "stream-aggr-conf", VolumeSource: corev1.VolumeSource{ @@ -510,6 +512,27 @@ func makeSpecForVMAgent(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf, ssCac args = append(args, "-remoteWrite.relabelConfig="+path.Join(vmv1beta1.RelabelingConfigDir, globalRelabelingName)) } + if cr.Spec.StreamAggrConfig != nil { + if cr.Spec.StreamAggrConfig.HasAnyRule() { + args = append(args, "-streamAggr.config="+path.Join(vmv1beta1.StreamAggrConfigDir, globalAggregationConfigName)) + } + if cr.Spec.StreamAggrConfig.KeepInput { + args = append(args, "-streamAggr.keepInput=true") + } + if cr.Spec.StreamAggrConfig.DropInput { + args = append(args, "-streamAggr.dropInput=true") + } + if cr.Spec.StreamAggrConfig.DedupInterval != "" { + args = append(args, fmt.Sprintf("-streamAggr.dedupInterval=%s", cr.Spec.StreamAggrConfig.DedupInterval)) + } + if len(cr.Spec.StreamAggrConfig.DropInputLabels) > 0 { + args = append(args, fmt.Sprintf("-streamAggr.dropInputLabels=%s", strings.Join(cr.Spec.StreamAggrConfig.DropInputLabels, ","))) + } + if cr.Spec.StreamAggrConfig.IgnoreOldSamples { + args = append(args, "-streamAggr.ignoreOldSamples=true") + } + } + args = build.AppendArgsForInsertPorts(args, cr.Spec.InsertPorts) specRes := build.Resources(cr.Spec.Resources, config.Resource(c.VMAgentDefault.Resource), c.VMAgentDefault.UseDefaultResources) @@ -533,7 +556,7 @@ func makeSpecForVMAgent(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf, ssCac var operatorContainers []corev1.Container var ic []corev1.Container // conditional add config reloader container - if !cr.Spec.IngestOnlyMode || cr.HasAnyRelabellingConfigs() || cr.HasAnyStreamAggrConfigs() { + if !cr.Spec.IngestOnlyMode || cr.HasAnyRelabellingConfigs() || cr.HasAnyStreamAggrRule() { configReloader := buildConfigReloaderContainer(cr, c) operatorContainers = append(operatorContainers, configReloader) if !cr.Spec.IngestOnlyMode { @@ -654,7 +677,7 @@ func buildVMAgentRelabelingsAssets(ctx context.Context, cr *vmv1beta1.VMAgent, r } if cr.Spec.RelabelConfig != nil { // need to fetch content from - data, err := fetchConfigMapContentByKey(ctx, rclient, + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: cr.Spec.RelabelConfig.Name, Namespace: cr.Namespace}}, cr.Spec.RelabelConfig.Key) if err != nil { @@ -678,7 +701,7 @@ func buildVMAgentRelabelingsAssets(ctx context.Context, cr *vmv1beta1.VMAgent, r } } if rw.UrlRelabelConfig != nil { - data, err := fetchConfigMapContentByKey(ctx, rclient, + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: rw.UrlRelabelConfig.Name, Namespace: cr.Namespace}}, rw.UrlRelabelConfig.Key) if err != nil { @@ -713,7 +736,7 @@ func createOrUpdateRelabelConfigsAssets(ctx context.Context, cr *vmv1beta1.VMAge } // buildVMAgentStreamAggrConfig combines all possible stream aggregation configs and adding it to the configmap. -func buildVMAgentStreamAggrConfig(cr *vmv1beta1.VMAgent) (*corev1.ConfigMap, error) { +func buildVMAgentStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) (*corev1.ConfigMap, error) { cfgCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: cr.Namespace, @@ -724,17 +747,56 @@ func buildVMAgentStreamAggrConfig(cr *vmv1beta1.VMAgent) (*corev1.ConfigMap, err }, Data: make(map[string]string), } - for i, rw := range cr.Spec.RemoteWrite { - if !rw.HasStreamAggr() { - continue + // global section + if cr.Spec.StreamAggrConfig != nil { + if len(cr.Spec.StreamAggrConfig.Rules) > 0 { + data, err := yaml.Marshal(cr.Spec.StreamAggrConfig.Rules) + if err != nil { + return nil, fmt.Errorf("cannot serialize relabelConfig as yaml: %w", err) + } + if len(data) > 0 { + cfgCM.Data[globalAggregationConfigName] = string(data) + } } - data, err := yaml.Marshal(rw.StreamAggrConfig.Rules) - if err != nil { - return nil, fmt.Errorf("cannot serialize StreamAggrConfig rules as yaml for remoteWrite with url %s: %w", rw.URL, err) + if cr.Spec.StreamAggrConfig.RuleConfigMap != nil { + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: cr.Spec.StreamAggrConfig.RuleConfigMap.Name, Namespace: cr.Namespace}}, + cr.Spec.StreamAggrConfig.RuleConfigMap.Key) + if err != nil { + return nil, fmt.Errorf("cannot fetch configmap: %s, err: %w", cr.Spec.StreamAggrConfig.RuleConfigMap.Name, err) + } + if len(data) > 0 { + cfgCM.Data[globalAggregationConfigName] += data + } } - if len(data) > 0 { - cfgCM.Data[rw.AsConfigMapKey(i, "stream-aggr-conf")] = string(data) + } + + for i := range cr.Spec.RemoteWrite { + rw := cr.Spec.RemoteWrite[i] + if rw.StreamAggrConfig != nil { + if len(rw.StreamAggrConfig.Rules) > 0 { + data, err := yaml.Marshal(rw.StreamAggrConfig.Rules) + if err != nil { + return nil, fmt.Errorf("cannot serialize relabelConfig as yaml: %w", err) + } + if len(data) > 0 { + cfgCM.Data[rw.AsConfigMapKey(i, "stream-aggr-conf")] = string(data) + } + } + if rw.StreamAggrConfig.RuleConfigMap != nil { + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: rw.StreamAggrConfig.RuleConfigMap.Name, Namespace: cr.Namespace}}, + rw.StreamAggrConfig.RuleConfigMap.Key) + if err != nil { + return nil, fmt.Errorf("cannot fetch configmap: %s, err: %w", rw.StreamAggrConfig.RuleConfigMap.Name, err) + } + if len(data) > 0 { + cfgCM.Data[rw.AsConfigMapKey(i, "stream-aggr-conf")] += data + } + } + } + } return cfgCM, nil } @@ -742,10 +804,10 @@ func buildVMAgentStreamAggrConfig(cr *vmv1beta1.VMAgent) (*corev1.ConfigMap, err // CreateOrUpdateVMAgentStreamAggrConfig builds stream aggregation configs for vmagent at separate configmap, serialized as yaml func CreateOrUpdateVMAgentStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) error { // fast path - if !cr.HasAnyStreamAggrConfigs() { + if !cr.HasAnyStreamAggrRule() { return nil } - streamAggrCM, err := buildVMAgentStreamAggrConfig(cr) + streamAggrCM, err := buildVMAgentStreamAggrConfig(ctx, cr, rclient) if err != nil { return err } @@ -760,13 +822,6 @@ func CreateOrUpdateVMAgentStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VM return rclient.Update(ctx, streamAggrCM) } -func fetchConfigMapContentByKey(ctx context.Context, rclient client.Client, cm *corev1.ConfigMap, key string) (string, error) { - if err := rclient.Get(ctx, types.NamespacedName{Namespace: cm.Namespace, Name: cm.Name}, cm); err != nil { - return "", err - } - return cm.Data[key], nil -} - func createOrUpdateTLSAssets(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client, assets map[string]string) error { tlsAssetsSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ @@ -1129,6 +1184,9 @@ func buildRemoteWrites(cr *vmv1beta1.VMAgent, ssCache *scrapesSecretsCache) []st streamAggrKeepInput := remoteFlag{flagSetting: "-remoteWrite.streamAggr.keepInput="} streamAggrDropInput := remoteFlag{flagSetting: "-remoteWrite.streamAggr.dropInput="} streamAggrDedupInterval := remoteFlag{flagSetting: "-remoteWrite.streamAggr.dedupInterval="} + streamAggrDropInputLabels := remoteFlag{flagSetting: "-remoteWrite.streamAggr.dropInputLabels="} + streamAggrIgnoreFirstIntervals := remoteFlag{flagSetting: "-remoteWrite.streamAggr.ignoreFirstIntervals="} + streamAggrIgnoreOldSamples := remoteFlag{flagSetting: "-remoteWrite.streamAggr.ignoreOldSamples="} pathPrefix := path.Join(tlsAssetsDir, cr.Namespace) @@ -1269,10 +1327,13 @@ func buildRemoteWrites(cr *vmv1beta1.VMAgent, ssCache *scrapesSecretsCache) []st oauth2Scopes.flagSetting += fmt.Sprintf("%s,", oascopes) var dedupIntVal, streamConfVal string - var keepInputVal, dropInputVal bool - if rws.HasStreamAggr() { - streamAggrConfig.isNotNull = true - streamConfVal = path.Join(vmv1beta1.StreamAggrConfigDir, rws.AsConfigMapKey(i, "stream-aggr-conf")) + var keepInputVal, dropInputVal, ignoreOldSamples bool + var ignoreFirstIntervalsVal int + if rws.StreamAggrConfig != nil { + if rws.StreamAggrConfig.HasAnyRule() { + streamAggrConfig.isNotNull = true + streamConfVal = path.Join(vmv1beta1.StreamAggrConfigDir, rws.AsConfigMapKey(i, "stream-aggr-conf")) + } dedupIntVal = rws.StreamAggrConfig.DedupInterval if dedupIntVal != "" { @@ -1287,17 +1348,31 @@ func buildRemoteWrites(cr *vmv1beta1.VMAgent, ssCache *scrapesSecretsCache) []st if dropInputVal { streamAggrDropInput.isNotNull = true } + if len(rws.StreamAggrConfig.DropInputLabels) > 0 { + streamAggrDropInputLabels.isNotNull = true + streamAggrDropInputLabels.flagSetting += fmt.Sprintf("%s,", strings.Join(rws.StreamAggrConfig.DropInputLabels, ",")) + } + ignoreFirstIntervalsVal = rws.StreamAggrConfig.IgnoreFirstIntervals + if ignoreFirstIntervalsVal > 0 { + streamAggrIgnoreFirstIntervals.isNotNull = true + } + ignoreOldSamples = rws.StreamAggrConfig.IgnoreOldSamples + if ignoreOldSamples { + streamAggrIgnoreOldSamples.isNotNull = true + } } streamAggrConfig.flagSetting += fmt.Sprintf("%s,", streamConfVal) streamAggrKeepInput.flagSetting += fmt.Sprintf("%v,", keepInputVal) streamAggrDropInput.flagSetting += fmt.Sprintf("%v,", dropInputVal) streamAggrDedupInterval.flagSetting += fmt.Sprintf("%s,", dedupIntVal) + streamAggrIgnoreFirstIntervals.flagSetting += fmt.Sprintf("%d,", ignoreFirstIntervalsVal) + streamAggrIgnoreOldSamples.flagSetting += fmt.Sprintf("%v,", ignoreOldSamples) } remoteArgs = append(remoteArgs, url, authUser, bearerTokenFile, urlRelabelConfig, tlsInsecure, sendTimeout) remoteArgs = append(remoteArgs, tlsServerName, tlsKeys, tlsCerts, tlsCAs) remoteArgs = append(remoteArgs, oauth2ClientID, oauth2ClientSecretFile, oauth2Scopes, oauth2TokenURL) remoteArgs = append(remoteArgs, headers, authPasswordFile) - remoteArgs = append(remoteArgs, streamAggrConfig, streamAggrKeepInput, streamAggrDedupInterval, streamAggrDropInput) + remoteArgs = append(remoteArgs, streamAggrConfig, streamAggrKeepInput, streamAggrDedupInterval, streamAggrDropInput, streamAggrDropInputLabels, streamAggrIgnoreFirstIntervals, streamAggrIgnoreOldSamples) for _, remoteArgType := range remoteArgs { if remoteArgType.isNotNull { @@ -1332,7 +1407,7 @@ func buildConfigReloaderContainer(cr *vmv1beta1.VMAgent, c *config.BaseOperatorC MountPath: vmv1beta1.RelabelingConfigDir, }) } - if cr.HasAnyStreamAggrConfigs() { + if cr.HasAnyStreamAggrRule() { configReloadVolumeMounts = append(configReloadVolumeMounts, corev1.VolumeMount{ Name: "stream-aggr-conf", @@ -1401,7 +1476,7 @@ func buildConfigReloaderArgs(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf) args = append(args, fmt.Sprintf("--config-file=%s", path.Join(vmAgentConfDir, vmagentGzippedFilename))) } } - if cr.HasAnyStreamAggrConfigs() { + if cr.HasAnyStreamAggrRule() { args = append(args, fmt.Sprintf("--%s=%s", dirsArg, vmv1beta1.StreamAggrConfigDir)) } if cr.HasAnyRelabellingConfigs() { diff --git a/internal/controller/operator/factory/vmagent/vmagent_test.go b/internal/controller/operator/factory/vmagent/vmagent_test.go index 9ac84270..18d0f57f 100644 --- a/internal/controller/operator/factory/vmagent/vmagent_test.go +++ b/internal/controller/operator/factory/vmagent/vmagent_test.go @@ -1362,11 +1362,20 @@ func TestCreateOrUpdateStreamAggrConfig(t *testing.T) { predefinedObjects: []runtime.Object{}, }, { - name: "simple stream aggr config", + name: "simple global and remoteWrite stream aggr config", args: args{ ctx: context.TODO(), cr: &vmv1beta1.VMAgent{ Spec: vmv1beta1.VMAgentSpec{ + StreamAggrConfig: &vmv1beta1.StreamAggrConfig{ + Rules: []vmv1beta1.StreamAggrRule{{ + Match: []string{`test`}, + Interval: "30s", + Outputs: []string{"total"}, + By: []string{"job", "instance"}, + Without: []string{"pod"}, + }}, + }, RemoteWrite: []vmv1beta1.VMAgentRemoteWriteSpec{ { URL: "localhost:8429", @@ -1391,11 +1400,26 @@ func TestCreateOrUpdateStreamAggrConfig(t *testing.T) { }, }, validate: func(cm *corev1.ConfigMap) error { - data, ok := cm.Data["RWS_0-CM-STREAM-AGGR-CONF"] + globalData, ok := cm.Data["global_aggregation.yaml"] + if !ok { + return fmt.Errorf("key: %s, not exists at map: %v", "global_aggregation.yaml", cm.BinaryData) + } + wantGlobal := `- match: test + interval: 30s + outputs: + - total + by: + - job + - instance + without: + - pod +` + assert.Equal(t, wantGlobal, globalData) + remoteData, ok := cm.Data["RWS_0-CM-STREAM-AGGR-CONF"] if !ok { return fmt.Errorf("key: %s, not exists at map: %v", "RWS_0-CM-STREAM-AGGR-CONFl", cm.BinaryData) } - wantGlobal := `- match: + wantRemote := `- match: - '{__name__="count1"}' - '{__name__="count2"}' interval: 1m @@ -1411,7 +1435,7 @@ func TestCreateOrUpdateStreamAggrConfig(t *testing.T) { output_relabel_configs: - regex: (.+):.+ ` - assert.Equal(t, wantGlobal, data) + assert.Equal(t, wantRemote, remoteData) return nil }, predefinedObjects: []runtime.Object{}, diff --git a/internal/controller/operator/factory/vmsingle/vmsingle.go b/internal/controller/operator/factory/vmsingle/vmsingle.go index 18cfeb8b..3b49d6f6 100644 --- a/internal/controller/operator/factory/vmsingle/vmsingle.go +++ b/internal/controller/operator/factory/vmsingle/vmsingle.go @@ -5,6 +5,7 @@ import ( "fmt" "path" "sort" + "strings" vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" "github.com/VictoriaMetrics/operator/internal/config" @@ -261,7 +262,7 @@ func makeSpecForVMSingle(ctx context.Context, cr *vmv1beta1.VMSingle, c *config. }) } - if cr.HasStreamAggrConfig() { + if cr.HasAnyStreamAggrRule() { volumes = append(volumes, corev1.Volume{ Name: k8stools.SanitizeVolumeName("stream-aggr-conf"), VolumeSource: corev1.VolumeSource{ @@ -282,10 +283,25 @@ func makeSpecForVMSingle(ctx context.Context, cr *vmv1beta1.VMSingle, c *config. if cr.Spec.StreamAggrConfig.KeepInput { args = append(args, "--streamAggr.keepInput=true") } - if cr.Spec.StreamAggrConfig.DedupInterval != "" { - args = append(args, fmt.Sprintf("--streamAggr.dedupInterval=%s", cr.Spec.StreamAggrConfig.DedupInterval)) + if cr.Spec.StreamAggrConfig.DropInput { + args = append(args, "--streamAggr.dropInput=true") + } + if len(cr.Spec.StreamAggrConfig.DropInputLabels) > 0 { + args = append(args, fmt.Sprintf("--streamAggr.dropInputLabels=%s", strings.Join(cr.Spec.StreamAggrConfig.DropInputLabels, ","))) + } + if cr.Spec.StreamAggrConfig.IgnoreFirstIntervals > 0 { + args = append(args, fmt.Sprintf("--streamAggr.ignoreFirstIntervals=%d", cr.Spec.StreamAggrConfig.IgnoreFirstIntervals)) + } + if cr.Spec.StreamAggrConfig.IgnoreOldSamples { + args = append(args, "--streamAggr.ignoreOldSamples=true") } } + + // deduplication can work without stream aggregation rules + if cr.Spec.StreamAggrConfig != nil && cr.Spec.StreamAggrConfig.DedupInterval != "" { + args = append(args, fmt.Sprintf("--streamAggr.dedupInterval=%s", cr.Spec.StreamAggrConfig.DedupInterval)) + } + volumes, vmMounts = cr.Spec.License.MaybeAddToVolumes(volumes, vmMounts, vmv1beta1.SecretsDir) args = cr.Spec.License.MaybeAddToArgs(args, vmv1beta1.SecretsDir) @@ -414,7 +430,7 @@ func CreateOrUpdateVMSingleService(ctx context.Context, cr *vmv1beta1.VMSingle, } // buildVMSingleStreamAggrConfig build configmap with stream aggregation config for vmsingle. -func buildVMSingleStreamAggrConfig(cr *vmv1beta1.VMSingle) (*corev1.ConfigMap, error) { +func buildVMSingleStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMSingle, rclient client.Client) (*corev1.ConfigMap, error) { cfgCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: cr.Namespace, @@ -425,23 +441,35 @@ func buildVMSingleStreamAggrConfig(cr *vmv1beta1.VMSingle) (*corev1.ConfigMap, e }, Data: make(map[string]string), } - data, err := yaml.Marshal(cr.Spec.StreamAggrConfig.Rules) - if err != nil { - return nil, fmt.Errorf("cannot serialize StreamAggrConfig rules as yaml: %w", err) + if len(cr.Spec.StreamAggrConfig.Rules) > 0 { + data, err := yaml.Marshal(cr.Spec.StreamAggrConfig.Rules) + if err != nil { + return nil, fmt.Errorf("cannot serialize relabelConfig as yaml: %w", err) + } + if len(data) > 0 { + cfgCM.Data["config.yaml"] = string(data) + } } - if len(data) > 0 { - cfgCM.Data["config.yaml"] = string(data) + if cr.Spec.StreamAggrConfig.RuleConfigMap != nil { + data, err := k8stools.FetchConfigMapContentByKey(ctx, rclient, + &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: cr.Spec.StreamAggrConfig.RuleConfigMap.Name, Namespace: cr.Namespace}}, + cr.Spec.StreamAggrConfig.RuleConfigMap.Key) + if err != nil { + return nil, fmt.Errorf("cannot fetch configmap: %s, err: %w", cr.Spec.StreamAggrConfig.RuleConfigMap.Name, err) + } + if len(data) > 0 { + cfgCM.Data["config.yaml"] += data + } } - return cfgCM, nil } // CreateOrUpdateVMSingleStreamAggrConfig builds stream aggregation configs for vmsingle at separate configmap, serialized as yaml func CreateOrUpdateVMSingleStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMSingle, rclient client.Client) error { - if !cr.HasStreamAggrConfig() { + if !cr.HasAnyStreamAggrRule() { return nil } - streamAggrCM, err := buildVMSingleStreamAggrConfig(cr) + streamAggrCM, err := buildVMSingleStreamAggrConfig(ctx, cr, rclient) if err != nil { return err }