Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scaler for temporal #6191

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f5d7f78
add scaler for temporal
Prajithp Sep 26, 2024
2cfaa31
add option to filter based on build ids
Prajithp Sep 27, 2024
6018463
use typed config
Prajithp Sep 27, 2024
984d1de
support apiKey authentication
Prajithp Sep 30, 2024
58d8990
use context
Prajithp Oct 1, 2024
6766405
Merge branch 'main' into temporal
Prajithp Oct 3, 2024
9aad76c
add MTLS auth option and some fixes
Prajithp Oct 4, 2024
5159eb6
update e2e test to use official image
Prajithp Oct 7, 2024
5c8b3e6
rename metadata variables
Prajithp Oct 13, 2024
e946a54
fix temporal server override command
Prajithp Oct 13, 2024
7390335
remove namespace from cli args
Prajithp Oct 14, 2024
66a373b
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
7681beb
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
d1aa803
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
4cdecfb
add MinConnectTimeout option
Prajithp Oct 17, 2024
ba2049a
add test case for worker versioning
Prajithp Oct 17, 2024
9e08d57
Merge branch 'main' into temporal
Prajithp Oct 18, 2024
ec360cd
Merge branch 'main' into temporal
Prajithp Oct 30, 2024
79af8dc
add modules to vendor
Prajithp Nov 1, 2024
6e403df
Update tests/scalers/temporal/temporal_test.go
Prajithp Nov 1, 2024
4e7f89b
Update tests/scalers/temporal/temporal_test.go
Prajithp Nov 1, 2024
b716fa2
refactoring e2e test
Prajithp Nov 9, 2024
5c06b4e
update vendor modules
Prajithp Nov 9, 2024
73dae53
fix tests
Prajithp Nov 11, 2024
9dd849d
fix ci
Prajithp Nov 11, 2024
6ccc697
fix ci
Prajithp Nov 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.31.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.31.0
go.opentelemetry.io/otel/metric v1.31.0
go.temporal.io/sdk v1.30.0
go.uber.org/mock v0.5.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
Expand All @@ -118,6 +119,15 @@ require (
sigs.k8s.io/kustomize/kustomize/v5 v5.5.0
)

require (
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/nexus-rpc/sdk-go v0.0.11 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/robfig/cron v1.2.0 // indirect
go.temporal.io/api v1.40.0 // indirect
)

replace (
// pin k8s.io to v0.31.2 & sigs.k8s.io/controller-runtime to v0.19.1
github.com/google/cel-go => github.com/google/cel-go v0.20.1
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,8 @@ github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0
github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ=
github.com/expr-lang/expr v1.16.9 h1:WUAzmR0JNI9JCiF0/ewwHB1gmcGw5wW7nWt8gc6PpCI=
github.com/expr-lang/expr v1.16.9/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw=
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
Expand Down Expand Up @@ -1879,6 +1881,7 @@ github.com/google/s2a-go v0.1.8 h1:zZDs9gcbt9ZPLV0ndSyQk6Kacx2g/X+SKYovpnz3SMM=
github.com/google/s2a-go v0.1.8/go.mod h1:6iNWHTpQ+nfNRN5E00MSdfDwVesa8hhS32PhPO8deJA=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -2173,6 +2176,8 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
github.com/neelance/sourcemap v0.0.0-20200213170602-2833bce08e4c/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/newrelic/newrelic-client-go v1.1.0 h1:aflNjzQ21c+2GwBVh+UbAf9lznkRfCcVABoc5UM4IXw=
github.com/newrelic/newrelic-client-go v1.1.0/go.mod h1:RYMXt7hgYw7nzuXIGd2BH0F1AivgWw7WrBhNBQZEB4k=
github.com/nexus-rpc/sdk-go v0.0.11 h1:qH3Us3spfp50t5ca775V1va2eE6z1zMQDZY4mvbw0CI=
github.com/nexus-rpc/sdk-go v0.0.11/go.mod h1:TpfkM2Cw0Rlk9drGkoiSMpFqflKTiQLWUNyKJjF8mKQ=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oapi-codegen/runtime v1.1.1 h1:EXLHh0DXIJnWhdRPN2w4MXAzFyE4CskzhNLUmtpMYro=
Expand Down Expand Up @@ -2203,6 +2208,8 @@ github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT9
github.com/otiai10/mint v1.3.3/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pborman/uuid v1.2.1 h1:+ZZIw58t/ozdjRaXh/3awHfmWRbzYxJoAdNJxe/3pvw=
github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI=
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE=
Expand Down Expand Up @@ -2256,6 +2263,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qq
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis=
github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -2482,6 +2491,10 @@ go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0=
go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8=
go.temporal.io/api v1.40.0 h1:rH3HvUUCFr0oecQTBW5tI6DdDQsX2Xb6OFVgt/bvLto=
go.temporal.io/api v1.40.0/go.mod h1:1WwYUMo6lao8yl0371xWUm13paHExN5ATYT/B7QtFis=
go.temporal.io/sdk v1.30.0 h1:7jzSFZYk+tQ2kIYEP+dvrM7AW9EsCEP52JHCjVGuwbI=
go.temporal.io/sdk v1.30.0/go.mod h1:Pv45F/fVDgWKx+jhix5t/dGgqROVaI+VjPLd3CHWqq0=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
Expand Down
249 changes: 249 additions & 0 deletions pkg/scalers/temporal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package scalers

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/go-logr/logr"
sdk "go.temporal.io/sdk/client"
sdklog "go.temporal.io/sdk/log"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

var (
temporalDefauleQueueTypes = []sdk.TaskQueueType{
sdk.TaskQueueTypeActivity,
sdk.TaskQueueTypeWorkflow,
sdk.TaskQueueTypeNexus,
}
)

type temporalScaler struct {
metricType v2.MetricTargetType
metadata *temporalMetadata
tcl sdk.Client
logger logr.Logger
}

type temporalMetadata struct {
Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"`
Namespace string `keda:"name=namespace, order=triggerMetadata;resolvedEnv, default=default"`
ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"`
TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"`
TaskQueue string `keda:"name=taskQueue, order=triggerMetadata;resolvedEnv"`
QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"`
BuildID string `keda:"name=buildId, order=triggerMetadata;resolvedEnv, optional"`
AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=false"`
Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=false"`
APIKey string `keda:"name=apiKey, order=authParams;resolvedEnv;triggerMetadata, optional"`
MinConnectTimeout int `keda:"name=minConnectTimeout, order=triggerMetadata, default=5"`

UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
CA string `keda:"name=ca, order=authParams, optional"`

triggerIndex int
}

func (a *temporalMetadata) Validate() error {
if a.TargetQueueSize <= 0 {
return fmt.Errorf("targetQueueSize must be a positive number")
}
if a.ActivationTargetQueueSize < 0 {
return fmt.Errorf("activationTargetQueueSize must be a positive number")
}

if (a.Cert == "") != (a.Key == "") {
return fmt.Errorf("both cert and key must be provided when using TLS")
}

if a.MinConnectTimeout < 0 {
return fmt.Errorf("minConnectTimeout must be a positive number")
}

return nil
}

func NewTemporalScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
logger := InitializeLogger(config, "temporal_scaler")

metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("failed to get scaler metric type: %w", err)
}

meta, err := parseTemporalMetadata(config, logger)
if err != nil {
return nil, fmt.Errorf("failed to parse Temporal metadata: %w", err)
}

c, err := getTemporalClient(ctx, meta, logger)
if err != nil {
return nil, fmt.Errorf("failed to create Temporal client connection: %w", err)
}

return &temporalScaler{
metricType: metricType,
metadata: meta,
tcl: c,
logger: logger,
}, nil
}

func (s *temporalScaler) Close(_ context.Context) error {
if s.tcl != nil {
s.tcl.Close()
}
return nil
}

func (s *temporalScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("temporal-%s-%s", s.metadata.Namespace, s.metadata.TaskQueue))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueSize),
}

metricSpec := v2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

return []v2.MetricSpec{metricSpec}
}

func (s *temporalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueSize, err := s.getQueueSize(ctx)
if err != nil {
return nil, false, fmt.Errorf("failed to get Temporal queue size: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(queueSize))

return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil
}

func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) {
var selection *sdk.TaskQueueVersionSelection
if s.metadata.AllActive || s.metadata.Unversioned || s.metadata.BuildID != "" {
selection = &sdk.TaskQueueVersionSelection{
AllActive: s.metadata.AllActive,
Unversioned: s.metadata.Unversioned,
BuildIDs: []string{s.metadata.BuildID},
}
}

queueType := getQueueTypes(s.metadata.QueueTypes)

resp, err := s.tcl.DescribeTaskQueueEnhanced(ctx, sdk.DescribeTaskQueueEnhancedOptions{
TaskQueue: s.metadata.TaskQueue,
ReportStats: true,
Versions: selection,
TaskQueueTypes: queueType,
})
if err != nil {
return 0, fmt.Errorf("failed to get Temporal queue size: %w", err)
}

return getCombinedBacklogCount(resp), nil
}

func getQueueTypes(queueTypes []string) []sdk.TaskQueueType {
var taskQueueTypes []sdk.TaskQueueType
for _, t := range queueTypes {
var taskQueueType sdk.TaskQueueType
switch t {
case "workflow":
taskQueueType = sdk.TaskQueueTypeWorkflow
case "activity":
taskQueueType = sdk.TaskQueueTypeActivity
case "nexus":
taskQueueType = sdk.TaskQueueTypeNexus
}
taskQueueTypes = append(taskQueueTypes, taskQueueType)
}

if len(taskQueueTypes) == 0 {
return temporalDefauleQueueTypes
}
return taskQueueTypes
}

func getCombinedBacklogCount(description sdk.TaskQueueDescription) int64 {
var count int64
for _, versionInfo := range description.VersionsInfo {
for _, typeInfo := range versionInfo.TypesInfo {
if typeInfo.Stats != nil {
count += typeInfo.Stats.ApproximateBacklogCount
}
}
}
return count
}

func getTemporalClient(ctx context.Context, meta *temporalMetadata, log logr.Logger) (sdk.Client, error) {
logHandler := logr.ToSlogHandler(log)
options := sdk.Options{
HostPort: meta.Endpoint,
Namespace: meta.Namespace,
Logger: sdklog.NewStructuredLogger(slog.New(logHandler)),
}

dialOptions := []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: time.Duration(meta.MinConnectTimeout) * time.Second,
}),
}

if meta.APIKey != "" {
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(
func(ctx context.Context, method string, req any, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(
metadata.AppendToOutgoingContext(ctx, "temporal-namespace", meta.Namespace),
method,
req,
reply,
cc,
opts...,
)
},
))
options.Credentials = sdk.NewAPIKeyStaticCredentials(meta.APIKey)
}

options.ConnectionOptions = sdk.ConnectionOptions{
DialOptions: dialOptions,
}

if meta.Cert != "" && meta.Key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.Cert, meta.Key, meta.KeyPassword, meta.CA, meta.UnsafeSsl)
if err != nil {
return nil, err
}
options.ConnectionOptions.TLS = tlsConfig
}

return sdk.DialContext(ctx, options)
}

func parseTemporalMetadata(config *scalersconfig.ScalerConfig, _ logr.Logger) (*temporalMetadata, error) {
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
meta := &temporalMetadata{triggerIndex: config.TriggerIndex}
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
if err := config.TypedConfig(meta); err != nil {
return meta, fmt.Errorf("error parsing temporal metadata: %w", err)
}

return meta, nil
}
Loading
Loading