Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add scaler for temporal #6191

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f5d7f78
add scaler for temporal
Prajithp Sep 26, 2024
2cfaa31
add option to filter based on build ids
Prajithp Sep 27, 2024
6018463
use typed config
Prajithp Sep 27, 2024
984d1de
support apiKey authentication
Prajithp Sep 30, 2024
58d8990
use context
Prajithp Oct 1, 2024
6766405
Merge branch 'main' into temporal
Prajithp Oct 3, 2024
9aad76c
add MTLS auth option and some fixes
Prajithp Oct 4, 2024
5159eb6
update e2e test to use official image
Prajithp Oct 7, 2024
5c8b3e6
rename metadata variables
Prajithp Oct 13, 2024
e946a54
fix temporal server override command
Prajithp Oct 13, 2024
7390335
remove namespace from cli args
Prajithp Oct 14, 2024
66a373b
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
7681beb
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
d1aa803
Update tests/scalers/temporal/temporal_test.go
Prajithp Oct 17, 2024
4cdecfb
add MinConnectTimeout option
Prajithp Oct 17, 2024
ba2049a
add test case for worker versioning
Prajithp Oct 17, 2024
9e08d57
Merge branch 'main' into temporal
Prajithp Oct 18, 2024
ec360cd
Merge branch 'main' into temporal
Prajithp Oct 30, 2024
79af8dc
add modules to vendor
Prajithp Nov 1, 2024
6e403df
Update tests/scalers/temporal/temporal_test.go
Prajithp Nov 1, 2024
4e7f89b
Update tests/scalers/temporal/temporal_test.go
Prajithp Nov 1, 2024
b716fa2
refactoring e2e test
Prajithp Nov 9, 2024
5c06b4e
update vendor modules
Prajithp Nov 9, 2024
73dae53
fix tests
Prajithp Nov 11, 2024
9dd849d
fix ci
Prajithp Nov 11, 2024
6ccc697
fix ci
Prajithp Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions pkg/scalers/temporal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package scalers

import (
"context"
"fmt"
"log/slog"
"time"

"github.com/go-logr/logr"
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
sdk "go.temporal.io/sdk/client"
sdklog "go.temporal.io/sdk/log"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"
)

var (
temporalDefauleQueueTypes = []sdk.TaskQueueType{
sdk.TaskQueueTypeActivity,
sdk.TaskQueueTypeWorkflow,
sdk.TaskQueueTypeNexus,
}
)

type temporalScaler struct {
metricType v2.MetricTargetType
metadata *temporalMetadata
tcl sdk.Client
logger logr.Logger
}

type temporalMetadata struct {
ActivationTargetQueueSize int64 `keda:"name=activationTargetQueueSize, order=triggerMetadata, default=0"`
Endpoint string `keda:"name=endpoint, order=triggerMetadata;resolvedEnv"`
Namespace string `keda:"name=namespace, order=triggerMetadata, default=default"`
TargetQueueSize int64 `keda:"name=targetQueueSize, order=triggerMetadata, default=5"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueTypes []string `keda:"name=queueTypes, order=triggerMetadata, optional"`
BuildIDs []string `keda:"name=buildIds, order=triggerMetadata, optional"`
AllActive bool `keda:"name=selectAllActive, order=triggerMetadata, default=true"`
Unversioned bool `keda:"name=selectUnversioned, order=triggerMetadata, default=true"`
APIKey string `keda:"name=apiKey, order=authParams;triggerMetadata, optional"`

triggerIndex int
}

func (a *temporalMetadata) Validate() error {
if a.TargetQueueSize <= 0 {
return fmt.Errorf("targetQueueSize must be a positive number")
}
if a.ActivationTargetQueueSize < 0 {
return fmt.Errorf("activationTargetQueueSize must be a positive number")
}

return nil
}

func NewTemporalScaler(ctx context.Context, config *scalersconfig.ScalerConfig) (Scaler, error) {
logger := InitializeLogger(config, "temporal_scaler")

metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("failed to get scaler metric type: %w", err)
}

meta, err := parseTemporalMetadata(config, logger)
if err != nil {
return nil, fmt.Errorf("failed to parse Temporal metadata: %w", err)
}

c, err := getTemporalClient(ctx, meta)
if err != nil {
return nil, fmt.Errorf("failed to create Temporal client connection: %w", err)
}

return &temporalScaler{
metricType: metricType,
metadata: meta,
tcl: c,
logger: logger,
}, nil
}

func (s *temporalScaler) Close(_ context.Context) error {
if s.tcl != nil {
s.tcl.Close()
}
return nil
}

func (s *temporalScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("temporal-%s-%s", s.metadata.Namespace, s.metadata.QueueName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.TargetQueueSize),
}

metricSpec := v2.MetricSpec{
External: externalMetric,
Type: externalMetricType,
}

return []v2.MetricSpec{metricSpec}
}

func (s *temporalScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueSize, err := s.getQueueSize(ctx)
if err != nil {
return nil, false, fmt.Errorf("failed to get Temporal queue size: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(queueSize))

return []external_metrics.ExternalMetricValue{metric}, queueSize > s.metadata.ActivationTargetQueueSize, nil
}

func (s *temporalScaler) getQueueSize(ctx context.Context) (int64, error) {
var selection *sdk.TaskQueueVersionSelection
if s.metadata.AllActive || s.metadata.Unversioned || len(s.metadata.BuildIDs) > 0 {
selection = &sdk.TaskQueueVersionSelection{
AllActive: s.metadata.AllActive,
Unversioned: s.metadata.Unversioned,
BuildIDs: s.metadata.BuildIDs,
}
}

queueType := getQueueTypes(s.metadata.QueueTypes)

resp, err := s.tcl.DescribeTaskQueueEnhanced(ctx, sdk.DescribeTaskQueueEnhancedOptions{
TaskQueue: s.metadata.QueueName,
ReportStats: true,
Versions: selection,
TaskQueueTypes: queueType,
})
if err != nil {
return 0, fmt.Errorf("failed to get Temporal queue size: %w", err)
}

return getCombinedBacklogCount(resp), nil
}

func getQueueTypes(queueTypes []string) []sdk.TaskQueueType {
var taskQueueTypes []sdk.TaskQueueType
for _, t := range queueTypes {
var taskQueueType sdk.TaskQueueType
switch t {
case "workflow":
taskQueueType = sdk.TaskQueueTypeWorkflow
case "activity":
taskQueueType = sdk.TaskQueueTypeActivity
case "nexus":
taskQueueType = sdk.TaskQueueTypeNexus
}
taskQueueTypes = append(taskQueueTypes, taskQueueType)
}

if len(taskQueueTypes) == 0 {
return temporalDefauleQueueTypes
}
return taskQueueTypes
}

func getCombinedBacklogCount(description sdk.TaskQueueDescription) int64 {
var count int64
for _, versionInfo := range description.VersionsInfo {
for _, typeInfo := range versionInfo.TypesInfo {
if typeInfo.Stats != nil {
count += typeInfo.Stats.ApproximateBacklogCount
}
}
}
return count
}

func getTemporalClient(ctx context.Context, meta *temporalMetadata) (sdk.Client, error) {
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
options := sdk.Options{
HostPort: meta.Endpoint,
Namespace: meta.Namespace,
Logger: sdklog.NewStructuredLogger(slog.Default()),
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
}

dialOptions := []grpc.DialOption{
grpc.WithConnectParams(grpc.ConnectParams{
MinConnectTimeout: 5 * time.Second,
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
}),
}

if meta.APIKey != "" {
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(
func(ctx context.Context, method string, req any, reply any,
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(
metadata.AppendToOutgoingContext(ctx, "temporal-namespace", meta.Namespace),
method,
req,
reply,
cc,
opts...,
)
},
))
options.Credentials = sdk.NewAPIKeyStaticCredentials(meta.APIKey)
}

options.ConnectionOptions = sdk.ConnectionOptions{
DialOptions: dialOptions,
}

return sdk.DialContext(ctx, options)
}

func parseTemporalMetadata(config *scalersconfig.ScalerConfig, _ logr.Logger) (*temporalMetadata, error) {
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
meta := &temporalMetadata{triggerIndex: config.TriggerIndex}
Prajithp marked this conversation as resolved.
Show resolved Hide resolved
if err := config.TypedConfig(meta); err != nil {
return meta, fmt.Errorf("error parsing temporal metadata: %w", err)
}

return meta, nil
}
Loading
Loading