From ebaa1a769597ce81876c6150f32e4e0ca5b9783d Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Mon, 1 Apr 2024 18:38:54 +0900 Subject: [PATCH 1/9] feat: add basic features --- Dockerfile | 2 +- api/v1alpha1/broom_types.go | 64 +++++- api/v1alpha1/zz_generated.deepcopy.go | 73 +++++- config/crd/bases/ai.m3.com_brooms.yaml | 49 +++- config/manager/kustomization.yaml | 6 + config/rbac/role.yaml | 52 +++++ config/samples/ai_v1alpha1_broom.yaml | 16 +- config/samples/cronjob.yaml | 34 +++ config/samples/secret.yaml | 7 + go.mod | 4 +- go.sum | 9 + internal/controller/broom_controller.go | 293 +++++++++++++++++++++++- internal/random/random.go | 15 ++ internal/slack/slack.go | 64 ++++++ 14 files changed, 676 insertions(+), 12 deletions(-) create mode 100644 config/samples/cronjob.yaml create mode 100644 config/samples/secret.yaml create mode 100644 internal/random/random.go create mode 100644 internal/slack/slack.go diff --git a/Dockerfile b/Dockerfile index aca26f9..fa0ed5d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN go mod download # Copy the go source COPY cmd/main.go cmd/main.go COPY api/ api/ -COPY internal/controller/ internal/controller/ +COPY internal/ internal/ # Build # the GOARCH has not a default value to allow the binary be built according to the host where the command diff --git a/api/v1alpha1/broom_types.go b/api/v1alpha1/broom_types.go index becf16d..12b1a48 100644 --- a/api/v1alpha1/broom_types.go +++ b/api/v1alpha1/broom_types.go @@ -17,19 +17,79 @@ limitations under the License. package v1alpha1 import ( + "fmt" + "strconv" + + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +type BroomTarget struct { + Name string `json:"name,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + Namespace string `json:"namespace,omitempty"` +} + +type BroomAdjustmentType string + +const ( + AddAdjustment BroomAdjustmentType = "Add" + MulAdjustment BroomAdjustmentType = "Mul" +) + +type BroomAdjustment struct { + Type BroomAdjustmentType `json:"type"` + Value string `json:"value"` +} + +func (adj BroomAdjustment) IncreaseMemory(m *resource.Quantity) error { + switch adj.Type { + case AddAdjustment: + y, err := resource.ParseQuantity(adj.Value) + if err != nil { + return fmt.Errorf("unable to parse value to resource.Quantity: %w", err) + } + m.Add(y) + case MulAdjustment: + y, err := strconv.Atoi(adj.Value) + if err != nil { + return fmt.Errorf("unable to parse value to int: %w", err) + } + m.Mul(int64(y)) + } + return nil +} + +type BroomRetryPolicy string + +const ( + RetryAllowPolicy BroomRetryPolicy = "Allow" + RetryForbidPolicy BroomRetryPolicy = "Forbid" +) + +type BroomWebhookSecret struct { + Namespace string `json:"namespace"` + Name string `json:"name"` + Key string `json:"key"` +} + +type BroomWebhook struct { + Secret BroomWebhookSecret `json:"secret"` + Channel string `json:"channel,omitempty"` +} + // BroomSpec defines the desired state of Broom type BroomSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - // Foo is an example field of Broom. Edit broom_types.go to remove/update - Foo string `json:"foo,omitempty"` + Target BroomTarget `json:"target,omitempty"` + Adjustment BroomAdjustment `json:"adjustment"` + RetryPolicy BroomRetryPolicy `json:"retryPolicy"` + Webhook BroomWebhook `json:"webhook"` } // BroomStatus defines the observed state of Broom diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 84b9cc8..1bc6313 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -29,7 +29,7 @@ func (in *Broom) DeepCopyInto(out *Broom) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) - out.Spec = in.Spec + in.Spec.DeepCopyInto(&out.Spec) out.Status = in.Status } @@ -51,6 +51,21 @@ func (in *Broom) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BroomAdjustment) DeepCopyInto(out *BroomAdjustment) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomAdjustment. +func (in *BroomAdjustment) DeepCopy() *BroomAdjustment { + if in == nil { + return nil + } + out := new(BroomAdjustment) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BroomList) DeepCopyInto(out *BroomList) { *out = *in @@ -86,6 +101,9 @@ func (in *BroomList) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *BroomSpec) DeepCopyInto(out *BroomSpec) { *out = *in + in.Target.DeepCopyInto(&out.Target) + out.Adjustment = in.Adjustment + out.Webhook = in.Webhook } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomSpec. @@ -112,3 +130,56 @@ func (in *BroomStatus) DeepCopy() *BroomStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BroomTarget) DeepCopyInto(out *BroomTarget) { + *out = *in + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomTarget. +func (in *BroomTarget) DeepCopy() *BroomTarget { + if in == nil { + return nil + } + out := new(BroomTarget) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BroomWebhook) DeepCopyInto(out *BroomWebhook) { + *out = *in + out.Secret = in.Secret +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomWebhook. +func (in *BroomWebhook) DeepCopy() *BroomWebhook { + if in == nil { + return nil + } + out := new(BroomWebhook) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BroomWebhookSecret) DeepCopyInto(out *BroomWebhookSecret) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomWebhookSecret. +func (in *BroomWebhookSecret) DeepCopy() *BroomWebhookSecret { + if in == nil { + return nil + } + out := new(BroomWebhookSecret) + in.DeepCopyInto(out) + return out +} diff --git a/config/crd/bases/ai.m3.com_brooms.yaml b/config/crd/bases/ai.m3.com_brooms.yaml index 5ddff0c..9f94034 100644 --- a/config/crd/bases/ai.m3.com_brooms.yaml +++ b/config/crd/bases/ai.m3.com_brooms.yaml @@ -39,10 +39,53 @@ spec: spec: description: BroomSpec defines the desired state of Broom properties: - foo: - description: Foo is an example field of Broom. Edit broom_types.go - to remove/update + adjustment: + properties: + type: + type: string + value: + type: string + required: + - type + - value + type: object + retryPolicy: type: string + target: + properties: + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string + type: object + webhook: + properties: + channel: + type: string + secret: + properties: + key: + type: string + name: + type: string + namespace: + type: string + required: + - key + - name + - namespace + type: object + required: + - secret + type: object + required: + - adjustment + - retryPolicy + - webhook type: object status: description: BroomStatus defines the observed state of Broom diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 5c5f0b8..d3da005 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -1,2 +1,8 @@ resources: - manager.yaml +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization +images: +- name: controller + newName: asia-northeast1-docker.pkg.dev/m3ai-ullman-dev/arbok/broom + newTag: latest diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 6eb1557..de0792b 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -4,6 +4,34 @@ kind: ClusterRole metadata: name: manager-role rules: +- apiGroups: + - "" + resources: + - events + verbs: + - get + - list + - watch +- apiGroups: + - "" + resources: + - pods + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list + - watch - apiGroups: - ai.m3.com resources: @@ -30,3 +58,27 @@ rules: - get - patch - update +- apiGroups: + - batch + resources: + - cronjobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch +- apiGroups: + - batch + resources: + - jobs + verbs: + - create + - delete + - get + - list + - patch + - update + - watch diff --git a/config/samples/ai_v1alpha1_broom.yaml b/config/samples/ai_v1alpha1_broom.yaml index 2785617..205dfa6 100644 --- a/config/samples/ai_v1alpha1_broom.yaml +++ b/config/samples/ai_v1alpha1_broom.yaml @@ -9,4 +9,18 @@ metadata: app.kubernetes.io/created-by: broom name: broom-sample spec: - # TODO(user): Add fields here + target: + # name: oom-sample + labels: + rendezvous.m3.com/use-broom: "true" + # namespace: broom + adjustment: + type: Mul + value: "2" + retryPolicy: "Allow" + webhook: + secret: + namespace: default + name: broom + key: SLACK_WEBHOOK_URL + # channel: "#alert" diff --git a/config/samples/cronjob.yaml b/config/samples/cronjob.yaml new file mode 100644 index 0000000..80caac2 --- /dev/null +++ b/config/samples/cronjob.yaml @@ -0,0 +1,34 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: oom-sample + labels: + rendezvous.m3.com/use-broom: "true" +spec: + schedule: "*/2 * * * *" + jobTemplate: + spec: + activeDeadlineSeconds: 60 + backoffLimit: 1 + template: + spec: + containers: + - name: oom-container + image: ubuntu:latest + command: + - /bin/bash + - -c + args: + - | + echo PID=$$ + for i in {0..9} + do + eval a$i'=$(head --bytes 5000000 /dev/zero | cat -v)' + echo $((i++)); + done + resources: + limits: + memory: "100Mi" + requests: + memory: "50Mi" + restartPolicy: Never diff --git a/config/samples/secret.yaml b/config/samples/secret.yaml new file mode 100644 index 0000000..f246aa1 --- /dev/null +++ b/config/samples/secret.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Secret +metadata: + name: broom +type: Opaque +data: + SLACK_WEBHOOK_URL: TO_BE_SPECIFIED \ No newline at end of file diff --git a/go.mod b/go.mod index f2d8dc4..55a5c30 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.21 require ( github.com/onsi/ginkgo/v2 v2.14.0 github.com/onsi/gomega v1.30.0 + github.com/slack-go/slack v0.12.5 + k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 k8s.io/client-go v0.29.0 sigs.k8s.io/controller-runtime v0.17.0 @@ -31,6 +33,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 // indirect github.com/google/uuid v1.3.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/imdario/mergo v0.3.6 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect @@ -61,7 +64,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.29.0 // indirect k8s.io/apiextensions-apiserver v0.29.0 // indirect k8s.io/component-base v0.29.0 // indirect k8s.io/klog/v2 v2.110.1 // indirect diff --git a/go.sum b/go.sum index 57b4fa9..37164a5 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,8 @@ github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/ github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= +github.com/go-test/deep v1.0.4 h1:u2CU3YKy9I2pmu9pX0eq50wCgjfGIt539SqR7FbHiho= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -41,6 +43,7 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -51,6 +54,9 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJY github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -96,11 +102,14 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/slack-go/slack v0.12.5 h1:ddZ6uz6XVaB+3MTDhoW04gG+Vc/M/X1ctC+wssy2cqs= +github.com/slack-go/slack v0.12.5/go.mod h1:hlGi5oXA+Gt+yWTPP0plCdRKmjsDxecdHxYQdlMQKOw= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= diff --git a/internal/controller/broom_controller.go b/internal/controller/broom_controller.go index 3f2dc16..e7264ee 100644 --- a/internal/controller/broom_controller.go +++ b/internal/controller/broom_controller.go @@ -18,24 +18,43 @@ package controller import ( "context" + "fmt" + "slices" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" aiv1alpha1 "github.com/m3dev/broom/api/v1alpha1" + "github.com/m3dev/broom/internal/random" + "github.com/m3dev/broom/internal/slack" ) // BroomReconciler reconciles a Broom object type BroomReconciler struct { client.Client - Scheme *runtime.Scheme + Scheme *runtime.Scheme + ResolvedJobs map[types.UID]struct{} } //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms/finalizers,verbs=update +//+kubebuilder:rbac:groups="",resources=events,verbs=get;list;watch +//+kubebuilder:rbac:groups=batch,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. @@ -47,16 +66,284 @@ type BroomReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.0/pkg/reconcile func (r *BroomReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = log.FromContext(ctx) + log := log.FromContext(ctx) + var broom aiv1alpha1.Broom + if err := r.Get(ctx, req.NamespacedName, &broom); err != nil { + log.Error(err, "unable to fetch Broom") + return ctrl.Result{}, client.IgnoreNotFound(err) + } - // TODO(user): your logic here + br, err := r.listCandidateBatchResources(ctx, broom.Spec.Target.Namespace) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to fetch candidate resources: %w", err) + } + cronJobOwnedReferences := r.traceOOMKilledOwnerReference(br) + for uid, info := range cronJobOwnedReferences { + cronJob, ok := br.cronJobs[uid] + if !ok { + return ctrl.Result{}, fmt.Errorf("unable to fetch CronJob: UID=%s", uid) + } + if !isTargeted(cronJob, broom.Spec.Target) { + continue + } + oldSpec := cronJob.Spec.DeepCopy() + newSpec, err := r.getNewCronJobSpec(&cronJob, broom.Spec.Adjustment, info.OOMContainerNames) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to get updated CronJob spec: %w", err) + } + err = r.updateCronJob(ctx, &cronJob, newSpec) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to update CronJob: %w", err) + } + + var retriedJobName string + if broom.Spec.RetryPolicy == aiv1alpha1.RetryAllowPolicy { + retriedJobName, err = r.retryJob(ctx, &cronJob, info) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to retry Job: %w", err) + } + } + + if err := r.notifyResult(ctx, broom.Spec.Webhook, &cronJob, *oldSpec, retriedJobName); err != nil { + return ctrl.Result{}, fmt.Errorf("unable to notify Slack: %w", err) + } + } return ctrl.Result{}, nil } +type batchResources struct { + pods map[types.UID]corev1.Pod + jobs map[types.UID]batchv1.Job + cronJobs map[types.UID]batchv1.CronJob +} + +// listCandidateResources lists all pods, jobs, cronjobs in the specified namespace +func (r *BroomReconciler) listCandidateBatchResources(ctx context.Context, namespace string) (*batchResources, error) { + b := &batchResources{ + pods: make(map[types.UID]corev1.Pod, 0), + jobs: make(map[types.UID]batchv1.Job, 0), + cronJobs: make(map[types.UID]batchv1.CronJob, 0), + } + opts := client.ListOptions{ + Namespace: namespace, + } + + var pods corev1.PodList + if err := r.List(ctx, &pods, &opts); err != nil { + return nil, fmt.Errorf("unable to fetch Pods: %w", err) + } + for _, pod := range pods.Items { + b.pods[pod.UID] = pod + } + + var jobs batchv1.JobList + if err := r.List(ctx, &jobs, &opts); err != nil { + return nil, fmt.Errorf("unable to fetch Jobs: %w", err) + } + for _, job := range jobs.Items { + b.jobs[job.UID] = job + } + + var cronJobs batchv1.CronJobList + if err := r.List(ctx, &cronJobs, &opts); err != nil { + return nil, fmt.Errorf("unable to fetch CronJobs: %w", err) + } + for _, cronJob := range cronJobs.Items { + b.cronJobs[cronJob.UID] = cronJob + } + + return b, nil +} + +type cronJobOOMInfo struct { + LastFailedJob *batchv1.Job + OOMContainerNames []string +} + +// traceOOMKilledOwnerReference returns a reference from CronJob UID to OOMKilled Pod and referenced Job information +func (r *BroomReconciler) traceOOMKilledOwnerReference(br *batchResources) map[types.UID]cronJobOOMInfo { + jobOwnedOOMContainers := make(map[metav1.OwnerReference][]string, 0) + for _, p := range br.pods { + for _, cs := range p.Status.ContainerStatuses { + if cs.State.Terminated == nil || cs.State.Terminated.Reason != "OOMKilled" { + continue + } + for _, ref := range p.OwnerReferences { + if ref.Kind == "Job" { + jobOwnedOOMContainers[ref] = append(jobOwnedOOMContainers[ref], cs.Name) + } + } + } + } + + if r.ResolvedJobs == nil { + r.ResolvedJobs = make(map[types.UID]struct{}, 0) + } + + cronJobOwnedReferences := make(map[types.UID]cronJobOOMInfo, 0) + for ref, containerNames := range jobOwnedOOMContainers { + job, ok := br.jobs[ref.UID] + if !ok { + continue // possibly deleted while reconciling + } + for _, ownerRef := range job.OwnerReferences { + if _, ok := r.ResolvedJobs[ref.UID]; !ok && ownerRef.Kind == "CronJob" { + if oomInfo, ok := cronJobOwnedReferences[ownerRef.UID]; !ok { + cronJobOwnedReferences[ownerRef.UID] = cronJobOOMInfo{ + LastFailedJob: &job, + OOMContainerNames: containerNames, + } + } else { + if job.CreationTimestamp.After(oomInfo.LastFailedJob.CreationTimestamp.Time) { + oomInfo.LastFailedJob = &job + } + } + r.ResolvedJobs[ref.UID] = struct{}{} + } + } + } + + return cronJobOwnedReferences +} + +// getNewCronJobSpec returns a spec with increased memory limit for CronJob +func (r *BroomReconciler) getNewCronJobSpec(cj *batchv1.CronJob, adj aiv1alpha1.BroomAdjustment, containers []string) (*batchv1.CronJobSpec, error) { + newSpec := cj.Spec.DeepCopy() + for i, c := range cj.Spec.JobTemplate.Spec.Template.Spec.Containers { + if !slices.Contains(containers, c.Name) { // Ignore non-OOM Pod containers + continue + } + if m := c.Resources.Limits.Memory(); m != nil { + if err := adj.IncreaseMemory(m); err != nil { + return &batchv1.CronJobSpec{}, fmt.Errorf("unable to increase memory: %w", err) + } + newSpec.JobTemplate.Spec.Template.Spec.Containers[i].Resources.Limits[corev1.ResourceMemory] = *m + } + } + return newSpec, nil +} + +// updateCronJob updates the CronJob in the Kubernetes cluster with given spec +func (r *BroomReconciler) updateCronJob(ctx context.Context, cj *batchv1.CronJob, spec *batchv1.CronJobSpec) error { + log := log.FromContext(ctx) + cj.Spec = *spec + if err := r.Update(ctx, cj); err != nil { + return fmt.Errorf("unable to update CronJob: %w", err) + } + log.Info("Updated CronJob", "name", cj.Name) + return nil +} + +// retryJob creates Job for the failed Job with updated CronJob jobTemplate spec +func (r *BroomReconciler) retryJob(ctx context.Context, cj *batchv1.CronJob, info cronJobOOMInfo) (string, error) { + log := log.FromContext(ctx) + randomString := random.GetRandomString(5) + retriedJobName := fmt.Sprintf("%s-retry-%s", info.LastFailedJob.Name, randomString) + annotations := map[string]string{ + "rendezvous.m3.com/retried-by-broom": "true", + } + job := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: cj.Namespace, + Name: retriedJobName, + Labels: info.LastFailedJob.Labels, + Annotations: annotations, + }, + Spec: cj.Spec.JobTemplate.Spec, + } + if err := r.Create(ctx, &job); err != nil { + return "", fmt.Errorf("unable to create retried Job: %w", err) + } + log.Info("Retried Job", "name", job.Name) + return job.Name, nil +} + +// isTargeted returns whether the Cronjob matches all the fields specified for the target or not +func isTargeted(cj batchv1.CronJob, target aiv1alpha1.BroomTarget) bool { + if target.Namespace != "" && cj.Namespace != target.Namespace { + return false + } + if target.Name != "" && cj.Name != target.Name { + return false + } + for k, v := range target.Labels { + val, ok := cj.Labels[k] + if !ok || val != v { + return false + } + } + return true +} + +// notifyResult notifies the result of changes with webhook information retrieved from Secret +func (r *BroomReconciler) notifyResult(ctx context.Context, w aiv1alpha1.BroomWebhook, cj *batchv1.CronJob, oldSpec batchv1.CronJobSpec, rj string) error { + secret := &corev1.Secret{} + if err := r.Get(ctx, client.ObjectKey{Namespace: w.Secret.Namespace, Name: w.Secret.Name}, secret); err != nil { + return fmt.Errorf("unable to get Secret for webhook URL: %w", err) + } + webhookURL := string(secret.Data[w.Secret.Key]) + webhookChannel := w.Channel + + res := slack.UpdateResult{ + CronJobNamespace: cj.Namespace, + CronJobName: cj.Name, + ContainerUpdates: []slack.ContainerUpdate{}, + RetriedJobName: rj, + } + + for _, oc := range oldSpec.JobTemplate.Spec.Template.Spec.Containers { + for _, nc := range cj.Spec.JobTemplate.Spec.Template.Spec.Containers { + if oc.Name == nc.Name && !(oc.Resources.Limits.Memory().Equal(*nc.Resources.Limits.Memory())) { + res.ContainerUpdates = append(res.ContainerUpdates, slack.ContainerUpdate{ + Name: oc.Name, + BeforeMemory: oc.Resources.Limits.Memory().String(), + AfterMemory: nc.Resources.Limits.Memory().String(), + }) + } + } + } + + if err := slack.SendMessage(res, webhookURL, webhookChannel); err != nil { + return fmt.Errorf("unable to send message to Slack: %w", err) + } + return nil +} + +// findObjectsForPod finds Brooms to create a reconcile request +func (r *BroomReconciler) findObjectsForPod(ctx context.Context, pod client.Object) []reconcile.Request { + attachedBrooms := &aiv1alpha1.BroomList{} + if err := r.List(ctx, attachedBrooms); err != nil { + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, len(attachedBrooms.Items)) + for i, item := range attachedBrooms.Items { + requests[i] = reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: item.GetName(), + Namespace: item.GetNamespace(), + }, + } + } + return requests +} + // SetupWithManager sets up the controller with the Manager. func (r *BroomReconciler) SetupWithManager(mgr ctrl.Manager) error { + p := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + new := e.ObjectNew.(*corev1.Pod) + return new.Status.Phase == "Failed" + }, + } + return ctrl.NewControllerManagedBy(mgr). For(&aiv1alpha1.Broom{}). + Watches( + &corev1.Pod{}, + handler.EnqueueRequestsFromMapFunc(r.findObjectsForPod), + builder.WithPredicates(p), + ). Complete(r) } diff --git a/internal/random/random.go b/internal/random/random.go new file mode 100644 index 0000000..edf9b41 --- /dev/null +++ b/internal/random/random.go @@ -0,0 +1,15 @@ +package random + +import ( + "math/rand" +) + +func GetRandomString(n int) string { + var letters = []rune("abcdefghijklmnopqrstuvwxyz0123456789") + + s := make([]rune, n) + for i := range s { + s[i] = letters[rand.Intn(len(letters))] + } + return string(s) +} diff --git a/internal/slack/slack.go b/internal/slack/slack.go new file mode 100644 index 0000000..9167cbf --- /dev/null +++ b/internal/slack/slack.go @@ -0,0 +1,64 @@ +package slack + +import ( + "fmt" + + "github.com/slack-go/slack" +) + +const ( + slackUserName = "Broom" + slackIconEmoji = ":broom:" +) + +type ContainerUpdate struct { + Name string `json:"name"` + BeforeMemory string `json:"before_memory"` + AfterMemory string `json:"after_memory"` +} + +type UpdateResult struct { + CronJobNamespace string `json:"cronjob_namespace"` + CronJobName string `json:"cronjob_name"` + ContainerUpdates []ContainerUpdate `json:"container_updates"` + RetriedJobName string `json:"retried_job_name"` +} + +func SendMessage(res UpdateResult, webhookURL string, webhookChannel string) error { + if len(res.ContainerUpdates) == 0 { + return nil + } + + var memoryChanges string + for _, u := range res.ContainerUpdates { + memoryChanges += fmt.Sprintf("\t:sparkles: *%s (%s → %s)*\n", u.Name, u.BeforeMemory, u.AfterMemory) + } + + retriedJob := res.RetriedJobName + if retriedJob == "" { + retriedJob = "None" + } + + attatchment := slack.Attachment{ + Text: fmt.Sprintf("Namespace: *%s*\nName: *%s*\nContainer memory changes:\n%sRetried Job: *%s*\n", + res.CronJobNamespace, + res.CronJobName, + memoryChanges, + retriedJob, + ), + } + + msg := slack.WebhookMessage{ + Username: slackUserName, + IconEmoji: slackIconEmoji, + Channel: webhookChannel, + Text: ":broom: CronJob jobTemplate updated", + Attachments: []slack.Attachment{attatchment}, + } + err := slack.PostWebhook(webhookURL, &msg) + if err != nil { + return fmt.Errorf("failed to send message: %w", err) + } + + return nil +} From 18c95fffa4863a71adb238abf022db756012ea05 Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Mon, 1 Apr 2024 22:12:57 +0900 Subject: [PATCH 2/9] fix: initialize Reconciler with initialized map field --- cmd/main.go | 5 +---- internal/controller/broom_controller.go | 12 ++++++++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index cb8a657..2b99b53 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -122,10 +122,7 @@ func main() { os.Exit(1) } - if err = (&controller.BroomReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { + if err = controller.New(mgr).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Broom") os.Exit(1) } diff --git a/internal/controller/broom_controller.go b/internal/controller/broom_controller.go index e7264ee..97a0bb4 100644 --- a/internal/controller/broom_controller.go +++ b/internal/controller/broom_controller.go @@ -47,6 +47,14 @@ type BroomReconciler struct { ResolvedJobs map[types.UID]struct{} } +func New(mgr ctrl.Manager) *BroomReconciler { + return &BroomReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + ResolvedJobs: make(map[types.UID]struct{}, 0), + } +} + //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms/status,verbs=get;update;patch //+kubebuilder:rbac:groups=ai.m3.com,resources=brooms/finalizers,verbs=update @@ -177,10 +185,6 @@ func (r *BroomReconciler) traceOOMKilledOwnerReference(br *batchResources) map[t } } - if r.ResolvedJobs == nil { - r.ResolvedJobs = make(map[types.UID]struct{}, 0) - } - cronJobOwnedReferences := make(map[types.UID]cronJobOOMInfo, 0) for ref, containerNames := range jobOwnedOOMContainers { job, ok := br.jobs[ref.UID] From 4f1e1ff3f246584a6492c01d187592936d6943cb Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Mon, 1 Apr 2024 22:41:27 +0900 Subject: [PATCH 3/9] fix: change restartPolicy --- api/v1alpha1/broom_types.go | 14 +++++++------- config/crd/bases/ai.m3.com_brooms.yaml | 4 ++-- internal/controller/broom_controller.go | 20 ++++++++++---------- internal/slack/slack.go | 10 +++++----- 4 files changed, 24 insertions(+), 24 deletions(-) diff --git a/api/v1alpha1/broom_types.go b/api/v1alpha1/broom_types.go index 12b1a48..9e9975e 100644 --- a/api/v1alpha1/broom_types.go +++ b/api/v1alpha1/broom_types.go @@ -63,11 +63,11 @@ func (adj BroomAdjustment) IncreaseMemory(m *resource.Quantity) error { return nil } -type BroomRetryPolicy string +type BroomRestartPolicy string const ( - RetryAllowPolicy BroomRetryPolicy = "Allow" - RetryForbidPolicy BroomRetryPolicy = "Forbid" + RestartOnOOMPolicy BroomRestartPolicy = "OnOOM" + RestartNeverPolicy BroomRestartPolicy = "Never" ) type BroomWebhookSecret struct { @@ -86,10 +86,10 @@ type BroomSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster // Important: Run "make" to regenerate code after modifying this file - Target BroomTarget `json:"target,omitempty"` - Adjustment BroomAdjustment `json:"adjustment"` - RetryPolicy BroomRetryPolicy `json:"retryPolicy"` - Webhook BroomWebhook `json:"webhook"` + Target BroomTarget `json:"target,omitempty"` + Adjustment BroomAdjustment `json:"adjustment"` + RestartPolicy BroomRestartPolicy `json:"restartPolicy"` + Webhook BroomWebhook `json:"webhook"` } // BroomStatus defines the observed state of Broom diff --git a/config/crd/bases/ai.m3.com_brooms.yaml b/config/crd/bases/ai.m3.com_brooms.yaml index 9f94034..9bb76c7 100644 --- a/config/crd/bases/ai.m3.com_brooms.yaml +++ b/config/crd/bases/ai.m3.com_brooms.yaml @@ -49,7 +49,7 @@ spec: - type - value type: object - retryPolicy: + restartPolicy: type: string target: properties: @@ -84,7 +84,7 @@ spec: type: object required: - adjustment - - retryPolicy + - restartPolicy - webhook type: object status: diff --git a/internal/controller/broom_controller.go b/internal/controller/broom_controller.go index 97a0bb4..e4f4fe5 100644 --- a/internal/controller/broom_controller.go +++ b/internal/controller/broom_controller.go @@ -105,15 +105,15 @@ func (r *BroomReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, fmt.Errorf("unable to update CronJob: %w", err) } - var retriedJobName string - if broom.Spec.RetryPolicy == aiv1alpha1.RetryAllowPolicy { - retriedJobName, err = r.retryJob(ctx, &cronJob, info) + var restartedJobName string + if broom.Spec.RestartPolicy == aiv1alpha1.RestartOnOOMPolicy { + restartedJobName, err = r.restartUpdatedJob(ctx, &cronJob, info) if err != nil { - return ctrl.Result{}, fmt.Errorf("unable to retry Job: %w", err) + return ctrl.Result{}, fmt.Errorf("unable to restart updated Job: %w", err) } } - if err := r.notifyResult(ctx, broom.Spec.Webhook, &cronJob, *oldSpec, retriedJobName); err != nil { + if err := r.notifyResult(ctx, broom.Spec.Webhook, &cronJob, *oldSpec, restartedJobName); err != nil { return ctrl.Result{}, fmt.Errorf("unable to notify Slack: %w", err) } } @@ -239,13 +239,13 @@ func (r *BroomReconciler) updateCronJob(ctx context.Context, cj *batchv1.CronJob return nil } -// retryJob creates Job for the failed Job with updated CronJob jobTemplate spec -func (r *BroomReconciler) retryJob(ctx context.Context, cj *batchv1.CronJob, info cronJobOOMInfo) (string, error) { +// restartUpdatedJob creates Job for the failed Job with updated CronJob jobTemplate spec +func (r *BroomReconciler) restartUpdatedJob(ctx context.Context, cj *batchv1.CronJob, info cronJobOOMInfo) (string, error) { log := log.FromContext(ctx) randomString := random.GetRandomString(5) - retriedJobName := fmt.Sprintf("%s-retry-%s", info.LastFailedJob.Name, randomString) + retriedJobName := fmt.Sprintf("%s-restart-%s", info.LastFailedJob.Name, randomString) annotations := map[string]string{ - "rendezvous.m3.com/retried-by-broom": "true", + "rendezvous.m3.com/restarted-by-broom": "true", } job := batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -293,7 +293,7 @@ func (r *BroomReconciler) notifyResult(ctx context.Context, w aiv1alpha1.BroomWe CronJobNamespace: cj.Namespace, CronJobName: cj.Name, ContainerUpdates: []slack.ContainerUpdate{}, - RetriedJobName: rj, + RestartedJobName: rj, } for _, oc := range oldSpec.JobTemplate.Spec.Template.Spec.Containers { diff --git a/internal/slack/slack.go b/internal/slack/slack.go index 9167cbf..88c30be 100644 --- a/internal/slack/slack.go +++ b/internal/slack/slack.go @@ -21,7 +21,7 @@ type UpdateResult struct { CronJobNamespace string `json:"cronjob_namespace"` CronJobName string `json:"cronjob_name"` ContainerUpdates []ContainerUpdate `json:"container_updates"` - RetriedJobName string `json:"retried_job_name"` + RestartedJobName string `json:"restarted_job_name"` } func SendMessage(res UpdateResult, webhookURL string, webhookChannel string) error { @@ -34,9 +34,9 @@ func SendMessage(res UpdateResult, webhookURL string, webhookChannel string) err memoryChanges += fmt.Sprintf("\t:sparkles: *%s (%s → %s)*\n", u.Name, u.BeforeMemory, u.AfterMemory) } - retriedJob := res.RetriedJobName - if retriedJob == "" { - retriedJob = "None" + restartedJob := res.RestartedJobName + if restartedJob == "" { + restartedJob = "None" } attatchment := slack.Attachment{ @@ -44,7 +44,7 @@ func SendMessage(res UpdateResult, webhookURL string, webhookChannel string) err res.CronJobNamespace, res.CronJobName, memoryChanges, - retriedJob, + restartedJob, ), } From 6ef1660dfdeaffaf0c62b6b269e3aeee36f2b271 Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Mon, 1 Apr 2024 22:51:18 +0900 Subject: [PATCH 4/9] fix: add the word 'slack' to 'webhook' --- api/v1alpha1/broom_types.go | 10 ++-- api/v1alpha1/zz_generated.deepcopy.go | 62 ++++++++++++------------- config/samples/ai_v1alpha1_broom.yaml | 2 +- internal/controller/broom_controller.go | 4 +- 4 files changed, 39 insertions(+), 39 deletions(-) diff --git a/api/v1alpha1/broom_types.go b/api/v1alpha1/broom_types.go index 9e9975e..76cfb1e 100644 --- a/api/v1alpha1/broom_types.go +++ b/api/v1alpha1/broom_types.go @@ -70,15 +70,15 @@ const ( RestartNeverPolicy BroomRestartPolicy = "Never" ) -type BroomWebhookSecret struct { +type BroomSlackWebhookSecret struct { Namespace string `json:"namespace"` Name string `json:"name"` Key string `json:"key"` } -type BroomWebhook struct { - Secret BroomWebhookSecret `json:"secret"` - Channel string `json:"channel,omitempty"` +type BroomSlackWebhook struct { + Secret BroomSlackWebhookSecret `json:"secret"` + Channel string `json:"channel,omitempty"` } // BroomSpec defines the desired state of Broom @@ -89,7 +89,7 @@ type BroomSpec struct { Target BroomTarget `json:"target,omitempty"` Adjustment BroomAdjustment `json:"adjustment"` RestartPolicy BroomRestartPolicy `json:"restartPolicy"` - Webhook BroomWebhook `json:"webhook"` + SlackWebhook BroomSlackWebhook `json:"slackWebhook"` } // BroomStatus defines the observed state of Broom diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 1bc6313..176afb5 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -99,87 +99,87 @@ func (in *BroomList) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BroomSpec) DeepCopyInto(out *BroomSpec) { +func (in *BroomSlackWebhook) DeepCopyInto(out *BroomSlackWebhook) { *out = *in - in.Target.DeepCopyInto(&out.Target) - out.Adjustment = in.Adjustment - out.Webhook = in.Webhook + out.Secret = in.Secret } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomSpec. -func (in *BroomSpec) DeepCopy() *BroomSpec { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomSlackWebhook. +func (in *BroomSlackWebhook) DeepCopy() *BroomSlackWebhook { if in == nil { return nil } - out := new(BroomSpec) + out := new(BroomSlackWebhook) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BroomStatus) DeepCopyInto(out *BroomStatus) { +func (in *BroomSlackWebhookSecret) DeepCopyInto(out *BroomSlackWebhookSecret) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomStatus. -func (in *BroomStatus) DeepCopy() *BroomStatus { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomSlackWebhookSecret. +func (in *BroomSlackWebhookSecret) DeepCopy() *BroomSlackWebhookSecret { if in == nil { return nil } - out := new(BroomStatus) + out := new(BroomSlackWebhookSecret) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BroomTarget) DeepCopyInto(out *BroomTarget) { +func (in *BroomSpec) DeepCopyInto(out *BroomSpec) { *out = *in - if in.Labels != nil { - in, out := &in.Labels, &out.Labels - *out = make(map[string]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } + in.Target.DeepCopyInto(&out.Target) + out.Adjustment = in.Adjustment + out.SlackWebhook = in.SlackWebhook } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomTarget. -func (in *BroomTarget) DeepCopy() *BroomTarget { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomSpec. +func (in *BroomSpec) DeepCopy() *BroomSpec { if in == nil { return nil } - out := new(BroomTarget) + out := new(BroomSpec) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BroomWebhook) DeepCopyInto(out *BroomWebhook) { +func (in *BroomStatus) DeepCopyInto(out *BroomStatus) { *out = *in - out.Secret = in.Secret } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomWebhook. -func (in *BroomWebhook) DeepCopy() *BroomWebhook { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomStatus. +func (in *BroomStatus) DeepCopy() *BroomStatus { if in == nil { return nil } - out := new(BroomWebhook) + out := new(BroomStatus) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *BroomWebhookSecret) DeepCopyInto(out *BroomWebhookSecret) { +func (in *BroomTarget) DeepCopyInto(out *BroomTarget) { *out = *in + if in.Labels != nil { + in, out := &in.Labels, &out.Labels + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomWebhookSecret. -func (in *BroomWebhookSecret) DeepCopy() *BroomWebhookSecret { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BroomTarget. +func (in *BroomTarget) DeepCopy() *BroomTarget { if in == nil { return nil } - out := new(BroomWebhookSecret) + out := new(BroomTarget) in.DeepCopyInto(out) return out } diff --git a/config/samples/ai_v1alpha1_broom.yaml b/config/samples/ai_v1alpha1_broom.yaml index 205dfa6..c661926 100644 --- a/config/samples/ai_v1alpha1_broom.yaml +++ b/config/samples/ai_v1alpha1_broom.yaml @@ -18,7 +18,7 @@ spec: type: Mul value: "2" retryPolicy: "Allow" - webhook: + slackWebhook: secret: namespace: default name: broom diff --git a/internal/controller/broom_controller.go b/internal/controller/broom_controller.go index e4f4fe5..460c28a 100644 --- a/internal/controller/broom_controller.go +++ b/internal/controller/broom_controller.go @@ -113,7 +113,7 @@ func (r *BroomReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl } } - if err := r.notifyResult(ctx, broom.Spec.Webhook, &cronJob, *oldSpec, restartedJobName); err != nil { + if err := r.notifyResult(ctx, broom.Spec.SlackWebhook, &cronJob, *oldSpec, restartedJobName); err != nil { return ctrl.Result{}, fmt.Errorf("unable to notify Slack: %w", err) } } @@ -281,7 +281,7 @@ func isTargeted(cj batchv1.CronJob, target aiv1alpha1.BroomTarget) bool { } // notifyResult notifies the result of changes with webhook information retrieved from Secret -func (r *BroomReconciler) notifyResult(ctx context.Context, w aiv1alpha1.BroomWebhook, cj *batchv1.CronJob, oldSpec batchv1.CronJobSpec, rj string) error { +func (r *BroomReconciler) notifyResult(ctx context.Context, w aiv1alpha1.BroomSlackWebhook, cj *batchv1.CronJob, oldSpec batchv1.CronJobSpec, rj string) error { secret := &corev1.Secret{} if err := r.Get(ctx, client.ObjectKey{Namespace: w.Secret.Namespace, Name: w.Secret.Name}, secret); err != nil { return fmt.Errorf("unable to get Secret for webhook URL: %w", err) From 027c7d2b6c4348c8afc6f94cc2d643215c4f9c70 Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Mon, 1 Apr 2024 22:54:50 +0900 Subject: [PATCH 5/9] fix: change labels and annotations to m3.com/* --- config/samples/ai_v1alpha1_broom.yaml | 2 +- config/samples/cronjob.yaml | 2 +- internal/controller/broom_controller.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/config/samples/ai_v1alpha1_broom.yaml b/config/samples/ai_v1alpha1_broom.yaml index c661926..c4aa4b2 100644 --- a/config/samples/ai_v1alpha1_broom.yaml +++ b/config/samples/ai_v1alpha1_broom.yaml @@ -12,7 +12,7 @@ spec: target: # name: oom-sample labels: - rendezvous.m3.com/use-broom: "true" + m3.com/use-broom: "true" # namespace: broom adjustment: type: Mul diff --git a/config/samples/cronjob.yaml b/config/samples/cronjob.yaml index 80caac2..a9e9c7b 100644 --- a/config/samples/cronjob.yaml +++ b/config/samples/cronjob.yaml @@ -3,7 +3,7 @@ kind: CronJob metadata: name: oom-sample labels: - rendezvous.m3.com/use-broom: "true" + m3.com/use-broom: "true" spec: schedule: "*/2 * * * *" jobTemplate: diff --git a/internal/controller/broom_controller.go b/internal/controller/broom_controller.go index 460c28a..44e8101 100644 --- a/internal/controller/broom_controller.go +++ b/internal/controller/broom_controller.go @@ -245,7 +245,7 @@ func (r *BroomReconciler) restartUpdatedJob(ctx context.Context, cj *batchv1.Cro randomString := random.GetRandomString(5) retriedJobName := fmt.Sprintf("%s-restart-%s", info.LastFailedJob.Name, randomString) annotations := map[string]string{ - "rendezvous.m3.com/restarted-by-broom": "true", + "m3.com/restarted-by-broom": "true", } job := batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ From 6bfb7dbaaf765b7a23aa0da4f084778cc5ab9686 Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Tue, 2 Apr 2024 00:31:05 +0900 Subject: [PATCH 6/9] test: add test for isTargeted() --- config/crd/bases/ai.m3.com_brooms.yaml | 26 ++--- internal/controller/broom_controller_test.go | 108 ++++++++++++++++++- 2 files changed, 120 insertions(+), 14 deletions(-) diff --git a/config/crd/bases/ai.m3.com_brooms.yaml b/config/crd/bases/ai.m3.com_brooms.yaml index 9bb76c7..96de4c1 100644 --- a/config/crd/bases/ai.m3.com_brooms.yaml +++ b/config/crd/bases/ai.m3.com_brooms.yaml @@ -51,18 +51,7 @@ spec: type: object restartPolicy: type: string - target: - properties: - labels: - additionalProperties: - type: string - type: object - name: - type: string - namespace: - type: string - type: object - webhook: + slackWebhook: properties: channel: type: string @@ -82,10 +71,21 @@ spec: required: - secret type: object + target: + properties: + labels: + additionalProperties: + type: string + type: object + name: + type: string + namespace: + type: string + type: object required: - adjustment - restartPolicy - - webhook + - slackWebhook type: object status: description: BroomStatus defines the observed state of Broom diff --git a/internal/controller/broom_controller_test.go b/internal/controller/broom_controller_test.go index c0ae092..68c2a75 100644 --- a/internal/controller/broom_controller_test.go +++ b/internal/controller/broom_controller_test.go @@ -21,7 +21,10 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -32,7 +35,11 @@ import ( var _ = Describe("Broom Controller", func() { Context("When reconciling a resource", func() { - const resourceName = "test-resource" + const ( + resourceName = "broom-sample" + cronJobName = "oom-sample" + cronJobNamespace = "broom" + ) ctx := context.Background() @@ -55,6 +62,56 @@ var _ = Describe("Broom Controller", func() { } Expect(k8sClient.Create(ctx, resource)).To(Succeed()) } + + By("creating the Namespace") + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: cronJobNamespace, + }, + } + Expect(k8sClient.Create(ctx, ns)).To(Succeed()) + + By("creating the CronJob") + cronJob := &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: cronJobName, + Namespace: cronJobNamespace, + Labels: map[string]string{ + "m3.com/use-broom": "true", + }, + }, + Spec: batchv1.CronJobSpec{ + Schedule: "*/2 * * * *", + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "oom-container", + Image: "ubuntu:latest", + Command: []string{"/bin/bash", "-c"}, + Args: []string{ + "echo PID=$$; for i in {0..9}; do eval a$i'=$(head --bytes 5000000 /dev/zero | cat -v)'; echo $((i++)); done", + }, + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("100Mi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("50Mi"), + }, + }, + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + }, + } + Expect(k8sClient.Create(ctx, cronJob)).To(Succeed()) }) AfterEach(func() { @@ -65,6 +122,23 @@ var _ = Describe("Broom Controller", func() { By("Cleanup the specific resource instance Broom") Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) + + By("Cleanup the CronJob") + cronJob := &batchv1.CronJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: cronJobName, + Namespace: cronJobNamespace, + }, + } + Expect(k8sClient.Delete(ctx, cronJob)).To(Succeed()) + + By("Cleanup the Namespace") + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: cronJobNamespace, + }, + } + Expect(k8sClient.Delete(ctx, ns)).To(Succeed()) }) It("should successfully reconcile the resource", func() { By("Reconciling the created resource") @@ -79,6 +153,38 @@ var _ = Describe("Broom Controller", func() { Expect(err).NotTo(HaveOccurred()) // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. // Example: If you expect a certain status condition after reconciliation, verify it here. + + By("Checking if the CronJob is targeted correctly") + cronJob := &batchv1.CronJob{} + Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cronJobName, Namespace: cronJobNamespace}, cronJob)).To(Succeed()) + + target := aiv1alpha1.BroomTarget{ + Namespace: cronJobNamespace, + Name: cronJobName, + Labels: map[string]string{ + "m3.com/use-broom": "true", + }, + } + res := isTargeted(*cronJob, target) + Expect(res).To(BeTrue(), "Expected CronJob to be targeted but it is not") + + emptyTarget := aiv1alpha1.BroomTarget{} + res = isTargeted(*cronJob, emptyTarget) + Expect(res).To(BeTrue(), "Expected CronJob to be targeted but it is not") + + wrongNamespaceTarget := aiv1alpha1.BroomTarget{ + Namespace: "default", + } + res = isTargeted(*cronJob, wrongNamespaceTarget) + Expect(res).To(BeFalse(), "Expected CronJob not to be targeted but it is") + + wrongLabelTarget := aiv1alpha1.BroomTarget{ + Labels: map[string]string{ + "m3.com/foo": "bar", + }, + } + res = isTargeted(*cronJob, wrongLabelTarget) + Expect(res).To(BeFalse(), "Expected CronJob not to be targeted but it is") }) }) }) From 2eca08fd9abf635ee7d417c3025499b6eee30878 Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Tue, 2 Apr 2024 00:48:23 +0900 Subject: [PATCH 7/9] fix: change retry to restart --- internal/controller/broom_controller.go | 8 ++++---- internal/slack/slack.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/controller/broom_controller.go b/internal/controller/broom_controller.go index 44e8101..77646e5 100644 --- a/internal/controller/broom_controller.go +++ b/internal/controller/broom_controller.go @@ -243,23 +243,23 @@ func (r *BroomReconciler) updateCronJob(ctx context.Context, cj *batchv1.CronJob func (r *BroomReconciler) restartUpdatedJob(ctx context.Context, cj *batchv1.CronJob, info cronJobOOMInfo) (string, error) { log := log.FromContext(ctx) randomString := random.GetRandomString(5) - retriedJobName := fmt.Sprintf("%s-restart-%s", info.LastFailedJob.Name, randomString) + restartedJobName := fmt.Sprintf("%s-restart-%s", info.LastFailedJob.Name, randomString) annotations := map[string]string{ "m3.com/restarted-by-broom": "true", } job := batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Namespace: cj.Namespace, - Name: retriedJobName, + Name: restartedJobName, Labels: info.LastFailedJob.Labels, Annotations: annotations, }, Spec: cj.Spec.JobTemplate.Spec, } if err := r.Create(ctx, &job); err != nil { - return "", fmt.Errorf("unable to create retried Job: %w", err) + return "", fmt.Errorf("unable to create restarted Job: %w", err) } - log.Info("Retried Job", "name", job.Name) + log.Info("Restarted Job", "name", job.Name) return job.Name, nil } diff --git a/internal/slack/slack.go b/internal/slack/slack.go index 88c30be..e7e4dac 100644 --- a/internal/slack/slack.go +++ b/internal/slack/slack.go @@ -40,7 +40,7 @@ func SendMessage(res UpdateResult, webhookURL string, webhookChannel string) err } attatchment := slack.Attachment{ - Text: fmt.Sprintf("Namespace: *%s*\nName: *%s*\nContainer memory changes:\n%sRetried Job: *%s*\n", + Text: fmt.Sprintf("Namespace: *%s*\nName: *%s*\nContainer memory changes:\n%sRestarted Job: *%s*\n", res.CronJobNamespace, res.CronJobName, memoryChanges, From 22f42b7eae0f51344dc6364f9aba6caa99b5b807 Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Tue, 2 Apr 2024 00:56:59 +0900 Subject: [PATCH 8/9] fix: change restartPolicy on manifest --- config/samples/ai_v1alpha1_broom.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/samples/ai_v1alpha1_broom.yaml b/config/samples/ai_v1alpha1_broom.yaml index c4aa4b2..a8645fa 100644 --- a/config/samples/ai_v1alpha1_broom.yaml +++ b/config/samples/ai_v1alpha1_broom.yaml @@ -17,7 +17,7 @@ spec: adjustment: type: Mul value: "2" - retryPolicy: "Allow" + restartPolicy: "OnOOM" slackWebhook: secret: namespace: default From 394ba962ff314c8d6d58910c4d2ce6fa92ad6a8c Mon Sep 17 00:00:00 2001 From: Kitsuya0828 Date: Tue, 2 Apr 2024 13:53:02 +0900 Subject: [PATCH 9/9] revert: delete test for isTargeted --- internal/controller/broom_controller_test.go | 108 +------------------ 1 file changed, 1 insertion(+), 107 deletions(-) diff --git a/internal/controller/broom_controller_test.go b/internal/controller/broom_controller_test.go index 68c2a75..c0ae092 100644 --- a/internal/controller/broom_controller_test.go +++ b/internal/controller/broom_controller_test.go @@ -21,10 +21,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -35,11 +32,7 @@ import ( var _ = Describe("Broom Controller", func() { Context("When reconciling a resource", func() { - const ( - resourceName = "broom-sample" - cronJobName = "oom-sample" - cronJobNamespace = "broom" - ) + const resourceName = "test-resource" ctx := context.Background() @@ -62,56 +55,6 @@ var _ = Describe("Broom Controller", func() { } Expect(k8sClient.Create(ctx, resource)).To(Succeed()) } - - By("creating the Namespace") - ns := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: cronJobNamespace, - }, - } - Expect(k8sClient.Create(ctx, ns)).To(Succeed()) - - By("creating the CronJob") - cronJob := &batchv1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: cronJobName, - Namespace: cronJobNamespace, - Labels: map[string]string{ - "m3.com/use-broom": "true", - }, - }, - Spec: batchv1.CronJobSpec{ - Schedule: "*/2 * * * *", - JobTemplate: batchv1.JobTemplateSpec{ - Spec: batchv1.JobSpec{ - Template: corev1.PodTemplateSpec{ - Spec: corev1.PodSpec{ - Containers: []corev1.Container{ - { - Name: "oom-container", - Image: "ubuntu:latest", - Command: []string{"/bin/bash", "-c"}, - Args: []string{ - "echo PID=$$; for i in {0..9}; do eval a$i'=$(head --bytes 5000000 /dev/zero | cat -v)'; echo $((i++)); done", - }, - Resources: corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceMemory: resource.MustParse("100Mi"), - }, - Requests: corev1.ResourceList{ - corev1.ResourceMemory: resource.MustParse("50Mi"), - }, - }, - }, - }, - RestartPolicy: corev1.RestartPolicyNever, - }, - }, - }, - }, - }, - } - Expect(k8sClient.Create(ctx, cronJob)).To(Succeed()) }) AfterEach(func() { @@ -122,23 +65,6 @@ var _ = Describe("Broom Controller", func() { By("Cleanup the specific resource instance Broom") Expect(k8sClient.Delete(ctx, resource)).To(Succeed()) - - By("Cleanup the CronJob") - cronJob := &batchv1.CronJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: cronJobName, - Namespace: cronJobNamespace, - }, - } - Expect(k8sClient.Delete(ctx, cronJob)).To(Succeed()) - - By("Cleanup the Namespace") - ns := &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: cronJobNamespace, - }, - } - Expect(k8sClient.Delete(ctx, ns)).To(Succeed()) }) It("should successfully reconcile the resource", func() { By("Reconciling the created resource") @@ -153,38 +79,6 @@ var _ = Describe("Broom Controller", func() { Expect(err).NotTo(HaveOccurred()) // TODO(user): Add more specific assertions depending on your controller's reconciliation logic. // Example: If you expect a certain status condition after reconciliation, verify it here. - - By("Checking if the CronJob is targeted correctly") - cronJob := &batchv1.CronJob{} - Expect(k8sClient.Get(ctx, types.NamespacedName{Name: cronJobName, Namespace: cronJobNamespace}, cronJob)).To(Succeed()) - - target := aiv1alpha1.BroomTarget{ - Namespace: cronJobNamespace, - Name: cronJobName, - Labels: map[string]string{ - "m3.com/use-broom": "true", - }, - } - res := isTargeted(*cronJob, target) - Expect(res).To(BeTrue(), "Expected CronJob to be targeted but it is not") - - emptyTarget := aiv1alpha1.BroomTarget{} - res = isTargeted(*cronJob, emptyTarget) - Expect(res).To(BeTrue(), "Expected CronJob to be targeted but it is not") - - wrongNamespaceTarget := aiv1alpha1.BroomTarget{ - Namespace: "default", - } - res = isTargeted(*cronJob, wrongNamespaceTarget) - Expect(res).To(BeFalse(), "Expected CronJob not to be targeted but it is") - - wrongLabelTarget := aiv1alpha1.BroomTarget{ - Labels: map[string]string{ - "m3.com/foo": "bar", - }, - } - res = isTargeted(*cronJob, wrongLabelTarget) - Expect(res).To(BeFalse(), "Expected CronJob not to be targeted but it is") }) }) })