From 247cc87c08462ceb2ee463f9403117bb0ea34838 Mon Sep 17 00:00:00 2001 From: vadasambar Date: Fri, 31 Jan 2025 20:00:35 +0530 Subject: [PATCH 1/4] feat: add support for pod suspend --- controllers/namespace_controller.go | 102 +++--------------------- controllers/pod_admission_controller.go | 12 +-- controllers/pod_controller.go | 64 ++++++++++++++- controllers/suite_test.go | 6 +- main.go | 11 ++- pkg/common/common.go | 93 +++++++++++++++++++++ 6 files changed, 182 insertions(+), 106 deletions(-) create mode 100644 pkg/common/common.go diff --git a/controllers/namespace_controller.go b/controllers/namespace_controller.go index ca9299c..476e67d 100644 --- a/controllers/namespace_controller.go +++ b/controllers/namespace_controller.go @@ -18,16 +18,15 @@ package controllers import ( "context" - "fmt" "github.com/doodlescheduling/k8s-pause/api/v1beta1" + "github.com/doodlescheduling/k8s-pause/pkg/common" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,11 +38,6 @@ import ( //+kubebuilder:rbac:groups=core,resources=namespaces/status,verbs=get;update;patch //+kubebuilder:rbac:groups=core,resources=namespaces/finalizers,verbs=update -const ( - previousSchedulerName = "k8s-pause/previousScheduler" - ignoreAnnotation = "k8s-pause/ignore" -) - // NamespaceReconciler reconciles a Namespace object type NamespaceReconciler struct { Client client.WithWatch @@ -84,14 +78,14 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } var suspend bool - if suspended, ok := ns.Annotations[suspendedAnnotation]; ok { + if suspended, ok := ns.Annotations[SuspendedAnnotation]; ok { if suspended == "true" { suspend = true } } var profile *v1beta1.ResumeProfile - if p, ok := ns.Annotations[profileAnnotation]; ok { + if p, ok := ns.Annotations[ProfileAnnotation]; ok { profile = &v1beta1.ResumeProfile{} err := r.Client.Get(ctx, client.ObjectKey{ Name: p, @@ -146,7 +140,7 @@ func (r *NamespaceReconciler) resume(ctx context.Context, ns corev1.Namespace, p } for _, pod := range list.Items { - if ignore, ok := pod.Annotations[ignoreAnnotation]; ok && ignore == "true" { + if ignore, ok := pod.Annotations[common.IgnoreAnnotation]; ok && ignore == "true" { continue } @@ -156,7 +150,7 @@ func (r *NamespaceReconciler) resume(ctx context.Context, ns corev1.Namespace, p } } - if pod.Status.Phase == phaseSuspended && pod.Spec.SchedulerName == schedulerName { + if pod.Status.Phase == phaseSuspended && pod.Spec.SchedulerName == SchedulerName { if len(pod.ObjectMeta.OwnerReferences) > 0 { err := r.Client.Delete(ctx, &pod) if err != nil { @@ -174,14 +168,14 @@ func (r *NamespaceReconciler) resume(ctx context.Context, ns corev1.Namespace, p // Reset status, not needed as its ignored but nice clone.Status = corev1.PodStatus{} - if scheduler, ok := clone.Annotations[previousSchedulerName]; ok { + if scheduler, ok := clone.Annotations[common.PreviousSchedulerName]; ok { clone.Spec.SchedulerName = scheduler - delete(clone.Annotations, previousSchedulerName) + delete(clone.Annotations, common.PreviousSchedulerName) } else { clone.Spec.SchedulerName = "" } - err := r.recreatePod(ctx, pod, clone) + err := common.RecreatePod(ctx, r.Client, pod, clone) if err != nil { logger.Error(err, "recrete unowned pod failed", "pod", pod.Name) } @@ -192,38 +186,6 @@ func (r *NamespaceReconciler) resume(ctx context.Context, ns corev1.Namespace, p return ctrl.Result{}, nil } -func (r *NamespaceReconciler) recreatePod(ctx context.Context, pod corev1.Pod, clone *corev1.Pod) error { - list := corev1.PodList{} - watcher, err := r.Client.Watch(ctx, &list) - if err != nil { - return fmt.Errorf("failed to start watch stream for pod %s: %w", pod.Name, err) - } - - ch := watcher.ResultChan() - - err = r.Client.Delete(ctx, &pod) - if err != nil { - return fmt.Errorf("failed to delete pod %s: %w", pod.Name, err) - } - - // Wait for delete event before we can attempt create the clone - for event := range ch { - if event.Type == watch.Deleted { - if val, ok := event.Object.(*corev1.Pod); ok && val.Name == pod.Name { - err = r.Client.Create(ctx, clone) - if err != nil { - return fmt.Errorf("failed to recreate pod %s: %w", pod.Name, err) - } - - watcher.Stop() - break - } - } - } - - return nil -} - func (r *NamespaceReconciler) suspend(ctx context.Context, ns corev1.Namespace, logger logr.Logger) (ctrl.Result, error) { var list corev1.PodList if err := r.Client.List(ctx, &list, client.InNamespace(ns.Name)); err != nil { @@ -231,11 +193,11 @@ func (r *NamespaceReconciler) suspend(ctx context.Context, ns corev1.Namespace, } for _, pod := range list.Items { - if ignore, ok := pod.Annotations[ignoreAnnotation]; ok && ignore == "true" { + if ignore, ok := pod.Annotations[common.IgnoreAnnotation]; ok && ignore == "true" { continue } - if err := r.suspendPod(ctx, pod, logger); err != nil { + if err := common.SuspendPod(ctx, r.Client, pod, logger); err != nil { logger.Error(err, "failed to suspend pod", "pod", pod.Name) continue } @@ -255,7 +217,7 @@ func (r *NamespaceReconciler) suspendNotInProfile(ctx context.Context, ns corev1 continue } - if err := r.suspendPod(ctx, pod, logger); err != nil { + if err := common.SuspendPod(ctx, r.Client, pod, logger); err != nil { logger.Error(err, "failed to suspend pod", "pod", pod.Name) continue } @@ -263,45 +225,3 @@ func (r *NamespaceReconciler) suspendNotInProfile(ctx context.Context, ns corev1 return ctrl.Result{}, nil } - -func (r *NamespaceReconciler) suspendPod(ctx context.Context, pod corev1.Pod, logger logr.Logger) error { - if pod.Spec.SchedulerName == schedulerName { - return nil - } - - // We assume the pod is managed by another controller if there is an existing owner ref - if len(pod.ObjectMeta.OwnerReferences) > 0 { - err := r.Client.Delete(ctx, &pod) - if err != nil { - return err - } - - // If there is no owner lets clone the pod and swap the scheduler - } else { - clone := pod.DeepCopy() - // We won't be able to create the object with the same resource version - clone.ObjectMeta.ResourceVersion = "" - - // Remove assigned node to avoid scheduling - clone.Spec.NodeName = "" - - // Reset status, not needed as its ignored but nice - clone.Status = corev1.PodStatus{} - - // Assign our own scheduler to avoid the default scheduler interfer with the workload - clone.Spec.SchedulerName = schedulerName - - if clone.Annotations == nil { - clone.Annotations = make(map[string]string) - } - - clone.Annotations[previousSchedulerName] = pod.Spec.SchedulerName - - err := r.recreatePod(ctx, pod, clone) - if err != nil { - return fmt.Errorf("recrete unowned pod `%s` failed: %w", pod.Name, err) - } - } - - return nil -} diff --git a/controllers/pod_admission_controller.go b/controllers/pod_admission_controller.go index 3edd69c..0f8ec5a 100644 --- a/controllers/pod_admission_controller.go +++ b/controllers/pod_admission_controller.go @@ -16,9 +16,9 @@ import ( // +kubebuilder:webhook:path=/mutate-v1-pod,mutating=true,failurePolicy=fail,groups="",resources=pods,verbs=create;update,versions=v1,name=pause.infra.doodle.com,admissionReviewVersions=v1,sideEffects=None const ( - profileAnnotation = "k8s-pause/profile" - suspendedAnnotation = "k8s-pause/suspend" - schedulerName = "k8s-pause" + ProfileAnnotation = "k8s-pause/profile" + SuspendedAnnotation = "k8s-pause/suspend" + SchedulerName = "k8s-pause" ) // podAnnotator annotates Pods @@ -46,13 +46,13 @@ func (a *Scheduler) Handle(ctx context.Context, req admission.Request) admission } var suspend bool - if suspended, ok := ns.Annotations[suspendedAnnotation]; ok { + if suspended, ok := ns.Annotations[SuspendedAnnotation]; ok { if suspended == "true" { suspend = true } } - if p, ok := ns.Annotations[profileAnnotation]; ok { + if p, ok := ns.Annotations[ProfileAnnotation]; ok { var profile v1beta1.ResumeProfile err := a.Client.Get(ctx, client.ObjectKey{ Name: p, @@ -76,7 +76,7 @@ func (a *Scheduler) Handle(ctx context.Context, req admission.Request) admission } } - pod.Spec.SchedulerName = schedulerName + pod.Spec.SchedulerName = SchedulerName marshaledPod, err := json.Marshal(pod) if err != nil { diff --git a/controllers/pod_controller.go b/controllers/pod_controller.go index 4a63c2d..0a25662 100644 --- a/controllers/pod_controller.go +++ b/controllers/pod_controller.go @@ -19,6 +19,7 @@ package controllers import ( "context" + "github.com/doodlescheduling/k8s-pause/pkg/common" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -39,7 +40,7 @@ const ( // PodReconciler reconciles a Pod object type PodReconciler struct { - client.Client + Client client.WithWatch Log logr.Logger Scheme *runtime.Scheme } @@ -59,6 +60,8 @@ func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager, opts PodReconcilerOpt // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.11.0/pkg/reconcile func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + logger := r.Log.WithValues("Namespace", req.Namespace, "Name", req.NamespacedName) + // Fetch the pod pod := corev1.Pod{} @@ -74,7 +77,64 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R return reconcile.Result{}, err } - if pod.Spec.SchedulerName == schedulerName { + var suspend bool + if suspended, ok := pod.Annotations[SuspendedAnnotation]; ok { + if suspended == "true" { + suspend = true + } + } + + if suspend { + logger.Info("make sure pod is suspended") + if ignore, ok := pod.Annotations[common.IgnoreAnnotation]; ok && ignore == "true" { + return ctrl.Result{}, nil + } else { + if err := common.SuspendPod(ctx, r.Client, pod, logger); err != nil { + logger.Error(err, "failed to suspend pod", "pod", pod.Name) + } + } + + } else { + logger.Info("make sure pod is resumed") + if ignore, ok := pod.Annotations[common.IgnoreAnnotation]; ok && ignore == "true" { + return ctrl.Result{}, nil + } else { + if pod.Status.Phase == phaseSuspended && pod.Spec.SchedulerName == SchedulerName { + if len(pod.ObjectMeta.OwnerReferences) > 0 { + err := r.Client.Delete(ctx, &pod) + if err != nil { + logger.Error(err, "failed to delete pod while resuming", "pod", pod.Name) + } + } else { + clone := pod.DeepCopy() + + // We won't be able to create the object with the same resource version + clone.ObjectMeta.ResourceVersion = "" + + // Remove assigned node to avoid scheduling + clone.Spec.NodeName = "" + + // Reset status, not needed as its ignored but nice + clone.Status = corev1.PodStatus{} + + if scheduler, ok := clone.Annotations[common.PreviousSchedulerName]; ok { + clone.Spec.SchedulerName = scheduler + delete(clone.Annotations, common.PreviousSchedulerName) + } else { + clone.Spec.SchedulerName = "" + } + + err := common.RecreatePod(ctx, r.Client, pod, clone) + if err != nil { + logger.Error(err, "recrete unowned pod failed", "pod", pod.Name) + } + } + } + } + + } + + if pod.Spec.SchedulerName == SchedulerName { pod.Status.Phase = phaseSuspended // Update status after reconciliation. diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 676d3f3..0731da0 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -73,8 +73,12 @@ var _ = BeforeSuite(func() { //+kubebuilder:scaffold:scheme // Pod setup fmt.Printf("setup..................................") + cl, err := client.NewWithWatch(k8sManager.GetConfig(), client.Options{ + Scheme: k8sManager.GetScheme(), + Mapper: k8sManager.GetRESTMapper(), + }) err = (&PodReconciler{ - Client: k8sManager.GetClient(), + Client: cl, Log: ctrl.Log.WithName("controllers").WithName("Pod"), Scheme: k8sManager.GetScheme(), }).SetupWithManager(k8sManager, PodReconcilerOptions{MaxConcurrentReconciles: 10}) diff --git a/main.go b/main.go index 50092f3..a82f3ec 100644 --- a/main.go +++ b/main.go @@ -115,9 +115,13 @@ func main() { setupLog.Error(err, "unable to start manager") os.Exit(1) } + client, err := client.NewWithWatch(mgr.GetConfig(), client.Options{ + Scheme: mgr.GetScheme(), + Mapper: mgr.GetRESTMapper(), + }) if err = (&controllers.PodReconciler{ - Client: mgr.GetClient(), + Client: client, Log: ctrl.Log.WithName("controllers").WithName("Pod"), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr, controllers.PodReconcilerOptions{MaxConcurrentReconciles: viper.GetInt("concurrent")}); err != nil { @@ -125,11 +129,6 @@ func main() { os.Exit(1) } - client, err := client.NewWithWatch(mgr.GetConfig(), client.Options{ - Scheme: mgr.GetScheme(), - Mapper: mgr.GetRESTMapper(), - }) - if err != nil { setupLog.Error(err, "failed to setup client") os.Exit(1) diff --git a/pkg/common/common.go b/pkg/common/common.go new file mode 100644 index 0000000..39ae3c1 --- /dev/null +++ b/pkg/common/common.go @@ -0,0 +1,93 @@ +package common + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + ProfileAnnotation = "k8s-pause/profile" + SuspendedAnnotation = "k8s-pause/suspend" + SchedulerName = "k8s-pause" + PreviousSchedulerName = "k8s-pause/previousScheduler" + IgnoreAnnotation = "k8s-pause/ignore" +) + +func SuspendPod(ctx context.Context, client client.WithWatch, pod corev1.Pod, logger logr.Logger) error { + if pod.Spec.SchedulerName == SchedulerName { + return nil + } + + // We assume the pod is managed by another controller if there is an existing owner ref + if len(pod.ObjectMeta.OwnerReferences) > 0 { + err := client.Delete(ctx, &pod) + if err != nil { + return err + } + + // If there is no owner lets clone the pod and swap the scheduler + } else { + clone := pod.DeepCopy() + // We won't be able to create the object with the same resource version + clone.ObjectMeta.ResourceVersion = "" + + // Remove assigned node to avoid scheduling + clone.Spec.NodeName = "" + + // Reset status, not needed as its ignored but nice + clone.Status = corev1.PodStatus{} + + // Assign our own scheduler to avoid the default scheduler interfer with the workload + clone.Spec.SchedulerName = SchedulerName + + if clone.Annotations == nil { + clone.Annotations = make(map[string]string) + } + + clone.Annotations[PreviousSchedulerName] = pod.Spec.SchedulerName + + err := RecreatePod(ctx, client, pod, clone) + if err != nil { + return fmt.Errorf("recrete unowned pod `%s` failed: %w", pod.Name, err) + } + } + + return nil +} + +func RecreatePod(ctx context.Context, client client.WithWatch, pod corev1.Pod, clone *corev1.Pod) error { + list := corev1.PodList{} + watcher, err := client.Watch(ctx, &list) + if err != nil { + return fmt.Errorf("failed to start watch stream for pod %s: %w", pod.Name, err) + } + + ch := watcher.ResultChan() + + err = client.Delete(ctx, &pod) + if err != nil { + return fmt.Errorf("failed to delete pod %s: %w", pod.Name, err) + } + + // Wait for delete event before we can attempt create the clone + for event := range ch { + if event.Type == watch.Deleted { + if val, ok := event.Object.(*corev1.Pod); ok && val.Name == pod.Name { + err = client.Create(ctx, clone) + if err != nil { + return fmt.Errorf("failed to recreate pod %s: %w", pod.Name, err) + } + + watcher.Stop() + break + } + } + } + + return nil +} From 75420b661b984c32ed4b4a6d13d7180cab5c028c Mon Sep 17 00:00:00 2001 From: vadasambar Date: Fri, 31 Jan 2025 20:11:41 +0530 Subject: [PATCH 2/4] test: add e2e test for pod suspend --- scripts/e2e-test.sh | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/scripts/e2e-test.sh b/scripts/e2e-test.sh index 45538f5..ba85fe9 100644 --- a/scripts/e2e-test.sh +++ b/scripts/e2e-test.sh @@ -40,3 +40,44 @@ while true; do used=$((used + 2)) sleep 2 done + +pod="$(kubectl -n podinfo get po --no-headers | head -n1 | awk '{print $1}')" +kubectl -n podinfo annotate $pod k8s-pause/suspend=true --overwrite +used=0 + +while true; do + countTotal=$(kubectl -n podinfo get pods | grep $pod | wc -l) + countSuspended=$(kubectl -n podinfo get pods | grep Suspended | wc -l) + + if [[ "$countTotal" == "$countSuspended" ]]; then + break; + fi + + if [[ $used -gt $timeout ]]; then + echo "Unable to suspend pods" + exit 1 + fi + + used=$((used + 2)) + sleep 2 +done + +kubectl -n podinfo annotate $pod k8s-pause/suspend=false --overwrite +used=0 + +while true; do + countTotal=$(kubectl -n podinfo get pods | grep $pod | wc -l) + countRunning=$(kubectl -n podinfo get pods | grep Running | wc -l) + + if [[ "$countTotal" == "$countRunning" ]]; then + break; + fi + + if [[ $used -gt $timeout ]]; then + echo "Unable to resume pods" + exit 1 + fi + + used=$((used + 2)) + sleep 2 +done From 15e14029f87f92b3f06387a9a30cbefd012d095a Mon Sep 17 00:00:00 2001 From: vadasambar Date: Fri, 31 Jan 2025 20:17:37 +0530 Subject: [PATCH 3/4] chore: fix lint issues --- controllers/suite_test.go | 6 ++++++ main.go | 9 ++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/controllers/suite_test.go b/controllers/suite_test.go index 0731da0..d7dcf6c 100644 --- a/controllers/suite_test.go +++ b/controllers/suite_test.go @@ -19,6 +19,7 @@ package controllers import ( "context" "fmt" + "os" "path/filepath" "testing" @@ -77,6 +78,11 @@ var _ = BeforeSuite(func() { Scheme: k8sManager.GetScheme(), Mapper: k8sManager.GetRESTMapper(), }) + if err != nil { + logf.Log.Error(err, "failed to setup client") + os.Exit(1) + } + err = (&PodReconciler{ Client: cl, Log: ctrl.Log.WithName("controllers").WithName("Pod"), diff --git a/main.go b/main.go index a82f3ec..27c7244 100644 --- a/main.go +++ b/main.go @@ -119,6 +119,10 @@ func main() { Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper(), }) + if err != nil { + setupLog.Error(err, "failed to setup client") + os.Exit(1) + } if err = (&controllers.PodReconciler{ Client: client, @@ -129,11 +133,6 @@ func main() { os.Exit(1) } - if err != nil { - setupLog.Error(err, "failed to setup client") - os.Exit(1) - } - if err = (&controllers.NamespaceReconciler{ Client: client, Log: ctrl.Log.WithName("controllers").WithName("Namespace"), From 690c4e2adda2d17e7022eeadf6d60bb156469cb5 Mon Sep 17 00:00:00 2001 From: vadasambar Date: Mon, 3 Feb 2025 21:25:30 +0530 Subject: [PATCH 4/4] refactor: source annotations from `common` pkg --- controllers/pod_admission_controller.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/controllers/pod_admission_controller.go b/controllers/pod_admission_controller.go index 0f8ec5a..124bed3 100644 --- a/controllers/pod_admission_controller.go +++ b/controllers/pod_admission_controller.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/doodlescheduling/k8s-pause/api/v1beta1" + "github.com/doodlescheduling/k8s-pause/pkg/common" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -15,12 +16,6 @@ import ( // +kubebuilder:webhook:path=/mutate-v1-pod,mutating=true,failurePolicy=fail,groups="",resources=pods,verbs=create;update,versions=v1,name=pause.infra.doodle.com,admissionReviewVersions=v1,sideEffects=None -const ( - ProfileAnnotation = "k8s-pause/profile" - SuspendedAnnotation = "k8s-pause/suspend" - SchedulerName = "k8s-pause" -) - // podAnnotator annotates Pods type Scheduler struct { Client client.Client @@ -46,13 +41,13 @@ func (a *Scheduler) Handle(ctx context.Context, req admission.Request) admission } var suspend bool - if suspended, ok := ns.Annotations[SuspendedAnnotation]; ok { + if suspended, ok := ns.Annotations[common.SuspendedAnnotation]; ok { if suspended == "true" { suspend = true } } - if p, ok := ns.Annotations[ProfileAnnotation]; ok { + if p, ok := ns.Annotations[common.ProfileAnnotation]; ok { var profile v1beta1.ResumeProfile err := a.Client.Get(ctx, client.ObjectKey{ Name: p, @@ -76,7 +71,7 @@ func (a *Scheduler) Handle(ctx context.Context, req admission.Request) admission } } - pod.Spec.SchedulerName = SchedulerName + pod.Spec.SchedulerName = common.SchedulerName marshaledPod, err := json.Marshal(pod) if err != nil {