Skip to content

Commit

Permalink
WIP: ScaleUp for book-capacity ProvisioningRequestClass
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Jan 12, 2024
1 parent 1c4e6c9 commit 36cf899
Show file tree
Hide file tree
Showing 7 changed files with 660 additions and 3 deletions.
114 changes: 114 additions & 0 deletions cluster-autoscaler/core/scaleup/orchestrator/provreq/orchestrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provreq

import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/bookcapacity"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/rest"

ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type provisioningRequestClient interface {
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}

type provReqOrchestrator struct {
context *context.AutoscalingContext
client provisioningRequestClient
injector *scheduling.HintingSimulator
}

func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}

return &provReqOrchestrator{client: client}, nil
}

func (o *provReqOrchestrator) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
taintConfig taints.TaintConfig,
) {
o.context = autoscalingContext
o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker)
}

func (o *provReqOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) {
provReqs, err := o.client.ProvisioningRequests()
if err != nil {
//log error
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if bookcapacity.BookCapacity(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
//log error
}
podsToCreate = append(podsToCreate, pods...)
}
}
o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false)

unschedulablePods = bookcapacity.FilterBookCapacityClass(unschedulablePods)
provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name)
if err != nil {
//TODO return error
}
_, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
if err == nil {
bookcapacity.SetCondition(provReq, bookcapacity.BookCapacityCondition, "Capacity is found", "")
//TODO return status?
return nil, nil
}
//TODO check if error is persistent

bookcapacity.SetCondition(provReq, bookcapacity.PendingCondition, "Capacity is not found", "")
//TODO return status, error?
return nil, nil
}

func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) {
return nil, nil
}
110 changes: 110 additions & 0 deletions cluster-autoscaler/core/scaleup/orchestrator/wrapper_orchestrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator/provreq"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/rest"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
consumeProvReq = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"
)

type WrapperOrchestrator struct {
scaleUpRegularPods bool
scaleUpOrchestrator scaleup.Orchestrator
provReqOrchestrator scaleup.Orchestrator
}

func NewWrapperOrchestrator(kubeConfig *rest.Config) scaleup.Orchestrator {
provReqOrchestrator, err := provreq.New(kubeConfig)
if err != nil {
return &WrapperOrchestrator{
scaleUpOrchestrator: New(),
provReqOrchestrator: provReqOrchestrator,
}
}
// log error or return error?
return New()

}

// Initialize initializes the orchestrator object with required fields.
func (o *WrapperOrchestrator) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
taintConfig taints.TaintConfig,
) {
o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig)
o.provReqOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, taintConfig)
}

// ScaleUp run scaleUp function for regular pods of pods from ProvisioningRequest.
func (o *WrapperOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
provReqPods, regularPods := sortOut(unschedulablePods)
if len(provReqPods) == 0 {
return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
}
if len(regularPods) == 0 {
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
}
if o.scaleUpRegularPods {
o.scaleUpRegularPods = false
return o.scaleUpOrchestrator.ScaleUp(regularPods, nodes, daemonSets, nodeInfos)
}
o.scaleUpRegularPods = true
return o.provReqOrchestrator.ScaleUp(provReqPods, nodes, daemonSets, nodeInfos)
}

func sortOut(unschedulablePods []*apiv1.Pod) (provReqPods, regularPods []*apiv1.Pod) {
for _, pod := range unschedulablePods {
if _, ok := pod.Annotations[consumeProvReq]; ok {
provReqPods = append(provReqPods, pod)
} else {
regularPods = append(regularPods, pod)
}
}
return
}

// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
// than the configured min size. The source of truth for the current node group
// size is the TargetSize queried directly from cloud providers. Returns
// appropriate status or error if an unexpected error occurred.
func (o *WrapperOrchestrator) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
return o.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
}
8 changes: 8 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"
Expand Down Expand Up @@ -465,6 +466,12 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)

scaleUpOrchestrator := orchestrator.New()
if *provisioningRequestsEnabled {
kubeClient := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator = orchestrator.NewWrapperOrchestrator(kubeClient)
}

opts := core.AutoscalerOptions{
AutoscalingOptions: autoscalingOptions,
ClusterSnapshot: clustersnapshot.NewDeltaClusterSnapshot(),
Expand All @@ -474,6 +481,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
PredicateChecker: predicateChecker,
DeleteOptions: deleteOptions,
DrainabilityRules: drainabilityRules,
ScaleUpOrchestrator: scaleUpOrchestrator,
}

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
Expand Down
72 changes: 72 additions & 0 deletions cluster-autoscaler/provisioningrequest/bookcapacity/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package bookcapacity

import (
"time"

apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
)

type ProvisioningRequestCondition string

const (
BookCapacityCondition = ProvisioningRequestCondition("BookCapacity")
ExpiredCondition = ProvisioningRequestCondition("Expired")
PendingCondition = ProvisioningRequestCondition("Pending") // or maybe have (BookCapacity; False) condition instead of (Pending; True)?
RejectedCondition = ProvisioningRequestCondition("Rejected") // seems useless

BookCapacityClass = "book-capacity.kubernetes.io"
DefaultReservationTime = 10 * time.Minute
)

func BookCapacity(pr *provreqwrapper.ProvisioningRequest) bool {
if pr.V1Beta1().Spec.ProvisioningClassName != BookCapacityClass {
return false
}
if pr.Conditions() == nil {
return false
}
condition := pr.Conditions()[len(pr.Conditions())-1]
if condition.Type == string(BookCapacityCondition) && condition.Status == v1.ConditionTrue {
return true
}
return false
}

// TODO: should be done by wrappe orchestrator? Injector will inject pods from one provReq,
// so probably this is not needed. But some check should be done, in case future code change.
// filterBookCapacityClass filter out list of pods to return pods that belong to
// one ProvisioningRequest of book-capacity ProvisioningRequestClass.
func FilterBookCapacityClass(unschedulablePods []*apiv1.Pod) []*apiv1.Pod {
return unschedulablePods
}

func SetCondition(pr *provreqwrapper.ProvisioningRequest, conditionType ProvisioningRequestCondition, reason, message string) {
conditions := pr.Conditions()
conditions = append(conditions, v1.Condition{
Type: string(conditionType),
Status: v1.ConditionTrue,
// ObservedGeneration: ?,
LastTransitionTime: v1.Now(),
Reason: reason,
Message: message,
})
pr.SetConditions(conditions)
}
Loading

0 comments on commit 36cf899

Please sign in to comment.