diff --git a/Dockerfile b/Dockerfile index aca26f9..fa0ed5d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN go mod download # Copy the go source COPY cmd/main.go cmd/main.go COPY api/ api/ -COPY internal/controller/ internal/controller/ +COPY internal/ internal/ # Build # the GOARCH has not a default value to allow the binary be built according to the host where the command diff --git a/api/v1alpha1/broom_types.go b/api/v1alpha1/broom_types.go index becf16d..76cfb1e 100644 --- a/api/v1alpha1/broom_types.go +++ b/api/v1alpha1/broom_types.go @@ -17,19 +17,79 @@ limitations under the License. package v1alpha1 import ( + "fmt" + "strconv" + + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +type BroomTarget struct { + Name string `json:"name,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Namespace string `json:"namespace,omitempty"` +} + +type BroomAdjustmentType string + +const ( + AddAdjustment BroomAdjustmentType = "Add" + MulAdjustment BroomAdjustmentType = "Mul" +) + +type BroomAdjustment struct { + Type BroomAdjustmentType `json:"type"` + Value string `json:"value"` +} + +func (adj BroomAdjustment) IncreaseMemory(m *resource.Quantity) error { + switch adj.Type { + case AddAdjustment: + y, err := resource.ParseQuantity(adj.Value) + if err != nil { + return fmt.Errorf("unable to parse value to resource.Quantity: %w", err) + } + m.Add(y) + case MulAdjustment: + y, err := strconv.Atoi(adj.Value) + if err != nil { + return fmt.Errorf("unable to parse value to int: %w", err) + } + m.Mul(int64(y)) + } + return nil +} + +type BroomRestartPolicy string + +const ( + RestartOnOOMPolicy BroomRestartPolicy = "OnOOM" + RestartNeverPolicy BroomRestartPolicy = "Never" +) + +type BroomSlackWebhookSecret struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + Key string `json:"key"` +} + +type BroomSlackWebhook struct { + Secret BroomSlackWebhookSecret `json:"secret"` + Channel string `json:"channel,omitempty"` +} + // BroomSpec defines the desired state of Broom type BroomSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - // Foo is an example field of Broom. Edit broom_types.go to remove/update - Foo string `json:"foo,omitempty"` + Target BroomTarget `json:"target,omitempty"` + Adjustment BroomAdjustment `json:"adjustment"` + RestartPolicy BroomRestartPolicy `json:"restartPolicy"` + SlackWebhook BroomSlackWebhook `json:"slackWebhook"` } // BroomStatus defines the observed state of Broom diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 84b9cc8..176afb5 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -29,7 +29,7 @@ func (in *Broom) DeepCopyInto(out *Broom) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -51,6 +51,21 @@ func (in *Broom) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BroomAdjustment) DeepCopyInto(out *BroomAdjustment) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomAdjustment. +func (in *BroomAdjustment) DeepCopy() *BroomAdjustment { + if in == nil { + return nil + } + out := new(BroomAdjustment) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BroomList) DeepCopyInto(out *BroomList) { *out = *in @@ -83,9 +98,43 @@ func (in *BroomList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BroomSlackWebhook) DeepCopyInto(out *BroomSlackWebhook) { + *out = *in + out.Secret = in.Secret +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomSlackWebhook. +func (in *BroomSlackWebhook) DeepCopy() *BroomSlackWebhook { + if in == nil { + return nil + } + out := new(BroomSlackWebhook) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BroomSlackWebhookSecret) DeepCopyInto(out *BroomSlackWebhookSecret) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomSlackWebhookSecret. +func (in *BroomSlackWebhookSecret) DeepCopy() *BroomSlackWebhookSecret { + if in == nil { + return nil + } + out := new(BroomSlackWebhookSecret) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BroomSpec) DeepCopyInto(out *BroomSpec) { *out = *in + in.Target.DeepCopyInto(&out.Target) + out.Adjustment = in.Adjustment + out.SlackWebhook = in.SlackWebhook } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomSpec. @@ -112,3 +161,25 @@ func (in *BroomStatus) DeepCopy() *BroomStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BroomTarget) DeepCopyInto(out *BroomTarget) { + *out = *in + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomTarget. +func (in *BroomTarget) DeepCopy() *BroomTarget { + if in == nil { + return nil + } + out := new(BroomTarget) + in.DeepCopyInto(out) + return out +} diff --git a/cmd/main.go b/cmd/main.go index cb8a657..2b99b53 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -122,10 +122,7 @@ func main() { os.Exit(1) } - if err = (&controller.BroomReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + if err = controller.New(mgr).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Broom") os.Exit(1) } diff --git a/config/crd/bases/ai.m3.com_brooms.yaml b/config/crd/bases/ai.m3.com_brooms.yaml index 5ddff0c..96de4c1 100644 --- a/config/crd/bases/ai.m3.com_brooms.yaml +++ b/config/crd/bases/ai.m3.com_brooms.yaml @@ -39,10 +39,53 @@ spec: spec: description: BroomSpec defines the desired state of Broom properties: - foo: - description: Foo is an example field of Broom. Edit broom_types.go - to remove/update + adjustment: + properties: + type: + type: string + value: + type: string + required: + - type + - value + type: object + restartPolicy: type: string + slackWebhook: + properties: + channel: + type: string + secret: + properties: + key: + type: string + name: + type: string + namespace: + type: string + required: + - key + - name + - namespace + type: object + required: + - secret + type: object + target: + properties: + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string + type: object + required: + - adjustment + - restartPolicy + - slackWebhook type: object status: description: BroomStatus defines the observed state of Broom diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 5c5f0b8..d3da005 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -1,2 +1,8 @@ resources: - manager.yaml +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +images: +- name: controller + newName: asia-northeast1-docker.pkg.dev/m3ai-ullman-dev/arbok/broom + newTag: latest diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 6eb1557..de0792b 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -4,6 +4,34 @@ kind: ClusterRole metadata: name: manager-role rules: +- apiGroups: + - "" + resources: + - events + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch - apiGroups: - ai.m3.com resources: @@ -30,3 +58,27 @@ rules: - get - patch - update +- apiGroups: + - batch + resources: + - cronjobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch diff --git a/config/samples/ai_v1alpha1_broom.yaml b/config/samples/ai_v1alpha1_broom.yaml index 2785617..a8645fa 100644 --- a/config/samples/ai_v1alpha1_broom.yaml +++ b/config/samples/ai_v1alpha1_broom.yaml @@ -9,4 +9,18 @@ metadata: app.kubernetes.io/created-by: broom name: broom-sample spec: - # TODO(user): Add fields here + target: + # name: oom-sample + labels: + m3.com/use-broom: "true" + # namespace: broom + adjustment: + type: Mul + value: "2" + restartPolicy: "OnOOM" + slackWebhook: + secret: + namespace: default + name: broom + key: SLACK_WEBHOOK_URL + # channel: "#alert" diff --git a/config/samples/cronjob.yaml b/config/samples/cronjob.yaml new file mode 100644 index 0000000..a9e9c7b --- /dev/null +++ b/config/samples/cronjob.yaml @@ -0,0 +1,34 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: oom-sample + labels: + m3.com/use-broom: "true" +spec: + schedule: "*/2 * * * *" + jobTemplate: + spec: + activeDeadlineSeconds: 60 + backoffLimit: 1 + template: + spec: + containers: + - name: oom-container + image: ubuntu:latest + command: + - /bin/bash + - -c + args: + - | + echo PID=$$ + for i in {0..9} + do + eval a$i'=$(head --bytes 5000000 /dev/zero | cat -v)' + echo $((i++)); + done + resources: + limits: + memory: "100Mi" + requests: + memory: "50Mi" + restartPolicy: Never diff --git a/config/samples/secret.yaml b/config/samples/secret.yaml new file mode 100644 index 0000000..f246aa1 --- /dev/null +++ b/config/samples/secret.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Secret +metadata: + name: broom +type: Opaque +data: + SLACK_WEBHOOK_URL: TO_BE_SPECIFIED \ No newline at end of file diff --git a/go.mod b/go.mod index f2d8dc4..55a5c30 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.21 require ( github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 + github.com/slack-go/slack v0.12.5 + k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 sigs.k8s.io/controller-runtime v0.17.0 @@ -31,6 +33,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/imdario/mergo v0.3.6 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -61,7 +64,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.0 // indirect k8s.io/apiextensions-apiserver v0.29.0 // indirect k8s.io/component-base v0.29.0 // indirect k8s.io/klog/v2 v2.110.1 // indirect diff --git a/go.sum b/go.sum index 57b4fa9..37164a5 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/ github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-test/deep v1.0.4 h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -41,6 +43,7 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -51,6 +54,9 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -96,11 +102,14 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/slack-go/slack v0.12.5 h1:ddZ6uz6XVaB+3MTDhoW04gG+Vc/M/X1ctC+wssy2cqs= +github.com/slack-go/slack v0.12.5/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/internal/controller/broom_controller.go b/internal/controller/broom_controller.go index 3f2dc16..77646e5 100644 --- a/internal/controller/broom_controller.go +++ b/internal/controller/broom_controller.go @@ -18,24 +18,51 @@ package controller import ( "context" + "fmt" + "slices" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" aiv1alpha1 "github.com/m3dev/broom/api/v1alpha1" + "github.com/m3dev/broom/internal/random" + "github.com/m3dev/broom/internal/slack" ) // BroomReconciler reconciles a Broom object type BroomReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + ResolvedJobs map[types.UID]struct{} +} + +func New(mgr ctrl.Manager) *BroomReconciler { + return &BroomReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ResolvedJobs: make(map[types.UID]struct{}, 0), + } } //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms/finalizers,verbs=update +//+kubebuilder:rbac:groups="",resources=events,verbs=get;list;watch +//+kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -47,16 +74,280 @@ type BroomReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.0/pkg/reconcile func (r *BroomReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) + log := log.FromContext(ctx) + var broom aiv1alpha1.Broom + if err := r.Get(ctx, req.NamespacedName, &broom); err != nil { + log.Error(err, "unable to fetch Broom") + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + br, err := r.listCandidateBatchResources(ctx, broom.Spec.Target.Namespace) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to fetch candidate resources: %w", err) + } + cronJobOwnedReferences := r.traceOOMKilledOwnerReference(br) + for uid, info := range cronJobOwnedReferences { + cronJob, ok := br.cronJobs[uid] + if !ok { + return ctrl.Result{}, fmt.Errorf("unable to fetch CronJob: UID=%s", uid) + } + if !isTargeted(cronJob, broom.Spec.Target) { + continue + } + oldSpec := cronJob.Spec.DeepCopy() + newSpec, err := r.getNewCronJobSpec(&cronJob, broom.Spec.Adjustment, info.OOMContainerNames) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to get updated CronJob spec: %w", err) + } - // TODO(user): your logic here + err = r.updateCronJob(ctx, &cronJob, newSpec) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to update CronJob: %w", err) + } + var restartedJobName string + if broom.Spec.RestartPolicy == aiv1alpha1.RestartOnOOMPolicy { + restartedJobName, err = r.restartUpdatedJob(ctx, &cronJob, info) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to restart updated Job: %w", err) + } + } + + if err := r.notifyResult(ctx, broom.Spec.SlackWebhook, &cronJob, *oldSpec, restartedJobName); err != nil { + return ctrl.Result{}, fmt.Errorf("unable to notify Slack: %w", err) + } + } return ctrl.Result{}, nil } +type batchResources struct { + pods map[types.UID]corev1.Pod + jobs map[types.UID]batchv1.Job + cronJobs map[types.UID]batchv1.CronJob +} + +// listCandidateResources lists all pods, jobs, cronjobs in the specified namespace +func (r *BroomReconciler) listCandidateBatchResources(ctx context.Context, namespace string) (*batchResources, error) { + b := &batchResources{ + pods: make(map[types.UID]corev1.Pod, 0), + jobs: make(map[types.UID]batchv1.Job, 0), + cronJobs: make(map[types.UID]batchv1.CronJob, 0), + } + opts := client.ListOptions{ + Namespace: namespace, + } + + var pods corev1.PodList + if err := r.List(ctx, &pods, &opts); err != nil { + return nil, fmt.Errorf("unable to fetch Pods: %w", err) + } + for _, pod := range pods.Items { + b.pods[pod.UID] = pod + } + + var jobs batchv1.JobList + if err := r.List(ctx, &jobs, &opts); err != nil { + return nil, fmt.Errorf("unable to fetch Jobs: %w", err) + } + for _, job := range jobs.Items { + b.jobs[job.UID] = job + } + + var cronJobs batchv1.CronJobList + if err := r.List(ctx, &cronJobs, &opts); err != nil { + return nil, fmt.Errorf("unable to fetch CronJobs: %w", err) + } + for _, cronJob := range cronJobs.Items { + b.cronJobs[cronJob.UID] = cronJob + } + + return b, nil +} + +type cronJobOOMInfo struct { + LastFailedJob *batchv1.Job + OOMContainerNames []string +} + +// traceOOMKilledOwnerReference returns a reference from CronJob UID to OOMKilled Pod and referenced Job information +func (r *BroomReconciler) traceOOMKilledOwnerReference(br *batchResources) map[types.UID]cronJobOOMInfo { + jobOwnedOOMContainers := make(map[metav1.OwnerReference][]string, 0) + for _, p := range br.pods { + for _, cs := range p.Status.ContainerStatuses { + if cs.State.Terminated == nil || cs.State.Terminated.Reason != "OOMKilled" { + continue + } + for _, ref := range p.OwnerReferences { + if ref.Kind == "Job" { + jobOwnedOOMContainers[ref] = append(jobOwnedOOMContainers[ref], cs.Name) + } + } + } + } + + cronJobOwnedReferences := make(map[types.UID]cronJobOOMInfo, 0) + for ref, containerNames := range jobOwnedOOMContainers { + job, ok := br.jobs[ref.UID] + if !ok { + continue // possibly deleted while reconciling + } + for _, ownerRef := range job.OwnerReferences { + if _, ok := r.ResolvedJobs[ref.UID]; !ok && ownerRef.Kind == "CronJob" { + if oomInfo, ok := cronJobOwnedReferences[ownerRef.UID]; !ok { + cronJobOwnedReferences[ownerRef.UID] = cronJobOOMInfo{ + LastFailedJob: &job, + OOMContainerNames: containerNames, + } + } else { + if job.CreationTimestamp.After(oomInfo.LastFailedJob.CreationTimestamp.Time) { + oomInfo.LastFailedJob = &job + } + } + r.ResolvedJobs[ref.UID] = struct{}{} + } + } + } + + return cronJobOwnedReferences +} + +// getNewCronJobSpec returns a spec with increased memory limit for CronJob +func (r *BroomReconciler) getNewCronJobSpec(cj *batchv1.CronJob, adj aiv1alpha1.BroomAdjustment, containers []string) (*batchv1.CronJobSpec, error) { + newSpec := cj.Spec.DeepCopy() + for i, c := range cj.Spec.JobTemplate.Spec.Template.Spec.Containers { + if !slices.Contains(containers, c.Name) { // Ignore non-OOM Pod containers + continue + } + if m := c.Resources.Limits.Memory(); m != nil { + if err := adj.IncreaseMemory(m); err != nil { + return &batchv1.CronJobSpec{}, fmt.Errorf("unable to increase memory: %w", err) + } + newSpec.JobTemplate.Spec.Template.Spec.Containers[i].Resources.Limits[corev1.ResourceMemory] = *m + } + } + return newSpec, nil +} + +// updateCronJob updates the CronJob in the Kubernetes cluster with given spec +func (r *BroomReconciler) updateCronJob(ctx context.Context, cj *batchv1.CronJob, spec *batchv1.CronJobSpec) error { + log := log.FromContext(ctx) + cj.Spec = *spec + if err := r.Update(ctx, cj); err != nil { + return fmt.Errorf("unable to update CronJob: %w", err) + } + log.Info("Updated CronJob", "name", cj.Name) + return nil +} + +// restartUpdatedJob creates Job for the failed Job with updated CronJob jobTemplate spec +func (r *BroomReconciler) restartUpdatedJob(ctx context.Context, cj *batchv1.CronJob, info cronJobOOMInfo) (string, error) { + log := log.FromContext(ctx) + randomString := random.GetRandomString(5) + restartedJobName := fmt.Sprintf("%s-restart-%s", info.LastFailedJob.Name, randomString) + annotations := map[string]string{ + "m3.com/restarted-by-broom": "true", + } + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: cj.Namespace, + Name: restartedJobName, + Labels: info.LastFailedJob.Labels, + Annotations: annotations, + }, + Spec: cj.Spec.JobTemplate.Spec, + } + if err := r.Create(ctx, &job); err != nil { + return "", fmt.Errorf("unable to create restarted Job: %w", err) + } + log.Info("Restarted Job", "name", job.Name) + return job.Name, nil +} + +// isTargeted returns whether the Cronjob matches all the fields specified for the target or not +func isTargeted(cj batchv1.CronJob, target aiv1alpha1.BroomTarget) bool { + if target.Namespace != "" && cj.Namespace != target.Namespace { + return false + } + if target.Name != "" && cj.Name != target.Name { + return false + } + for k, v := range target.Labels { + val, ok := cj.Labels[k] + if !ok || val != v { + return false + } + } + return true +} + +// notifyResult notifies the result of changes with webhook information retrieved from Secret +func (r *BroomReconciler) notifyResult(ctx context.Context, w aiv1alpha1.BroomSlackWebhook, cj *batchv1.CronJob, oldSpec batchv1.CronJobSpec, rj string) error { + secret := &corev1.Secret{} + if err := r.Get(ctx, client.ObjectKey{Namespace: w.Secret.Namespace, Name: w.Secret.Name}, secret); err != nil { + return fmt.Errorf("unable to get Secret for webhook URL: %w", err) + } + webhookURL := string(secret.Data[w.Secret.Key]) + webhookChannel := w.Channel + + res := slack.UpdateResult{ + CronJobNamespace: cj.Namespace, + CronJobName: cj.Name, + ContainerUpdates: []slack.ContainerUpdate{}, + RestartedJobName: rj, + } + + for _, oc := range oldSpec.JobTemplate.Spec.Template.Spec.Containers { + for _, nc := range cj.Spec.JobTemplate.Spec.Template.Spec.Containers { + if oc.Name == nc.Name && !(oc.Resources.Limits.Memory().Equal(*nc.Resources.Limits.Memory())) { + res.ContainerUpdates = append(res.ContainerUpdates, slack.ContainerUpdate{ + Name: oc.Name, + BeforeMemory: oc.Resources.Limits.Memory().String(), + AfterMemory: nc.Resources.Limits.Memory().String(), + }) + } + } + } + + if err := slack.SendMessage(res, webhookURL, webhookChannel); err != nil { + return fmt.Errorf("unable to send message to Slack: %w", err) + } + return nil +} + +// findObjectsForPod finds Brooms to create a reconcile request +func (r *BroomReconciler) findObjectsForPod(ctx context.Context, pod client.Object) []reconcile.Request { + attachedBrooms := &aiv1alpha1.BroomList{} + if err := r.List(ctx, attachedBrooms); err != nil { + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, len(attachedBrooms.Items)) + for i, item := range attachedBrooms.Items { + requests[i] = reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: item.GetName(), + Namespace: item.GetNamespace(), + }, + } + } + return requests +} + // SetupWithManager sets up the controller with the Manager. func (r *BroomReconciler) SetupWithManager(mgr ctrl.Manager) error { + p := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + new := e.ObjectNew.(*corev1.Pod) + return new.Status.Phase == "Failed" + }, + } + return ctrl.NewControllerManagedBy(mgr). For(&aiv1alpha1.Broom{}). + Watches( + &corev1.Pod{}, + handler.EnqueueRequestsFromMapFunc(r.findObjectsForPod), + builder.WithPredicates(p), + ). Complete(r) } diff --git a/internal/random/random.go b/internal/random/random.go new file mode 100644 index 0000000..edf9b41 --- /dev/null +++ b/internal/random/random.go @@ -0,0 +1,15 @@ +package random + +import ( + "math/rand" +) + +func GetRandomString(n int) string { + var letters = []rune("abcdefghijklmnopqrstuvwxyz0123456789") + + s := make([]rune, n) + for i := range s { + s[i] = letters[rand.Intn(len(letters))] + } + return string(s) +} diff --git a/internal/slack/slack.go b/internal/slack/slack.go new file mode 100644 index 0000000..e7e4dac --- /dev/null +++ b/internal/slack/slack.go @@ -0,0 +1,64 @@ +package slack + +import ( + "fmt" + + "github.com/slack-go/slack" +) + +const ( + slackUserName = "Broom" + slackIconEmoji = ":broom:" +) + +type ContainerUpdate struct { + Name string `json:"name"` + BeforeMemory string `json:"before_memory"` + AfterMemory string `json:"after_memory"` +} + +type UpdateResult struct { + CronJobNamespace string `json:"cronjob_namespace"` + CronJobName string `json:"cronjob_name"` + ContainerUpdates []ContainerUpdate `json:"container_updates"` + RestartedJobName string `json:"restarted_job_name"` +} + +func SendMessage(res UpdateResult, webhookURL string, webhookChannel string) error { + if len(res.ContainerUpdates) == 0 { + return nil + } + + var memoryChanges string + for _, u := range res.ContainerUpdates { + memoryChanges += fmt.Sprintf("\t:sparkles: *%s (%s → %s)*\n", u.Name, u.BeforeMemory, u.AfterMemory) + } + + restartedJob := res.RestartedJobName + if restartedJob == "" { + restartedJob = "None" + } + + attatchment := slack.Attachment{ + Text: fmt.Sprintf("Namespace: *%s*\nName: *%s*\nContainer memory changes:\n%sRestarted Job: *%s*\n", + res.CronJobNamespace, + res.CronJobName, + memoryChanges, + restartedJob, + ), + } + + msg := slack.WebhookMessage{ + Username: slackUserName, + IconEmoji: slackIconEmoji, + Channel: webhookChannel, + Text: ":broom: CronJob jobTemplate updated", + Attachments: []slack.Attachment{attatchment}, + } + err := slack.PostWebhook(webhookURL, &msg) + if err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + return nil +}