Skip to content

Commit

Permalink
Introduce ProvisioningRequest processors
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Dec 19, 2023
1 parent 8f75e9c commit b821e53
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 118 deletions.
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ type AutoscalingOptions struct {
DynamicNodeDeleteDelayAfterTaintEnabled bool
// BypassedSchedulers are used to specify which schedulers to bypass their processing
BypassedSchedulers map[string]bool
// ProvisioningRequestEnabled tells if CA process ProvisioningRequest.
ProvisioningRequestEnabled bool
}

// KubeClientOptions specify options for kube client
Expand Down
30 changes: 3 additions & 27 deletions cluster-autoscaler/core/podlistprocessor/pod_list_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@ limitations under the License.
package podlistprocessor

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
)

type defaultPodListProcessor struct {
processors []pods.PodListProcessor
}

// NewDefaultPodListProcessor returns a default implementation of the pod list
// processor, which wraps and sequentially runs other sub-processors.
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *defaultPodListProcessor {
return &defaultPodListProcessor{
processors: []pods.PodListProcessor{
func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateChecker) *pods.ListedPodListProcessor {
return &pods.ListedPodListProcessor{
Processors: []pods.PodListProcessor{
NewClearTPURequestsPodListProcessor(),
NewFilterOutExpendablePodListProcessor(),
NewCurrentlyDrainedNodesPodListProcessor(),
Expand All @@ -40,21 +34,3 @@ func NewDefaultPodListProcessor(predicateChecker predicatechecker.PredicateCheck
},
}
}

// Process runs sub-processors sequentially
func (p *defaultPodListProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
var err error
for _, processor := range p.processors {
unschedulablePods, err = processor.Process(ctx, unschedulablePods)
if err != nil {
return nil, err
}
}
return unschedulablePods, nil
}

func (p *defaultPodListProcessor) CleanUp() {
for _, processor := range p.processors {
processor.CleanUp()
}
}
23 changes: 22 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ import (
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
Expand Down Expand Up @@ -241,6 +244,7 @@ var (
forceDaemonSets = flag.Bool("force-ds", false, "Blocks scale-up of node groups too small for all suitable Daemon Sets pods.")
dynamicNodeDeleteDelayAfterTaintEnabled = flag.Bool("dynamic-node-delete-delay-after-taint-enabled", false, "Enables dynamic adjustment of NodeDeleteDelayAfterTaint based of the latency between CA and api-server")
bypassedSchedulers = pflag.StringSlice("bypassed-scheduler-names", []string{}, fmt.Sprintf("Names of schedulers to bypass. If set to non-empty value, CA will not wait for pods to reach a certain age before triggering a scale-up."))
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -390,6 +394,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
},
DynamicNodeDeleteDelayAfterTaintEnabled: *dynamicNodeDeleteDelayAfterTaintEnabled,
BypassedSchedulers: scheduler_util.GetBypassedSchedulersMap(*bypassedSchedulers),
ProvisioningRequestEnabled: *provisioningRequestsEnabled,
}
}

Expand Down Expand Up @@ -443,7 +448,14 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter

opts.Processors = ca_processors.DefaultProcessors(autoscalingOptions)
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
opts.Processors.PodListProcessor = podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker)
if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor, err = initProvisioningRequestProcessors(autoscalingOptions.KubeClientOpts, podListProcessor)
if err != nil {
klog.Errorf("Failed to create Provisioning Request processor, error: %v", err)
}
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
if autoscalingOptions.ParallelDrain {
sdCandidatesSorting := previouscandidates.NewPreviousCandidates()
Expand Down Expand Up @@ -731,3 +743,12 @@ func parseSingleGpuLimit(limits string) (config.GpuLimits, error) {
}
return parsedGpuLimits, nil
}

func initProvisioningRequestProcessors(kubeConfig config.KubeClientOptions, processors *pods.ListedPodListProcessor) (*pods.ListedPodListProcessor, error) {
config := kube_util.GetKubeConfig(kubeConfig)
prClient, err := provreqclient.NewProvisioningRequestClient(config)
if err != nil {
return nil, fmt.Errorf("failed to create Provisioning Request client, err: %v", err)
}
return provreq.AddProvisioningRequestProcessors(processors, provreq.NewDefautlEventManager(prClient)), nil
}
24 changes: 24 additions & 0 deletions cluster-autoscaler/processors/pods/pod_list_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,27 @@ func (p *NoOpPodListProcessor) Process(
// CleanUp cleans up the processor's internal structures.
func (p *NoOpPodListProcessor) CleanUp() {
}

// ListedPodListProcessor is a list of PodListProcessors
type ListedPodListProcessor struct {
Processors []PodListProcessor
}

// Process runs sub-processors sequentially
func (p *ListedPodListProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
var err error
for _, processor := range p.Processors {
unschedulablePods, err = processor.Process(ctx, unschedulablePods)
if err != nil {
return nil, err
}
}
return unschedulablePods, nil
}

// CleanUp cleans up the processor's internal structures.
func (p *ListedPodListProcessor) CleanUp() {
for _, processor := range p.Processors {
processor.CleanUp()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package provreq

import (
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/utils/klogx"
"k8s.io/klog/v2"
)

const provisioningRequestPodAnnotationKey = "cluster-autoscaler.kubernetes.io/consume-provisioning-request"

// EventManager is an interface for handling events for provisioning request.
type EventManager interface {
LogIgnoredInScaleUpEvent(context *context.AutoscalingContext, now time.Time, pod *apiv1.Pod, prName string)
Reset()
}

type defaultEventManager struct {
prClient provreqclient.ProvisioningRequestClient
}

// NewDefautlEventManager return basic event manager.
func NewDefautlEventManager(pr provreqclient.ProvisioningRequestClient) *defaultEventManager {
return &defaultEventManager{}
}

// LogIgnoredInScaleUpEvent adds event about ignored scale up for unscheduled pod, that consumes Provisioning Request.
func (e *defaultEventManager) LogIgnoredInScaleUpEvent(context *context.AutoscalingContext, now time.Time, pod *apiv1.Pod, prName string) {
pr, err := e.prClient.ProvisioningRequest(pod.Namespace, prName)
if err != nil {
if !errors.IsNotFound(err) {
klog.Warningf("While fetching Provisioning Request %s/%s got unrecognized error: %v", pod.Namespace, prName, err)
return
}
}
message := fmt.Sprintf("Unschedulable pod ignored in scale-up loop, because it's consuming ProvisioningRequest %s/%s", pr.Namespace(), pr.Name())
context.Recorder.Event(pod, apiv1.EventTypeWarning, "", message)
}

// Reset resets event manager internal structure.
func (e *defaultEventManager) Reset() {}

// ProvisioningRequestPodsFilter filter out pods that consumes Provisioning Request
type ProvisioningRequestPodsFilter struct {
e EventManager
}

// Process filters out all pods that are consuming a Provisioning Request from unschedulable pods list.
func (p *ProvisioningRequestPodsFilter) Process(
context *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
now := time.Now()
p.e.Reset()
loggingQuota := klogx.PodsLoggingQuota()
result := make([]*apiv1.Pod, 0, len(unschedulablePods))
for _, pod := range unschedulablePods {
prName, found := provisioningRequestName(pod)
if !found {
result = append(result, pod)
continue
}
klogx.V(1).UpTo(loggingQuota).Infof("Ignoring unschedulable pod %s/%s as it consumes ProvisioningRequest: %s/%s", pod.Namespace, pod.Name, pod.Namespace, prName)
p.e.LogIgnoredInScaleUpEvent(context, now, pod, prName)
}
klogx.V(1).Over(loggingQuota).Infof("There are also %v other pods which were ignored", -loggingQuota.Left())
return result, nil
}

// CleanUp cleans up the processor's internal structures.
func (p *ProvisioningRequestPodsFilter) CleanUp() {}

// ProvisioningRequestPodsInjector is a processor that inject in-memory pods from ProvisioningRequest.
type ProvisioningRequestPodsInjector struct{}

// Process inject in-memory pods that consumes a Provisioning Request to unschedulable pods list.
// TODO(yaroslava): implement
func (p *ProvisioningRequestPodsInjector) Process(
context *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
return unschedulablePods, nil
}

// CleanUp cleans up the processor's internal structures.
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// AddProvisioningRequestProcessors appends Provisioning Request processor to the list of pods list processors
func AddProvisioningRequestProcessors(p *pods.ListedPodListProcessor, e EventManager) *pods.ListedPodListProcessor {
p.Processors = append(p.Processors, &ProvisioningRequestPodsFilter{e}, &ProvisioningRequestPodsInjector{})
return p
}

func provisioningRequestName(pod *v1.Pod) (string, bool) {
if pod == nil || pod.Annotations == nil {
return "", false
}
provReqName, found := pod.Annotations[provisioningRequestPodAnnotationKey]
return provReqName, found
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1client
package provreqclient

import (
"fmt"
Expand All @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/client/clientset/versioned"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/client/informers/externalversions"
listers "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/client/listers/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/listers/core/v1"
Expand All @@ -40,15 +41,22 @@ const (
provisioningRequestClientCallTimeout = 4 * time.Second
)

// ProvisioningRequestClient represents client for v1beta1 ProvReq CRD.
type ProvisioningRequestClient struct {
// ProvisioningRequestClient represents the service that is able to list,
// access and delete different Provisioning Requests.
type ProvisioningRequestClient interface {
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
}

// ProvisioningRequestClientV1beta1 represents client for v1beta1 ProvReq CRD.
type ProvisioningRequestClientV1beta1 struct {
client versioned.Interface
provReqLister listers.ProvisioningRequestLister
podTemplLister v1.PodTemplateLister
}

// NewProvisioningRequestClient configures and returns a provisioningRequestClient.
func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClient, error) {
func NewProvisioningRequestClient(kubeConfig *rest.Config) (ProvisioningRequestClient, error) {
prClient, err := newPRClient(kubeConfig)
if err != nil {
return nil, fmt.Errorf("Failed to create Provisioning Request client: %v", err)
Expand All @@ -69,29 +77,45 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequest
return nil, err
}

return &ProvisioningRequestClient{
return &ProvisioningRequestClientV1beta1{
client: prClient,
provReqLister: provReqLister,
podTemplLister: podTemplLister,
}, nil
}

// ProvisioningRequest gets a specific ProvisioningRequest CR.
func (c *ProvisioningRequestClient) ProvisioningRequest(namespace, name string) (*v1beta1.ProvisioningRequest, error) {
return c.provReqLister.ProvisioningRequests(namespace).Get(name)
func (c *ProvisioningRequestClientV1beta1) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) {
v1Beta1PR, err := c.provReqLister.ProvisioningRequests(namespace).Get(name)
if err == nil {
podTemplates, errPodTemplates := c.FetchPodTemplates(v1Beta1PR)
if errPodTemplates != nil {
return nil, fmt.Errorf("while fetching pod templates for Get Provisioning Request %s/%s got error: %v", namespace, name, errPodTemplates)
}
return provreqwrapper.NewV1Beta1ProvisioningRequest(v1Beta1PR, podTemplates), nil
}
return nil, err
}

// ProvisioningRequests gets all ProvisioningRequest CRs.
func (c *ProvisioningRequestClient) ProvisioningRequests() ([]*v1beta1.ProvisioningRequest, error) {
provisioningRequests, err := c.provReqLister.List(labels.Everything())
func (c *ProvisioningRequestClientV1beta1) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) {
v1Beta1PRs, err := c.provReqLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("error fetching provisioningRequests: %w", err)
}
return provisioningRequests, nil
prs := make([]*provreqwrapper.ProvisioningRequest, 0, len(v1Beta1PRs))
for _, v1Beta1PR := range v1Beta1PRs {
podTemplates, errPodTemplates := c.FetchPodTemplates(v1Beta1PR)
if errPodTemplates != nil {
return nil, fmt.Errorf("while fetching pod templates for List Provisioning Request %s/%s got error: %v", v1Beta1PR.Namespace, v1Beta1PR.Name, errPodTemplates)
}
prs = append(prs, provreqwrapper.NewV1Beta1ProvisioningRequest(v1Beta1PR, podTemplates))
}
return prs, nil
}

// FetchPodTemplates fetches PodTemplates referenced by the Provisioning Request.
func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) {
func (c *ProvisioningRequestClientV1beta1) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) {
podTemplates := make([]*apiv1.PodTemplate, 0, len(pr.Spec.PodSets))
for _, podSpec := range pr.Spec.PodSets {
podTemplate, err := c.podTemplLister.PodTemplates(pr.Namespace).Get(podSpec.PodTemplateRef.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1client
package provreqclient

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package v1beta1client
package provreqclient

import (
"context"
Expand All @@ -35,7 +35,7 @@ import (
)

// NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests.
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) (*ProvisioningRequestClient, *FakeProvisioningRequestForceClient) {
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) (*ProvisioningRequestClientV1beta1, *FakeProvisioningRequestForceClient) {
t.Helper()
provReqClient := fake.NewSimpleClientset()
podTemplClient := fake_kubernetes.NewSimpleClientset()
Expand All @@ -60,7 +60,7 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...
if err != nil {
t.Fatalf("Failed to create Provisioning Request lister. Error was: %v", err)
}
return &ProvisioningRequestClient{
return &ProvisioningRequestClientV1beta1{
client: provReqClient,
provReqLister: provReqLister,
podTemplLister: podTemplLister,
Expand Down
Loading

0 comments on commit b821e53

Please sign in to comment.