Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream aggregation: sync options dropInputLabels, `ignoreFirstInter… #1054

Merged
merged 2 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions api/operator/v1beta1/vmagent_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ type VMAgentSpec struct {
// InlineRelabelConfig - defines GlobalRelabelConfig for vmagent, can be defined directly at CRD.
// +optional
InlineRelabelConfig []RelabelConfig `json:"inlineRelabelConfig,omitempty"`
// StreamAggrConfig defines global stream aggregation configuration for VMAgent
// +optional
StreamAggrConfig *StreamAggrConfig `json:"streamAggrConfig,omitempty"`
// SelectAllByDefault changes default behavior for empty CRD selectors, such ServiceScrapeSelector.
// with selectAllByDefault: true and empty serviceScrapeSelector and ServiceScrapeNamespaceSelector
// Operator selects all exist serviceScrapes
Expand Down Expand Up @@ -539,11 +542,6 @@ func (rw *VMAgentRemoteWriteSpec) AsConfigMapKey(idx int, suffix string) string
return fmt.Sprintf("RWS_%d-CM-%s", idx, strings.ToUpper(suffix))
}

// HasStreamAggr returns true if stream aggregation is enabled for this remoteWrite
func (rw *VMAgentRemoteWriteSpec) HasStreamAggr() bool {
return rw.StreamAggrConfig != nil && len(rw.StreamAggrConfig.Rules) > 0
}

// VMAgentStatus defines the observed state of VMAgent
// +k8s:openapi-gen=true
type VMAgentStatus struct {
Expand Down Expand Up @@ -805,13 +803,17 @@ func (cr *VMAgent) HasAnyRelabellingConfigs() bool {
return false
}

// HasAnyStreamAggrConfigs checks if agent has any streaming aggregation config defined
func (cr *VMAgent) HasAnyStreamAggrConfigs() bool {
// HasAnyStreamAggrRule checks if vmagent has any defined aggregation rules
func (cr *VMAgent) HasAnyStreamAggrRule() bool {
if cr.Spec.StreamAggrConfig.HasAnyRule() {
return true
}
for _, rw := range cr.Spec.RemoteWrite {
if rw.HasStreamAggr() {
if rw.StreamAggrConfig.HasAnyRule() {
return true
}
}

return false
}

Expand Down
21 changes: 21 additions & 0 deletions api/operator/v1beta1/vmextra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,11 @@ type ConfigMapKeyReference struct {
// +k8s:openapi-gen=true
type StreamAggrConfig struct {
// Stream aggregation rules
// +optional
Rules []StreamAggrRule `json:"rules"`
// ConfigMap with stream aggregation rules
// +optional
RuleConfigMap *v1.ConfigMapKeySelector `json:"configmap,omitempty"`
// Allows writing both raw and aggregate data
// +optional
KeepInput bool `json:"keepInput,omitempty"`
Expand All @@ -484,6 +488,13 @@ type StreamAggrConfig struct {
// Allows setting different de-duplication intervals per each configured remote storage
// +optional
DedupInterval string `json:"dedupInterval,omitempty"`
// labels to drop from samples for aggregator before stream de-duplication and aggregation
// +optional
DropInputLabels []string `json:"dropInputLabels,omitempty"`
IgnoreFirstIntervals int `json:"ignoreFirstIntervals,omitempty"`
// IgnoreOldSamples instructs to ignore samples with old timestamps outside the current aggregation interval.
// +optional
IgnoreOldSamples bool `json:"ignoreOldSamples,omitempty"`
}

// StreamAggrRule defines the rule in stream aggregation config
Expand Down Expand Up @@ -570,6 +581,8 @@ type StreamAggrRule struct {
// +optional
Without []string `json:"without,omitempty" yaml:"without,omitempty"`

IgnoreFirstIntervals *int `json:"ignore_first_intervals,omitempty" yaml:"ignore_first_intervals,omitempty"`

// DropInputLabels is an optional list with labels, which must be dropped before further processing of input samples.
//
// Labels are dropped before de-duplication and aggregation.
Expand All @@ -587,6 +600,14 @@ type StreamAggrRule struct {
OutputRelabelConfigs []RelabelConfig `json:"output_relabel_configs,omitempty" yaml:"output_relabel_configs,omitempty"`
}

// HasAnyRule returns true if there is at least one aggregation rule
func (config *StreamAggrConfig) HasAnyRule() bool {
if config != nil && (len(config.Rules) > 0 || config.RuleConfigMap != nil) {
return true
}
return false
}

// KeyValue defines a (key, value) tuple.
// +kubebuilder:object:generate=false
// +k8s:openapi-gen=false
Expand Down
6 changes: 3 additions & 3 deletions api/operator/v1beta1/vmsingle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ type VMSingleSpec struct {
Paused bool `json:"paused,omitempty"`
}

// HasStreamAggrConfig checks if streamAggrConfig present
func (cr *VMSingle) HasStreamAggrConfig() bool {
return cr.Spec.StreamAggrConfig != nil && len(cr.Spec.StreamAggrConfig.Rules) > 0
// HasAnyStreamAggrRule checks if vmsingle has any defined aggregation rules
func (cr *VMSingle) HasAnyStreamAggrRule() bool {
return cr.Spec.StreamAggrConfig.HasAnyRule()
}

// UnmarshalJSON implements json.Unmarshaler interface
Expand Down
20 changes: 20 additions & 0 deletions api/operator/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading