From 83d1fd50eec1c2168efeba8c06bab56aef052ca7 Mon Sep 17 00:00:00 2001 From: Guilherme Cassolato Date: Mon, 14 Oct 2024 23:31:06 +0200 Subject: [PATCH] envoy gateway rate limit cluster reconciler Signed-off-by: Guilherme Cassolato --- ...y_gateway_rate_limit_cluster_reconciler.go | 234 ++++++++++++++++++ ...voygateway_limitador_cluster_controller.go | 224 ----------------- .../istio_rate_limit_cluster_reconciler.go | 38 +-- controllers/ratelimit_workflow.go | 129 ++++++---- controllers/state_of_the_world.go | 4 +- controllers/test_common.go | 13 - main.go | 11 - pkg/envoygateway/utils.go | 36 +++ pkg/istio/utils.go | 10 +- ...teway_limitador_cluster_controller_test.go | 13 +- ...dor_cluster_envoyfilter_controller_test.go | 7 +- 11 files changed, 368 insertions(+), 351 deletions(-) create mode 100644 controllers/envoy_gateway_rate_limit_cluster_reconciler.go delete mode 100644 controllers/envoygateway_limitador_cluster_controller.go diff --git a/controllers/envoy_gateway_rate_limit_cluster_reconciler.go b/controllers/envoy_gateway_rate_limit_cluster_reconciler.go new file mode 100644 index 000000000..7329af324 --- /dev/null +++ b/controllers/envoy_gateway_rate_limit_cluster_reconciler.go @@ -0,0 +1,234 @@ +package controllers + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "sync" + + envoygatewayv1alpha1 "github.com/envoyproxy/gateway/api/v1alpha1" + limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1" + "github.com/samber/lo" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/dynamic" + gatewayapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + "github.com/kuadrant/policy-machinery/controller" + "github.com/kuadrant/policy-machinery/machinery" + + kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1" + kuadrantv1beta3 "github.com/kuadrant/kuadrant-operator/api/v1beta3" + "github.com/kuadrant/kuadrant-operator/pkg/common" + kuadrantenvoygateway "github.com/kuadrant/kuadrant-operator/pkg/envoygateway" + "github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers" +) + +// envoyGatewayRateLimitClusterReconciler reconciles EnvoyGateway EnvoyPatchPolicy custom resources +type envoyGatewayRateLimitClusterReconciler struct { + *reconcilers.BaseReconciler + client *dynamic.DynamicClient +} + +func (r *envoyGatewayRateLimitClusterReconciler) Subscription() controller.Subscription { + return controller.Subscription{ + ReconcileFunc: r.Reconcile, + Events: []controller.ResourceEventMatcher{ // matches reconciliation events that change the rate limit definitions or status of rate limit policies + {Kind: &kuadrantv1beta1.KuadrantGroupKind}, + {Kind: &machinery.GatewayClassGroupKind}, + {Kind: &machinery.GatewayGroupKind}, + {Kind: &machinery.HTTPRouteGroupKind}, + {Kind: &kuadrantv1beta3.RateLimitPolicyGroupKind}, + {Kind: &kuadrantenvoygateway.EnvoyPatchPolicyGroupKind}, + }, + } +} + +func (r *envoyGatewayRateLimitClusterReconciler) Reconcile(ctx context.Context, _ []controller.ResourceEvent, topology *machinery.Topology, _ error, state *sync.Map) error { + logger := controller.LoggerFromContext(ctx).WithName("envoyGatewayRateLimitClusterReconciler") + + logger.V(1).Info("building envoy gateway rate limit clusters") + defer logger.V(1).Info("finished building envoy gateway rate limit clusters") + + kuadrant, err := GetKuadrantFromTopology(topology) + if err != nil { + if errors.Is(err, ErrMissingKuadrant) { + logger.V(1).Info(err.Error()) + return nil + } + return err + } + + limitadorObj, found := lo.Find(topology.Objects().Children(kuadrant), func(child machinery.Object) bool { + return child.GroupVersionKind().GroupKind() == kuadrantv1beta1.LimitadorGroupKind + }) + if !found { + logger.V(1).Info(ErrMissingLimitador.Error()) + return nil + } + limitador := limitadorObj.(*controller.RuntimeObject).Object.(*limitadorv1alpha1.Limitador) + + effectivePolicies, ok := state.Load(StateEffectiveRateLimitPolicies) + if !ok { + logger.Error(ErrMissingStateEffectiveRateLimitPolicies, "failed to get effective rate limit policies from state") + return nil + } + + gateways := lo.UniqBy(lo.FilterMap(lo.Values(effectivePolicies.(EffectiveRateLimitPolicies)), func(effectivePolicy EffectiveRateLimitPolicy, _ int) (*machinery.Gateway, bool) { + // assumes the path is always [gatewayclass, gateway, listener, httproute, httprouterule] + gatewayClass, _ := effectivePolicy.Path[0].(*machinery.GatewayClass) + gateway, _ := effectivePolicy.Path[1].(*machinery.Gateway) + return gateway, gatewayClass.Spec.ControllerName == envoyGatewayGatewayControllerName + }), func(gateway *machinery.Gateway) string { + return gateway.GetLocator() + }) + + desiredEnvoyPatchPolicies := make(map[k8stypes.NamespacedName]struct{}) + + // reconcile envoy gateway cluster for gateway + for _, gateway := range gateways { + gatewayKey := k8stypes.NamespacedName{Name: gateway.GetName(), Namespace: gateway.GetNamespace()} + + desiredEnvoyPatchPolicy, err := r.buildDesiredEnvoyPatchPolicy(limitador, gateway) + if err != nil { + logger.Error(err, "failed to build desired envoy filter") + continue + } + desiredEnvoyPatchPolicies[k8stypes.NamespacedName{Name: desiredEnvoyPatchPolicy.GetName(), Namespace: desiredEnvoyPatchPolicy.GetNamespace()}] = struct{}{} + + resource := r.client.Resource(kuadrantenvoygateway.EnvoyPatchPoliciesResource).Namespace(desiredEnvoyPatchPolicy.GetNamespace()) + + existingEnvoyPatchPolicyObj, found := lo.Find(topology.Objects().Children(gateway), func(child machinery.Object) bool { + return child.GroupVersionKind().GroupKind() == kuadrantenvoygateway.EnvoyPatchPolicyGroupKind && child.GetName() == desiredEnvoyPatchPolicy.GetName() && child.GetNamespace() == desiredEnvoyPatchPolicy.GetNamespace() + }) + + // create + if !found { + desiredEnvoyPatchPolicyUnstructured, err := controller.Destruct(desiredEnvoyPatchPolicy) + if err != nil { + logger.Error(err, "failed to destruct envoyfilter object", "gateway", gatewayKey.String(), "envoyfilter", desiredEnvoyPatchPolicy) + continue + } + if _, err = resource.Create(ctx, desiredEnvoyPatchPolicyUnstructured, metav1.CreateOptions{}); err != nil { + logger.Error(err, "failed to create envoyfilter object", "gateway", gatewayKey.String(), "envoyfilter", desiredEnvoyPatchPolicyUnstructured.Object) + // TODO: handle error + } + continue + } + + existingEnvoyPatchPolicy := existingEnvoyPatchPolicyObj.(*controller.RuntimeObject).Object.(*envoygatewayv1alpha1.EnvoyPatchPolicy) + + if equalEnvoyPatchPolicies(existingEnvoyPatchPolicy, desiredEnvoyPatchPolicy) { + logger.V(1).Info("envoyfilter object is up to date, nothing to do") + continue + } + + // update + existingEnvoyPatchPolicy.Spec = envoygatewayv1alpha1.EnvoyPatchPolicySpec{ + TargetRef: desiredEnvoyPatchPolicy.Spec.TargetRef, + Type: desiredEnvoyPatchPolicy.Spec.Type, + JSONPatches: desiredEnvoyPatchPolicy.Spec.JSONPatches, + Priority: desiredEnvoyPatchPolicy.Spec.Priority, + } + + existingEnvoyPatchPolicyUnstructured, err := controller.Destruct(existingEnvoyPatchPolicy) + if err != nil { + logger.Error(err, "failed to destruct envoyfilter object", "gateway", gatewayKey.String(), "envoyfilter", existingEnvoyPatchPolicy) + continue + } + if _, err = resource.Update(ctx, existingEnvoyPatchPolicyUnstructured, metav1.UpdateOptions{}); err != nil { + logger.Error(err, "failed to update envoyfilter object", "gateway", gatewayKey.String(), "envoyfilter", existingEnvoyPatchPolicyUnstructured.Object) + // TODO: handle error + } + } + + // cleanup envoy gateway clusters for gateways that are not in the effective policies + staleEnvoyPatchPolicies := topology.Objects().Items(func(o machinery.Object) bool { + _, desired := desiredEnvoyPatchPolicies[k8stypes.NamespacedName{Name: o.GetName(), Namespace: o.GetNamespace()}] + return o.GroupVersionKind().GroupKind() == kuadrantenvoygateway.EnvoyPatchPolicyGroupKind && !desired + }) + + for _, envoyPatchPolicy := range staleEnvoyPatchPolicies { + if err := r.client.Resource(kuadrantenvoygateway.EnvoyPatchPoliciesResource).Namespace(envoyPatchPolicy.GetNamespace()).Delete(ctx, envoyPatchPolicy.GetName(), metav1.DeleteOptions{}); err != nil { + logger.Error(err, "failed to delete envoyfilter object", "envoyfilter", fmt.Sprintf("%s/%s", envoyPatchPolicy.GetNamespace(), envoyPatchPolicy.GetName())) + // TODO: handle error + } + } + + return nil +} + +func (r *envoyGatewayRateLimitClusterReconciler) buildDesiredEnvoyPatchPolicy(limitador *limitadorv1alpha1.Limitador, gateway *machinery.Gateway) (*envoygatewayv1alpha1.EnvoyPatchPolicy, error) { + envoyPatchPolicy := &envoygatewayv1alpha1.EnvoyPatchPolicy{ + TypeMeta: metav1.TypeMeta{ + Kind: kuadrantenvoygateway.EnvoyPatchPolicyGroupKind.Kind, + APIVersion: envoygatewayv1alpha1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: RateLimitClusterName(gateway.GetName()), + Namespace: gateway.GetNamespace(), + Labels: map[string]string{rateLimitClusterLabelKey: "true"}, + }, + Spec: envoygatewayv1alpha1.EnvoyPatchPolicySpec{ + TargetRef: gatewayapiv1alpha2.LocalPolicyTargetReference{ + Group: gatewayapiv1alpha2.Group(machinery.GatewayGroupKind.Group), + Kind: gatewayapiv1alpha2.Kind(machinery.GatewayGroupKind.Kind), + Name: gatewayapiv1alpha2.ObjectName(gateway.GetName()), + }, + Type: envoygatewayv1alpha1.JSONPatchEnvoyPatchType, + }, + } + + jsonPatches, err := envoyGatewayEnvoyPatchPolicyClusterPatch(limitador.Status.Service.Host, int(limitador.Status.Service.Ports.GRPC)) + if err != nil { + return nil, err + } + envoyPatchPolicy.Spec.JSONPatches = jsonPatches + + if err := r.SetOwnerReference(gateway.Gateway, envoyPatchPolicy); err != nil { + return nil, err + } + + return envoyPatchPolicy, nil +} + +// envoyGatewayEnvoyPatchPolicyClusterPatch returns a set envoy config patch that defines the rate limit cluster for the gateway. +// The rate limit cluster configures the endpoint of the external rate limit service. +func envoyGatewayEnvoyPatchPolicyClusterPatch(host string, port int) ([]envoygatewayv1alpha1.EnvoyJSONPatchConfig, error) { + patchRaw, _ := json.Marshal(rateLimitClusterPatch(host, port)) + patch := &apiextensionsv1.JSON{} + if err := patch.UnmarshalJSON(patchRaw); err != nil { + return nil, err + } + + return []envoygatewayv1alpha1.EnvoyJSONPatchConfig{ + { + Type: envoygatewayv1alpha1.ClusterEnvoyResourceType, + Name: common.KuadrantRateLimitClusterName, + Operation: envoygatewayv1alpha1.JSONPatchOperation{ + Op: envoygatewayv1alpha1.JSONPatchOperationType("add"), + Path: "", + Value: patch, + }, + }, + }, nil +} + +func equalEnvoyPatchPolicies(a, b *envoygatewayv1alpha1.EnvoyPatchPolicy) bool { + if a.Spec.Priority != b.Spec.Priority || a.Spec.TargetRef != b.Spec.TargetRef { + return false + } + + aJSONPatches := a.Spec.JSONPatches + bJSONPatches := b.Spec.JSONPatches + if len(aJSONPatches) != len(bJSONPatches) { + return false + } + return lo.EveryBy(aJSONPatches, func(aJSONPatch envoygatewayv1alpha1.EnvoyJSONPatchConfig) bool { + return lo.SomeBy(bJSONPatches, func(bJSONPatch envoygatewayv1alpha1.EnvoyJSONPatchConfig) bool { + return aJSONPatch.Type == bJSONPatch.Type && aJSONPatch.Name == bJSONPatch.Name && aJSONPatch.Operation == bJSONPatch.Operation + }) + }) +} diff --git a/controllers/envoygateway_limitador_cluster_controller.go b/controllers/envoygateway_limitador_cluster_controller.go deleted file mode 100644 index 9d9eb7637..000000000 --- a/controllers/envoygateway_limitador_cluster_controller.go +++ /dev/null @@ -1,224 +0,0 @@ -package controllers - -import ( - "context" - "encoding/json" - "fmt" - - egv1alpha1 "github.com/envoyproxy/gateway/api/v1alpha1" - "github.com/go-logr/logr" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1" - - kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1" - "github.com/kuadrant/kuadrant-operator/pkg/common" - kuadrantenvoygateway "github.com/kuadrant/kuadrant-operator/pkg/envoygateway" - kuadrantgatewayapi "github.com/kuadrant/kuadrant-operator/pkg/library/gatewayapi" - "github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers" -) - -// EnvoyGatewayLimitadorClusterReconciler reconciles an EnvoyGateway EnvoyPatchPolicy object -// to setup limitador's cluster on the gateway. It is a requirement for the wasm module to work. -// https://gateway.envoyproxy.io/latest/api/extension_types/#envoypatchpolicy -type EnvoyGatewayLimitadorClusterReconciler struct { - *reconcilers.BaseReconciler -} - -//+kubebuilder:rbac:groups=gateway.envoyproxy.io,resources=envoypatchpolicies,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=gateway.envoyproxy.io,resources=envoyextensionpolicies,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=gateways,verbs=get;list;watch;update;patch - -// For more details, check Reconcile and its Result here: -// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.10.0/pkg/reconcile -func (r *EnvoyGatewayLimitadorClusterReconciler) Reconcile(eventCtx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := r.Logger().WithValues("envoyExtensionPolicy", req.NamespacedName) - logger.V(1).Info("Reconciling limitador cluster") - ctx := logr.NewContext(eventCtx, logger) - - extPolicy := &egv1alpha1.EnvoyExtensionPolicy{} - if err := r.Client().Get(ctx, req.NamespacedName, extPolicy); err != nil { - if apierrors.IsNotFound(err) { - logger.Info("no envoygateway extension policy object found") - return ctrl.Result{}, nil - } - logger.Error(err, "failed to get envoygateway extension policy object") - return ctrl.Result{}, err - } - - if logger.V(1).Enabled() { - jsonData, err := json.MarshalIndent(extPolicy.Spec.PolicyTargetReferences, "", " ") - if err != nil { - return ctrl.Result{}, err - } - logger.V(1).Info(string(jsonData)) - } - - if extPolicy.DeletionTimestamp != nil { - // no need to handle deletion - // ownerrefs will do the job - return ctrl.Result{}, nil - } - - // - // Get kuadrant - // - kuadrantList := &kuadrantv1beta1.KuadrantList{} - err := r.Client().List(ctx, kuadrantList) - if err != nil { - return ctrl.Result{}, err - } - if len(kuadrantList.Items) == 0 { - logger.Info("kuadrant object not found. Nothing to do") - return ctrl.Result{}, nil - } - - kObj := kuadrantList.Items[0] - - // - // Get limitador - // - limitadorKey := client.ObjectKey{Name: common.LimitadorName, Namespace: kObj.Namespace} - limitador := &limitadorv1alpha1.Limitador{} - err = r.Client().Get(ctx, limitadorKey, limitador) - logger.V(1).Info("read limitador", "key", limitadorKey, "err", err) - if err != nil { - if apierrors.IsNotFound(err) { - logger.Info("limitador object not found. Nothing to do") - return ctrl.Result{}, nil - } - return ctrl.Result{}, err - } - - if !meta.IsStatusConditionTrue(limitador.Status.Conditions, "Ready") { - logger.Info("limitador status reports not ready. Retrying") - return ctrl.Result{Requeue: true}, nil - } - - limitadorClusterPatchPolicy, err := r.desiredLimitadorClusterPatchPolicy(extPolicy, limitador) - if err != nil { - return ctrl.Result{}, err - } - err = r.ReconcileResource(ctx, &egv1alpha1.EnvoyPatchPolicy{}, limitadorClusterPatchPolicy, reconcilers.CreateOnlyMutator) - if err != nil { - return ctrl.Result{}, err - } - - logger.V(1).Info("Envoygateway limitador cluster reconciled successfully") - - return ctrl.Result{}, nil -} - -func (r *EnvoyGatewayLimitadorClusterReconciler) desiredLimitadorClusterPatchPolicy( - extPolicy *egv1alpha1.EnvoyExtensionPolicy, - limitador *limitadorv1alpha1.Limitador) (*egv1alpha1.EnvoyPatchPolicy, error) { - patchPolicy := &egv1alpha1.EnvoyPatchPolicy{ - TypeMeta: metav1.TypeMeta{ - Kind: egv1alpha1.KindEnvoyPatchPolicy, - APIVersion: egv1alpha1.GroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: LimitadorClusterEnvoyPatchPolicyName(extPolicy.GetName()), - Namespace: extPolicy.Namespace, - }, - Spec: egv1alpha1.EnvoyPatchPolicySpec{ - // Same target ref as the associated extension policy - TargetRef: extPolicy.Spec.PolicyTargetReferences.TargetRefs[0].LocalPolicyTargetReference, - Type: egv1alpha1.JSONPatchEnvoyPatchType, - JSONPatches: []egv1alpha1.EnvoyJSONPatchConfig{ - limitadorClusterPatch( - limitador.Status.Service.Host, - int(limitador.Status.Service.Ports.GRPC), - ), - }, - }, - } - - // controller reference - // patchPolicy has ownerref to extension policy - if err := r.SetOwnerReference(extPolicy, patchPolicy); err != nil { - return nil, err - } - - return patchPolicy, nil -} - -func LimitadorClusterEnvoyPatchPolicyName(targetName string) string { - return fmt.Sprintf("patch-for-%s", targetName) -} - -func limitadorClusterPatch(limitadorSvcHost string, limitadorGRPCPort int) egv1alpha1.EnvoyJSONPatchConfig { - // The patch defines the rate_limit_cluster, which provides the endpoint location of the external rate limit service. - // TODO(eguzki): Istio EnvoyFilter uses almost the same structure. DRY - patchUnstructured := map[string]any{ - "name": common.KuadrantRateLimitClusterName, - "type": "STRICT_DNS", - "connect_timeout": "1s", - "lb_policy": "ROUND_ROBIN", - "http2_protocol_options": map[string]any{}, - "load_assignment": map[string]any{ - "cluster_name": common.KuadrantRateLimitClusterName, - "endpoints": []map[string]any{ - { - "lb_endpoints": []map[string]any{ - { - "endpoint": map[string]any{ - "address": map[string]any{ - "socket_address": map[string]any{ - "address": limitadorSvcHost, - "port_value": limitadorGRPCPort, - }, - }, - }, - }, - }, - }, - }, - }, - } - - patchRaw, _ := json.Marshal(patchUnstructured) - value := &apiextensionsv1.JSON{} - value.UnmarshalJSON(patchRaw) - - return egv1alpha1.EnvoyJSONPatchConfig{ - Type: egv1alpha1.ClusterEnvoyResourceType, - Name: common.KuadrantRateLimitClusterName, - Operation: egv1alpha1.JSONPatchOperation{ - Op: egv1alpha1.JSONPatchOperationType("add"), - Path: "", - Value: value, - }, - } -} - -// SetupWithManager sets up the controller with the Manager. -func (r *EnvoyGatewayLimitadorClusterReconciler) SetupWithManager(mgr ctrl.Manager) error { - ok, err := kuadrantenvoygateway.IsEnvoyGatewayInstalled(mgr.GetRESTMapper()) - if err != nil { - return err - } - if !ok { - r.Logger().Info("EnvoyGateway limitador cluster controller disabled. EnvoyGateway API was not found") - return nil - } - - ok, err = kuadrantgatewayapi.IsGatewayAPIInstalled(mgr.GetRESTMapper()) - if err != nil { - return err - } - if !ok { - r.Logger().Info("EnvoyGateway limitador cluster disabled. GatewayAPI was not found") - return nil - } - - return ctrl.NewControllerManagedBy(mgr). - For(&egv1alpha1.EnvoyExtensionPolicy{}). - Owns(&egv1alpha1.EnvoyPatchPolicy{}). - Complete(r) -} diff --git a/controllers/istio_rate_limit_cluster_reconciler.go b/controllers/istio_rate_limit_cluster_reconciler.go index 0c5871458..0f61e023f 100644 --- a/controllers/istio_rate_limit_cluster_reconciler.go +++ b/controllers/istio_rate_limit_cluster_reconciler.go @@ -21,7 +21,6 @@ import ( kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1" kuadrantv1beta3 "github.com/kuadrant/kuadrant-operator/api/v1beta3" - "github.com/kuadrant/kuadrant-operator/pkg/common" kuadrantistio "github.com/kuadrant/kuadrant-operator/pkg/istio" "github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers" ) @@ -166,7 +165,7 @@ func (r *istioRateLimitClusterReconciler) buildDesiredEnvoyFilter(limitador *lim APIVersion: istioclientgonetworkingv1alpha3.SchemeGroupVersion.String(), }, ObjectMeta: metav1.ObjectMeta{ - Name: rateLimitClusterName(gateway.GetName()), + Name: RateLimitClusterName(gateway.GetName()), Namespace: gateway.GetNamespace(), Labels: map[string]string{rateLimitClusterLabelKey: "true"}, }, @@ -197,40 +196,9 @@ func (r *istioRateLimitClusterReconciler) buildDesiredEnvoyFilter(limitador *lim // istioEnvoyFilterClusterPatch returns an envoy config patch that defines the rate limit cluster for the gateway. // The rate limit cluster configures the endpoint of the external rate limit service. func istioEnvoyFilterClusterPatch(host string, port int) ([]*istioapinetworkingv1alpha3.EnvoyFilter_EnvoyConfigObjectPatch, error) { - patchUnstructured := map[string]any{ - "operation": "ADD", - "value": map[string]any{ - "name": common.KuadrantRateLimitClusterName, - "type": "STRICT_DNS", - "connect_timeout": "1s", - "lb_policy": "ROUND_ROBIN", - "http2_protocol_options": map[string]any{}, - "load_assignment": map[string]any{ - "cluster_name": common.KuadrantRateLimitClusterName, - "endpoints": []map[string]any{ - { - "lb_endpoints": []map[string]any{ - { - "endpoint": map[string]any{ - "address": map[string]any{ - "socket_address": map[string]any{ - "address": host, - "port_value": port, - }, - }, - }, - }, - }, - }, - }, - }, - }, - } - - patchRaw, _ := json.Marshal(patchUnstructured) + patchRaw, _ := json.Marshal(map[string]any{"operation": "ADD", "value": rateLimitClusterPatch(host, port)}) patch := &istioapinetworkingv1alpha3.EnvoyFilter_Patch{} - err := patch.UnmarshalJSON(patchRaw) - if err != nil { + if err := patch.UnmarshalJSON(patchRaw); err != nil { return nil, err } diff --git a/controllers/ratelimit_workflow.go b/controllers/ratelimit_workflow.go index ad02c5e12..69b0e1182 100644 --- a/controllers/ratelimit_workflow.go +++ b/controllers/ratelimit_workflow.go @@ -23,6 +23,7 @@ import ( kuadrantv1 "github.com/kuadrant/kuadrant-operator/api/v1" kuadrantv1beta1 "github.com/kuadrant/kuadrant-operator/api/v1beta1" kuadrantv1beta3 "github.com/kuadrant/kuadrant-operator/api/v1beta3" + "github.com/kuadrant/kuadrant-operator/pkg/common" kuadrantenvoygateway "github.com/kuadrant/kuadrant-operator/pkg/envoygateway" kuadrantistio "github.com/kuadrant/kuadrant-operator/pkg/istio" "github.com/kuadrant/kuadrant-operator/pkg/library/reconcilers" @@ -33,7 +34,9 @@ import ( const ( rateLimitClusterLabelKey = "kuadrant.io/rate-limit-cluster" - istioGatewayControllerName = "istio.io/gateway-controller" // make this configurable? + // make these configurable? + istioGatewayControllerName = "istio.io/gateway-controller" + envoyGatewayGatewayControllerName = "gateway.envoyproxy.io/gatewayclass-controller" ) var ( @@ -73,14 +76,15 @@ func NewRateLimitWorkflow(manager ctrlruntime.Manager, client *dynamic.DynamicCl }, } + baseReconciler := reconcilers.NewBaseReconciler(manager.GetClient(), manager.GetScheme(), manager.GetAPIReader(), log.Log.WithName("ratelimit")) + if isIstioInstalled { - baseReconciler := reconcilers.NewBaseReconciler(manager.GetClient(), manager.GetScheme(), manager.GetAPIReader(), log.Log.WithName("ratelimit")) effectiveRateLimitPoliciesWorkflow.Tasks = append(effectiveRateLimitPoliciesWorkflow.Tasks, (&istioRateLimitClusterReconciler{BaseReconciler: baseReconciler, client: client}).Subscription().Reconcile) effectiveRateLimitPoliciesWorkflow.Tasks = append(effectiveRateLimitPoliciesWorkflow.Tasks, (&istioExtensionReconciler{client: client}).Subscription().Reconcile) } if isEnvoyGatewayInstalled { - // TODO: reconcile envoy cluster (EnvoyPatchPolicy) + effectiveRateLimitPoliciesWorkflow.Tasks = append(effectiveRateLimitPoliciesWorkflow.Tasks, (&envoyGatewayRateLimitClusterReconciler{BaseReconciler: baseReconciler, client: client}).Subscription().Reconcile) // TODO: reconcile envoy extension (EnvoyExtensionPolicy) } @@ -114,56 +118,39 @@ func LimitNameToLimitadorIdentifier(rlpKey k8stypes.NamespacedName, uniqueLimitN return identifier } -func rateLimitPolicyAcceptedStatus(policy machinery.Policy) (accepted bool, err error) { - p, ok := policy.(*kuadrantv1beta3.RateLimitPolicy) - if !ok { - return - } - if condition := meta.FindStatusCondition(p.Status.Conditions, string(gatewayapiv1alpha2.PolicyConditionAccepted)); condition != nil { - accepted = condition.Status == metav1.ConditionTrue - if !accepted { - err = fmt.Errorf(condition.Message) - } - return - } - return -} - -func rateLimitPolicyAcceptedStatusFunc(state *sync.Map) func(policy machinery.Policy) (bool, error) { - validatedPolicies, validated := state.Load(StateRateLimitPolicyValid) - if !validated { - return rateLimitPolicyAcceptedStatus - } - validatedPoliciesMap := validatedPolicies.(map[string]error) - return func(policy machinery.Policy) (bool, error) { - err, validated := validatedPoliciesMap[policy.GetLocator()] - if validated { - return err == nil, err - } - return rateLimitPolicyAcceptedStatus(policy) - } -} - -func isRateLimitPolicyAcceptedFunc(state *sync.Map) func(machinery.Policy) bool { - f := rateLimitPolicyAcceptedStatusFunc(state) - return func(policy machinery.Policy) bool { - accepted, _ := f(policy) - return accepted - } +func RateLimitClusterName(gatewayName string) string { + return fmt.Sprintf("kuadrant-ratelimiting-%s", gatewayName) } -func isRateLimitPolicyAcceptedAndNotDeletedFunc(state *sync.Map) func(machinery.Policy) bool { - f := isRateLimitPolicyAcceptedFunc(state) - return func(policy machinery.Policy) bool { - p, object := policy.(metav1.Object) - return object && f(policy) && p.GetDeletionTimestamp() == nil +func rateLimitClusterPatch(host string, port int) map[string]any { + return map[string]any{ + "name": common.KuadrantRateLimitClusterName, + "type": "STRICT_DNS", + "connect_timeout": "1s", + "lb_policy": "ROUND_ROBIN", + "http2_protocol_options": map[string]any{}, + "load_assignment": map[string]any{ + "cluster_name": common.KuadrantRateLimitClusterName, + "endpoints": []map[string]any{ + { + "lb_endpoints": []map[string]any{ + { + "endpoint": map[string]any{ + "address": map[string]any{ + "socket_address": map[string]any{ + "address": host, + "port_value": port, + }, + }, + }, + }, + }, + }, + }, + }, } } -func rateLimitClusterName(gatewayName string) string { - return fmt.Sprintf("kuadrant-ratelimiting-%s", gatewayName) -} - func rateLimitWasmRuleBuilder(pathID string, effectivePolicy EffectiveRateLimitPolicy, state *sync.Map) wasm.WasmRuleBuilderFunc { policiesInPath := kuadrantv1.PoliciesInPath(effectivePolicy.Path, isRateLimitPolicyAcceptedAndNotDeletedFunc(state)) @@ -233,3 +220,49 @@ func wasmDataFromLimit(limitIdentifier string, limit kuadrantv1beta3.Limit) (dat return data } + +func isRateLimitPolicyAcceptedAndNotDeletedFunc(state *sync.Map) func(machinery.Policy) bool { + f := isRateLimitPolicyAcceptedFunc(state) + return func(policy machinery.Policy) bool { + p, object := policy.(metav1.Object) + return object && f(policy) && p.GetDeletionTimestamp() == nil + } +} + +func isRateLimitPolicyAcceptedFunc(state *sync.Map) func(machinery.Policy) bool { + f := rateLimitPolicyAcceptedStatusFunc(state) + return func(policy machinery.Policy) bool { + accepted, _ := f(policy) + return accepted + } +} + +func rateLimitPolicyAcceptedStatusFunc(state *sync.Map) func(policy machinery.Policy) (bool, error) { + validatedPolicies, validated := state.Load(StateRateLimitPolicyValid) + if !validated { + return rateLimitPolicyAcceptedStatus + } + validatedPoliciesMap := validatedPolicies.(map[string]error) + return func(policy machinery.Policy) (bool, error) { + err, validated := validatedPoliciesMap[policy.GetLocator()] + if validated { + return err == nil, err + } + return rateLimitPolicyAcceptedStatus(policy) + } +} + +func rateLimitPolicyAcceptedStatus(policy machinery.Policy) (accepted bool, err error) { + p, ok := policy.(*kuadrantv1beta3.RateLimitPolicy) + if !ok { + return + } + if condition := meta.FindStatusCondition(p.Status.Conditions, string(gatewayapiv1alpha2.PolicyConditionAccepted)); condition != nil { + accepted = condition.Status == metav1.ConditionTrue + if !accepted { + err = fmt.Errorf(condition.Message) + } + return + } + return +} diff --git a/controllers/state_of_the_world.go b/controllers/state_of_the_world.go index b7db271b0..d697ed183 100644 --- a/controllers/state_of_the_world.go +++ b/controllers/state_of_the_world.go @@ -228,7 +228,9 @@ func (b *BootOptionsBuilder) getEnvoyGatewayOptions() []controller.ControllerOpt envoygateway.EnvoyExtensionPolicyGroupKind, envoygateway.SecurityPolicyGroupKind, ), - // TODO: add object links + controller.WithObjectLinks( + envoygateway.LinkGatewayToEnvoyPatchPolicy, + ), ) // TODO: add specific tasks to workflow } diff --git a/controllers/test_common.go b/controllers/test_common.go index 6e942a546..e1a978d10 100644 --- a/controllers/test_common.go +++ b/controllers/test_common.go @@ -196,19 +196,6 @@ func SetupKuadrantOperatorForTest(s *runtime.Scheme, cfg *rest.Config) { Expect(err).NotTo(HaveOccurred()) - envoyGatewayLimitadorClusterReconciler := reconcilers.NewBaseReconciler( - mgr.GetClient(), - mgr.GetScheme(), - mgr.GetAPIReader(), - log.Log.WithName("envoyGatewayLimitadorClusterReconciler"), - ) - - err = (&EnvoyGatewayLimitadorClusterReconciler{ - BaseReconciler: envoyGatewayLimitadorClusterReconciler, - }).SetupWithManager(mgr) - - Expect(err).NotTo(HaveOccurred()) - dClient, err := dynamic.NewForConfig(mgr.GetConfig()) Expect(err).NotTo(HaveOccurred()) diff --git a/main.go b/main.go index 8c1eb6fd6..6aef9b119 100644 --- a/main.go +++ b/main.go @@ -269,17 +269,6 @@ func main() { os.Exit(1) } - envoyGatewayLimitadorClusterReconciler := reconcilers.NewBaseReconciler( - mgr.GetClient(), mgr.GetScheme(), mgr.GetAPIReader(), - log.Log.WithName("envoyGatewayLimitadorClusterReconciler"), - ) - if err = (&controllers.EnvoyGatewayLimitadorClusterReconciler{ - BaseReconciler: envoyGatewayLimitadorClusterReconciler, - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "EnvoyGatewayLimitadorClusterReconciler") - os.Exit(1) - } - //+kubebuilder:scaffold:builder if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/pkg/envoygateway/utils.go b/pkg/envoygateway/utils.go index 629bc3eff..7e4a20109 100644 --- a/pkg/envoygateway/utils.go +++ b/pkg/envoygateway/utils.go @@ -2,8 +2,13 @@ package envoygateway import ( egv1alpha1 "github.com/envoyproxy/gateway/api/v1alpha1" + "github.com/samber/lo" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" + gatewayapiv1 "sigs.k8s.io/gateway-api/apis/v1" + + "github.com/kuadrant/policy-machinery/controller" + "github.com/kuadrant/policy-machinery/machinery" "github.com/kuadrant/kuadrant-operator/pkg/library/utils" ) @@ -70,3 +75,34 @@ func IsEnvoyGatewayInstalled(restMapper meta.RESTMapper) (bool, error) { // EnvoyGateway found return true, nil } + +func LinkGatewayToEnvoyPatchPolicy(objs controller.Store) machinery.LinkFunc { + gateways := lo.Map(objs.FilterByGroupKind(machinery.GatewayGroupKind), func(obj controller.Object, _ int) machinery.Object { + return &machinery.Gateway{Gateway: obj.(*gatewayapiv1.Gateway)} + }) + + return machinery.LinkFunc{ + From: machinery.GatewayGroupKind, + To: EnvoyPatchPolicyGroupKind, + Func: func(child machinery.Object) []machinery.Object { + envoyPatchPolicy := child.(*controller.RuntimeObject).Object.(*egv1alpha1.EnvoyPatchPolicy) + namespace := envoyPatchPolicy.GetNamespace() + targetRef := envoyPatchPolicy.Spec.TargetRef + group := string(targetRef.Group) + if group == "" { + group = machinery.GatewayGroupKind.Group + } + kind := string(targetRef.Kind) + if kind == "" { + kind = machinery.GatewayGroupKind.Kind + } + name := string(targetRef.Name) + if group != machinery.GatewayGroupKind.Group || kind != machinery.GatewayGroupKind.Kind || name == "" { + return []machinery.Object{} + } + return lo.Filter(gateways, func(gateway machinery.Object, _ int) bool { + return gateway.GetName() == name && gateway.GetNamespace() == namespace + }) + }, + } +} diff --git a/pkg/istio/utils.go b/pkg/istio/utils.go index b943847c4..bf1217d44 100644 --- a/pkg/istio/utils.go +++ b/pkg/istio/utils.go @@ -104,7 +104,7 @@ func LinkGatewayToWasmPlugin(objs controller.Store) machinery.LinkFunc { To: WasmPluginGroupKind, Func: func(child machinery.Object) []machinery.Object { wasmPlugin := child.(*controller.RuntimeObject).Object.(*istioclientgoextensionv1alpha1.WasmPlugin) - return lo.FilterMap(gateways, istioTargetRefsIncludeObjectFunc(wasmPlugin.Spec.TargetRefs, wasmPlugin.GetNamespace())) + return lo.Filter(gateways, istioTargetRefsIncludeObjectFunc(wasmPlugin.Spec.TargetRefs, wasmPlugin.GetNamespace())) }, } } @@ -119,15 +119,15 @@ func LinkGatewayToEnvoyFilter(objs controller.Store) machinery.LinkFunc { To: EnvoyFilterGroupKind, Func: func(child machinery.Object) []machinery.Object { envoyFilter := child.(*controller.RuntimeObject).Object.(*istioclientgonetworkingv1alpha3.EnvoyFilter) - return lo.FilterMap(gateways, istioTargetRefsIncludeObjectFunc(envoyFilter.Spec.TargetRefs, envoyFilter.GetNamespace())) + return lo.Filter(gateways, istioTargetRefsIncludeObjectFunc(envoyFilter.Spec.TargetRefs, envoyFilter.GetNamespace())) }, } } -func istioTargetRefsIncludeObjectFunc(targetRefs []*istioapiv1beta1.PolicyTargetReference, defaultNamespace string) func(machinery.Object, int) (machinery.Object, bool) { - return func(obj machinery.Object, _ int) (machinery.Object, bool) { +func istioTargetRefsIncludeObjectFunc(targetRefs []*istioapiv1beta1.PolicyTargetReference, defaultNamespace string) func(machinery.Object, int) bool { + return func(obj machinery.Object, _ int) bool { groupKind := obj.GroupVersionKind().GroupKind() - return obj, lo.SomeBy(targetRefs, func(targetRef *istioapiv1beta1.PolicyTargetReference) bool { + return lo.SomeBy(targetRefs, func(targetRef *istioapiv1beta1.PolicyTargetReference) bool { if targetRef == nil { return false } diff --git a/tests/envoygateway/envoygateway_limitador_cluster_controller_test.go b/tests/envoygateway/envoygateway_limitador_cluster_controller_test.go index 4eb5a6472..39b498dca 100644 --- a/tests/envoygateway/envoygateway_limitador_cluster_controller_test.go +++ b/tests/envoygateway/envoygateway_limitador_cluster_controller_test.go @@ -23,7 +23,6 @@ import ( kuadrantv1beta3 "github.com/kuadrant/kuadrant-operator/api/v1beta3" "github.com/kuadrant/kuadrant-operator/controllers" "github.com/kuadrant/kuadrant-operator/pkg/common" - "github.com/kuadrant/kuadrant-operator/pkg/wasm" "github.com/kuadrant/kuadrant-operator/tests" ) @@ -133,9 +132,7 @@ var _ = Describe("limitador cluster controller", func() { It("Creates envoypatchpolicy for limitador cluster", func(ctx SpecContext) { patchKey := client.ObjectKey{ - Name: controllers.LimitadorClusterEnvoyPatchPolicyName( - wasm.WasmExtensionName(TestGatewayName), - ), + Name: controllers.RateLimitClusterName(TestGatewayName), Namespace: testNamespace, } @@ -205,9 +202,7 @@ var _ = Describe("limitador cluster controller", func() { Expect(err).ToNot(HaveOccurred()) patchKey := client.ObjectKey{ - Name: controllers.LimitadorClusterEnvoyPatchPolicyName( - wasm.WasmExtensionName(TestGatewayName), - ), + Name: controllers.RateLimitClusterName(TestGatewayName), Namespace: testNamespace, } @@ -224,9 +219,7 @@ var _ = Describe("limitador cluster controller", func() { Expect(err).ToNot(HaveOccurred()) patchKey := client.ObjectKey{ - Name: controllers.LimitadorClusterEnvoyPatchPolicyName( - wasm.WasmExtensionName(TestGatewayName), - ), + Name: controllers.RateLimitClusterName(TestGatewayName), Namespace: testNamespace, } diff --git a/tests/istio/limitador_cluster_envoyfilter_controller_test.go b/tests/istio/limitador_cluster_envoyfilter_controller_test.go index 7527e2d72..a74a3de90 100644 --- a/tests/istio/limitador_cluster_envoyfilter_controller_test.go +++ b/tests/istio/limitador_cluster_envoyfilter_controller_test.go @@ -3,7 +3,6 @@ package istio_test import ( - "fmt" "time" limitadorv1alpha1 "github.com/kuadrant/limitador-operator/api/v1alpha1" @@ -19,6 +18,7 @@ import ( gatewayapiv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" kuadrantv1beta3 "github.com/kuadrant/kuadrant-operator/api/v1beta3" + "github.com/kuadrant/kuadrant-operator/controllers" "github.com/kuadrant/kuadrant-operator/pkg/common" "github.com/kuadrant/kuadrant-operator/pkg/library/kuadrant" "github.com/kuadrant/kuadrant-operator/tests" @@ -32,7 +32,6 @@ var _ = Describe("Limitador Cluster EnvoyFilter controller", func() { var ( testNamespace string rlpName = "toystore-rlp" - efName = fmt.Sprintf("kuadrant-ratelimiting-cluster-%s", TestGatewayName) ) beforeEachCallback := func(ctx SpecContext) { @@ -120,7 +119,7 @@ var _ = Describe("Limitador Cluster EnvoyFilter controller", func() { // Check envoy filter Eventually(func() bool { existingEF := &istioclientnetworkingv1alpha3.EnvoyFilter{} - efKey := client.ObjectKey{Name: efName, Namespace: testNamespace} + efKey := client.ObjectKey{Name: controllers.RateLimitClusterName(TestGatewayName), Namespace: testNamespace} err = testClient().Get(ctx, efKey, existingEF) if err != nil { return false @@ -134,7 +133,7 @@ var _ = Describe("Limitador Cluster EnvoyFilter controller", func() { // Check envoy filter is gone Eventually(func() bool { existingEF := &istioclientnetworkingv1alpha3.EnvoyFilter{} - efKey := client.ObjectKey{Name: efName, Namespace: testNamespace} + efKey := client.ObjectKey{Name: controllers.RateLimitClusterName(TestGatewayName), Namespace: testNamespace} err = testClient().Get(ctx, efKey, existingEF) return apierrors.IsNotFound(err) }).WithContext(ctx).Should(BeTrue())