From 3e028af3b084f673f42d085020d565a44a56f8fa Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Wed, 7 Jun 2023 21:02:09 +0200 Subject: [PATCH 1/3] Enable Azure Workload Identity to authorize against RabbitMQ management API Signed-off-by: Jakub Adamus --- CHANGELOG.md | 1 + pkg/scalers/rabbitmq_scaler.go | 54 ++++++++++++++++++++++++----- pkg/scalers/rabbitmq_scaler_test.go | 38 +++++++++++++------- 3 files changed, 73 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ecfb8153acf..46a3e0d0560 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Pulsar Scaler**: Improve error messages for unsuccessful connections ([#4563](https://github.com/kedacore/keda/issues/4563)) - **Security**: Enable secret scanning in GitHub repo - **RabbitMQ Scaler**: Add support for `unsafeSsl` in trigger metadata ([#4448](https://github.com/kedacore/keda/issues/4448)) +- **RabbitMQ Scaler**: Add support for `workloadIdentityResource` and utilize AzureAD Workload Identity for HTTP authorization - **Prometheus Metrics**: Add new metric with KEDA build info ([#4647](https://github.com/kedacore/keda/issues/4647)) - **Prometheus Scaler**: Add support for Google Managed Prometheus ([#4675](https://github.com/kedacore/keda/pull/4675)) diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 719fd7fee4a..abdaea3ea02 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -17,6 +17,8 @@ import ( v2 "k8s.io/api/autoscaling/v2" "k8s.io/metrics/pkg/apis/external_metrics" + "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + "github.com/kedacore/keda/v2/pkg/scalers/azure" kedautil "github.com/kedacore/keda/v2/pkg/util" ) @@ -59,6 +61,7 @@ type rabbitMQScaler struct { connection *amqp.Connection channel *amqp.Channel httpClient *http.Client + azureOAuth *azure.ADWorkloadIdentityTokenProvider logger logr.Logger } @@ -85,6 +88,10 @@ type rabbitMQMetadata struct { keyPassword string enableTLS bool unsafeSsl bool + + // token provider for azure AD + workloadIdentityClientID string + workloadIdentityResource string } type queueInfo struct { @@ -233,6 +240,13 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { meta.keyPassword = config.AuthParams["keyPassword"] + if config.PodIdentity.Provider == v1alpha1.PodIdentityProviderAzureWorkload { + if config.AuthParams["workloadIdentityResource"] != "" { + meta.workloadIdentityClientID = config.PodIdentity.IdentityID + meta.workloadIdentityResource = config.AuthParams["workloadIdentityResource"] + } + } + certGiven := meta.cert != "" keyGiven := meta.key != "" if certGiven != keyGiven { @@ -264,6 +278,10 @@ func parseRabbitMQMetadata(config *ScalerConfig) (*rabbitMQMetadata, error) { } } + if meta.protocol == amqpProtocol && config.AuthParams["workloadIdentityResource"] != "" { + return nil, fmt.Errorf("workload identity is not supported for amqp protocol currently") + } + // Resolve queueName if val, ok := config.TriggerMetadata["queueName"]; ok { meta.queueName = val @@ -464,9 +482,9 @@ func (s *rabbitMQScaler) Close(context.Context) error { return nil } -func (s *rabbitMQScaler) getQueueStatus() (int64, float64, error) { +func (s *rabbitMQScaler) getQueueStatus(ctx context.Context) (int64, float64, error) { if s.metadata.protocol == httpProtocol { - info, err := s.getQueueInfoViaHTTP() + info, err := s.getQueueInfoViaHTTP(ctx) if err != nil { return -1, -1, err } @@ -488,12 +506,32 @@ func (s *rabbitMQScaler) getQueueStatus() (int64, float64, error) { return int64(items.Messages), 0, nil } -func getJSON(s *rabbitMQScaler, url string) (queueInfo, error) { +func getJSON(ctx context.Context, s *rabbitMQScaler, url string) (queueInfo, error) { var result queueInfo - r, err := s.httpClient.Get(url) + + request, err := http.NewRequest("GET", url, nil) + if err != nil { + return result, err + } + + if s.metadata.workloadIdentityResource != "" { + if s.azureOAuth == nil { + s.azureOAuth = azure.NewAzureADWorkloadIdentityTokenProvider(ctx, s.metadata.workloadIdentityClientID, s.metadata.workloadIdentityResource) + } + + err = s.azureOAuth.Refresh() + if err != nil { + return result, err + } + + request.Header.Set("Authorization", "Bearer "+s.azureOAuth.OAuthToken()) + } + + r, err := s.httpClient.Do(request) if err != nil { return result, err } + defer r.Body.Close() if r.StatusCode == 200 { @@ -518,7 +556,7 @@ func getJSON(s *rabbitMQScaler, url string) (queueInfo, error) { return result, fmt.Errorf("error requesting rabbitMQ API status: %s, response: %s, from: %s", r.Status, body, url) } -func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) { +func (s *rabbitMQScaler) getQueueInfoViaHTTP(ctx context.Context) (*queueInfo, error) { parsedURL, err := url.Parse(s.metadata.host) if err != nil { @@ -547,7 +585,7 @@ func (s *rabbitMQScaler) getQueueInfoViaHTTP() (*queueInfo, error) { } var info queueInfo - info, err = getJSON(s, getQueueInfoManagementURI) + info, err = getJSON(ctx, s, getQueueInfoManagementURI) if err != nil { return nil, err @@ -572,8 +610,8 @@ func (s *rabbitMQScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpe } // GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric -func (s *rabbitMQScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - messages, publishRate, err := s.getQueueStatus() +func (s *rabbitMQScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + messages, publishRate, err := s.getQueueStatus(ctx) if err != nil { return []external_metrics.ExternalMetricValue{}, false, s.anonymizeRabbitMQError(err) } diff --git a/pkg/scalers/rabbitmq_scaler_test.go b/pkg/scalers/rabbitmq_scaler_test.go index 90cc9294251..c1dcf1f301e 100644 --- a/pkg/scalers/rabbitmq_scaler_test.go +++ b/pkg/scalers/rabbitmq_scaler_test.go @@ -11,6 +11,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/kedacore/keda/v2/apis/keda/v1alpha1" ) const ( @@ -24,10 +26,12 @@ type parseRabbitMQMetadataTestData struct { } type parseRabbitMQAuthParamTestData struct { - metadata map[string]string - authParams map[string]string - isError bool - enableTLS bool + metadata map[string]string + podIdentity v1alpha1.AuthPodIdentity + authParams map[string]string + isError bool + enableTLS bool + workloadIdentity bool } type rabbitMQMetricIdentifier struct { @@ -134,19 +138,23 @@ var testRabbitMQMetadata = []parseRabbitMQMetadataTestData{ } var testRabbitMQAuthParamData = []parseRabbitMQAuthParamTestData{ - {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true, false}, // success, TLS cert/key and assumed public CA - {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "enable", "cert": "ceert", "key": "keey"}, false, true, false}, // success, TLS cert/key + key password and assumed public CA - {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "enable", "cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true, false}, // success, TLS CA only - {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "ca": "caaa"}, false, true}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "enable", "ca": "caaa"}, false, true, false}, // failure, TLS missing cert - {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "ca": "caaa", "key": "kee"}, true, true}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "enable", "ca": "caaa", "key": "kee"}, true, true, false}, // failure, TLS missing key - {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, true}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "enable", "ca": "caaa", "cert": "ceert"}, true, true, false}, // failure, TLS invalid - {map[string]string{"queueName": "sample", "hostFromEnv": host}, map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "kee"}, true, true}, + {map[string]string{"queueName": "sample", "hostFromEnv": host}, v1alpha1.AuthPodIdentity{}, map[string]string{"tls": "yes", "ca": "caaa", "cert": "ceert", "key": "kee"}, true, true, false}, + // success, WorkloadIdentity + {map[string]string{"queueName": "sample", "hostFromEnv": host, "protocol": "http"}, v1alpha1.AuthPodIdentity{Provider: v1alpha1.PodIdentityProviderAzureWorkload, IdentityID: "client-id"}, map[string]string{"workloadIdentityResource": "rabbitmq-resource-id"}, false, false, true}, + // failure, WoekloadIdentity not supported for amqp + {map[string]string{"queueName": "sample", "hostFromEnv": host, "protocol": "amqp"}, v1alpha1.AuthPodIdentity{Provider: v1alpha1.PodIdentityProviderAzureWorkload, IdentityID: "client-id"}, map[string]string{"workloadIdentityResource": "rabbitmq-resource-id"}, true, false, false}, } var rabbitMQMetricIdentifiers = []rabbitMQMetricIdentifier{ {&testRabbitMQMetadata[1], 0, "s0-rabbitmq-sample"}, @@ -177,7 +185,7 @@ func TestRabbitMQParseMetadata(t *testing.T) { func TestRabbitMQParseAuthParamData(t *testing.T) { for _, testData := range testRabbitMQAuthParamData { - metadata, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams}) + metadata, err := parseRabbitMQMetadata(&ScalerConfig{ResolvedEnv: sampleRabbitMqResolvedEnv, TriggerMetadata: testData.metadata, AuthParams: testData.authParams, PodIdentity: testData.podIdentity}) if err != nil && !testData.isError { t.Error("Expected success but got error", err) } @@ -201,6 +209,12 @@ func TestRabbitMQParseAuthParamData(t *testing.T) { t.Errorf("Expected key to be set to %v but got %v\n", testData.authParams["keyPassword"], metadata.key) } } + if metadata != nil && metadata.workloadIdentityClientID != "" && !testData.workloadIdentity { + t.Errorf("Expected workloadIdentity to be disabled but got %v as client ID and %v as resource\n", metadata.workloadIdentityClientID, metadata.workloadIdentityResource) + } + if metadata != nil && metadata.workloadIdentityClientID == "" && testData.workloadIdentity { + t.Error("Expected workloadIdentity to be enabled but was not\n") + } } } From c9c6f1b9502c0242a22b4ce8ea269311fe5a17b9 Mon Sep 17 00:00:00 2001 From: Jakub Adamus Date: Fri, 16 Jun 2023 00:30:37 +0200 Subject: [PATCH 2/3] Add E2E tests for RabbitMQ with Azure Workload Identity auth & fix rabbitmq generic tests Signed-off-by: Jakub Adamus --- tests/scalers/rabbitmq/rabbitmq_helper.go | 63 ++++++- .../rabbitmq_queue_amqp_test.go | 6 +- .../rabbitmq_queue_amqp_vhost_test.go | 6 +- .../rabbitmq_queue_http_test.go | 6 +- .../rabbitmq_queue_http_aad_wi_test.go | 172 +++++++++++++++++ .../rabbitmq_queue_http_regex_test.go | 6 +- .../rabbitmq_queue_http_regex_aad_wi_test.go | 178 ++++++++++++++++++ .../rabbitmq_queue_http_regex_vhost_test.go | 6 +- .../rabbitmq_queue_http_vhost_test.go | 6 +- 9 files changed, 421 insertions(+), 28 deletions(-) create mode 100644 tests/scalers/rabbitmq/rabbitmq_queue_http_aad_wi/rabbitmq_queue_http_aad_wi_test.go create mode 100644 tests/scalers/rabbitmq/rabbitmq_queue_http_regex_aad_wi/rabbitmq_queue_http_regex_aad_wi_test.go diff --git a/tests/scalers/rabbitmq/rabbitmq_helper.go b/tests/scalers/rabbitmq/rabbitmq_helper.go index a3ad2e2d6e2..b44c4ce135b 100644 --- a/tests/scalers/rabbitmq/rabbitmq_helper.go +++ b/tests/scalers/rabbitmq/rabbitmq_helper.go @@ -60,6 +60,15 @@ data: default_vhost = {{.VHostName}} management.tcp.port = 15672 management.tcp.ip = 0.0.0.0 + {{if .EnableOAuth}} + auth_backends.1 = rabbit_auth_backend_internal + auth_backends.2 = rabbit_auth_backend_oauth2 + auth_backends.3 = rabbit_auth_backend_amqp + auth_oauth2.resource_server_id = {{.OAuthClientID}} + auth_oauth2.scope_prefix = rabbitmq. + auth_oauth2.additional_scopes_key = {{.OAuthScopesKey}} + auth_oauth2.jwks_url = {{.OAuthJwksURI}} + {{end}} enabled_plugins: | [rabbitmq_management]. --- @@ -158,6 +167,28 @@ spec: ` ) +type RabbitOAuthConfig struct { + Enable bool + ClientID string + ScopesKey string + JwksURI string +} + +func WithoutOAuth() RabbitOAuthConfig { + return RabbitOAuthConfig{ + Enable: false, + } +} + +func WithAzureADOAuth(tenantID string, clientID string) RabbitOAuthConfig { + return RabbitOAuthConfig{ + Enable: true, + ClientID: clientID, + ScopesKey: "roles", + JwksURI: fmt.Sprintf("https://login.microsoftonline.com/%s/discovery/keys", tenantID), + } +} + type templateData struct { Namespace string Connection string @@ -165,26 +196,38 @@ type templateData struct { HostName, VHostName string Username, Password string MessageCount int + EnableOAuth bool + OAuthClientID string + OAuthScopesKey string + OAuthJwksURI string } -func RMQInstall(t *testing.T, kc *kubernetes.Clientset, namespace, user, password, vhost string) { +func RMQInstall(t *testing.T, kc *kubernetes.Clientset, namespace, user, password, vhost string, oauth RabbitOAuthConfig) { helper.CreateNamespace(t, kc, namespace) data := templateData{ - Namespace: namespace, - VHostName: vhost, - Username: user, - Password: password, + Namespace: namespace, + VHostName: vhost, + Username: user, + Password: password, + EnableOAuth: oauth.Enable, + OAuthClientID: oauth.ClientID, + OAuthScopesKey: oauth.ScopesKey, + OAuthJwksURI: oauth.JwksURI, } helper.KubectlApplyWithTemplate(t, data, "rmqDeploymentTemplate", deploymentTemplate) } -func RMQUninstall(t *testing.T, namespace, user, password, vhost string) { +func RMQUninstall(t *testing.T, namespace, user, password, vhost string, oauth RabbitOAuthConfig) { data := templateData{ - Namespace: namespace, - VHostName: vhost, - Username: user, - Password: password, + Namespace: namespace, + VHostName: vhost, + Username: user, + Password: password, + EnableOAuth: oauth.Enable, + OAuthClientID: oauth.ClientID, + OAuthScopesKey: oauth.ScopesKey, + OAuthJwksURI: oauth.JwksURI, } helper.KubectlDeleteWithTemplate(t, data, "rmqDeploymentTemplate", deploymentTemplate) diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_amqp/rabbitmq_queue_amqp_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_amqp/rabbitmq_queue_amqp_test.go index fd3fc89ffc6..be16d0a480b 100644 --- a/tests/scalers/rabbitmq/rabbitmq_queue_amqp/rabbitmq_queue_amqp_test.go +++ b/tests/scalers/rabbitmq/rabbitmq_queue_amqp/rabbitmq_queue_amqp_test.go @@ -17,7 +17,7 @@ import ( ) // Load environment variables from .env file -var _ = godotenv.Load("../../.env") +var _ = godotenv.Load("../../../.env") const ( testName = "rmq-queue-amqp-test" @@ -79,7 +79,7 @@ func TestScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - RMQInstall(t, kc, rmqNamespace, user, password, vhost) + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth()) CreateKubernetesResources(t, kc, testNamespace, data, templates) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), @@ -92,7 +92,7 @@ func TestScaler(t *testing.T) { // cleanup t.Log("--- cleaning up ---") DeleteKubernetesResources(t, testNamespace, data, templates) - RMQUninstall(t, rmqNamespace, user, password, vhost) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth()) } func getTemplateData() (templateData, []Template) { diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_amqp_vhost/rabbitmq_queue_amqp_vhost_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_amqp_vhost/rabbitmq_queue_amqp_vhost_test.go index 299c9a6a74a..7ae59414a0d 100644 --- a/tests/scalers/rabbitmq/rabbitmq_queue_amqp_vhost/rabbitmq_queue_amqp_vhost_test.go +++ b/tests/scalers/rabbitmq/rabbitmq_queue_amqp_vhost/rabbitmq_queue_amqp_vhost_test.go @@ -17,7 +17,7 @@ import ( ) // Load environment variables from .env file -var _ = godotenv.Load("../../.env") +var _ = godotenv.Load("../../../.env") const ( testName = "rmq-queue-amqp-vhost-test" @@ -79,7 +79,7 @@ func TestScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - RMQInstall(t, kc, rmqNamespace, user, password, vhost) + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth()) CreateKubernetesResources(t, kc, testNamespace, data, templates) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), @@ -92,7 +92,7 @@ func TestScaler(t *testing.T) { // cleanup t.Log("--- cleaning up ---") DeleteKubernetesResources(t, testNamespace, data, templates) - RMQUninstall(t, rmqNamespace, user, password, vhost) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth()) } func getTemplateData() (templateData, []Template) { diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_http/rabbitmq_queue_http_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_http/rabbitmq_queue_http_test.go index 26749ddd9af..d19c5138956 100644 --- a/tests/scalers/rabbitmq/rabbitmq_queue_http/rabbitmq_queue_http_test.go +++ b/tests/scalers/rabbitmq/rabbitmq_queue_http/rabbitmq_queue_http_test.go @@ -17,7 +17,7 @@ import ( ) // Load environment variables from .env file -var _ = godotenv.Load("../../.env") +var _ = godotenv.Load("../../../.env") const ( testName = "rmq-queue-http-test" @@ -80,7 +80,7 @@ func TestScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - RMQInstall(t, kc, rmqNamespace, user, password, vhost) + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth()) CreateKubernetesResources(t, kc, testNamespace, data, templates) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), @@ -91,7 +91,7 @@ func TestScaler(t *testing.T) { // cleanup t.Log("--- cleaning up ---") DeleteKubernetesResources(t, testNamespace, data, templates) - RMQUninstall(t, rmqNamespace, user, password, vhost) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth()) } func getTemplateData() (templateData, []Template) { diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_http_aad_wi/rabbitmq_queue_http_aad_wi_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_http_aad_wi/rabbitmq_queue_http_aad_wi_test.go new file mode 100644 index 00000000000..865ff77b1b2 --- /dev/null +++ b/tests/scalers/rabbitmq/rabbitmq_queue_http_aad_wi/rabbitmq_queue_http_aad_wi_test.go @@ -0,0 +1,172 @@ +//go:build e2e +// +build e2e + +package rabbitmq_queue_http_add_wi_test + +import ( + "encoding/base64" + "fmt" + "os" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + . "github.com/kedacore/keda/v2/tests/scalers/rabbitmq" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "rmq-queue-http-test-aad-wi" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + rmqNamespace = fmt.Sprintf("%s-rmq", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + secretName = fmt.Sprintf("%s-secret", testName) + triggerAuthName = fmt.Sprintf("%s-ta", testName) + triggerSecretName = fmt.Sprintf("%s-ta-secret", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + queueName = "hello" + user = fmt.Sprintf("%s-user", testName) + password = fmt.Sprintf("%s-password", testName) + vhost = "/" + connectionString = fmt.Sprintf("amqp://%s:%s@rabbitmq.%s.svc.cluster.local/", user, password, rmqNamespace) + httpConnectionString = fmt.Sprintf("http://%s:%s@rabbitmq.%s.svc.cluster.local/", user, password, rmqNamespace) + httpNoAuthConnectionString = fmt.Sprintf("http://rabbitmq.%s.svc.cluster.local/", rmqNamespace) + rabbitAppClientID = os.Getenv("TF_AZURE_RABBIT_API_APPLICATION_ID") + azureADTenantID = os.Getenv("TF_AZURE_SP_TENANT") + messageCount = 100 +) + +const ( + secretTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.TriggerSecretName}} + namespace: {{.TestNamespace}} +type: Opaque +data: + workloadIdentityResource: {{.Base64RabbitAppClientID}} +` + + triggerAuthTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthName}} + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: azure-workload + + secretTargetRef: + - parameter: workloadIdentityResource + name: {{.TriggerSecretName}} + key: workloadIdentityResource +` + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 4 + triggers: + - type: rabbitmq + metadata: + queueName: {{.QueueName}} + vhostName: {{.VHost}} + host: {{.ConnectionNoAuth}} + protocol: http + mode: QueueLength + value: '10' + authenticationRef: + name: {{.TriggerAuthName}} +` +) + +type templateData struct { + TestNamespace string + DeploymentName string + TriggerSecretName string + TriggerAuthName string + ScaledObjectName string + SecretName string + QueueName string + VHost string + Connection, Base64Connection string + ConnectionNoAuth string + RabbitAppClientID, Base64RabbitAppClientID string +} + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + require.NotEmpty(t, rabbitAppClientID, "TF_AZURE_RABBIT_API_APPLICATION_ID env variable is required for rabbitmq workload identity tests") + require.NotEmpty(t, azureADTenantID, "TF_AZURE_SP_TENANT env variable is required for rabbitmq workload identity tests") + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithAzureADOAuth(azureADTenantID, rabbitAppClientID)) + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") + + testScaling(t, kc) + + // cleanup + t.Log("--- cleaning up ---") + DeleteKubernetesResources(t, testNamespace, data, templates) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithAzureADOAuth(azureADTenantID, rabbitAppClientID)) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + VHost: vhost, + QueueName: queueName, + Connection: connectionString, + Base64Connection: base64.StdEncoding.EncodeToString([]byte(httpConnectionString)), + TriggerAuthName: triggerAuthName, + TriggerSecretName: triggerSecretName, + ConnectionNoAuth: httpNoAuthConnectionString, + RabbitAppClientID: rabbitAppClientID, + Base64RabbitAppClientID: base64.StdEncoding.EncodeToString([]byte(rabbitAppClientID)), + }, []Template{ + {Name: "deploymentTemplate", Config: RMQTargetDeploymentTemplate}, + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "triggerAuthTemplate", Config: triggerAuthTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} + +func testScaling(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale out ---") + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messageCount) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 1), + "replica count should be 4 after 1 minute") + + t.Log("--- testing scale in ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +} diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_http_regex/rabbitmq_queue_http_regex_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_http_regex/rabbitmq_queue_http_regex_test.go index 16551e57655..da128a85519 100644 --- a/tests/scalers/rabbitmq/rabbitmq_queue_http_regex/rabbitmq_queue_http_regex_test.go +++ b/tests/scalers/rabbitmq/rabbitmq_queue_http_regex/rabbitmq_queue_http_regex_test.go @@ -17,7 +17,7 @@ import ( ) // Load environment variables from .env file -var _ = godotenv.Load("../../.env") +var _ = godotenv.Load("../../../.env") const ( testName = "rmq-queue-http-regex-test" @@ -83,7 +83,7 @@ func TestScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - RMQInstall(t, kc, rmqNamespace, user, password, vhost) + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth()) CreateKubernetesResources(t, kc, testNamespace, data, templates) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), @@ -94,7 +94,7 @@ func TestScaler(t *testing.T) { // cleanup t.Log("--- cleaning up ---") DeleteKubernetesResources(t, testNamespace, data, templates) - RMQUninstall(t, rmqNamespace, user, password, vhost) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth()) } func getTemplateData() (templateData, []Template) { diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_http_regex_aad_wi/rabbitmq_queue_http_regex_aad_wi_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_http_regex_aad_wi/rabbitmq_queue_http_regex_aad_wi_test.go new file mode 100644 index 00000000000..db40a384f24 --- /dev/null +++ b/tests/scalers/rabbitmq/rabbitmq_queue_http_regex_aad_wi/rabbitmq_queue_http_regex_aad_wi_test.go @@ -0,0 +1,178 @@ +//go:build e2e +// +build e2e + +package rabbitmq_queue_http_regex_aad_wi_test + +import ( + "encoding/base64" + "fmt" + "os" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" + . "github.com/kedacore/keda/v2/tests/scalers/rabbitmq" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../../.env") + +const ( + testName = "rmq-queue-http-regex-aad-wi-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + rmqNamespace = fmt.Sprintf("%s-rmq", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + secretName = fmt.Sprintf("%s-secret", testName) + triggerAuthName = fmt.Sprintf("%s-ta", testName) + triggerSecretName = fmt.Sprintf("%s-ta-secret", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + queueName = "hello" + queueRegex = "^hell.{1}$" + user = fmt.Sprintf("%s-user", testName) + password = fmt.Sprintf("%s-password", testName) + vhost = "/" + connectionString = fmt.Sprintf("amqp://%s:%s@rabbitmq.%s.svc.cluster.local/", user, password, rmqNamespace) + httpConnectionString = fmt.Sprintf("http://%s:%s@rabbitmq.%s.svc.cluster.local/", user, password, rmqNamespace) + httpNoAuthConnectionString = fmt.Sprintf("http://rabbitmq.%s.svc.cluster.local/", rmqNamespace) + rabbitAppClientID = os.Getenv("TF_AZURE_RABBIT_API_APPLICATION_ID") + azureADTenantID = os.Getenv("TF_AZURE_SP_TENANT") + messageCount = 100 +) + +const ( + secretTemplate = ` +apiVersion: v1 +kind: Secret +metadata: + name: {{.TriggerSecretName}} + namespace: {{.TestNamespace}} +type: Opaque +data: + workloadIdentityResource: {{.Base64RabbitAppClientID}} +` + triggerAuthTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: TriggerAuthentication +metadata: + name: {{.TriggerAuthName}} + namespace: {{.TestNamespace}} +spec: + podIdentity: + provider: azure-workload + + secretTargetRef: + - parameter: workloadIdentityResource + name: {{.TriggerSecretName}} + key: workloadIdentityResource +` + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 5 + cooldownPeriod: 10 + minReplicaCount: 0 + maxReplicaCount: 4 + triggers: + - type: rabbitmq + metadata: + queueName: {{.QueueName}} + vhostName: {{.VHost}} + host: {{.ConnectionNoAuth}} + protocol: http + mode: QueueLength + value: '10' + useRegex: 'true' + operation: sum + authenticationRef: + name: {{.TriggerAuthName}} +` +) + +type templateData struct { + TestNamespace string + DeploymentName string + TriggerSecretName string + TriggerAuthName string + ScaledObjectName string + SecretName string + QueueName string + VHost string + Connection, Base64Connection string + ConnectionNoAuth string + RabbitAppClientID, Base64RabbitAppClientID string +} + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + require.NotEmpty(t, rabbitAppClientID, "TF_AZURE_RABBIT_API_APPLICATION_ID env variable is required for rabbitmq workload identity tests") + require.NotEmpty(t, azureADTenantID, "TF_AZURE_SP_TENANT env variable is required for rabbitmq workload identity tests") + + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithAzureADOAuth(azureADTenantID, rabbitAppClientID)) + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") + + testScaling(t, kc) + + // cleanup + t.Log("--- cleaning up ---") + DeleteKubernetesResources(t, testNamespace, data, templates) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithAzureADOAuth(azureADTenantID, rabbitAppClientID)) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + SecretName: secretName, + VHost: vhost, + QueueName: queueRegex, + Connection: connectionString, + Base64Connection: base64.StdEncoding.EncodeToString([]byte(httpConnectionString)), + TriggerAuthName: triggerAuthName, + TriggerSecretName: triggerSecretName, + ConnectionNoAuth: httpNoAuthConnectionString, + RabbitAppClientID: rabbitAppClientID, + Base64RabbitAppClientID: base64.StdEncoding.EncodeToString([]byte(rabbitAppClientID)), + }, []Template{ + {Name: "deploymentTemplate", Config: RMQTargetDeploymentTemplate}, + {Name: "secretTemplate", Config: secretTemplate}, + {Name: "triggerAuthTemplate", Config: triggerAuthTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} + +func testScaling(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale out ---") + RMQPublishMessages(t, rmqNamespace, connectionString, queueName, messageCount) + // dummies + RMQPublishMessages(t, rmqNamespace, connectionString, fmt.Sprintf("%s-1", queueName), messageCount) + RMQPublishMessages(t, rmqNamespace, connectionString, fmt.Sprintf("%s-%s", queueName, queueName), messageCount) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 4, 60, 2), + "replica count should be 4 after 2 minute") + + t.Log("--- testing scale in ---") + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +} diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_http_regex_vhost/rabbitmq_queue_http_regex_vhost_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_http_regex_vhost/rabbitmq_queue_http_regex_vhost_test.go index 5e8a7f932bd..113d92d3cff 100644 --- a/tests/scalers/rabbitmq/rabbitmq_queue_http_regex_vhost/rabbitmq_queue_http_regex_vhost_test.go +++ b/tests/scalers/rabbitmq/rabbitmq_queue_http_regex_vhost/rabbitmq_queue_http_regex_vhost_test.go @@ -17,7 +17,7 @@ import ( ) // Load environment variables from .env file -var _ = godotenv.Load("../../.env") +var _ = godotenv.Load("../../../.env") const ( testName = "rmq-queue-http-regex-vhost-test" @@ -88,7 +88,7 @@ func TestScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - RMQInstall(t, kc, rmqNamespace, user, password, vhost) + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth()) CreateKubernetesResources(t, kc, testNamespace, data, templates) RMQCreateVHost(t, rmqNamespace, connectionHost, user, password, dummyVhost1) @@ -102,7 +102,7 @@ func TestScaler(t *testing.T) { // cleanup t.Log("--- cleaning up ---") DeleteKubernetesResources(t, testNamespace, data, templates) - RMQUninstall(t, rmqNamespace, user, password, vhost) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth()) } func getTemplateData() (templateData, []Template) { diff --git a/tests/scalers/rabbitmq/rabbitmq_queue_http_vhost/rabbitmq_queue_http_vhost_test.go b/tests/scalers/rabbitmq/rabbitmq_queue_http_vhost/rabbitmq_queue_http_vhost_test.go index f7ed7c6a8e6..a6c6963841b 100644 --- a/tests/scalers/rabbitmq/rabbitmq_queue_http_vhost/rabbitmq_queue_http_vhost_test.go +++ b/tests/scalers/rabbitmq/rabbitmq_queue_http_vhost/rabbitmq_queue_http_vhost_test.go @@ -17,7 +17,7 @@ import ( ) // Load environment variables from .env file -var _ = godotenv.Load("../../.env") +var _ = godotenv.Load("../../../.env") const ( testName = "rmq-queue-http-vhost-test" @@ -80,7 +80,7 @@ func TestScaler(t *testing.T) { kc := GetKubernetesClient(t) data, templates := getTemplateData() - RMQInstall(t, kc, rmqNamespace, user, password, vhost) + RMQInstall(t, kc, rmqNamespace, user, password, vhost, WithoutOAuth()) CreateKubernetesResources(t, kc, testNamespace, data, templates) assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), @@ -91,7 +91,7 @@ func TestScaler(t *testing.T) { // cleanup t.Log("--- cleaning up ---") DeleteKubernetesResources(t, testNamespace, data, templates) - RMQUninstall(t, rmqNamespace, user, password, vhost) + RMQUninstall(t, rmqNamespace, user, password, vhost, WithoutOAuth()) } func getTemplateData() (templateData, []Template) { From 5baa142fd68038a8b635bf24fdeeb6254660e4a0 Mon Sep 17 00:00:00 2001 From: KratkyZobak Date: Wed, 21 Jun 2023 09:50:35 +0200 Subject: [PATCH 3/3] Update CHANGELOG.md Add missing issue link Signed-off-by: KratkyZobak --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46a3e0d0560..dd40c58888b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,7 +62,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Pulsar Scaler**: Improve error messages for unsuccessful connections ([#4563](https://github.com/kedacore/keda/issues/4563)) - **Security**: Enable secret scanning in GitHub repo - **RabbitMQ Scaler**: Add support for `unsafeSsl` in trigger metadata ([#4448](https://github.com/kedacore/keda/issues/4448)) -- **RabbitMQ Scaler**: Add support for `workloadIdentityResource` and utilize AzureAD Workload Identity for HTTP authorization +- **RabbitMQ Scaler**: Add support for `workloadIdentityResource` and utilize AzureAD Workload Identity for HTTP authorization ([#4716](https://github.com/kedacore/keda/issues/4716)) - **Prometheus Metrics**: Add new metric with KEDA build info ([#4647](https://github.com/kedacore/keda/issues/4647)) - **Prometheus Scaler**: Add support for Google Managed Prometheus ([#4675](https://github.com/kedacore/keda/pull/4675))