Skip to content

Commit

Permalink
stream aggregation: sync options dropInputLabels, `ignoreFirstInter… (
Browse files Browse the repository at this point in the history
#1054)

* stream aggregation: sync options `dropInputLabels`, `ignoreFirstIntervals`, `ignoreOldSamples` from upstream, and support using configmap as the source of aggregation rules

* rename functions
  • Loading branch information
Haleygo authored Aug 9, 2024
1 parent b00759d commit d21e533
Show file tree
Hide file tree
Showing 13 changed files with 667 additions and 83 deletions.
18 changes: 10 additions & 8 deletions api/operator/v1beta1/vmagent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ type VMAgentSpec struct {
// InlineRelabelConfig - defines GlobalRelabelConfig for vmagent, can be defined directly at CRD.
// +optional
InlineRelabelConfig []RelabelConfig `json:"inlineRelabelConfig,omitempty"`
// StreamAggrConfig defines global stream aggregation configuration for VMAgent
// +optional
StreamAggrConfig *StreamAggrConfig `json:"streamAggrConfig,omitempty"`
// SelectAllByDefault changes default behavior for empty CRD selectors, such ServiceScrapeSelector.
// with selectAllByDefault: true and empty serviceScrapeSelector and ServiceScrapeNamespaceSelector
// Operator selects all exist serviceScrapes
Expand Down Expand Up @@ -539,11 +542,6 @@ func (rw *VMAgentRemoteWriteSpec) AsConfigMapKey(idx int, suffix string) string
return fmt.Sprintf("RWS_%d-CM-%s", idx, strings.ToUpper(suffix))
}

// HasStreamAggr returns true if stream aggregation is enabled for this remoteWrite
func (rw *VMAgentRemoteWriteSpec) HasStreamAggr() bool {
return rw.StreamAggrConfig != nil && len(rw.StreamAggrConfig.Rules) > 0
}

// VMAgentStatus defines the observed state of VMAgent
// +k8s:openapi-gen=true
type VMAgentStatus struct {
Expand Down Expand Up @@ -805,13 +803,17 @@ func (cr *VMAgent) HasAnyRelabellingConfigs() bool {
return false
}

// HasAnyStreamAggrConfigs checks if agent has any streaming aggregation config defined
func (cr *VMAgent) HasAnyStreamAggrConfigs() bool {
// HasAnyStreamAggrRule checks if vmagent has any defined aggregation rules
func (cr *VMAgent) HasAnyStreamAggrRule() bool {
if cr.Spec.StreamAggrConfig.HasAnyRule() {
return true
}
for _, rw := range cr.Spec.RemoteWrite {
if rw.HasStreamAggr() {
if rw.StreamAggrConfig.HasAnyRule() {
return true
}
}

return false
}

Expand Down
21 changes: 21 additions & 0 deletions api/operator/v1beta1/vmextra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,11 @@ type ConfigMapKeyReference struct {
// +k8s:openapi-gen=true
type StreamAggrConfig struct {
// Stream aggregation rules
// +optional
Rules []StreamAggrRule `json:"rules"`
// ConfigMap with stream aggregation rules
// +optional
RuleConfigMap *v1.ConfigMapKeySelector `json:"configmap,omitempty"`
// Allows writing both raw and aggregate data
// +optional
KeepInput bool `json:"keepInput,omitempty"`
Expand All @@ -484,6 +488,13 @@ type StreamAggrConfig struct {
// Allows setting different de-duplication intervals per each configured remote storage
// +optional
DedupInterval string `json:"dedupInterval,omitempty"`
// labels to drop from samples for aggregator before stream de-duplication and aggregation
// +optional
DropInputLabels []string `json:"dropInputLabels,omitempty"`
IgnoreFirstIntervals int `json:"ignoreFirstIntervals,omitempty"`
// IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval.
// +optional
IgnoreOldSamples bool `json:"ignoreOldSamples,omitempty"`
}

// StreamAggrRule defines the rule in stream aggregation config
Expand Down Expand Up @@ -570,6 +581,8 @@ type StreamAggrRule struct {
// +optional
Without []string `json:"without,omitempty" yaml:"without,omitempty"`

IgnoreFirstIntervals *int `json:"ignore_first_intervals,omitempty" yaml:"ignore_first_intervals,omitempty"`

// DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples.
//
// Labels are dropped before de-duplication and aggregation.
Expand All @@ -587,6 +600,14 @@ type StreamAggrRule struct {
OutputRelabelConfigs []RelabelConfig `json:"output_relabel_configs,omitempty" yaml:"output_relabel_configs,omitempty"`
}

// HasAnyRule returns true if there is at least one aggregation rule
func (config *StreamAggrConfig) HasAnyRule() bool {
if config != nil && (len(config.Rules) > 0 || config.RuleConfigMap != nil) {
return true
}
return false
}

// KeyValue defines a (key, value) tuple.
// +kubebuilder:object:generate=false
// +k8s:openapi-gen=false
Expand Down
6 changes: 3 additions & 3 deletions api/operator/v1beta1/vmsingle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ type VMSingleSpec struct {
Paused bool `json:"paused,omitempty"`
}

// HasStreamAggrConfig checks if streamAggrConfig present
func (cr *VMSingle) HasStreamAggrConfig() bool {
return cr.Spec.StreamAggrConfig != nil && len(cr.Spec.StreamAggrConfig.Rules) > 0
// HasAnyStreamAggrRule checks if vmsingle has any defined aggregation rules
func (cr *VMSingle) HasAnyStreamAggrRule() bool {
return cr.Spec.StreamAggrConfig.HasAnyRule()
}

// UnmarshalJSON implements json.Unmarshaler interface
Expand Down
20 changes: 20 additions & 0 deletions api/operator/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d21e533

Please sign in to comment.