Skip to content

Commit

Permalink
improve the stability of autoscaling when HPA is enabled (#450)
Browse files Browse the repository at this point in the history
* Add spec.minReplicas and change spec.replicas to an optional field

Signed-off-by: laminar <[email protected]>

* update crd templates in charts

Signed-off-by: laminar <[email protected]>

* fix

Signed-off-by: laminar <[email protected]>

* adjust the `checkIf<Resource>GenerationsIsIncreased` function

Signed-off-by: laminar <[email protected]>

* update the generation check mechanism

Signed-off-by: laminar <[email protected]>

* Improve the stability of FunctionMesh reconciliation

Signed-off-by: laminar <[email protected]>

Signed-off-by: laminar <[email protected]>
  • Loading branch information
tpiperatgod authored Sep 7, 2022
1 parent 52b260e commit 1a56afb
Show file tree
Hide file tree
Showing 33 changed files with 719 additions and 424 deletions.
13 changes: 7 additions & 6 deletions api/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ type FunctionSpec struct {
Tenant string `json:"tenant,omitempty"`
Namespace string `json:"namespace,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
// +kubebuilder:validation:Required
// +kubebuilder:validation:Minimum=1
Replicas *int32 `json:"replicas"`

Replicas *int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Minimum=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
// For complex HPA strategies, please refer to Pod.HPAutoscaler.
Expand Down Expand Up @@ -94,9 +94,10 @@ type FunctionSpec struct {
type FunctionStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
18 changes: 14 additions & 4 deletions api/v1alpha1/function_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,19 @@ var _ webhook.Defaulter = &Function{}
func (r *Function) Default() {
functionlog.Info("default", "name", r.Name)

if r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) {
if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = *r.Spec.MinReplicas
} else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil {
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = *r.Spec.Replicas
} else {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = 1
}
}

if r.Spec.AutoAck == nil {
Expand Down Expand Up @@ -179,7 +189,7 @@ func (r *Function) ValidateCreate() error {
allErrs = append(allErrs, fieldErrs...)
}

fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas)
fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas)
if len(fieldErrs) > 0 {
allErrs = append(allErrs, fieldErrs...)
}
Expand Down
1 change: 1 addition & 0 deletions api/v1alpha1/functionmesh_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type FunctionMeshStatus struct {
SourceConditions map[string]ResourceCondition `json:"sourceConditions,omitempty"`
SinkConditions map[string]ResourceCondition `json:"sinkConditions,omitempty"`
FunctionConditions map[string]ResourceCondition `json:"functionConditions,omitempty"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
13 changes: 7 additions & 6 deletions api/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type SinkSpec struct {
Tenant string `json:"tenant,omitempty"`
Namespace string `json:"namespace,omitempty"`
SinkType string `json:"sinkType,omitempty"` // refer to `--sink-type` as builtin connector
// +kubebuilder:validation:Required
// +kubebuilder:validation:Minimum=1
Replicas *int32 `json:"replicas"`

Replicas *int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Minimum=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
// For complex HPA strategies, please refer to Pod.HPAutoscaler.
Expand Down Expand Up @@ -85,9 +85,10 @@ type SinkSpec struct {
type SinkStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
18 changes: 14 additions & 4 deletions api/v1alpha1/sink_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,19 @@ var _ webhook.Defaulter = &Sink{}
func (r *Sink) Default() {
sinklog.Info("default", "name", r.Name)

if r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) {
if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = *r.Spec.MinReplicas
} else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil {
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = *r.Spec.Replicas
} else {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = 1
}
}

if r.Spec.AutoAck == nil {
Expand Down Expand Up @@ -129,7 +139,7 @@ func (r *Sink) ValidateCreate() error {
allErrs = append(allErrs, fieldErrs...)
}

fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas)
fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas)
if len(fieldErrs) > 0 {
allErrs = append(allErrs, fieldErrs...)
}
Expand Down
14 changes: 8 additions & 6 deletions api/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ type SourceSpec struct {
Namespace string `json:"namespace,omitempty"`
ClusterName string `json:"clusterName,omitempty"`
SourceType string `json:"sourceType,omitempty"` // refer to `--source-type` as builtin connector
// +kubebuilder:validation:Required
Replicas *int32 `json:"replicas"`

// +kubebuilder:validation:Minimum=1
Replicas *int32 `json:"replicas,omitempty"`
// +kubebuilder:validation:Minimum=1
MinReplicas *int32 `json:"minReplicas,omitempty"`
// MaxReplicas indicates the maximum number of replicas and enables the HorizontalPodAutoscaler
// If provided, a default HPA with CPU at average of 80% will be used.
// For complex HPA strategies, please refer to Pod.HPAutoscaler.
Expand Down Expand Up @@ -74,9 +75,10 @@ type SourceSpec struct {
type SourceStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
Conditions map[Component]ResourceCondition `json:"conditions"`
Replicas int32 `json:"replicas"`
Selector string `json:"selector"`
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +kubebuilder:object:root=true
Expand Down
18 changes: 14 additions & 4 deletions api/v1alpha1/source_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,19 @@ var _ webhook.Defaulter = &Source{}
func (r *Source) Default() {
sourcelog.Info("default", "name", r.Name)

if r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
if !(r.Spec.Replicas != nil && r.Spec.MinReplicas != nil) {
if r.Spec.MinReplicas != nil && r.Spec.Replicas == nil {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = *r.Spec.MinReplicas
} else if r.Spec.MinReplicas == nil && r.Spec.Replicas != nil {
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = *r.Spec.Replicas
} else {
r.Spec.Replicas = new(int32)
*r.Spec.Replicas = 1
r.Spec.MinReplicas = new(int32)
*r.Spec.MinReplicas = 1
}
}

if r.Spec.ProcessingGuarantee == "" {
Expand Down Expand Up @@ -139,7 +149,7 @@ func (r *Source) ValidateCreate() error {
allErrs = append(allErrs, fieldErrs...)
}

fieldErrs = validateReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MaxReplicas)
fieldErrs = validateReplicasAndMinReplicasAndMaxReplicas(r.Spec.Replicas, r.Spec.MinReplicas, r.Spec.MaxReplicas)
if len(fieldErrs) > 0 {
allErrs = append(allErrs, fieldErrs...)
}
Expand Down
27 changes: 22 additions & 5 deletions api/v1alpha1/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ func validateGolangRuntime(golang *GoRuntime) []*field.Error {
return allErrs
}

func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error {
func validateReplicasAndMinReplicasAndMaxReplicas(replicas, minReplicas, maxReplicas *int32) []*field.Error {
var allErrs field.ErrorList
// TODO: allow 0 replicas, currently hpa's min value has to be 1
if replicas == nil {
e := field.Invalid(field.NewPath("spec").Child("replicas"), nil, "replicas cannot be nil")
allErrs = append(allErrs, e)
}
//if replicas == nil {
// e := field.Invalid(field.NewPath("spec").Child("replicas"), nil, "replicas cannot be nil")
// allErrs = append(allErrs, e)
//}

if replicas != nil && *replicas <= 0 {
e := field.Invalid(field.NewPath("spec").Child("replicas"), *replicas, "replicas cannot be zero or negative")
Expand All @@ -109,6 +109,23 @@ func validateReplicasAndMaxReplicas(replicas, maxReplicas *int32) []*field.Error
"maxReplicas must be greater than or equal to replicas")
allErrs = append(allErrs, e)
}

if minReplicas != nil && *minReplicas <= 0 {
e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *replicas, "minReplicas cannot be zero or negative")
allErrs = append(allErrs, e)
}

if minReplicas != nil && replicas != nil && *minReplicas > *replicas {
e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *replicas,
"minReplicas must be less than or equal to replicas")
allErrs = append(allErrs, e)
}

if minReplicas != nil && maxReplicas != nil && *minReplicas > *maxReplicas {
e := field.Invalid(field.NewPath("spec").Child("minReplicas"), *maxReplicas,
"minReplicas must be less than or equal to maxReplicas")
allErrs = append(allErrs, e)
}
return allErrs
}

Expand Down
15 changes: 15 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -2981,6 +2985,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -5491,8 +5499,6 @@ spec:
- name
type: object
type: array
required:
- replicas
type: object
type: array
sources:
Expand Down Expand Up @@ -5581,6 +5587,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -8076,6 +8086,7 @@ spec:
type: object
replicas:
format: int32
minimum: 1
type: integer
resources:
properties:
Expand Down Expand Up @@ -8134,8 +8145,6 @@ spec:
- name
type: object
type: array
required:
- replicas
type: object
type: array
type: object
Expand All @@ -8152,6 +8161,9 @@ spec:
type: string
type: object
type: object
observedGeneration:
format: int64
type: integer
sinkConditions:
additionalProperties:
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -2847,6 +2851,9 @@ spec:
type: string
type: object
type: object
observedGeneration:
format: int64
type: integer
replicas:
format: int32
type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ spec:
maxReplicas:
format: int32
type: integer
minReplicas:
format: int32
minimum: 1
type: integer
name:
type: string
namespace:
Expand Down Expand Up @@ -2720,8 +2724,6 @@ spec:
- name
type: object
type: array
required:
- replicas
type: object
status:
properties:
Expand All @@ -2736,6 +2738,9 @@ spec:
type: string
type: object
type: object
observedGeneration:
format: int64
type: integer
replicas:
format: int32
type: integer
Expand Down
Loading

0 comments on commit 1a56afb

Please sign in to comment.