Skip to content

Commit

Permalink
feat: add hierarchical queues for capacity plugin
Browse files Browse the repository at this point in the history
Signed-off-by: Rui-Gan <[email protected]>
  • Loading branch information
Rui-Gan committed Oct 23, 2024
1 parent 1ab6088 commit 11188ef
Show file tree
Hide file tree
Showing 29 changed files with 1,953 additions and 201 deletions.
12 changes: 12 additions & 0 deletions cmd/webhook-manager/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"volcano.sh/apis/pkg/apis/helpers"
"volcano.sh/apis/pkg/apis/scheduling/scheme"
informers "volcano.sh/apis/pkg/client/informers/externalversions"
"volcano.sh/volcano/cmd/webhook-manager/app/options"
"volcano.sh/volcano/pkg/kube"
commonutil "volcano.sh/volcano/pkg/util"
Expand Down Expand Up @@ -64,6 +65,9 @@ func Run(config *options.Config) error {

vClient := getVolcanoClient(restConfig)
kubeClient := getKubeClient(restConfig)
factory := informers.NewSharedInformerFactory(vClient, 0)
queueInformer := factory.Scheduling().V1beta1().Queues()
queueLister := queueInformer.Lister()

broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
Expand All @@ -72,6 +76,7 @@ func Run(config *options.Config) error {
if service.Config != nil {
service.Config.VolcanoClient = vClient
service.Config.KubeClient = kubeClient
service.Config.QueueLister = queueLister
service.Config.SchedulerNames = config.SchedulerNames
service.Config.Recorder = recorder
service.Config.ConfigData = admissionConf
Expand All @@ -95,6 +100,13 @@ func Run(config *options.Config) error {
stopChannel := make(chan os.Signal, 1)
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)

factory.Start(webhookServeError)
for informerType, ok := range factory.WaitForCacheSync(webhookServeError) {
if !ok {
return fmt.Errorf("failed to sync cache: %v", informerType)
}
}

server := &http.Server{
Addr: config.ListenAddress + ":" + strconv.Itoa(config.Port),
TLSConfig: configTLS(config, restConfig),
Expand Down
2 changes: 1 addition & 1 deletion installer/helm/chart/volcano/templates/admission.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ rules:
verbs: ["create", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues"]
verbs: ["get", "list"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["services"]
verbs: ["get"]
Expand Down
2 changes: 1 addition & 1 deletion installer/helm/chart/volcano/templates/controllers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ rules:
verbs: ["get", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["podgroups", "queues", "queues/status"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
verbs: ["get", "list", "watch", "create", "delete", "update", "patch"]
- apiGroups: ["flow.volcano.sh"]
resources: ["jobflows", "jobtemplates"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
Expand Down
2 changes: 1 addition & 1 deletion installer/helm/chart/volcano/templates/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues"]
verbs: ["get", "list", "watch", "create", "delete"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues/status"]
verbs: ["update"]
Expand Down
6 changes: 3 additions & 3 deletions installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ rules:
verbs: ["create", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues"]
verbs: ["get", "list"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["services"]
verbs: ["get"]
Expand Down Expand Up @@ -4319,7 +4319,7 @@ rules:
verbs: ["get", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["podgroups", "queues", "queues/status"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
verbs: ["get", "list", "watch", "create", "delete", "update", "patch"]
- apiGroups: ["flow.volcano.sh"]
resources: ["jobflows", "jobtemplates"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
Expand Down Expand Up @@ -4497,7 +4497,7 @@ rules:
verbs: ["get", "list", "watch"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues"]
verbs: ["get", "list", "watch", "create", "delete"]
verbs: ["get", "list", "watch", "create", "delete", "update"]
- apiGroups: ["scheduling.incubator.k8s.io", "scheduling.volcano.sh"]
resources: ["queues/status"]
verbs: ["update"]
Expand Down
237 changes: 212 additions & 25 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,42 @@ package queue

import (
"context"
"encoding/json"
"fmt"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"volcano.sh/apis/pkg/apis/bus/v1alpha1"
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
"volcano.sh/volcano/pkg/controllers/queue/state"
)

const (
ClosedByParentAnnotationKey = "volcano.sh/closed-by-parent"
ClosedByParentAnnotationTrueValue = "true"
ClosedByParentAnnotationFalseValue = "false"
)

func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {
klog.V(4).Infof("Begin to sync queue %s.", queue.Name)
defer klog.V(4).Infof("End sync queue %s.", queue.Name)

// add parent queue if parent not specified
queue, err := c.updateQueueParent(queue)
if err != nil {
return err
}

podGroups := c.getPodGroups(queue.Name)
queueStatus := schedulingv1beta1.QueueStatus{}

Expand Down Expand Up @@ -82,24 +100,72 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
queueStatus.Allocated = v1.ResourceList{}
}

newQueue := queue.DeepCopy()
// ignore update when status does not change
if equality.Semantic.DeepEqual(queueStatus, queue.Status) {
if !equality.Semantic.DeepEqual(queueStatus, queue.Status) {
newQueue.Status = queueStatus
var err error
newQueue, err = c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Queue %s: %v.", newQueue.Name, err)
return err
}
}

if newQueue.Name == "root" {
return nil
}

newQueue := queue.DeepCopy()
newQueue.Status = queueStatus
if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Failed to update status of Queue %s: %v.", newQueue.Name, err)
parentQueue, err := c.queueLister.Get(newQueue.Spec.Parent)
if err != nil {
klog.Errorf("Failed to get parent queue of Queue %s: %v.", newQueue.Name, err)
return err
}

switch parentQueue.Status.State {
case schedulingv1beta1.QueueStateClosed, schedulingv1beta1.QueueStateClosing:
if newQueue.Status.State != schedulingv1beta1.QueueStateClosed && newQueue.Status.State != schedulingv1beta1.QueueStateClosing {
_, err = c.updateQueueAnnotation(newQueue, ClosedByParentAnnotationKey, ClosedByParentAnnotationTrueValue)
if err != nil {
klog.Errorf("Failed to patch annotation of Queue %s: %v.", newQueue.Name, err)
return err
}

req := &apis.Request{
QueueName: newQueue.Name,
Action: busv1alpha1.CloseQueueAction,
}

c.enqueue(req)
klog.V(3).Infof("Closing queue %s because its parent queue %s is closing or closed.", newQueue.Name, parentQueue.Name)
}
case schedulingv1beta1.QueueStateOpen:
if newQueue.Status.State == schedulingv1beta1.QueueStateClosed || newQueue.Status.State == schedulingv1beta1.QueueStateClosing {
if newQueue.Annotations[ClosedByParentAnnotationKey] == ClosedByParentAnnotationTrueValue {
req := &apis.Request{
QueueName: newQueue.Name,
Action: busv1alpha1.OpenQueueAction,
}

c.enqueue(req)
klog.V(3).Infof("Opening queue %s because its parent queue %s is opened.", newQueue.Name, parentQueue.Name)
}
}
}

return nil
}

func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {
klog.V(4).Infof("Begin to open queue %s.", queue.Name)

if queue.Status.State != schedulingv1beta1.QueueStateOpen {
continued, err := c.openHierarchicalQueue(queue)
if !continued {
return err
}
}

newQueue := queue.DeepCopy()
newQueue.Status.State = schedulingv1beta1.QueueStateOpen

Expand All @@ -111,37 +177,43 @@ func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateF
}

c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.OpenQueueAction), "Open queue succeed")
} else {
return nil
}

q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{})
if err != nil {
return err
}
q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{})
if err != nil {
return err
}

newQueue = q.DeepCopy()
if updateStateFn != nil {
updateStateFn(&newQueue.Status, nil)
} else {
return fmt.Errorf("internal error, update state function should be provided")
}
newQueue = q.DeepCopy()
if updateStateFn != nil {
updateStateFn(&newQueue.Status, nil)
} else {
return fmt.Errorf("internal error, update state function should be provided")
}

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction),
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
return err
if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction),
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
return err
}
}
}

return nil
_, err := c.updateQueueAnnotation(queue, ClosedByParentAnnotationKey, ClosedByParentAnnotationFalseValue)
return err
}

func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateStateFn state.UpdateQueueStatusFn) error {
klog.V(4).Infof("Begin to close queue %s.", queue.Name)

if queue.Status.State != schedulingv1beta1.QueueStateClosed && queue.Status.State != schedulingv1beta1.QueueStateClosing {
continued, err := c.closeHierarchicalQueue(queue)
if !continued {
return err
}
}

newQueue := queue.DeepCopy()
newQueue.Status.State = schedulingv1beta1.QueueStateClosed

Expand Down Expand Up @@ -181,3 +253,118 @@ func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateState

return nil
}

func (c *queuecontroller) openHierarchicalQueue(queue *schedulingv1beta1.Queue) (bool, error) {
if queue.Spec.Parent != "" && queue.Spec.Parent != "root" {
parentQueue, err := c.queueLister.Get(queue.Spec.Parent)
if err != nil {
return false, fmt.Errorf("Failed to get parent queue %s of queue %s: %v", queue.Spec.Parent, queue.Name, err)
}
if parentQueue.Status.State == schedulingv1beta1.QueueStateClosing || parentQueue.Status.State == schedulingv1beta1.QueueStateClosed {
klog.Errorf("Failed to open queue %s because its parent queue %s is closing or closed. Open the parent queue first.", queue.Name, queue.Spec.Parent)
return false, nil
}
}

queueList, err := c.queueLister.List(labels.Everything())
if err != nil {
return false, err
}

for _, childQueue := range queueList {
if childQueue.Spec.Parent == queue.Name && len(childQueue.Annotations) > 0 && childQueue.Annotations[ClosedByParentAnnotationKey] == ClosedByParentAnnotationTrueValue {
req := &apis.Request{
QueueName: childQueue.Name,
Action: busv1alpha1.OpenQueueAction,
}

c.enqueue(req)
klog.Infof("Opening queue %s because its parent queue %s is opened", childQueue.Name, queue.Name)
}
}
return true, nil
}

func (c *queuecontroller) closeHierarchicalQueue(queue *schedulingv1beta1.Queue) (bool, error) {
if queue.Name == "root" {
klog.Errorf("Root queue cannot be closed")
return false, nil
}

queueList, err := c.queueLister.List(labels.Everything())
if err != nil {
return false, err
}

for _, childQueue := range queueList {
if childQueue.Spec.Parent != queue.Name {
continue
}
if childQueue.Status.State != schedulingv1beta1.QueueStateClosed && childQueue.Status.State != schedulingv1beta1.QueueStateClosing {
_, err = c.updateQueueAnnotation(childQueue, ClosedByParentAnnotationKey, ClosedByParentAnnotationTrueValue)
if err != nil {
return false, fmt.Errorf("Failed to update annotations of queue %s: %v", childQueue.Name, err)
}
req := &apis.Request{
QueueName: childQueue.Name,
Action: busv1alpha1.CloseQueueAction,
}

c.enqueue(req)
klog.V(3).Infof("Closing child queue %s because its parent queue %s is closing or closed.", childQueue.Name, queue.Name)
}
}

return true, nil
}

func (c *queuecontroller) updateQueueParent(queue *schedulingv1beta1.Queue) (*schedulingv1beta1.Queue, error) {
if queue.Name == "root" || len(queue.Spec.Parent) > 0 {
return queue, nil
}

var patch = []patchOperation{
{
Op: "add",
Path: "/spec/parent",
Value: "root",
},
}

patchBytes, err := json.Marshal(patch)
if err != nil {
return nil, err
}

return c.vcClient.SchedulingV1beta1().Queues().Patch(context.TODO(), queue.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{})
}

func (c *queuecontroller) updateQueueAnnotation(queue *schedulingv1beta1.Queue, key string, value string) (*schedulingv1beta1.Queue, error) {
if len(queue.Annotations) > 0 && queue.Annotations[key] == value {
return queue, nil
}

var patch []patchOperation
if len(queue.Annotations) == 0 {
patch = append(patch, patchOperation{
Op: "replace",
Path: "/metadata/annotations",
Value: map[string]string{
key: value,
},
})
} else {
patch = append(patch, patchOperation{
Op: "replace",
Path: fmt.Sprintf("/metadata/annotations/%s", strings.ReplaceAll(key, "/", "~1")),
Value: value,
})
}

patchBytes, err := json.Marshal(patch)
if err != nil {
return nil, err
}

return c.vcClient.SchedulingV1beta1().Queues().Patch(context.TODO(), queue.Name, types.JSONPatchType, patchBytes, metav1.PatchOptions{})
}
Loading

0 comments on commit 11188ef

Please sign in to comment.