From 5cbe424668646c00addb586ec4c84142f194fb6a Mon Sep 17 00:00:00 2001 From: SpiritZhou Date: Wed, 10 Jul 2024 20:39:25 +0800 Subject: [PATCH] Provide CloudEvents around the management of ScaledObjects resources (#5953) --- CHANGELOG.md | 1 + apis/eventing/v1alpha1/cloudevent_types.go | 3 + cmd/operator/main.go | 1 - controllers/keda/scaledobject_controller.go | 20 ++-- controllers/keda/scaledobject_finalizer.go | 4 +- controllers/keda/suite_test.go | 1 - pkg/common/message/message.go | 2 + pkg/eventemitter/eventemitter.go | 6 +- .../cloudevent_source_test.go | 107 +++++++++++++++++- 9 files changed, 127 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fab886aea19..ea05a6a3b6e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **General**: Add --ca-dir flag to KEDA operator to specify directories with CA certificates for scalers to authenticate TLS connections (defaults to /custom/ca) ([#5860](https://github.com/kedacore/keda/issues/5860)) - **General**: Declarative parsing of scaler config ([#5037](https://github.com/kedacore/keda/issues/5037)|[#5797](https://github.com/kedacore/keda/issues/5797)) - **General**: Introduce new Splunk Scaler ([#5904](https://github.com/kedacore/keda/issues/5904)) +- **General**: Provide CloudEvents around the management of ScaledObjects resources ([#3522](https://github.com/kedacore/keda/issues/3522)) - **General**: Remove deprecated Kustomize commonLabels ([#5888](https://github.com/kedacore/keda/pull/5888)) - **General**: Support for Kubernetes v1.30 ([#5828](https://github.com/kedacore/keda/issues/5828)) diff --git a/apis/eventing/v1alpha1/cloudevent_types.go b/apis/eventing/v1alpha1/cloudevent_types.go index 89e14109882..fdab5229c96 100644 --- a/apis/eventing/v1alpha1/cloudevent_types.go +++ b/apis/eventing/v1alpha1/cloudevent_types.go @@ -26,6 +26,9 @@ const ( // ScaledObjectFailedType is for event when creating ScaledObject failed ScaledObjectFailedType CloudEventType = "keda.scaledobject.failed.v1" + + // ScaledObjectFailedType is for event when removed ScaledObject + ScaledObjectRemovedType CloudEventType = "keda.scaledobject.removed.v1" ) var AllEventTypes = []CloudEventType{ScaledObjectFailedType, ScaledObjectReadyType} diff --git a/cmd/operator/main.go b/cmd/operator/main.go index 88617aa8523..c9172cb5971 100644 --- a/cmd/operator/main.go +++ b/cmd/operator/main.go @@ -223,7 +223,6 @@ func main() { if err = (&kedacontrollers.ScaledObjectReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Recorder: eventRecorder, ScaleClient: scaleClient, ScaleHandler: scaledHandler, EventEmitter: eventEmitter, diff --git a/controllers/keda/scaledobject_controller.go b/controllers/keda/scaledobject_controller.go index 27625f6f10a..1032aae5b6d 100755 --- a/controllers/keda/scaledobject_controller.go +++ b/controllers/keda/scaledobject_controller.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" @@ -70,7 +69,6 @@ import ( type ScaledObjectReconciler struct { Client client.Client Scheme *runtime.Scheme - Recorder record.EventRecorder ScaleClient scale.ScalesGetter ScaleHandler scaling.ScaleHandler EventEmitter eventemitter.EventHandler @@ -119,8 +117,8 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont if r.Scheme == nil { return fmt.Errorf("ScaledObjectReconciler.Scheme is not initialized") } - if r.Recorder == nil { - return fmt.Errorf("ScaledObjectReconciler.Recorder is not initialized") + if r.EventEmitter == nil { + return fmt.Errorf("ScaledObjectReconciler.EventEmitter is not initialized") } // Start controller return ctrl.NewControllerManagedBy(mgr). @@ -184,7 +182,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request if !scaledObject.Status.Conditions.AreInitialized() { conditions := kedav1alpha1.GetInitializedConditions() if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil { - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) + r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) return ctrl.Result{}, err } } @@ -196,18 +194,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledObjectCheckFailed", msg) conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledObject check failed") - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg) + r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, msg) } else { wasReady := conditions.GetReadyCondition() if wasReady.IsFalse() || wasReady.IsUnknown() { - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg) + r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeNormal, eventingv1alpha1.ScaledObjectReadyType, eventreason.ScaledObjectReady, message.ScalerReadyMsg) } reqLogger.V(1).Info(msg) conditions.SetReadyCondition(metav1.ConditionTrue, kedav1alpha1.ScaledObjectConditionReadySuccessReason, msg) } if err := kedastatus.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil { - r.EventEmitter.Emit(scaledObject, req.NamespacedName, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) + r.EventEmitter.Emit(scaledObject, req.NamespacedName.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) return ctrl.Result{}, err } @@ -359,7 +357,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte if err != nil { msg := "Failed to parse Group, Version, Kind, Resource" logger.Error(err, msg, "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind) - r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectUpdateFailed, msg) + r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectUpdateFailed, err.Error()) return gvkr, err } gvkString := gvkr.GVKString() @@ -396,12 +394,12 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil { // resource doesn't exist logger.Error(err, message.ScaleTargetNotFoundMsg, "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name) - r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNotFoundMsg) + r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNotFoundMsg) return gvkr, err } // resource exist but doesn't expose /scale subresource logger.Error(errScale, message.ScaleTargetNoSubresourceMsg, "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name) - r.Recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg) + r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectFailedType, eventreason.ScaledObjectCheckFailed, message.ScaleTargetNoSubresourceMsg) return gvkr, errScale } isScalableCache.Store(gr.String(), true) diff --git a/controllers/keda/scaledobject_finalizer.go b/controllers/keda/scaledobject_finalizer.go index b3d48adbd77..70fe12e7d27 100644 --- a/controllers/keda/scaledobject_finalizer.go +++ b/controllers/keda/scaledobject_finalizer.go @@ -24,8 +24,10 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingv1alpha1 "github.com/kedacore/keda/v2/apis/eventing/v1alpha1" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" "github.com/kedacore/keda/v2/controllers/keda/util" + "github.com/kedacore/keda/v2/pkg/common/message" "github.com/kedacore/keda/v2/pkg/eventreason" ) @@ -86,7 +88,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logge } logger.Info("Successfully finalized ScaledObject") - r.Recorder.Event(scaledObject, corev1.EventTypeNormal, eventreason.ScaledObjectDeleted, "ScaledObject was deleted") + r.EventEmitter.Emit(scaledObject, scaledObject.Namespace, corev1.EventTypeWarning, eventingv1alpha1.ScaledObjectRemovedType, eventreason.ScaledObjectDeleted, message.ScaledObjectRemoved) return nil } diff --git a/controllers/keda/suite_test.go b/controllers/keda/suite_test.go index 54c318b8fdb..8659742f950 100644 --- a/controllers/keda/suite_test.go +++ b/controllers/keda/suite_test.go @@ -94,7 +94,6 @@ var _ = BeforeSuite(func() { err = (&ScaledObjectReconciler{ Client: k8sManager.GetClient(), Scheme: k8sManager.GetScheme(), - Recorder: k8sManager.GetEventRecorderFor("keda-operator"), ScaleHandler: scaling.NewScaleHandler(k8sManager.GetClient(), scaleClient, k8sManager.GetScheme(), time.Duration(10), k8sManager.GetEventRecorderFor("keda-operator"), nil), ScaleClient: scaleClient, EventEmitter: eventemitter.NewEventEmitter(k8sManager.GetClient(), k8sManager.GetEventRecorderFor("keda-operator"), "kubernetes-default", nil), diff --git a/pkg/common/message/message.go b/pkg/common/message/message.go index a63d7fa69f3..490b77a6f0e 100644 --- a/pkg/common/message/message.go +++ b/pkg/common/message/message.go @@ -28,4 +28,6 @@ const ( ScaleTargetNotFoundMsg = "Target resource doesn't exist" ScaleTargetNoSubresourceMsg = "Target resource doesn't expose /scale subresource" + + ScaledObjectRemoved = "ScaledObject was deleted" ) diff --git a/pkg/eventemitter/eventemitter.go b/pkg/eventemitter/eventemitter.go index 959156efc1a..a46480ff537 100644 --- a/pkg/eventemitter/eventemitter.go +++ b/pkg/eventemitter/eventemitter.go @@ -73,7 +73,7 @@ type EventEmitter struct { type EventHandler interface { DeleteCloudEventSource(cloudEventSource *eventingv1alpha1.CloudEventSource) error HandleCloudEventSource(ctx context.Context, cloudEventSource *eventingv1alpha1.CloudEventSource) error - Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string) + Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason string, message string) } // EventDataHandler defines the behavior for different event handlers @@ -325,7 +325,7 @@ func (e *EventEmitter) checkEventHandlers(ctx context.Context, cloudEventSource } // Emit is emitting event to both local kubernetes and custom CloudEventSource handler. After emit event to local kubernetes, event will inqueue and waitng for handler's consuming. -func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedName, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) { +func (e *EventEmitter) Emit(object runtime.Object, namesapce string, eventType string, cloudeventType eventingv1alpha1.CloudEventType, reason, message string) { e.recorder.Event(object, eventType, reason, message) e.eventHandlersCacheLock.RLock() @@ -337,7 +337,7 @@ func (e *EventEmitter) Emit(object runtime.Object, namesapce types.NamespacedNam objectName, _ := meta.NewAccessor().Name(object) objectType, _ := meta.NewAccessor().Kind(object) eventData := eventdata.EventData{ - Namespace: namesapce.Namespace, + Namespace: namesapce, CloudEventType: cloudeventType, ObjectName: strings.ToLower(objectName), ObjectType: strings.ToLower(objectType), diff --git a/tests/internals/cloudevent_source/cloudevent_source_test.go b/tests/internals/cloudevent_source/cloudevent_source_test.go index e56579207a3..a5fe794c209 100644 --- a/tests/internals/cloudevent_source/cloudevent_source_test.go +++ b/tests/internals/cloudevent_source/cloudevent_source_test.go @@ -27,6 +27,7 @@ var _ = godotenv.Load("../../.env") var ( namespace = fmt.Sprintf("%s-ns", testName) scaledObjectName = fmt.Sprintf("%s-so", testName) + deploymentName = fmt.Sprintf("%s-d", testName) clientName = fmt.Sprintf("%s-client", testName) cloudeventSourceName = fmt.Sprintf("%s-ce", testName) cloudeventSourceErrName = fmt.Sprintf("%s-ce-err", testName) @@ -43,6 +44,7 @@ var ( type templateData struct { TestNamespace string ScaledObject string + DeploymentName string ClientName string CloudEventSourceName string CloudeventSourceErrName string @@ -210,6 +212,56 @@ spec: excludedEventTypes: - keda.scaledobject.failed.v1 ` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + deploy: {{.DeploymentName}} +spec: + replicas: 1 + selector: + matchLabels: + pod: {{.DeploymentName}} + template: + metadata: + labels: + pod: {{.DeploymentName}} + spec: + containers: + - name: nginx + image: 'nginxinc/nginx-unprivileged' +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObject}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 5 + minReplicaCount: 1 + maxReplicaCount: 10 + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: cron + metadata: + timezone: Etc/UTC + start: 3 * * * * + end: 5 * * * * + desiredReplicas: '4' +` ) func TestScaledObjectGeneral(t *testing.T) { @@ -223,6 +275,7 @@ func TestScaledObjectGeneral(t *testing.T) { assert.True(t, WaitForAllPodRunningInNamespace(t, kc, namespace, 5, 20), "all pods should be running") testErrEventSourceEmitValue(t, kc, data) + testEventSourceEmitValue(t, kc, data) testErrEventSourceExcludeValue(t, kc, data) testErrEventSourceIncludeValue(t, kc, data) testErrEventSourceCreation(t, kc, data) @@ -258,8 +311,16 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem foundEvents = append(foundEvents, cloudEvent) data := map[string]string{} err := cloudEvent.DataAs(&data) + t.Log("--- test emitting eventsource about scaledobject err---", "message", data["message"]) + assert.NoError(t, err) - assert.Equal(t, data["message"], "ScaledObject doesn't have correct scaleTargetRef specification") + assert.Condition(t, func() bool { + if data["message"] == "ScaledObject doesn't have correct scaleTargetRef specification" || data["message"] == "Target resource doesn't exist" { + return true + } + return false + }, "get filtered event") + assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.failed.v1") assert.Equal(t, cloudEvent.Source(), expectedSource) assert.Equal(t, cloudEvent.DataContentType(), "application/json") @@ -272,6 +333,49 @@ func testErrEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data tem assert.NotEmpty(t, foundEvents) } +func testEventSourceEmitValue(t *testing.T, _ *kubernetes.Clientset, data templateData) { + t.Log("--- test emitting eventsource about scaledobject removed---") + KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + KubectlApplyWithTemplate(t, data, "deploymentTemplate", deploymentTemplate) + + // wait 15 seconds to ensure event propagation + time.Sleep(5 * time.Second) + KubectlDeleteWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) + time.Sleep(10 * time.Second) + + out, outErr, err := ExecCommandOnSpecificPod(t, clientName, namespace, fmt.Sprintf("curl -X GET %s/getCloudEvent/%s", cloudEventHTTPServiceURL, "ScaledObjectDeleted")) + assert.NotEmpty(t, out) + assert.Empty(t, outErr) + assert.NoError(t, err, "dont expect error requesting ") + + cloudEvents := []cloudevents.Event{} + err = json.Unmarshal([]byte(out), &cloudEvents) + + assert.NoError(t, err, "dont expect error unmarshaling the cloudEvents") + assert.Greater(t, len(cloudEvents), 0, "cloudEvents should have at least 1 item") + + foundEvents := []cloudevents.Event{} + + for _, cloudEvent := range cloudEvents { + if cloudEvent.Subject() == expectedSubject { + foundEvents = append(foundEvents, cloudEvent) + data := map[string]string{} + err := cloudEvent.DataAs(&data) + + assert.NoError(t, err) + assert.Equal(t, data["message"], "ScaledObject was deleted") + assert.Equal(t, cloudEvent.Type(), "keda.scaledobject.removed.v1") + assert.Equal(t, cloudEvent.Source(), expectedSource) + assert.Equal(t, cloudEvent.DataContentType(), "application/json") + + if lastCloudEventTime.Before(cloudEvent.Time()) { + lastCloudEventTime = cloudEvent.Time() + } + } + } + assert.NotEmpty(t, foundEvents) +} + // tests error events not emitted by func testErrEventSourceExcludeValue(t *testing.T, _ *kubernetes.Clientset, data templateData) { t.Log("--- test emitting eventsource about scaledobject err with exclude filter---") @@ -362,6 +466,7 @@ func getTemplateData() (templateData, []Template) { return templateData{ TestNamespace: namespace, ScaledObject: scaledObjectName, + DeploymentName: deploymentName, ClientName: clientName, CloudEventSourceName: cloudeventSourceName, CloudeventSourceErrName: cloudeventSourceErrName,