Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for pod level suspend #60

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 11 additions & 91 deletions controllers/namespace_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ package controllers

import (
"context"
"fmt"

"github.com/doodlescheduling/k8s-pause/api/v1beta1"
"github.com/doodlescheduling/k8s-pause/pkg/common"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
ctrl "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -39,11 +38,6 @@ import (
//+kubebuilder:rbac:groups=core,resources=namespaces/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=core,resources=namespaces/finalizers,verbs=update

const (
previousSchedulerName = "k8s-pause/previousScheduler"
ignoreAnnotation = "k8s-pause/ignore"
)

// NamespaceReconciler reconciles a Namespace object
type NamespaceReconciler struct {
Client client.WithWatch
Expand Down Expand Up @@ -84,14 +78,14 @@ func (r *NamespaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

var suspend bool
if suspended, ok := ns.Annotations[suspendedAnnotation]; ok {
if suspended, ok := ns.Annotations[SuspendedAnnotation]; ok {
if suspended == "true" {
suspend = true
}
}

var profile *v1beta1.ResumeProfile
if p, ok := ns.Annotations[profileAnnotation]; ok {
if p, ok := ns.Annotations[ProfileAnnotation]; ok {
profile = &v1beta1.ResumeProfile{}
err := r.Client.Get(ctx, client.ObjectKey{
Name: p,
Expand Down Expand Up @@ -146,7 +140,7 @@ func (r *NamespaceReconciler) resume(ctx context.Context, ns corev1.Namespace, p
}

for _, pod := range list.Items {
if ignore, ok := pod.Annotations[ignoreAnnotation]; ok && ignore == "true" {
if ignore, ok := pod.Annotations[common.IgnoreAnnotation]; ok && ignore == "true" {
continue
}

Expand All @@ -156,7 +150,7 @@ func (r *NamespaceReconciler) resume(ctx context.Context, ns corev1.Namespace, p
}
}

if pod.Status.Phase == phaseSuspended && pod.Spec.SchedulerName == schedulerName {
if pod.Status.Phase == phaseSuspended && pod.Spec.SchedulerName == SchedulerName {
if len(pod.ObjectMeta.OwnerReferences) > 0 {
err := r.Client.Delete(ctx, &pod)
if err != nil {
Expand All @@ -174,14 +168,14 @@ func (r *NamespaceReconciler) resume(ctx context.Context, ns corev1.Namespace, p
// Reset status, not needed as its ignored but nice
clone.Status = corev1.PodStatus{}

if scheduler, ok := clone.Annotations[previousSchedulerName]; ok {
if scheduler, ok := clone.Annotations[common.PreviousSchedulerName]; ok {
clone.Spec.SchedulerName = scheduler
delete(clone.Annotations, previousSchedulerName)
delete(clone.Annotations, common.PreviousSchedulerName)
} else {
clone.Spec.SchedulerName = ""
}

err := r.recreatePod(ctx, pod, clone)
err := common.RecreatePod(ctx, r.Client, pod, clone)
if err != nil {
logger.Error(err, "recrete unowned pod failed", "pod", pod.Name)
}
Expand All @@ -192,50 +186,18 @@ func (r *NamespaceReconciler) resume(ctx context.Context, ns corev1.Namespace, p
return ctrl.Result{}, nil
}

func (r *NamespaceReconciler) recreatePod(ctx context.Context, pod corev1.Pod, clone *corev1.Pod) error {
list := corev1.PodList{}
watcher, err := r.Client.Watch(ctx, &list)
if err != nil {
return fmt.Errorf("failed to start watch stream for pod %s: %w", pod.Name, err)
}

ch := watcher.ResultChan()

err = r.Client.Delete(ctx, &pod)
if err != nil {
return fmt.Errorf("failed to delete pod %s: %w", pod.Name, err)
}

// Wait for delete event before we can attempt create the clone
for event := range ch {
if event.Type == watch.Deleted {
if val, ok := event.Object.(*corev1.Pod); ok && val.Name == pod.Name {
err = r.Client.Create(ctx, clone)
if err != nil {
return fmt.Errorf("failed to recreate pod %s: %w", pod.Name, err)
}

watcher.Stop()
break
}
}
}

return nil
}

func (r *NamespaceReconciler) suspend(ctx context.Context, ns corev1.Namespace, logger logr.Logger) (ctrl.Result, error) {
var list corev1.PodList
if err := r.Client.List(ctx, &list, client.InNamespace(ns.Name)); err != nil {
return ctrl.Result{}, err
}

for _, pod := range list.Items {
if ignore, ok := pod.Annotations[ignoreAnnotation]; ok && ignore == "true" {
if ignore, ok := pod.Annotations[common.IgnoreAnnotation]; ok && ignore == "true" {
continue
}

if err := r.suspendPod(ctx, pod, logger); err != nil {
if err := common.SuspendPod(ctx, r.Client, pod, logger); err != nil {
logger.Error(err, "failed to suspend pod", "pod", pod.Name)
continue
}
Expand All @@ -255,53 +217,11 @@ func (r *NamespaceReconciler) suspendNotInProfile(ctx context.Context, ns corev1
continue
}

if err := r.suspendPod(ctx, pod, logger); err != nil {
if err := common.SuspendPod(ctx, r.Client, pod, logger); err != nil {
logger.Error(err, "failed to suspend pod", "pod", pod.Name)
continue
}
}

return ctrl.Result{}, nil
}

func (r *NamespaceReconciler) suspendPod(ctx context.Context, pod corev1.Pod, logger logr.Logger) error {
if pod.Spec.SchedulerName == schedulerName {
return nil
}

// We assume the pod is managed by another controller if there is an existing owner ref
if len(pod.ObjectMeta.OwnerReferences) > 0 {
err := r.Client.Delete(ctx, &pod)
if err != nil {
return err
}

// If there is no owner lets clone the pod and swap the scheduler
} else {
clone := pod.DeepCopy()
// We won't be able to create the object with the same resource version
clone.ObjectMeta.ResourceVersion = ""

// Remove assigned node to avoid scheduling
clone.Spec.NodeName = ""

// Reset status, not needed as its ignored but nice
clone.Status = corev1.PodStatus{}

// Assign our own scheduler to avoid the default scheduler interfer with the workload
clone.Spec.SchedulerName = schedulerName

if clone.Annotations == nil {
clone.Annotations = make(map[string]string)
}

clone.Annotations[previousSchedulerName] = pod.Spec.SchedulerName

err := r.recreatePod(ctx, pod, clone)
if err != nil {
return fmt.Errorf("recrete unowned pod `%s` failed: %w", pod.Name, err)
}
}

return nil
}
13 changes: 4 additions & 9 deletions controllers/pod_admission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"

"github.com/doodlescheduling/k8s-pause/api/v1beta1"
"github.com/doodlescheduling/k8s-pause/pkg/common"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -15,12 +16,6 @@ import (

// +kubebuilder:webhook:path=/mutate-v1-pod,mutating=true,failurePolicy=fail,groups="",resources=pods,verbs=create;update,versions=v1,name=pause.infra.doodle.com,admissionReviewVersions=v1,sideEffects=None

const (
profileAnnotation = "k8s-pause/profile"
suspendedAnnotation = "k8s-pause/suspend"
schedulerName = "k8s-pause"
)

// podAnnotator annotates Pods
type Scheduler struct {
Client client.Client
Expand All @@ -46,13 +41,13 @@ func (a *Scheduler) Handle(ctx context.Context, req admission.Request) admission
}

var suspend bool
if suspended, ok := ns.Annotations[suspendedAnnotation]; ok {
if suspended, ok := ns.Annotations[common.SuspendedAnnotation]; ok {
if suspended == "true" {
suspend = true
}
}

if p, ok := ns.Annotations[profileAnnotation]; ok {
if p, ok := ns.Annotations[common.ProfileAnnotation]; ok {
var profile v1beta1.ResumeProfile
err := a.Client.Get(ctx, client.ObjectKey{
Name: p,
Expand All @@ -76,7 +71,7 @@ func (a *Scheduler) Handle(ctx context.Context, req admission.Request) admission
}
}

pod.Spec.SchedulerName = schedulerName
pod.Spec.SchedulerName = common.SchedulerName

marshaledPod, err := json.Marshal(pod)
if err != nil {
Expand Down
64 changes: 62 additions & 2 deletions controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"

"github.com/doodlescheduling/k8s-pause/pkg/common"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -39,7 +40,7 @@ const (

// PodReconciler reconciles a Pod object
type PodReconciler struct {
client.Client
Client client.WithWatch
Log logr.Logger
Scheme *runtime.Scheme
}
Expand All @@ -59,6 +60,8 @@ func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager, opts PodReconcilerOpt
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/[email protected]/pkg/reconcile
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.Log.WithValues("Namespace", req.Namespace, "Name", req.NamespacedName)

// Fetch the pod
pod := corev1.Pod{}

Expand All @@ -74,7 +77,64 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return reconcile.Result{}, err
}

if pod.Spec.SchedulerName == schedulerName {
var suspend bool
if suspended, ok := pod.Annotations[SuspendedAnnotation]; ok {
if suspended == "true" {
suspend = true
}
}

if suspend {
logger.Info("make sure pod is suspended")
if ignore, ok := pod.Annotations[common.IgnoreAnnotation]; ok && ignore == "true" {
return ctrl.Result{}, nil
} else {
if err := common.SuspendPod(ctx, r.Client, pod, logger); err != nil {
logger.Error(err, "failed to suspend pod", "pod", pod.Name)
}
}

} else {
logger.Info("make sure pod is resumed")
if ignore, ok := pod.Annotations[common.IgnoreAnnotation]; ok && ignore == "true" {
return ctrl.Result{}, nil
} else {
if pod.Status.Phase == phaseSuspended && pod.Spec.SchedulerName == SchedulerName {
if len(pod.ObjectMeta.OwnerReferences) > 0 {
err := r.Client.Delete(ctx, &pod)
if err != nil {
logger.Error(err, "failed to delete pod while resuming", "pod", pod.Name)
}
} else {
clone := pod.DeepCopy()

// We won't be able to create the object with the same resource version
clone.ObjectMeta.ResourceVersion = ""

// Remove assigned node to avoid scheduling
clone.Spec.NodeName = ""

// Reset status, not needed as its ignored but nice
clone.Status = corev1.PodStatus{}

if scheduler, ok := clone.Annotations[common.PreviousSchedulerName]; ok {
clone.Spec.SchedulerName = scheduler
delete(clone.Annotations, common.PreviousSchedulerName)
} else {
clone.Spec.SchedulerName = ""
}

err := common.RecreatePod(ctx, r.Client, pod, clone)
if err != nil {
logger.Error(err, "recrete unowned pod failed", "pod", pod.Name)
}
}
}
}

}

if pod.Spec.SchedulerName == SchedulerName {
pod.Status.Phase = phaseSuspended

// Update status after reconciliation.
Expand Down
12 changes: 11 additions & 1 deletion controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"os"
"path/filepath"
"testing"

Expand Down Expand Up @@ -73,8 +74,17 @@ var _ = BeforeSuite(func() {
//+kubebuilder:scaffold:scheme
// Pod setup
fmt.Printf("setup..................................")
cl, err := client.NewWithWatch(k8sManager.GetConfig(), client.Options{
Scheme: k8sManager.GetScheme(),
Mapper: k8sManager.GetRESTMapper(),
})
if err != nil {
logf.Log.Error(err, "failed to setup client")
os.Exit(1)
}

err = (&PodReconciler{
Client: k8sManager.GetClient(),
Client: cl,
Log: ctrl.Log.WithName("controllers").WithName("Pod"),
Scheme: k8sManager.GetScheme(),
}).SetupWithManager(k8sManager, PodReconcilerOptions{MaxConcurrentReconciles: 10})
Expand Down
20 changes: 9 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,26 +115,24 @@ func main() {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}

if err = (&controllers.PodReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Pod"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr, controllers.PodReconcilerOptions{MaxConcurrentReconciles: viper.GetInt("concurrent")}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Pod")
os.Exit(1)
}

client, err := client.NewWithWatch(mgr.GetConfig(), client.Options{
Scheme: mgr.GetScheme(),
Mapper: mgr.GetRESTMapper(),
})

if err != nil {
setupLog.Error(err, "failed to setup client")
os.Exit(1)
}

if err = (&controllers.PodReconciler{
Client: client,
Log: ctrl.Log.WithName("controllers").WithName("Pod"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr, controllers.PodReconcilerOptions{MaxConcurrentReconciles: viper.GetInt("concurrent")}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Pod")
os.Exit(1)
}

if err = (&controllers.NamespaceReconciler{
Client: client,
Log: ctrl.Log.WithName("controllers").WithName("Namespace"),
Expand Down
Loading