diff --git a/api/operator/v1beta1/vmagent_types.go b/api/operator/v1beta1/vmagent_types.go index 88af173a4..e07bdb7b8 100644 --- a/api/operator/v1beta1/vmagent_types.go +++ b/api/operator/v1beta1/vmagent_types.go @@ -803,13 +803,13 @@ func (cr *VMAgent) HasAnyRelabellingConfigs() bool { return false } -// HasAnyStreamAggrConfigs checks if vmagent has any defined aggregation rules -func (cr *VMAgent) HasAnyStreamAggrConfigs() bool { - if cr.Spec.StreamAggrConfig.HasStreamAggrConfig() { +// HasAnyStreamAggrRule checks if vmagent has any defined aggregation rules +func (cr *VMAgent) HasAnyStreamAggrRule() bool { + if cr.Spec.StreamAggrConfig.HasAnyRule() { return true } for _, rw := range cr.Spec.RemoteWrite { - if rw.StreamAggrConfig.HasStreamAggrConfig() { + if rw.StreamAggrConfig.HasAnyRule() { return true } } diff --git a/api/operator/v1beta1/vmextra_types.go b/api/operator/v1beta1/vmextra_types.go index 6d631fcf6..25e493e01 100644 --- a/api/operator/v1beta1/vmextra_types.go +++ b/api/operator/v1beta1/vmextra_types.go @@ -600,12 +600,8 @@ type StreamAggrRule struct { OutputRelabelConfigs []RelabelConfig `json:"output_relabel_configs,omitempty" yaml:"output_relabel_configs,omitempty"` } -// HasStreamAggr returns true if stream aggregation is enabled for this remoteWrite -// func (rw *VMAgentRemoteWriteSpec) HasStreamAggr() bool { -// return rw.StreamAggrConfig != nil -// } - -func (config *StreamAggrConfig) HasStreamAggrConfig() bool { +// HasAnyRule returns true if there is at least one aggregation rule +func (config *StreamAggrConfig) HasAnyRule() bool { if config != nil && (len(config.Rules) > 0 || config.RuleConfigMap != nil) { return true } diff --git a/api/operator/v1beta1/vmsingle_types.go b/api/operator/v1beta1/vmsingle_types.go index 7adb56f02..a1071cffe 100644 --- a/api/operator/v1beta1/vmsingle_types.go +++ b/api/operator/v1beta1/vmsingle_types.go @@ -212,9 +212,9 @@ type VMSingleSpec struct { Paused bool `json:"paused,omitempty"` } -// HasStreamAggrConfig checks if streamAggrConfig present -func (cr *VMSingle) HasStreamAggrConfig() bool { - return cr.Spec.StreamAggrConfig.HasStreamAggrConfig() +// HasAnyStreamAggrRule checks if vmsingle has any defined aggregation rules +func (cr *VMSingle) HasAnyStreamAggrRule() bool { + return cr.Spec.StreamAggrConfig.HasAnyRule() } // UnmarshalJSON implements json.Unmarshaler interface diff --git a/internal/controller/operator/factory/vmagent/vmagent.go b/internal/controller/operator/factory/vmagent/vmagent.go index bd53d8531..4431c3c63 100644 --- a/internal/controller/operator/factory/vmagent/vmagent.go +++ b/internal/controller/operator/factory/vmagent/vmagent.go @@ -428,7 +428,7 @@ func makeSpecForVMAgent(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf, ssCac MountPath: vmAgentConfDir, }) } - if cr.HasAnyStreamAggrConfigs() { + if cr.HasAnyStreamAggrRule() { volumes = append(volumes, corev1.Volume{ Name: "stream-aggr-conf", VolumeSource: corev1.VolumeSource{ @@ -513,7 +513,7 @@ func makeSpecForVMAgent(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf, ssCac } if cr.Spec.StreamAggrConfig != nil { - if cr.Spec.StreamAggrConfig.HasStreamAggrConfig() { + if cr.Spec.StreamAggrConfig.HasAnyRule() { args = append(args, "-streamAggr.config="+path.Join(vmv1beta1.StreamAggrConfigDir, globalAggregationConfigName)) } if cr.Spec.StreamAggrConfig.KeepInput { @@ -556,7 +556,7 @@ func makeSpecForVMAgent(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf, ssCac var operatorContainers []corev1.Container var ic []corev1.Container // conditional add config reloader container - if !cr.Spec.IngestOnlyMode || cr.HasAnyRelabellingConfigs() || cr.Spec.StreamAggrConfig.HasStreamAggrConfig() { + if !cr.Spec.IngestOnlyMode || cr.HasAnyRelabellingConfigs() || cr.HasAnyStreamAggrRule() { configReloader := buildConfigReloaderContainer(cr, c) operatorContainers = append(operatorContainers, configReloader) if !cr.Spec.IngestOnlyMode { @@ -804,7 +804,7 @@ func buildVMAgentStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMAgent, rc // CreateOrUpdateVMAgentStreamAggrConfig builds stream aggregation configs for vmagent at separate configmap, serialized as yaml func CreateOrUpdateVMAgentStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMAgent, rclient client.Client) error { // fast path - if !cr.HasAnyStreamAggrConfigs() { + if !cr.HasAnyStreamAggrRule() { return nil } streamAggrCM, err := buildVMAgentStreamAggrConfig(ctx, cr, rclient) @@ -1330,7 +1330,7 @@ func buildRemoteWrites(cr *vmv1beta1.VMAgent, ssCache *scrapesSecretsCache) []st var keepInputVal, dropInputVal, ignoreOldSamples bool var ignoreFirstIntervalsVal int if rws.StreamAggrConfig != nil { - if rws.StreamAggrConfig.HasStreamAggrConfig() { + if rws.StreamAggrConfig.HasAnyRule() { streamAggrConfig.isNotNull = true streamConfVal = path.Join(vmv1beta1.StreamAggrConfigDir, rws.AsConfigMapKey(i, "stream-aggr-conf")) } @@ -1407,7 +1407,7 @@ func buildConfigReloaderContainer(cr *vmv1beta1.VMAgent, c *config.BaseOperatorC MountPath: vmv1beta1.RelabelingConfigDir, }) } - if cr.Spec.StreamAggrConfig.HasStreamAggrConfig() { + if cr.HasAnyStreamAggrRule() { configReloadVolumeMounts = append(configReloadVolumeMounts, corev1.VolumeMount{ Name: "stream-aggr-conf", @@ -1476,7 +1476,7 @@ func buildConfigReloaderArgs(cr *vmv1beta1.VMAgent, c *config.BaseOperatorConf) args = append(args, fmt.Sprintf("--config-file=%s", path.Join(vmAgentConfDir, vmagentGzippedFilename))) } } - if cr.HasAnyStreamAggrConfigs() { + if cr.HasAnyStreamAggrRule() { args = append(args, fmt.Sprintf("--%s=%s", dirsArg, vmv1beta1.StreamAggrConfigDir)) } if cr.HasAnyRelabellingConfigs() { diff --git a/internal/controller/operator/factory/vmsingle/vmsingle.go b/internal/controller/operator/factory/vmsingle/vmsingle.go index 037362bb1..3b49d6f64 100644 --- a/internal/controller/operator/factory/vmsingle/vmsingle.go +++ b/internal/controller/operator/factory/vmsingle/vmsingle.go @@ -262,7 +262,7 @@ func makeSpecForVMSingle(ctx context.Context, cr *vmv1beta1.VMSingle, c *config. }) } - if cr.HasStreamAggrConfig() { + if cr.HasAnyStreamAggrRule() { volumes = append(volumes, corev1.Volume{ Name: k8stools.SanitizeVolumeName("stream-aggr-conf"), VolumeSource: corev1.VolumeSource{ @@ -466,7 +466,7 @@ func buildVMSingleStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMSingle, // CreateOrUpdateVMSingleStreamAggrConfig builds stream aggregation configs for vmsingle at separate configmap, serialized as yaml func CreateOrUpdateVMSingleStreamAggrConfig(ctx context.Context, cr *vmv1beta1.VMSingle, rclient client.Client) error { - if !cr.HasStreamAggrConfig() { + if !cr.HasAnyStreamAggrRule() { return nil } streamAggrCM, err := buildVMSingleStreamAggrConfig(ctx, cr, rclient)