Skip to content

Commit

Permalink
Add ProvReq injector
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Feb 15, 2024
1 parent ea387cd commit 38a9056
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 47 deletions.
7 changes: 6 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter

if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig)
if err != nil {
Expand All @@ -501,6 +501,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
}
podListProcessor.AddProcessor(injector)
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"time"

apiv1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

const (
defaultRetryTime = 10 * time.Minute
)

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client provisioningRequestClient
now func() time.Time
}

// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list.
func (p *ProvisioningRequestPodsInjector) Process(
_ *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod,
) ([]*apiv1.Pod, error) {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return nil, err
}
for _, pr := range provReqs {
conditions := pr.Conditions()
if apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Provisioned) {
continue
}

provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned)
accepted := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned)

//TODO(yaroslava): support exponential backoff
// Inject pods if ProvReq is new or it has Provisioned == False condition more than defaultRetryTime
if accepted == nil || provisioned != nil &&
provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.now()) {
provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name())
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Failed, metav1.ConditionTrue, provreqconditions.FailedToCreatePodsReason, err.Error(), metav1.NewTime(p.now()))
continue
}
unschedulablePods := append(unschedulablePods, provreqpods...)
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.now()))
return unschedulablePods, nil
}
}
return unschedulablePods, nil
}

// CleanUp cleans up the processor's internal structures.
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListProcessor, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &ProvisioningRequestPodsInjector{client: client, now: time.Now}, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
"context"
"testing"
"time"

v1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)

func TestProvisioningRequestPodsInjector(t *testing.T) {
now := time.Now()
minAgo := now.Add(-1 * time.Minute)
hourAgo := now.Add(-1 * time.Hour)

accepted := metav1.Condition{
Type: v1beta1.Accepted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(minAgo),
}
failed := metav1.Condition{
Type: v1beta1.Failed,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(hourAgo),
}
provisioned := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(hourAgo),
}
notProvisioned := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(hourAgo),
}
notProvisionedRecently := metav1.Condition{
Type: v1beta1.Provisioned,
Status: metav1.ConditionFalse,
LastTransitionTime: metav1.NewTime(minAgo),
}

podsA := 10
newProvReqA := testProvisioningRequestWithCondition("new", podsA)
newAcceptedProvReqA := testProvisioningRequestWithCondition("new", podsA, accepted)

podsB := 20
notProvisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-false-B", podsB, notProvisioned, accepted)
provisionedAcceptedProvReqB := testProvisioningRequestWithCondition("provisioned-and-accepted", podsB, provisioned, accepted)
failedProvReq := testProvisioningRequestWithCondition("failed", podsA, failed)
notProvisionedRecentlyProvReqB := testProvisioningRequestWithCondition("provisioned-false-recently-B", podsB, notProvisionedRecently)

testCases := []struct {
name string
provReqs []*provreqwrapper.ProvisioningRequest
wantUnscheduledPodCount int
wantUpdatedConditionName string
}{
{
name: "New ProvisioningRequest, pods are injected and Accepted condition is added",
provReqs: []*provreqwrapper.ProvisioningRequest{newProvReqA, provisionedAcceptedProvReqB},
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newProvReqA.Name(),
},
{
name: "New ProvisioningRequest, pods are injected and Accepted condition is updated",
provReqs: []*provreqwrapper.ProvisioningRequest{newAcceptedProvReqA, provisionedAcceptedProvReqB},
wantUnscheduledPodCount: podsA,
wantUpdatedConditionName: newAcceptedProvReqA.Name(),
},
{
name: "Provisioned=False, pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{notProvisionedAcceptedProvReqB, failedProvReq},
wantUnscheduledPodCount: podsB,
wantUpdatedConditionName: notProvisionedAcceptedProvReqB.Name(),
},
{
name: "Provisioned=True, no pods are injected",
provReqs: []*provreqwrapper.ProvisioningRequest{provisionedAcceptedProvReqB, failedProvReq, notProvisionedRecentlyProvReqB},
},
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, func() time.Time { return now }}
getUnscheduledPods, err := injector.Process(nil, []*v1.Pod{})
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
}
if len(getUnscheduledPods) != tc.wantUnscheduledPodCount {
t.Errorf("%s failed: injector.Process return %d unscheduled pods, want %d", tc.name, len(getUnscheduledPods), tc.wantUnscheduledPodCount)
}
if tc.wantUpdatedConditionName == "" {
continue
}
pr, _ := client.ProvisioningRequest("ns", tc.wantUpdatedConditionName)
accepted := apimeta.FindStatusCondition(pr.Conditions(), v1beta1.Accepted)
if accepted == nil || accepted.LastTransitionTime != metav1.NewTime(now) {
t.Errorf("%s: injector.Process hasn't update accepted condition for ProvisioningRequest %s", tc.name, tc.wantUpdatedConditionName)
}
}

}

func testProvisioningRequestWithCondition(name string, podCount int, conditions ...metav1.Condition) *provreqwrapper.ProvisioningRequest {
pr := provreqwrapper.BuildTestProvisioningRequest("ns", name, "10", "100", "", int32(podCount), false, time.Now(), "ProvisioningClass")
pr.V1Beta1().Status.Conditions = conditions
return pr
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type Detail string
// The following constants list all currently available Conditions Type values.
// See: https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Condition
const (
// Accepted indicates that the ProvisioningRequest was accepted by ClusterAutoscaler,
// so ClusterAutoscaler will attempt to provision the nodes for it.
Accepted string = "Accepted"
// BookingExpired indicates that the ProvisioningRequest had Provisioned condition before
// and capacity reservation time is expired.
BookingExpired string = "BookingExpired"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
Expand Down Expand Up @@ -121,13 +122,13 @@ func (o *provReqOrchestrator) bookCapacity() error {
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if shouldCapacityBeBooked(provReq) {
if conditions.ShouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
continue
}
podsToCreate = append(podsToCreate, pods...)
Expand All @@ -151,10 +152,10 @@ func (o *provReqOrchestrator) scaleUp(unschedulablePods []*apiv1.Pod) (bool, err
}
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
if len(st) < len(unschedulablePods) || err != nil {
setCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
return false, err
}
setCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, CapacityIsFoundReason, "Capacity is found in the cluster.", metav1.Now())
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, "Capacity is found in the cluster.", metav1.Now())
return true, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/apis/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)

Expand Down Expand Up @@ -75,14 +76,14 @@ func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.Provisioning
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, CapacityReservationTimeExpiredReason, CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
conditions.AddOrUpdateCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, conditions.CapacityReservationTimeExpiredReason, conditions.CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
updated++
}
for _, provReq := range failedProvReq {
if updated >= p.maxUpdated {
break
}
setCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, ExpiredReason, ExpiredMsg, metav1.NewTime(p.now()))
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.ExpiredReason, conditions.ExpiredMsg, metav1.NewTime(p.now()))
updated++
}
}
Expand Down
Loading

0 comments on commit 38a9056

Please sign in to comment.