Skip to content

Commit

Permalink
Add ProvisioningRequest injector (kubernetes#6529)
Browse files Browse the repository at this point in the history
* Add ProvisioningRequests injector

* Add test case for Accepted conditions and add supported provreq classes list

* Use Passive clock
  • Loading branch information
yaroslava-serdiuk authored Feb 28, 2024
1 parent 5232e53 commit dffff4f
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 47 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module k8s.io/autoscaler/cluster-autoscaler

go 1.21
go 1.21.3

require (
cloud.google.com/go/compute/metadata v0.2.3
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
}
podListProcessor.AddProcessor(injector)
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
101 changes: 101 additions & 0 deletions cluster-autoscaler/processors/provreq/provisioning_request_injector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"time"

apiv1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

const (
defaultRetryTime = 10 * time.Minute
)

// SupportedProvisioningClasses is a list of supported ProvisioningClasses in ClusterAutoscaler.
var SupportedProvisioningClasses = []string{v1beta1.ProvisioningClassCheckCapacity}

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client provisioningRequestClient
clock clock.PassiveClock
}

// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list.
func (p *ProvisioningRequestPodsInjector) Process(
_ *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod,
) ([]*apiv1.Pod, error) {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return nil, err
}
for _, pr := range provReqs {
conditions := pr.Conditions()
if apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Provisioned) {
continue
}

provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned)

//TODO(yaroslava): support exponential backoff
// Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime
inject := true
if provisioned != nil {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) {
inject = true
} else {
inject = false
}
}
if inject {
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name())
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Failed, metav1.ConditionTrue, provreqconditions.FailedToCreatePodsReason, err.Error(), metav1.NewTime(p.clock.Now()))
continue
}
unschedulablePods := append(unschedulablePods, provreqpods...)
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.clock.Now()))
return unschedulablePods, nil
}
}
return unschedulablePods, nil
}

// CleanUp cleans up the processor's internal structures.
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"context"
"testing"
"time"

v1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
clock "k8s.io/utils/clock/testing"
)

func TestProvisioningRequestPodsInjector(t *testing.T) {
now := time.Now()
minAgo := now.Add(-1 * time.Minute)
hourAgo := now.Add(-1 * time.Hour)

accepted := metav1.Condition{
Type: v1beta1.Accepted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(minAgo),
}
failed := metav1.Condition{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(hourAgo),
}
provisioned := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(hourAgo),
}
notProvisioned := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(hourAgo),
}
unknownProvisioned := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionUnknown,
LastTransitionTime: metav1.NewTime(hourAgo),
}
notProvisionedRecently := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(minAgo),
}

podsA := 10
newProvReqA := testProvisioningRequestWithCondition("new", podsA)
newAcceptedProvReqA := testProvisioningRequestWithCondition("new-accepted", podsA, accepted)

podsB := 20
notProvisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-false-B", podsB, notProvisioned, accepted)
provisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-and-accepted", podsB, provisioned, accepted)
failedProvReq := testProvisioningRequestWithCondition("failed", podsA, failed)
notProvisionedRecentlyProvReqB := testProvisioningRequestWithCondition("provisioned-false-recently-B", podsB, notProvisionedRecently)
unknownProvisionedProvReqB := testProvisioningRequestWithCondition("provisioned-unknown-B", podsB, unknownProvisioned)

testCases := []struct {
name string
provReqs []*provreqwrapper.ProvisioningRequest
wantUnscheduledPodCount int
wantUpdatedConditionName string
}{
{
name: "New ProvisioningRequest, pods are injected and Accepted condition is added",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newProvReqA.Name(),
},
{
name: "New ProvisioningRequest, pods are injected and Accepted condition is updated",
provReqs: []*provreqwrapper.ProvisioningRequest{newAcceptedProvReqA, provisionedAcceptedProvReqB},
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newAcceptedProvReqA.Name(),
},
{
name: "Provisioned=False, pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedAcceptedProvReqB, failedProvReq},
wantUnscheduledPodCount: podsB,
wantUpdatedConditionName: notProvisionedAcceptedProvReqB.Name(),
},
{
name: "Provisioned=True, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
},
{
name: "Provisioned=Unknown, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{unknownProvisionedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
},
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)}
getUnscheduledPods, err := injector.Process(nil, []*v1.Pod{})
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
}
if len(getUnscheduledPods) != tc.wantUnscheduledPodCount {
t.Errorf("%s failed: injector.Process return %d unscheduled pods, want %d", tc.name, len(getUnscheduledPods), tc.wantUnscheduledPodCount)
}
if tc.wantUpdatedConditionName == "" {
continue
}
pr, _ := client.ProvisioningRequest("ns", tc.wantUpdatedConditionName)
accepted := apimeta.FindStatusCondition(pr.Conditions(), v1beta1.Accepted)
if accepted == nil || accepted.LastTransitionTime != metav1.NewTime(now) {
t.Errorf("%s: injector.Process hasn't update accepted condition for ProvisioningRequest %s", tc.name, tc.wantUpdatedConditionName)
}
}

}

func testProvisioningRequestWithCondition(name string, podCount int, conditions ...metav1.Condition) *provreqwrapper.ProvisioningRequest {
pr := provreqwrapper.BuildTestProvisioningRequest("ns", name, "10", "100", "", int32(podCount), false, time.Now(), "ProvisioningClass")
pr.V1Beta1().Status.Conditions = conditions
return pr
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,14 @@ type Detail string
// The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const (
// Accepted indicates that the ProvisioningRequest was accepted by ClusterAutoscaler,
// so ClusterAutoscaler will attempt to provision the nodes for it.
Accepted string = "Accepted"
// BookingExpired indicates that the ProvisioningRequest had Provisioned condition before
// and capacity reservation time is expired.
BookingExpired string = "BookingExpired"
// CapacityRevoked indicates that requested resources are not longer valid.
CapacityRevoked string = "CapacityRevoked"
// Provisioned indicates that all of the requested resources were created
// and are available in the cluster. CA will set this condition when the
// VM creation finishes successfully.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
Expand Down Expand Up @@ -121,13 +122,13 @@ func (o *provReqOrchestrator) bookCapacity() error {
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if shouldCapacityBeBooked(provReq) {
if conditions.ShouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
continue
}
podsToCreate = append(podsToCreate, pods...)
Expand All @@ -151,10 +152,10 @@ func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, err
}
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
if len(st) < len(unschedulablePods) || err != nil {
setCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
return false, err
}
setCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.", metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now())
return true, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)

Expand Down Expand Up @@ -75,14 +76,14 @@ func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.Provisioning
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, CapacityReservationTimeExpiredReason, CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
conditions.AddOrUpdateCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, conditions.CapacityReservationTimeExpiredReason, conditions.CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
updated++
}
for _, provReq := range failedProvReq {
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now()))
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.ExpiredReason, conditions.ExpiredMsg, metav1.NewTime(p.now()))
updated++
}
}
Expand Down
Loading

0 comments on commit dffff4f

Please sign in to comment.