From 3adfe1c72358fe80ec0a2d5ea5d7c8c49c8b42ce Mon Sep 17 00:00:00 2001 From: Sam Barnes-Thornton Date: Thu, 15 Aug 2024 15:32:38 +0000 Subject: [PATCH 1/8] Add beanstalkd scaler Signed-off-by: Sam Barnes-Thornton Adds a new scaler for beanstalkd queues, along with necessary tests. Signed-off-by: Sam Barnes-Thornton --- CHANGELOG.md | 2 +- go.mod | 1 + go.sum | 2 + pkg/scalers/beanstalkd_scaler.go | 246 +++++++++++++++ pkg/scalers/beanstalkd_scaler_test.go | 257 +++++++++++++++ pkg/scaling/scalers_builder.go | 2 + tests/scalers/beanstalkd/beanstalkd_test.go | 227 ++++++++++++++ .../beanstalkd/go-beanstalk/License | 22 ++ .../beanstalkd/go-beanstalk/Readme.md | 19 ++ .../beanstalkd/go-beanstalk/conn.go | 295 ++++++++++++++++++ .../github.com/beanstalkd/go-beanstalk/doc.go | 6 + .../github.com/beanstalkd/go-beanstalk/err.go | 63 ++++ .../beanstalkd/go-beanstalk/name.go | 55 ++++ .../beanstalkd/go-beanstalk/parse.go | 54 ++++ .../beanstalkd/go-beanstalk/time.go | 12 + .../beanstalkd/go-beanstalk/tube.go | 112 +++++++ .../beanstalkd/go-beanstalk/tubeset.go | 39 +++ vendor/modules.txt | 3 + 18 files changed, 1416 insertions(+), 1 deletion(-) create mode 100644 pkg/scalers/beanstalkd_scaler.go create mode 100644 pkg/scalers/beanstalkd_scaler_test.go create mode 100644 tests/scalers/beanstalkd/beanstalkd_test.go create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/License create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/Readme.md create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/conn.go create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/doc.go create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/err.go create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/name.go create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/parse.go create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/time.go create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/tube.go create mode 100644 vendor/github.com/beanstalkd/go-beanstalk/tubeset.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e7d28889e2..d58f6d186ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -112,8 +112,8 @@ New deprecation(s): - **General**: Add `--ca-dir` flag to KEDA operator to specify directories with CA certificates for scalers to authenticate TLS connections (defaults to /custom/ca) ([#5860](https://github.com/kedacore/keda/issues/5860)) - **General**: Add Dynatrace Scaler ([#5685](https://github.com/kedacore/keda/pull/5685)) -- **General**: Add Splunk Scaler ([#5904](https://github.com/kedacore/keda/issues/5904)) - **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114)) +- **General**: Add Splunk Scaler ([#5904](https://github.com/kedacore/keda/issues/5904)) - **General**: Provide CloudEvents around the management of ScaledObjects resources ([#3522](https://github.com/kedacore/keda/issues/3522)) - **General**: Support for Kubernetes v1.30 ([#5828](https://github.com/kedacore/keda/issues/5828)) diff --git a/go.mod b/go.mod index b6eff2b66cc..4ce193149c3 100644 --- a/go.mod +++ b/go.mod @@ -117,6 +117,7 @@ require ( sigs.k8s.io/controller-tools v0.15.0 sigs.k8s.io/custom-metrics-apiserver v1.29.0 sigs.k8s.io/kustomize/kustomize/v5 v5.4.3 + github.com/beanstalkd/go-beanstalk v0.2.0 ) // Remove this when they merge the PR and cut a release https://github.com/open-policy-agent/cert-controller/pull/202 diff --git a/go.sum b/go.sum index 8ecdbc874fd..5c501c6b833 100644 --- a/go.sum +++ b/go.sum @@ -947,6 +947,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzx github.com/aws/smithy-go v1.13.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE= github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E= +github.com/beanstalkd/go-beanstalk v0.2.0 h1:6UOJugnu47uNB2jJO/lxyDgeD1Yds7owYi1USELqexA= +github.com/beanstalkd/go-beanstalk v0.2.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= diff --git a/pkg/scalers/beanstalkd_scaler.go b/pkg/scalers/beanstalkd_scaler.go new file mode 100644 index 00000000000..73976ac1274 --- /dev/null +++ b/pkg/scalers/beanstalkd_scaler.go @@ -0,0 +1,246 @@ +package scalers + +import ( + "context" + "errors" + "fmt" + "net/url" + "strconv" + "time" + + beanstalk "github.com/beanstalkd/go-beanstalk" + "github.com/go-logr/logr" + "github.com/mitchellh/mapstructure" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" + "github.com/kedacore/keda/v2/pkg/util" +) + +const ( + beanstalkdJobsMetricName = "jobs" + beanstalkdValueConfigName = "value" + beanstalkdActivationValueTriggerConfigName = "activationValue" + beanstalkdMetricType = "External" + beanstalkdNetworkProtocol = "tcp" +) + +type beanstalkdScaler struct { + metricType v2.MetricTargetType + metadata *beanstalkdMetadata + connection *beanstalk.Conn + tube *beanstalk.Tube + logger logr.Logger +} + +type beanstalkdMetadata struct { + server string + tube string + value float64 + activationValue float64 + includeDelayed bool + timeout time.Duration + triggerIndex int +} + +// TubeStats represents a set of tube statistics. +type tubeStats struct { + TotalJobs int64 `mapstructure:"total-jobs"` + JobsReady int64 `mapstructure:"current-jobs-ready"` + JobsReserved int64 `mapstructure:"current-jobs-reserved"` + JobsUrgent int64 `mapstructure:"current-jobs-urgent"` + JobsBuried int64 `mapstructure:"current-jobs-buried"` + JobsDelayed int64 `mapstructure:"current-jobs-delayed"` +} + +func NewBeanstalkdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { + s := &beanstalkdScaler{} + + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + s.metricType = metricType + + s.logger = InitializeLogger(config, "beanstalkd_scaler") + + meta, err := parseBeanstalkdMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing beanstalkd metadata: %w", err) + } + s.metadata = meta + + conn, err := beanstalk.DialTimeout(beanstalkdNetworkProtocol, s.metadata.server, s.metadata.timeout) + if err != nil { + return nil, fmt.Errorf("error connecting to beanstalkd: %w", err) + } + + s.connection = conn + + s.tube = beanstalk.NewTube(s.connection, meta.tube) + + return s, nil +} + +func parseBeanstalkdMetadata(config *scalersconfig.ScalerConfig) (*beanstalkdMetadata, error) { + meta := beanstalkdMetadata{} + + if err := parseServerValue(config, &meta); err != nil { + return nil, err + } + + if val, ok := config.TriggerMetadata["tube"]; ok { + meta.tube = val + } else { + return nil, fmt.Errorf("no queue name given") + } + + if err := parseTimeout(config, &meta); err != nil { + return nil, err + } + + meta.includeDelayed = false + if val, ok := config.TriggerMetadata["includeDelayed"]; ok { + boolVal, err := strconv.ParseBool(val) + if err != nil { + return nil, fmt.Errorf("failed to parse includeDelayed value. Must be either true or false") + } + meta.includeDelayed = boolVal + } + + value, valuePresent := config.TriggerMetadata[beanstalkdValueConfigName] + activationValue, activationValuePresent := config.TriggerMetadata[beanstalkdActivationValueTriggerConfigName] + + if activationValuePresent { + activation, err := strconv.ParseFloat(activationValue, 64) + if err != nil { + return nil, fmt.Errorf("can't parse %s: %w", beanstalkdActivationValueTriggerConfigName, err) + } + meta.activationValue = activation + } + + if !valuePresent { + return nil, fmt.Errorf("%s must be specified", beanstalkdValueConfigName) + } + triggerValue, err := strconv.ParseFloat(value, 64) + if err != nil { + return nil, fmt.Errorf("can't parse %s: %w", beanstalkdValueConfigName, err) + } + meta.value = triggerValue + + meta.triggerIndex = config.TriggerIndex + + return &meta, nil +} + +func parseServerValue(config *scalersconfig.ScalerConfig, meta *beanstalkdMetadata) error { + switch { + case config.AuthParams["server"] != "": + meta.server = config.AuthParams["server"] + case config.TriggerMetadata["server"] != "": + meta.server = config.TriggerMetadata["server"] + default: + return fmt.Errorf("no server setting given") + } + return nil +} + +func parseTimeout(config *scalersconfig.ScalerConfig, meta *beanstalkdMetadata) error { + if val, ok := config.TriggerMetadata["timeout"]; ok { + timeoutMS, err := strconv.Atoi(val) + if err != nil { + return fmt.Errorf("unable to parse timeout: %w", err) + } + if timeoutMS <= 0 { + return fmt.Errorf("timeout must be greater than 0: %w", err) + } + meta.timeout = time.Duration(timeoutMS) * time.Millisecond + } else { + meta.timeout = config.GlobalHTTPTimeout + } + return nil +} + +func (s *beanstalkdScaler) getTubeStats(ctx context.Context) (*tubeStats, error) { + errCh := make(chan error) + statsCh := make(chan *tubeStats) + + go func() { + rawStats, err := s.tube.Stats() + if err != nil { + errCh <- fmt.Errorf("error retrieving stats from beanstalkd: %w", err) + } + + var stats tubeStats + err = mapstructure.WeakDecode(rawStats, &stats) + if err != nil { + errCh <- fmt.Errorf("error decoding stats from beanstalkd: %w", err) + } + + statsCh <- &stats + }() + + select { + case err := <-errCh: + if errors.Is(err, beanstalk.ErrNotFound) { + s.logger.Info("tube not found, setting stats to 0") + return &tubeStats{ + TotalJobs: 0, + JobsReady: 0, + JobsDelayed: 0, + JobsReserved: 0, + JobsUrgent: 0, + JobsBuried: 0, + }, nil + } + return nil, err + case tubeStats := <-statsCh: + return tubeStats, nil + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (s *beanstalkdScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + stats, err := s.getTubeStats(ctx) + if err != nil { + return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error interacting with beanstalkd: %w", err) + } + + totalJobs := stats.JobsReady + stats.JobsReserved + + if s.metadata.includeDelayed { + totalJobs += stats.JobsDelayed + } + + metric := GenerateMetricInMili(metricName, float64(totalJobs)) + isActive := float64(totalJobs) > s.metadata.activationValue + + return []external_metrics.ExternalMetricValue{metric}, isActive, nil +} + +func (s *beanstalkdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("beanstalkd-%s", url.QueryEscape(s.metadata.tube)))), + }, + Target: GetMetricTargetMili(s.metricType, s.metadata.value), + } + metricSpec := v2.MetricSpec{ + External: externalMetric, Type: beanstalkdMetricType, + } + + return []v2.MetricSpec{metricSpec} +} + +func (s *beanstalkdScaler) Close(context.Context) error { + if s.connection != nil { + err := s.connection.Close() + if err != nil { + s.logger.Error(err, "Error closing beanstalkd connection") + return err + } + } + return nil +} diff --git a/pkg/scalers/beanstalkd_scaler_test.go b/pkg/scalers/beanstalkd_scaler_test.go new file mode 100644 index 00000000000..e93a553a28d --- /dev/null +++ b/pkg/scalers/beanstalkd_scaler_test.go @@ -0,0 +1,257 @@ +package scalers + +import ( + "context" + "fmt" + "net" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" +) + +const ( + beanstalkdServer = "localhost:3000" +) + +type parseBeanstalkdMetadataTestData struct { + metadata map[string]string + isError bool +} + +type beanstalkdMetricIdentifier struct { + metadataTestData *parseBeanstalkdMetadataTestData + index int + name string +} + +type tubeStatsTestData struct { + response map[string]interface{} + metadata map[string]string + isActive bool +} + +var testBeanstalkdMetadata = []parseBeanstalkdMetadataTestData{ + // nothing passed + {map[string]string{}, true}, + // properly formed + {map[string]string{"server": beanstalkdServer, "tube": "delayed", "value": "1", "includeDelayed": "true"}, false}, + // no includeDelayed + {map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "1"}, false}, + // missing server + {map[string]string{"tube": "stats-tube", "value": "1", "includeDelayed": "true"}, true}, + // missing tube + {map[string]string{"server": beanstalkdServer, "value": "1", "includeDelayed": "true"}, true}, + // missing value + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "includeDelayed": "true"}, true}, + // invalid value + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "lots", "includeDelayed": "true"}, true}, + // valid timeout + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "1", "includeDelayed": "true", "timeout": "1000"}, false}, + // invalid timeout + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "1", "includeDelayed": "true", "timeout": "-1"}, true}, + // activationValue passed + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "1", "activationValue": "10"}, false}, + // invalid activationValue passed + {map[string]string{"server": beanstalkdServer, "tube": "stats-tube", "value": "1", "activationValue": "AA"}, true}, +} + +var beanstalkdMetricIdentifiers = []beanstalkdMetricIdentifier{ + {&testBeanstalkdMetadata[2], 0, "s0-beanstalkd-no-delayed"}, + {&testBeanstalkdMetadata[1], 1, "s1-beanstalkd-delayed"}, +} + +var testTubeStatsTestData = []tubeStatsTestData{ + { + response: map[string]interface{}{ + "cmd-delete": 18, + "cmd-pause-tube": 0, + "current-jobs-buried": 6, + "current-jobs-delayed": 0, + "current-jobs-ready": 10, + "current-jobs-reserved": 0, + "current-jobs-urgent": 0, + "current-using": 3, + "current-waiting": 3, + "current-watching": 3, + "name": "form-crawler-notifications", + "pause": 0, + "pause-time-left": 0, + "total-jobs": 24, + }, + metadata: map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "2"}, + isActive: true, + }, + { + response: map[string]interface{}{ + "cmd-delete": 18, + "cmd-pause-tube": 0, + "current-jobs-buried": 0, + "current-jobs-delayed": 0, + "current-jobs-ready": 1, + "current-jobs-reserved": 0, + "current-jobs-urgent": 0, + "current-using": 3, + "current-waiting": 3, + "current-watching": 3, + "name": "form-crawler-notifications", + "pause": 0, + "pause-time-left": 0, + "total-jobs": 24, + }, + metadata: map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "3", "activationValue": "2"}, + isActive: false, + }, + { + response: map[string]interface{}{ + "cmd-delete": 18, + "cmd-pause-tube": 0, + "current-jobs-buried": 0, + "current-jobs-delayed": 10, + "current-jobs-ready": 0, + "current-jobs-reserved": 0, + "current-jobs-urgent": 0, + "current-using": 3, + "current-waiting": 3, + "current-watching": 3, + "name": "form-crawler-notifications", + "pause": 0, + "pause-time-left": 0, + "total-jobs": 24, + }, + metadata: map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "2"}, + isActive: false, + }, + { + response: map[string]interface{}{ + "cmd-delete": 18, + "cmd-pause-tube": 0, + "current-jobs-buried": 0, + "current-jobs-delayed": 10, + "current-jobs-ready": 0, + "current-jobs-reserved": 0, + "current-jobs-urgent": 0, + "current-using": 3, + "current-waiting": 3, + "current-watching": 3, + "name": "form-crawler-notifications", + "pause": 0, + "pause-time-left": 0, + "total-jobs": 24, + }, + metadata: map[string]string{"server": beanstalkdServer, "tube": "no-delayed", "value": "2", "includeDelayed": "true"}, + isActive: true, + }, +} + +func TestBeanstalkdParseMetadata(t *testing.T) { + for idx, testData := range testBeanstalkdMetadata { + meta, err := parseBeanstalkdMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Errorf("Expected error but got success in test case %d", idx) + } + if err == nil { + if val, ok := testData.metadata["includeDelayed"]; !ok { + assert.Equal(t, false, meta.includeDelayed) + } else { + boolVal, err := strconv.ParseBool(val) + if err != nil { + assert.Equal(t, boolVal, meta.includeDelayed) + } + } + } + } +} + +func TestBeanstalkdGetMetricSpecForScaling(t *testing.T) { + for _, testData := range beanstalkdMetricIdentifiers { + meta, err := parseBeanstalkdMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: nil, TriggerIndex: testData.index}) + if err != nil { + t.Fatal("could not parse metadata", err) + } + mockBeanstalkdScaler := beanstalkdScaler{ + metadata: meta, + connection: nil, + tube: nil, + } + + metricSpec := mockBeanstalkdScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + assert.Equal(t, testData.name, metricName, "correct external source name") + } +} + +func TestGetTubeStats(t *testing.T) { + for _, testData := range testTubeStatsTestData { + testData := testData + yamlData, err := yaml.Marshal(testData.response) + if err != nil { + t.Fatal(err) + } + + response := []byte(fmt.Sprintf("OK %d\r\n", len(yamlData))) + response = append(response, yamlData...) + response = append(response, []byte("\r\n")...) + createTestServer(t, response) + + s, err := NewBeanstalkdScaler( + &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadata, + GlobalHTTPTimeout: 1000 * time.Millisecond, + }, + ) + + assert.NoError(t, err) + + ctx := context.TODO() + _, active, err := s.GetMetricsAndActivity(ctx, "Metric") + + assert.NoError(t, err) + + assert.Equal(t, testData.isActive, active) + } +} + +func TestGetTubeStatsNotFound(t *testing.T) { + testData := testTubeStatsTestData[0] + createTestServer(t, []byte("NOT_FOUND\r\n")) + s, err := NewBeanstalkdScaler( + &scalersconfig.ScalerConfig{ + TriggerMetadata: testData.metadata, + GlobalHTTPTimeout: 1000 * time.Millisecond, + }, + ) + + assert.NoError(t, err) + + ctx := context.TODO() + _, active, err := s.GetMetricsAndActivity(ctx, "Metric") + + assert.NoError(t, err) + assert.False(t, active) +} + +func createTestServer(t *testing.T, response []byte) { + list, err := net.Listen("tcp", ":3000") + if err != nil { + t.Fatal(err) + } + go func() { + defer list.Close() + conn, err := list.Accept() + if err != nil { + return + } + + _, err = conn.Write(response) + assert.NoError(t, err) + conn.Close() + }() +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 1f4549c7ffa..80bfb40658f 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -152,6 +152,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewAzureQueueScaler(config) case "azure-servicebus": return scalers.NewAzureServiceBusScaler(ctx, config) + case "beanstalkd": + return scalers.NewBeanstalkdScaler(config) case "cassandra": return scalers.NewCassandraScaler(config) case "couchdb": diff --git a/tests/scalers/beanstalkd/beanstalkd_test.go b/tests/scalers/beanstalkd/beanstalkd_test.go new file mode 100644 index 00000000000..db108a06d34 --- /dev/null +++ b/tests/scalers/beanstalkd/beanstalkd_test.go @@ -0,0 +1,227 @@ +//go:build e2e +// +build e2e + +package beanstalkd_test + +import ( + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file +var _ = godotenv.Load("../../.env") + +const ( + testName = "beanstalkd-test" + deploymentName = "beanstalkd-consumer-deployment" + beanstalkdPutJobName = "beanstalkd-put-job" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + beanstalkdDeploymentName = fmt.Sprintf("%s-beanstalkd-deployment", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + beanstalkdTubeName = "default" +) + +type templateData struct { + TestNamespace string + BeanstalkdDeploymentName string + BeanstalkdPutJobName string + ScaledObjectName string + DeploymentName string + BeanstalkdTubeName string +} + +const ( + beanstalkdDeploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: beanstalkd + name: {{.BeanstalkdDeploymentName}} + namespace: {{.TestNamespace}} +spec: + selector: + matchLabels: + app: beanstalkd + template: + metadata: + labels: + app: beanstalkd + spec: + containers: + - image: docker.io/schickling/beanstalkd + name: beanstalkd + ports: + - containerPort: 11300 + name: beanstalkd + readinessProbe: + tcpSocket: + port: 11300 + initialDelaySeconds: 5 + periodSeconds: 10 +--- +apiVersion: v1 +kind: Service +metadata: + name: beanstalkd + namespace: {{.TestNamespace}} +spec: + ports: + - name: beanstalkd + port: 11300 + targetPort: 11300 + selector: + app: beanstalkd + type: ClusterIP +` + + scaledObjectActivationTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 3 + triggers: + - type: beanstalkd + metadata: + server: beanstalkd.{{.TestNamespace}}:11300 + value: "5" + activationValue: "15" + tube: {{.BeanstalkdTubeName}} +` + + beanstalkdPutJobsTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: {{.BeanstalkdPutJobName}} + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - name: beanstalkd-put-job + image: docker.io/sitecrafting/beanstalkd-cli + command: ["/bin/sh"] + args: ["-c", "for run in $(seq 1 10); do beanstalkd-cli --host=beanstalkd put \"Test Job\"; done;"] + restartPolicy: OnFailure +` + + scaledObjectDelayedTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + maxReplicaCount: 5 + minReplicaCount: 1 + triggers: + - type: beanstalkd + metadata: + server: beanstalkd.{{.TestNamespace}}:11300 + value: "5" + tube: {{.BeanstalkdTubeName}} + includeDelayed: "true" +` + + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: nginx-deployment +spec: + replicas: 0 + selector: + matchLabels: + app: nginx-deployment + template: + metadata: + labels: + app: nginx-deployment + spec: + containers: + - name: nginx-deployment + image: nginxinc/nginx-unprivileged + ports: + - containerPort: 80 +` +) + +func TestBeanstalkdScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + t.Cleanup(func() { + DeleteKubernetesResources(t, testNamespace, data, templates) + }) + + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, beanstalkdDeploymentName, testNamespace, 1, 60, 1), + "replica count should be 0 after a minute") + + // Add beanstalkd jobs + addBeanstalkdJobs(t, kc, &data) + + // test activation + testActivation(t, kc, data) + + // test scaling + testScale(t, kc, data) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + ScaledObjectName: scaledObjectName, + DeploymentName: deploymentName, + BeanstalkdDeploymentName: beanstalkdDeploymentName, + BeanstalkdTubeName: beanstalkdTubeName, + BeanstalkdPutJobName: beanstalkdPutJobName, + }, []Template{ + {Name: "beanstalkdDeploymentTemplate", Config: beanstalkdDeploymentTemplate}, + {Name: "deploymentTemplate", Config: deploymentTemplate}, + } +} + +func addBeanstalkdJobs(t *testing.T, kc *kubernetes.Clientset, data *templateData) { + // run putJob + KubectlReplaceWithTemplate(t, data, "beanstalkdPutJobsTemplate", beanstalkdPutJobsTemplate) + require.True(t, WaitForJobSuccess(t, kc, beanstalkdPutJobName, testNamespace, 30, 2), "Job should run successfully") +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing activation---") + + KubectlApplyWithTemplate(t, data, "scaledObjectActivationTemplate", scaledObjectActivationTemplate) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) +} + +func testScale(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scaling---") + KubectlApplyWithTemplate(t, data, "scaledObjectDelayedTemplate", scaledObjectDelayedTemplate) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 1), + "replica count should be 2 after a minute") +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/License b/vendor/github.com/beanstalkd/go-beanstalk/License new file mode 100644 index 00000000000..183c3898c36 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/License @@ -0,0 +1,22 @@ +Copyright 2012 Keith Rarick + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without +restriction, including without limitation the rights to use, +copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the +Software is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/beanstalkd/go-beanstalk/Readme.md b/vendor/github.com/beanstalkd/go-beanstalk/Readme.md new file mode 100644 index 00000000000..15fd3b96330 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/Readme.md @@ -0,0 +1,19 @@ +# Beanstalk + +Go client for [beanstalkd](https://beanstalkd.github.io). + +## Install + + $ go get github.com/beanstalkd/go-beanstalk + +## Use + +Produce jobs: + + c, err := beanstalk.Dial("tcp", "127.0.0.1:11300") + id, err := c.Put([]byte("hello"), 1, 0, 120*time.Second) + +Consume jobs: + + c, err := beanstalk.Dial("tcp", "127.0.0.1:11300") + id, body, err := c.Reserve(5 * time.Second) diff --git a/vendor/github.com/beanstalkd/go-beanstalk/conn.go b/vendor/github.com/beanstalkd/go-beanstalk/conn.go new file mode 100644 index 00000000000..7c3eb40d83c --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/conn.go @@ -0,0 +1,295 @@ +package beanstalk + +import ( + "fmt" + "io" + "net" + "net/textproto" + "strings" + "time" +) + +// DefaultDialTimeout is the time to wait for a connection to the beanstalk server. +const DefaultDialTimeout = 10 * time.Second + +// DefaultKeepAlivePeriod is the default period between TCP keepalive messages. +const DefaultKeepAlivePeriod = 10 * time.Second + +// A Conn represents a connection to a beanstalkd server. It consists +// of a default Tube and TubeSet as well as the underlying network +// connection. The embedded types carry methods with them; see the +// documentation of those types for details. +type Conn struct { + c *textproto.Conn + used string + watched map[string]bool + Tube + TubeSet +} + +var ( + space = []byte{' '} + crnl = []byte{'\r', '\n'} + yamlHead = []byte{'-', '-', '-', '\n'} + nl = []byte{'\n'} + colonSpace = []byte{':', ' '} + minusSpace = []byte{'-', ' '} +) + +// NewConn returns a new Conn using conn for I/O. +func NewConn(conn io.ReadWriteCloser) *Conn { + c := new(Conn) + c.c = textproto.NewConn(conn) + c.Tube = *NewTube(c, "default") + c.TubeSet = *NewTubeSet(c, "default") + c.used = "default" + c.watched = map[string]bool{"default": true} + return c +} + +// Dial connects addr on the given network using net.DialTimeout +// with a default timeout of 10s and then returns a new Conn for the connection. +func Dial(network, addr string) (*Conn, error) { + return DialTimeout(network, addr, DefaultDialTimeout) +} + +// DialTimeout connects addr on the given network using net.DialTimeout +// with a supplied timeout and then returns a new Conn for the connection. +func DialTimeout(network, addr string, timeout time.Duration) (*Conn, error) { + dialer := &net.Dialer{ + Timeout: timeout, + KeepAlive: DefaultKeepAlivePeriod, + } + c, err := dialer.Dial(network, addr) + if err != nil { + return nil, err + } + return NewConn(c), nil +} + +// Close closes the underlying network connection. +func (c *Conn) Close() error { + return c.c.Close() +} + +func (c *Conn) cmd(t *Tube, ts *TubeSet, body []byte, op string, args ...interface{}) (req, error) { + // negative dur checking + for _, arg := range args { + if d, _ := arg.(dur); d < 0 { + return req{}, fmt.Errorf("duration must be non-negative, got %v", time.Duration(d)) + } + } + + r := req{c.c.Next(), op} + c.c.StartRequest(r.id) + defer c.c.EndRequest(r.id) + err := c.adjustTubes(t, ts) + if err != nil { + return req{}, err + } + if body != nil { + args = append(args, len(body)) + } + c.printLine(op, args...) + if body != nil { + c.c.W.Write(body) + c.c.W.Write(crnl) + } + err = c.c.W.Flush() + if err != nil { + return req{}, ConnError{c, op, err} + } + return r, nil +} + +func (c *Conn) adjustTubes(t *Tube, ts *TubeSet) error { + if t != nil && t.Name != c.used { + if err := checkName(t.Name); err != nil { + return err + } + c.printLine("use", t.Name) + c.used = t.Name + } + if ts != nil { + for s := range ts.Name { + if !c.watched[s] { + if err := checkName(s); err != nil { + return err + } + c.printLine("watch", s) + } + } + for s := range c.watched { + if !ts.Name[s] { + c.printLine("ignore", s) + } + } + c.watched = make(map[string]bool) + for s := range ts.Name { + c.watched[s] = true + } + } + return nil +} + +// does not flush +func (c *Conn) printLine(cmd string, args ...interface{}) { + io.WriteString(c.c.W, cmd) + for _, a := range args { + c.c.W.Write(space) + fmt.Fprint(c.c.W, a) + } + c.c.W.Write(crnl) +} + +func (c *Conn) readResp(r req, readBody bool, f string, a ...interface{}) (body []byte, err error) { + c.c.StartResponse(r.id) + defer c.c.EndResponse(r.id) + line, err := c.c.ReadLine() + for strings.HasPrefix(line, "WATCHING ") || strings.HasPrefix(line, "USING ") { + line, err = c.c.ReadLine() + } + if err != nil { + return nil, ConnError{c, r.op, err} + } + toScan := line + if readBody { + var size int + toScan, size, err = parseSize(toScan) + if err != nil { + return nil, ConnError{c, r.op, err} + } + body = make([]byte, size+2) // include trailing CR NL + _, err = io.ReadFull(c.c.R, body) + if err != nil { + return nil, ConnError{c, r.op, err} + } + body = body[:size] // exclude trailing CR NL + } + + err = scan(toScan, f, a...) + if err != nil { + return nil, ConnError{c, r.op, err} + } + return body, nil +} + +// Delete deletes the given job. +func (c *Conn) Delete(id uint64) error { + r, err := c.cmd(nil, nil, nil, "delete", id) + if err != nil { + return err + } + _, err = c.readResp(r, false, "DELETED") + return err +} + +// Release tells the server to perform the following actions: +// set the priority of the given job to pri, remove it from the list of +// jobs reserved by c, wait delay seconds, then place the job in the +// ready queue, which makes it available for reservation by any client. +func (c *Conn) Release(id uint64, pri uint32, delay time.Duration) error { + r, err := c.cmd(nil, nil, nil, "release", id, pri, dur(delay)) + if err != nil { + return err + } + _, err = c.readResp(r, false, "RELEASED") + return err +} + +// Bury places the given job in a holding area in the job's tube and +// sets its priority to pri. The job will not be scheduled again until it +// has been kicked; see also the documentation of Kick. +func (c *Conn) Bury(id uint64, pri uint32) error { + r, err := c.cmd(nil, nil, nil, "bury", id, pri) + if err != nil { + return err + } + _, err = c.readResp(r, false, "BURIED") + return err +} + +// KickJob places the given job to the ready queue of the same tube where it currently belongs +// when the given job id exists and is in a buried or delayed state. +func (c *Conn) KickJob(id uint64) error { + r, err := c.cmd(nil, nil, nil, "kick-job", id) + if err != nil { + return err + } + _, err = c.readResp(r, false, "KICKED") + return err +} + +// Touch resets the reservation timer for the given job. +// It is an error if the job isn't currently reserved by c. +// See the documentation of Reserve for more details. +func (c *Conn) Touch(id uint64) error { + r, err := c.cmd(nil, nil, nil, "touch", id) + if err != nil { + return err + } + _, err = c.readResp(r, false, "TOUCHED") + return err +} + +// Peek gets a copy of the specified job from the server. +func (c *Conn) Peek(id uint64) (body []byte, err error) { + r, err := c.cmd(nil, nil, nil, "peek", id) + if err != nil { + return nil, err + } + return c.readResp(r, true, "FOUND %d", &id) +} + +// ReserveJob reserves the specified job by id from the server. +func (c *Conn) ReserveJob(id uint64) (body []byte, err error) { + r, err := c.cmd(nil, nil, nil, "reserve-job", id) + if err != nil { + return nil, err + } + return c.readResp(r, true, "RESERVED %d", &id) +} + +// Stats retrieves global statistics from the server. +func (c *Conn) Stats() (map[string]string, error) { + r, err := c.cmd(nil, nil, nil, "stats") + if err != nil { + return nil, err + } + body, err := c.readResp(r, true, "OK") + return parseDict(body), err +} + +// StatsJob retrieves statistics about the given job. +func (c *Conn) StatsJob(id uint64) (map[string]string, error) { + r, err := c.cmd(nil, nil, nil, "stats-job", id) + if err != nil { + return nil, err + } + body, err := c.readResp(r, true, "OK") + return parseDict(body), err +} + +// ListTubes returns the names of the tubes that currently +// exist on the server. +func (c *Conn) ListTubes() ([]string, error) { + r, err := c.cmd(nil, nil, nil, "list-tubes") + if err != nil { + return nil, err + } + body, err := c.readResp(r, true, "OK") + return parseList(body), err +} + +func scan(input, format string, a ...interface{}) error { + _, err := fmt.Sscanf(input, format, a...) + if err != nil { + return findRespError(input) + } + return nil +} + +type req struct { + id uint + op string +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/doc.go b/vendor/github.com/beanstalkd/go-beanstalk/doc.go new file mode 100644 index 00000000000..7bb685e008a --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/doc.go @@ -0,0 +1,6 @@ +// Package beanstalk provides a client for the beanstalk protocol. +// See http://kr.github.com/beanstalkd/ for the server. +// +// This package is synchronized internally and safe to use from +// multiple goroutines without other coordination. +package beanstalk diff --git a/vendor/github.com/beanstalkd/go-beanstalk/err.go b/vendor/github.com/beanstalkd/go-beanstalk/err.go new file mode 100644 index 00000000000..66a38512a1b --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/err.go @@ -0,0 +1,63 @@ +package beanstalk + +import "errors" + +// ConnError records an error message from the server and the operation +// and connection that caused it. +type ConnError struct { + Conn *Conn + Op string + Err error +} + +func (e ConnError) Error() string { + return e.Op + ": " + e.Err.Error() +} + +func (e ConnError) Unwrap() error { + return e.Err +} + +// Error messages returned by the server. +var ( + ErrBadFormat = errors.New("bad command format") + ErrBuried = errors.New("buried") + ErrDeadline = errors.New("deadline soon") + ErrDraining = errors.New("draining") + ErrInternal = errors.New("internal error") + ErrJobTooBig = errors.New("job too big") + ErrNoCRLF = errors.New("expected CR LF") + ErrNotFound = errors.New("not found") + ErrNotIgnored = errors.New("not ignored") + ErrOOM = errors.New("server is out of memory") + ErrTimeout = errors.New("timeout") + ErrUnknown = errors.New("unknown command") +) + +var respError = map[string]error{ + "BAD_FORMAT": ErrBadFormat, + "BURIED": ErrBuried, + "DEADLINE_SOON": ErrDeadline, + "DRAINING": ErrDraining, + "EXPECTED_CRLF": ErrNoCRLF, + "INTERNAL_ERROR": ErrInternal, + "JOB_TOO_BIG": ErrJobTooBig, + "NOT_FOUND": ErrNotFound, + "NOT_IGNORED": ErrNotIgnored, + "OUT_OF_MEMORY": ErrOOM, + "TIMED_OUT": ErrTimeout, + "UNKNOWN_COMMAND": ErrUnknown, +} + +type unknownRespError string + +func (e unknownRespError) Error() string { + return "unknown response: " + string(e) +} + +func findRespError(s string) error { + if err := respError[s]; err != nil { + return err + } + return unknownRespError(s) +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/name.go b/vendor/github.com/beanstalkd/go-beanstalk/name.go new file mode 100644 index 00000000000..5a85b2d41c6 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/name.go @@ -0,0 +1,55 @@ +package beanstalk + +import ( + "errors" +) + +// NameChars are the allowed name characters in the beanstalkd protocol. +const NameChars = `\-+/;.$_()0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz` + +// NameError indicates that a name was malformed and the specific error +// describing how. +type NameError struct { + Name string + Err error +} + +func (e NameError) Error() string { + return e.Err.Error() + ": " + e.Name +} + +func (e NameError) Unwrap() error { + return e.Err +} + +// Name format errors. The Err field of NameError contains one of these. +var ( + ErrEmpty = errors.New("name is empty") + ErrBadChar = errors.New("name has bad char") // contains a character not in NameChars + ErrTooLong = errors.New("name is too long") +) + +func checkName(s string) error { + switch { + case len(s) == 0: + return NameError{s, ErrEmpty} + case len(s) >= 200: + return NameError{s, ErrTooLong} + case !containsOnly(s, NameChars): + return NameError{s, ErrBadChar} + } + return nil +} + +func containsOnly(s, chars string) bool { +outer: + for _, c := range s { + for _, m := range chars { + if c == m { + continue outer + } + } + return false + } + return true +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/parse.go b/vendor/github.com/beanstalkd/go-beanstalk/parse.go new file mode 100644 index 00000000000..091ab86e8f3 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/parse.go @@ -0,0 +1,54 @@ +package beanstalk + +import ( + "bytes" + "strconv" + "strings" +) + +func parseDict(dat []byte) map[string]string { + if dat == nil { + return nil + } + d := make(map[string]string) + if bytes.HasPrefix(dat, yamlHead) { + dat = dat[4:] + } + for _, s := range bytes.Split(dat, nl) { + kv := bytes.SplitN(s, colonSpace, 2) + if len(kv) != 2 { + continue + } + d[string(kv[0])] = string(kv[1]) + } + return d +} + +func parseList(dat []byte) []string { + if dat == nil { + return nil + } + l := []string{} + if bytes.HasPrefix(dat, yamlHead) { + dat = dat[4:] + } + for _, s := range bytes.Split(dat, nl) { + if !bytes.HasPrefix(s, minusSpace) { + continue + } + l = append(l, string(s[2:])) + } + return l +} + +func parseSize(s string) (string, int, error) { + i := strings.LastIndex(s, " ") + if i == -1 { + return "", 0, findRespError(s) + } + n, err := strconv.Atoi(s[i+1:]) + if err != nil { + return "", 0, err + } + return s[:i], n, nil +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/time.go b/vendor/github.com/beanstalkd/go-beanstalk/time.go new file mode 100644 index 00000000000..fd128cbd849 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/time.go @@ -0,0 +1,12 @@ +package beanstalk + +import ( + "strconv" + "time" +) + +type dur time.Duration + +func (d dur) String() string { + return strconv.FormatInt(int64(time.Duration(d)/time.Second), 10) +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/tube.go b/vendor/github.com/beanstalkd/go-beanstalk/tube.go new file mode 100644 index 00000000000..fe7baf7e536 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/tube.go @@ -0,0 +1,112 @@ +package beanstalk + +import ( + "time" +) + +// Tube represents tube Name on the server connected to by Conn. +// It has methods for commands that operate on a single tube. +type Tube struct { + Conn *Conn + Name string +} + +// NewTube returns a new Tube representing the given name. +func NewTube(c *Conn, name string) *Tube { + return &Tube{c, name} +} + +// Put puts a job into tube t with priority pri and TTR ttr, and returns +// the id of the newly-created job. If delay is nonzero, the server will +// wait the given amount of time after returning to the client and before +// putting the job into the ready queue. +func (t *Tube) Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error) { + r, err := t.Conn.cmd(t, nil, body, "put", pri, dur(delay), dur(ttr)) + if err != nil { + return 0, err + } + _, err = t.Conn.readResp(r, false, "INSERTED %d", &id) + if err != nil { + return 0, err + } + return id, nil +} + +// PeekReady gets a copy of the job at the front of t's ready queue. +func (t *Tube) PeekReady() (id uint64, body []byte, err error) { + r, err := t.Conn.cmd(t, nil, nil, "peek-ready") + if err != nil { + return 0, nil, err + } + body, err = t.Conn.readResp(r, true, "FOUND %d", &id) + if err != nil { + return 0, nil, err + } + return id, body, nil +} + +// PeekDelayed gets a copy of the delayed job that is next to be +// put in t's ready queue. +func (t *Tube) PeekDelayed() (id uint64, body []byte, err error) { + r, err := t.Conn.cmd(t, nil, nil, "peek-delayed") + if err != nil { + return 0, nil, err + } + body, err = t.Conn.readResp(r, true, "FOUND %d", &id) + if err != nil { + return 0, nil, err + } + return id, body, nil +} + +// PeekBuried gets a copy of the job in the holding area that would +// be kicked next by Kick. +func (t *Tube) PeekBuried() (id uint64, body []byte, err error) { + r, err := t.Conn.cmd(t, nil, nil, "peek-buried") + if err != nil { + return 0, nil, err + } + body, err = t.Conn.readResp(r, true, "FOUND %d", &id) + if err != nil { + return 0, nil, err + } + return id, body, nil +} + +// Kick takes up to bound jobs from the holding area and moves them into +// the ready queue, then returns the number of jobs moved. Jobs will be +// taken in the order in which they were last buried. +func (t *Tube) Kick(bound int) (n int, err error) { + r, err := t.Conn.cmd(t, nil, nil, "kick", bound) + if err != nil { + return 0, err + } + _, err = t.Conn.readResp(r, false, "KICKED %d", &n) + if err != nil { + return 0, err + } + return n, nil +} + +// Stats retrieves statistics about tube t. +func (t *Tube) Stats() (map[string]string, error) { + r, err := t.Conn.cmd(nil, nil, nil, "stats-tube", t.Name) + if err != nil { + return nil, err + } + body, err := t.Conn.readResp(r, true, "OK") + return parseDict(body), err +} + +// Pause pauses new reservations in t for time d. +func (t *Tube) Pause(d time.Duration) error { + r, err := t.Conn.cmd(nil, nil, nil, "pause-tube", t.Name, dur(d)) + if err != nil { + return err + } + _, err = t.Conn.readResp(r, false, "PAUSED") + if err != nil { + return err + } + return nil +} diff --git a/vendor/github.com/beanstalkd/go-beanstalk/tubeset.go b/vendor/github.com/beanstalkd/go-beanstalk/tubeset.go new file mode 100644 index 00000000000..0b431e011c4 --- /dev/null +++ b/vendor/github.com/beanstalkd/go-beanstalk/tubeset.go @@ -0,0 +1,39 @@ +package beanstalk + +import ( + "time" +) + +// TubeSet represents a set of tubes on the server connected to by Conn. +// Name names the tubes represented. +type TubeSet struct { + Conn *Conn + Name map[string]bool +} + +// NewTubeSet returns a new TubeSet representing the given names. +func NewTubeSet(c *Conn, name ...string) *TubeSet { + ts := &TubeSet{c, make(map[string]bool)} + for _, s := range name { + ts.Name[s] = true + } + return ts +} + +// Reserve reserves and returns a job from one of the tubes in t. If no +// job is available before time timeout has passed, Reserve returns a +// ConnError recording ErrTimeout. +// +// Typically, a client will reserve a job, perform some work, then delete +// the job with Conn.Delete. +func (t *TubeSet) Reserve(timeout time.Duration) (id uint64, body []byte, err error) { + r, err := t.Conn.cmd(nil, t, nil, "reserve-with-timeout", dur(timeout)) + if err != nil { + return 0, nil, err + } + body, err = t.Conn.readResp(r, true, "RESERVED %d", &id) + if err != nil { + return 0, nil, err + } + return id, body, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b2138587ef9..e343c3ad1dc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -456,6 +456,9 @@ github.com/aws/smithy-go/time github.com/aws/smithy-go/transport/http github.com/aws/smithy-go/transport/http/internal/io github.com/aws/smithy-go/waiter +# github.com/beanstalkd/go-beanstalk v0.2.0 +## explicit; go 1.14 +github.com/beanstalkd/go-beanstalk # github.com/beorn7/perks v1.0.1 ## explicit; go 1.11 github.com/beorn7/perks/quantile From c83b920d50bac673992eb8a6c1221043e31eb1bf Mon Sep 17 00:00:00 2001 From: Sam Barnes-Thornton Date: Thu, 15 Aug 2024 15:39:12 +0000 Subject: [PATCH 2/8] Update changelog Signed-off by: Sam Barnes-Thornton Signed-off-by: Sam Barnes-Thornton --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d58f6d186ba..0598cb00121 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New +- **Beanstalkd**: Introduce new beanstalkd scaler ([#5901](https://github.com/kedacore/keda/issues/5901)) - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) #### Experimental From 0445b4642c4b498821f79862419cb218bb422d40 Mon Sep 17 00:00:00 2001 From: Sam Barnes-Thornton Date: Thu, 15 Aug 2024 15:52:18 +0000 Subject: [PATCH 3/8] Fix semgrep issues Signed-off-by: Sam Barnes-Thornton --- pkg/scalers/beanstalkd_scaler_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/beanstalkd_scaler_test.go b/pkg/scalers/beanstalkd_scaler_test.go index e93a553a28d..b5de6f0df0f 100644 --- a/pkg/scalers/beanstalkd_scaler_test.go +++ b/pkg/scalers/beanstalkd_scaler_test.go @@ -210,7 +210,7 @@ func TestGetTubeStats(t *testing.T) { assert.NoError(t, err) - ctx := context.TODO() + ctx := context.Background() _, active, err := s.GetMetricsAndActivity(ctx, "Metric") assert.NoError(t, err) @@ -231,7 +231,7 @@ func TestGetTubeStatsNotFound(t *testing.T) { assert.NoError(t, err) - ctx := context.TODO() + ctx := context.Background() _, active, err := s.GetMetricsAndActivity(ctx, "Metric") assert.NoError(t, err) @@ -239,7 +239,7 @@ func TestGetTubeStatsNotFound(t *testing.T) { } func createTestServer(t *testing.T, response []byte) { - list, err := net.Listen("tcp", ":3000") + list, err := net.Listen("tcp", "localhost:3000") if err != nil { t.Fatal(err) } From 1ccb18ab599209f3107e3dc5a42078ad319f32a3 Mon Sep 17 00:00:00 2001 From: Sam Barnes-Thornton Date: Thu, 15 Aug 2024 16:23:48 +0000 Subject: [PATCH 4/8] Fix ordering of changelog Signed-off-by: Sam Barnes-Thornton --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0598cb00121..99b09f93b51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -113,8 +113,8 @@ New deprecation(s): - **General**: Add `--ca-dir` flag to KEDA operator to specify directories with CA certificates for scalers to authenticate TLS connections (defaults to /custom/ca) ([#5860](https://github.com/kedacore/keda/issues/5860)) - **General**: Add Dynatrace Scaler ([#5685](https://github.com/kedacore/keda/pull/5685)) -- **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114)) - **General**: Add Splunk Scaler ([#5904](https://github.com/kedacore/keda/issues/5904)) +- **General**: Added `eagerScalingStrategy` for `ScaledJob` ([#5114](https://github.com/kedacore/keda/issues/5114)) - **General**: Provide CloudEvents around the management of ScaledObjects resources ([#3522](https://github.com/kedacore/keda/issues/3522)) - **General**: Support for Kubernetes v1.30 ([#5828](https://github.com/kedacore/keda/issues/5828)) From 342c6afa840a40f827dfeb70dd3cbd0c798f7685 Mon Sep 17 00:00:00 2001 From: Sam Barnes-Thornton Date: Mon, 19 Aug 2024 08:31:20 +0000 Subject: [PATCH 5/8] Update metadata parsing Signed-off-by: Sam Barnes-Thornton --- pkg/scalers/beanstalkd_scaler.go | 125 ++++++-------------------- pkg/scalers/beanstalkd_scaler_test.go | 6 +- 2 files changed, 32 insertions(+), 99 deletions(-) diff --git a/pkg/scalers/beanstalkd_scaler.go b/pkg/scalers/beanstalkd_scaler.go index 73976ac1274..a658b4f104f 100644 --- a/pkg/scalers/beanstalkd_scaler.go +++ b/pkg/scalers/beanstalkd_scaler.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "net/url" - "strconv" "time" beanstalk "github.com/beanstalkd/go-beanstalk" @@ -26,22 +25,22 @@ const ( beanstalkdNetworkProtocol = "tcp" ) -type beanstalkdScaler struct { +type BeanstalkdScaler struct { metricType v2.MetricTargetType - metadata *beanstalkdMetadata + metadata *BeanstalkdMetadata connection *beanstalk.Conn tube *beanstalk.Tube logger logr.Logger } -type beanstalkdMetadata struct { - server string - tube string - value float64 - activationValue float64 - includeDelayed bool - timeout time.Duration - triggerIndex int +type BeanstalkdMetadata struct { + Server string `keda:"name=server, order=triggerMetadata"` + Tube string `keda:"name=tube, order=triggerMetadata"` + Value float64 `keda:"name=value, order=triggerMetadata"` + ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional"` + IncludeDelayed bool `keda:"name=includeDelayed, order=triggerMetadata, optional"` + Timeout uint `keda:"name=timeout, order=triggerMetadata, optional, default=30"` + TriggerIndex int } // TubeStats represents a set of tube statistics. @@ -55,7 +54,7 @@ type tubeStats struct { } func NewBeanstalkdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { - s := &beanstalkdScaler{} + s := &BeanstalkdScaler{} metricType, err := GetMetricTargetType(config) if err != nil { @@ -71,98 +70,32 @@ func NewBeanstalkdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { } s.metadata = meta - conn, err := beanstalk.DialTimeout(beanstalkdNetworkProtocol, s.metadata.server, s.metadata.timeout) + timeout := time.Duration(s.metadata.Timeout) * time.Second + + conn, err := beanstalk.DialTimeout(beanstalkdNetworkProtocol, s.metadata.Server, timeout) if err != nil { return nil, fmt.Errorf("error connecting to beanstalkd: %w", err) } s.connection = conn - s.tube = beanstalk.NewTube(s.connection, meta.tube) + s.tube = beanstalk.NewTube(s.connection, meta.Tube) return s, nil } -func parseBeanstalkdMetadata(config *scalersconfig.ScalerConfig) (*beanstalkdMetadata, error) { - meta := beanstalkdMetadata{} - - if err := parseServerValue(config, &meta); err != nil { - return nil, err - } - - if val, ok := config.TriggerMetadata["tube"]; ok { - meta.tube = val - } else { - return nil, fmt.Errorf("no queue name given") - } - - if err := parseTimeout(config, &meta); err != nil { - return nil, err - } +func parseBeanstalkdMetadata(config *scalersconfig.ScalerConfig) (*BeanstalkdMetadata, error) { + meta := &BeanstalkdMetadata{} - meta.includeDelayed = false - if val, ok := config.TriggerMetadata["includeDelayed"]; ok { - boolVal, err := strconv.ParseBool(val) - if err != nil { - return nil, fmt.Errorf("failed to parse includeDelayed value. Must be either true or false") - } - meta.includeDelayed = boolVal - } - - value, valuePresent := config.TriggerMetadata[beanstalkdValueConfigName] - activationValue, activationValuePresent := config.TriggerMetadata[beanstalkdActivationValueTriggerConfigName] - - if activationValuePresent { - activation, err := strconv.ParseFloat(activationValue, 64) - if err != nil { - return nil, fmt.Errorf("can't parse %s: %w", beanstalkdActivationValueTriggerConfigName, err) - } - meta.activationValue = activation - } - - if !valuePresent { - return nil, fmt.Errorf("%s must be specified", beanstalkdValueConfigName) - } - triggerValue, err := strconv.ParseFloat(value, 64) - if err != nil { - return nil, fmt.Errorf("can't parse %s: %w", beanstalkdValueConfigName, err) - } - meta.value = triggerValue - - meta.triggerIndex = config.TriggerIndex - - return &meta, nil -} - -func parseServerValue(config *scalersconfig.ScalerConfig, meta *beanstalkdMetadata) error { - switch { - case config.AuthParams["server"] != "": - meta.server = config.AuthParams["server"] - case config.TriggerMetadata["server"] != "": - meta.server = config.TriggerMetadata["server"] - default: - return fmt.Errorf("no server setting given") + meta.TriggerIndex = config.TriggerIndex + if err := config.TypedConfig(meta); err != nil { + return nil, fmt.Errorf("error parsing beanstalkd metadata: %w", err) } - return nil -} -func parseTimeout(config *scalersconfig.ScalerConfig, meta *beanstalkdMetadata) error { - if val, ok := config.TriggerMetadata["timeout"]; ok { - timeoutMS, err := strconv.Atoi(val) - if err != nil { - return fmt.Errorf("unable to parse timeout: %w", err) - } - if timeoutMS <= 0 { - return fmt.Errorf("timeout must be greater than 0: %w", err) - } - meta.timeout = time.Duration(timeoutMS) * time.Millisecond - } else { - meta.timeout = config.GlobalHTTPTimeout - } - return nil + return meta, nil } -func (s *beanstalkdScaler) getTubeStats(ctx context.Context) (*tubeStats, error) { +func (s *BeanstalkdScaler) getTubeStats(ctx context.Context) (*tubeStats, error) { errCh := make(chan error) statsCh := make(chan *tubeStats) @@ -202,7 +135,7 @@ func (s *beanstalkdScaler) getTubeStats(ctx context.Context) (*tubeStats, error) } } -func (s *beanstalkdScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { +func (s *BeanstalkdScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { stats, err := s.getTubeStats(ctx) if err != nil { return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error interacting with beanstalkd: %w", err) @@ -210,22 +143,22 @@ func (s *beanstalkdScaler) GetMetricsAndActivity(ctx context.Context, metricName totalJobs := stats.JobsReady + stats.JobsReserved - if s.metadata.includeDelayed { + if s.metadata.IncludeDelayed { totalJobs += stats.JobsDelayed } metric := GenerateMetricInMili(metricName, float64(totalJobs)) - isActive := float64(totalJobs) > s.metadata.activationValue + isActive := float64(totalJobs) > s.metadata.ActivationValue return []external_metrics.ExternalMetricValue{metric}, isActive, nil } -func (s *beanstalkdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { +func (s *BeanstalkdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ - Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, util.NormalizeString(fmt.Sprintf("beanstalkd-%s", url.QueryEscape(s.metadata.tube)))), + Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, util.NormalizeString(fmt.Sprintf("beanstalkd-%s", url.QueryEscape(s.metadata.Tube)))), }, - Target: GetMetricTargetMili(s.metricType, s.metadata.value), + Target: GetMetricTargetMili(s.metricType, s.metadata.Value), } metricSpec := v2.MetricSpec{ External: externalMetric, Type: beanstalkdMetricType, @@ -234,7 +167,7 @@ func (s *beanstalkdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricS return []v2.MetricSpec{metricSpec} } -func (s *beanstalkdScaler) Close(context.Context) error { +func (s *BeanstalkdScaler) Close(context.Context) error { if s.connection != nil { err := s.connection.Close() if err != nil { diff --git a/pkg/scalers/beanstalkd_scaler_test.go b/pkg/scalers/beanstalkd_scaler_test.go index b5de6f0df0f..11f77176b5c 100644 --- a/pkg/scalers/beanstalkd_scaler_test.go +++ b/pkg/scalers/beanstalkd_scaler_test.go @@ -159,11 +159,11 @@ func TestBeanstalkdParseMetadata(t *testing.T) { } if err == nil { if val, ok := testData.metadata["includeDelayed"]; !ok { - assert.Equal(t, false, meta.includeDelayed) + assert.Equal(t, false, meta.IncludeDelayed) } else { boolVal, err := strconv.ParseBool(val) if err != nil { - assert.Equal(t, boolVal, meta.includeDelayed) + assert.Equal(t, boolVal, meta.IncludeDelayed) } } } @@ -176,7 +176,7 @@ func TestBeanstalkdGetMetricSpecForScaling(t *testing.T) { if err != nil { t.Fatal("could not parse metadata", err) } - mockBeanstalkdScaler := beanstalkdScaler{ + mockBeanstalkdScaler := BeanstalkdScaler{ metadata: meta, connection: nil, tube: nil, From e57c2c268ca294b2d23c862f706b9a4d68d60b2e Mon Sep 17 00:00:00 2001 From: sbarnesthornton <56411235+sbarnesthornton@users.noreply.github.com> Date: Wed, 21 Aug 2024 08:50:24 +0100 Subject: [PATCH 6/8] Update CHANGELOG.md Co-authored-by: Jorge Turrado Ferrero Signed-off-by: sbarnesthornton <56411235+sbarnesthornton@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f9f6d1ec87..2d9baca1825 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,7 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- **Beanstalkd**: Introduce new beanstalkd scaler ([#5901](https://github.com/kedacore/keda/issues/5901)) +- **General**: Introduce new beanstalkd scaler ([#5901](https://github.com/kedacore/keda/issues/5901)) - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) - **CloudEventSource**: Provide CloudEvents around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523)) From 7ee5f7744ac0013b2a76acb67d7b8075e938ec95 Mon Sep 17 00:00:00 2001 From: Sam Barnes-Thornton Date: Thu, 22 Aug 2024 14:40:39 +0000 Subject: [PATCH 7/8] Use one scaledobject in tests Signed-off-by: Sam Barnes-Thornton --- tests/scalers/beanstalkd/beanstalkd_test.go | 96 +++++++++++++++------ 1 file changed, 68 insertions(+), 28 deletions(-) diff --git a/tests/scalers/beanstalkd/beanstalkd_test.go b/tests/scalers/beanstalkd/beanstalkd_test.go index db108a06d34..69857682f80 100644 --- a/tests/scalers/beanstalkd/beanstalkd_test.go +++ b/tests/scalers/beanstalkd/beanstalkd_test.go @@ -22,6 +22,7 @@ const ( testName = "beanstalkd-test" deploymentName = "beanstalkd-consumer-deployment" beanstalkdPutJobName = "beanstalkd-put-job" + beanstalkdPopJobName = "beanstalkd-pop-job" ) var ( @@ -29,15 +30,18 @@ var ( beanstalkdDeploymentName = fmt.Sprintf("%s-beanstalkd-deployment", testName) scaledObjectName = fmt.Sprintf("%s-so", testName) beanstalkdTubeName = "default" + activationJobCount = 5 ) type templateData struct { TestNamespace string BeanstalkdDeploymentName string BeanstalkdPutJobName string + BeanstalkdPopJobName string ScaledObjectName string DeploymentName string BeanstalkdTubeName string + JobCount int } const ( @@ -95,12 +99,14 @@ spec: scaleTargetRef: name: {{.DeploymentName}} maxReplicaCount: 3 + pollingInterval: 5 + cooldownPeriod: 10 triggers: - type: beanstalkd metadata: server: beanstalkd.{{.TestNamespace}}:11300 - value: "5" - activationValue: "15" + value: "15" + activationValue: "10" tube: {{.BeanstalkdTubeName}} ` @@ -117,28 +123,25 @@ spec: - name: beanstalkd-put-job image: docker.io/sitecrafting/beanstalkd-cli command: ["/bin/sh"] - args: ["-c", "for run in $(seq 1 10); do beanstalkd-cli --host=beanstalkd put \"Test Job\"; done;"] + args: ["-c", "for run in $(seq 1 {{.JobCount}}); do beanstalkd-cli --host=beanstalkd put \"Test Job\"; done;"] restartPolicy: OnFailure ` - scaledObjectDelayedTemplate = ` -apiVersion: keda.sh/v1alpha1 -kind: ScaledObject + beanstalkdPopJobsTemplate = ` +apiVersion: batch/v1 +kind: Job metadata: - name: {{.ScaledObjectName}} + name: {{.BeanstalkdPopJobName}} namespace: {{.TestNamespace}} spec: - scaleTargetRef: - name: {{.DeploymentName}} - maxReplicaCount: 5 - minReplicaCount: 1 - triggers: - - type: beanstalkd - metadata: - server: beanstalkd.{{.TestNamespace}}:11300 - value: "5" - tube: {{.BeanstalkdTubeName}} - includeDelayed: "true" + template: + spec: + containers: + - name: beanstalkd-pop-job + image: docker.io/sitecrafting/beanstalkd-cli + command: ["/bin/sh"] + args: ["-c", "for run in $(seq 1 {{.JobCount}}); do beanstalkd-cli --host=beanstalkd pop; done;"] + restartPolicy: OnFailure ` deploymentTemplate = ` @@ -182,14 +185,14 @@ func TestBeanstalkdScaler(t *testing.T) { assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, beanstalkdDeploymentName, testNamespace, 1, 60, 1), "replica count should be 0 after a minute") - // Add beanstalkd jobs - addBeanstalkdJobs(t, kc, &data) - // test activation testActivation(t, kc, data) - // test scaling - testScale(t, kc, data) + // test scaling in + testScaleOut(t, kc, data) + + // scaling out + testScaleIn(t, kc, data) } func getTemplateData() (templateData, []Template) { @@ -200,28 +203,65 @@ func getTemplateData() (templateData, []Template) { BeanstalkdDeploymentName: beanstalkdDeploymentName, BeanstalkdTubeName: beanstalkdTubeName, BeanstalkdPutJobName: beanstalkdPutJobName, + BeanstalkdPopJobName: beanstalkdPopJobName, + JobCount: activationJobCount, }, []Template{ {Name: "beanstalkdDeploymentTemplate", Config: beanstalkdDeploymentTemplate}, {Name: "deploymentTemplate", Config: deploymentTemplate}, } } +// Adds five beanstalkd jobs to the default tube func addBeanstalkdJobs(t *testing.T, kc *kubernetes.Clientset, data *templateData) { // run putJob KubectlReplaceWithTemplate(t, data, "beanstalkdPutJobsTemplate", beanstalkdPutJobsTemplate) require.True(t, WaitForJobSuccess(t, kc, beanstalkdPutJobName, testNamespace, 30, 2), "Job should run successfully") } +// Removes five beanstalkd jobs from the default tube +func removeBeanstalkdJobs(t *testing.T, kc *kubernetes.Clientset, data *templateData) { + // run putJob + KubectlReplaceWithTemplate(t, data, "beanstalkdPopJobsTemplate", beanstalkdPopJobsTemplate) + require.True(t, WaitForJobSuccess(t, kc, beanstalkdPopJobName, testNamespace, 30, 2), "Job should run successfully") +} + func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing activation---") KubectlApplyWithTemplate(t, data, "scaledObjectActivationTemplate", scaledObjectActivationTemplate) + + // Add 5 beanstalkd jobs + data.JobCount = 5 + addBeanstalkdJobs(t, kc, &data) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) } -func testScale(t *testing.T, kc *kubernetes.Clientset, data templateData) { - t.Log("--- testing scaling---") - KubectlApplyWithTemplate(t, data, "scaledObjectDelayedTemplate", scaledObjectDelayedTemplate) - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 1), - "replica count should be 2 after a minute") +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scaling out ---") + + // Add 100 beanstalkd jobs + data.JobCount = 100 + addBeanstalkdJobs(t, kc, &data) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 3, 60, 1), + "replica count should be 3 after a minute") +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scaling in ---") + + // Remove 80 beanstalkd jobs + data.JobCount = 80 + removeBeanstalkdJobs(t, kc, &data) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 3000, 1), + "replica count should be 2 after 5 minutes") + + // Remove remaining beanstalkd jobs + data.JobCount = 25 + removeBeanstalkdJobs(t, kc, &data) + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after a minute") } From 39bc2c5a17781dc7e57f5dda01e02a73219245e9 Mon Sep 17 00:00:00 2001 From: sbarnesthornton <56411235+sbarnesthornton@users.noreply.github.com> Date: Tue, 3 Sep 2024 16:29:55 +0100 Subject: [PATCH 8/8] Update replica wait on beanstalkd scale in test Co-authored-by: Jorge Turrado Ferrero Signed-off-by: sbarnesthornton <56411235+sbarnesthornton@users.noreply.github.com> --- tests/scalers/beanstalkd/beanstalkd_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scalers/beanstalkd/beanstalkd_test.go b/tests/scalers/beanstalkd/beanstalkd_test.go index 69857682f80..7565d3fa0c6 100644 --- a/tests/scalers/beanstalkd/beanstalkd_test.go +++ b/tests/scalers/beanstalkd/beanstalkd_test.go @@ -255,7 +255,7 @@ func testScaleIn(t *testing.T, kc *kubernetes.Clientset, data templateData) { data.JobCount = 80 removeBeanstalkdJobs(t, kc, &data) - assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 3000, 1), + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 5), "replica count should be 2 after 5 minutes") // Remove remaining beanstalkd jobs