-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor liklus scaler #6433
base: main
Are you sure you want to change the base?
refactor liklus scaler #6433
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -3,7 +3,6 @@ package scalers | |||||||
import ( | ||||||||
"context" | ||||||||
"fmt" | ||||||||
"strconv" | ||||||||
"time" | ||||||||
|
||||||||
"github.com/go-logr/logr" | ||||||||
|
@@ -27,12 +26,12 @@ type liiklusScaler struct { | |||||||
} | ||||||||
|
||||||||
type liiklusMetadata struct { | ||||||||
lagThreshold int64 | ||||||||
activationLagThreshold int64 | ||||||||
address string | ||||||||
topic string | ||||||||
group string | ||||||||
groupVersion uint32 | ||||||||
LagThreshold int64 `keda:"name=lagThreshold,order=triggerMetadata,default=10"` | ||||||||
ActivationLagThreshold int64 `keda:"name=activationLagThreshold,order=triggerMetadata,default=0"` | ||||||||
Address string `keda:"name=address,order=triggerMetadata"` | ||||||||
Topic string `keda:"name=topic,order=triggerMetadata"` | ||||||||
Group string `keda:"name=group,order=triggerMetadata"` | ||||||||
GroupVersion uint32 `keda:"name=groupVersion,order=triggerMetadata,default=0"` | ||||||||
triggerIndex int | ||||||||
} | ||||||||
|
||||||||
|
@@ -70,7 +69,7 @@ func NewLiiklusScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { | |||||||
return nil, err | ||||||||
} | ||||||||
|
||||||||
conn, err := grpc.NewClient(lm.address, | ||||||||
conn, err := grpc.NewClient(lm.Address, | ||||||||
grpc.WithDefaultServiceConfig(grpcConfig), | ||||||||
grpc.WithTransportCredentials(insecure.NewCredentials())) | ||||||||
if err != nil { | ||||||||
|
@@ -94,21 +93,21 @@ func (s *liiklusScaler) GetMetricsAndActivity(ctx context.Context, metricName st | |||||||
return nil, false, err | ||||||||
} | ||||||||
|
||||||||
if totalLag/uint64(s.metadata.lagThreshold) > uint64(len(lags)) { | ||||||||
totalLag = uint64(s.metadata.lagThreshold) * uint64(len(lags)) | ||||||||
if totalLag/uint64(s.metadata.LagThreshold) > uint64(len(lags)) { | ||||||||
totalLag = uint64(s.metadata.LagThreshold) * uint64(len(lags)) | ||||||||
} | ||||||||
|
||||||||
metric := GenerateMetricInMili(metricName, float64(totalLag)) | ||||||||
|
||||||||
return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.activationLagThreshold), nil | ||||||||
return []external_metrics.ExternalMetricValue{metric}, totalLag > uint64(s.metadata.ActivationLagThreshold), nil | ||||||||
} | ||||||||
|
||||||||
func (s *liiklusScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { | ||||||||
externalMetric := &v2.ExternalMetricSource{ | ||||||||
Metric: v2.MetricIdentifier{ | ||||||||
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.topic))), | ||||||||
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("liiklus-%s", s.metadata.Topic))), | ||||||||
}, | ||||||||
Target: GetMetricTarget(s.metricType, s.metadata.lagThreshold), | ||||||||
Target: GetMetricTarget(s.metricType, s.metadata.LagThreshold), | ||||||||
} | ||||||||
metricSpec := v2.MetricSpec{External: externalMetric, Type: liiklusMetricType} | ||||||||
return []v2.MetricSpec{metricSpec} | ||||||||
|
@@ -131,9 +130,9 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, | |||||||
ctx1, cancel1 := context.WithTimeout(ctx, 10*time.Second) | ||||||||
defer cancel1() | ||||||||
gor, err := s.client.GetOffsets(ctx1, &liiklus_service.GetOffsetsRequest{ | ||||||||
Topic: s.metadata.topic, | ||||||||
Group: s.metadata.group, | ||||||||
GroupVersion: s.metadata.groupVersion, | ||||||||
Topic: s.metadata.Topic, | ||||||||
Group: s.metadata.Group, | ||||||||
GroupVersion: s.metadata.GroupVersion, | ||||||||
}) | ||||||||
if err != nil { | ||||||||
return 0, nil, err | ||||||||
|
@@ -142,7 +141,7 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, | |||||||
ctx2, cancel2 := context.WithTimeout(ctx, 10*time.Second) | ||||||||
defer cancel2() | ||||||||
geor, err := s.client.GetEndOffsets(ctx2, &liiklus_service.GetEndOffsetsRequest{ | ||||||||
Topic: s.metadata.topic, | ||||||||
Topic: s.metadata.Topic, | ||||||||
}) | ||||||||
if err != nil { | ||||||||
return 0, nil, err | ||||||||
|
@@ -159,50 +158,17 @@ func (s *liiklusScaler) getLag(ctx context.Context) (uint64, map[uint32]uint64, | |||||||
} | ||||||||
|
||||||||
func parseLiiklusMetadata(config *scalersconfig.ScalerConfig) (*liiklusMetadata, error) { | ||||||||
lagThreshold := defaultLiiklusLagThreshold | ||||||||
activationLagThreshold := defaultLiiklusActivationLagThreshold | ||||||||
|
||||||||
if val, ok := config.TriggerMetadata[liiklusLagThresholdMetricName]; ok { | ||||||||
t, err := strconv.ParseInt(val, 10, 64) | ||||||||
if err != nil { | ||||||||
return nil, fmt.Errorf("error parsing %s: %w", liiklusLagThresholdMetricName, err) | ||||||||
} | ||||||||
lagThreshold = t | ||||||||
meta := &liiklusMetadata{} | ||||||||
if err := config.TypedConfig(meta); err != nil { | ||||||||
return nil, fmt.Errorf("error parsing liiklus metadata: %w", err) | ||||||||
} | ||||||||
|
||||||||
if val, ok := config.TriggerMetadata[liiklusActivationLagThresholdMetricName]; ok { | ||||||||
t, err := strconv.ParseInt(val, 10, 64) | ||||||||
if err != nil { | ||||||||
return nil, fmt.Errorf("error parsing %s: %w", liiklusActivationLagThresholdMetricName, err) | ||||||||
} | ||||||||
activationLagThreshold = t | ||||||||
} | ||||||||
|
||||||||
groupVersion := uint32(0) | ||||||||
if val, ok := config.TriggerMetadata["groupVersion"]; ok { | ||||||||
t, err := strconv.ParseUint(val, 10, 32) | ||||||||
if err != nil { | ||||||||
return nil, fmt.Errorf("error parsing groupVersion: %w", err) | ||||||||
} | ||||||||
groupVersion = uint32(t) | ||||||||
} | ||||||||
|
||||||||
switch { | ||||||||
case config.TriggerMetadata["topic"] == "": | ||||||||
case meta.Topic == "": | ||||||||
return nil, ErrLiiklusNoTopic | ||||||||
case config.TriggerMetadata["address"] == "": | ||||||||
case meta.Address == "": | ||||||||
return nil, ErrLiiklusNoAddress | ||||||||
case config.TriggerMetadata["group"] == "": | ||||||||
case meta.Group == "": | ||||||||
return nil, ErrLiiklusNoGroup | ||||||||
} | ||||||||
|
||||||||
return &liiklusMetadata{ | ||||||||
topic: config.TriggerMetadata["topic"], | ||||||||
address: config.TriggerMetadata["address"], | ||||||||
group: config.TriggerMetadata["group"], | ||||||||
groupVersion: groupVersion, | ||||||||
lagThreshold: lagThreshold, | ||||||||
activationLagThreshold: activationLagThreshold, | ||||||||
triggerIndex: config.TriggerIndex, | ||||||||
}, nil | ||||||||
return meta, nil | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is the place that should populate
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added in: 4934fd0 |
||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,7 @@ package scalers | |
|
||
import ( | ||
"context" | ||
"errors" | ||
"strconv" | ||
"fmt" | ||
"testing" | ||
|
||
"github.com/go-logr/logr" | ||
|
@@ -15,12 +14,10 @@ import ( | |
) | ||
|
||
type parseLiiklusMetadataTestData struct { | ||
metadata map[string]string | ||
err error | ||
liiklusAddress string | ||
group string | ||
topic string | ||
threshold int64 | ||
name string | ||
metadata map[string]string | ||
ExpectedErr error | ||
ExpectedMetatada *liiklusMetadata | ||
} | ||
|
||
type liiklusMetricIdentifier struct { | ||
|
@@ -30,12 +27,64 @@ type liiklusMetricIdentifier struct { | |
} | ||
|
||
var parseLiiklusMetadataTestDataset = []parseLiiklusMetadataTestData{ | ||
{map[string]string{}, ErrLiiklusNoTopic, "", "", "", 0}, | ||
{map[string]string{"topic": "foo"}, ErrLiiklusNoAddress, "", "", "", 0}, | ||
{map[string]string{"topic": "foo", "address": "bar:6565"}, ErrLiiklusNoGroup, "", "", "", 0}, | ||
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup"}, nil, "bar:6565", "mygroup", "foo", 10}, | ||
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "activationLagThreshold": "aa"}, strconv.ErrSyntax, "bar:6565", "mygroup", "foo", 10}, | ||
{map[string]string{"topic": "foo", "address": "bar:6565", "group": "mygroup", "lagThreshold": "15"}, nil, "bar:6565", "mygroup", "foo", 15}, | ||
{ | ||
name: "Empty metadata", | ||
metadata: map[string]string{}, | ||
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + | ||
"missing required parameter \"address\" in [triggerMetadata]\n" + | ||
"missing required parameter \"topic\" in [triggerMetadata]\n" + | ||
"missing required parameter \"group\" in [triggerMetadata]"), | ||
ExpectedMetatada: nil, | ||
}, | ||
{ | ||
name: "Empty address", | ||
metadata: map[string]string{"topic": "foo"}, | ||
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + | ||
"missing required parameter \"address\" in [triggerMetadata]\n" + | ||
"missing required parameter \"group\" in [triggerMetadata]"), | ||
ExpectedMetatada: nil, | ||
}, | ||
{ | ||
name: "Empty group", | ||
metadata: map[string]string{"topic": "foo", "address": "using-mock"}, | ||
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: " + | ||
"missing required parameter \"group\" in [triggerMetadata]"), | ||
ExpectedMetatada: nil, | ||
}, | ||
{ | ||
name: "Valid", | ||
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup"}, | ||
ExpectedErr: nil, | ||
ExpectedMetatada: &liiklusMetadata{ | ||
LagThreshold: defaultLiiklusLagThreshold, | ||
ActivationLagThreshold: defaultLiiklusActivationLagThreshold, | ||
Address: "using-mock", | ||
Topic: "foo", | ||
Group: "mygroup", | ||
GroupVersion: 0, | ||
triggerIndex: 0, | ||
}, | ||
}, | ||
{ | ||
name: "Invalid activationLagThreshold", | ||
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "activationLagThreshold": "invalid"}, | ||
ExpectedErr: fmt.Errorf("error parsing liiklus metadata: unable to set param \"activationLagThreshold\" value \"invalid\": unable to unmarshal to field type int64: invalid character 'i' looking for beginning of value"), | ||
ExpectedMetatada: nil, | ||
}, | ||
{ | ||
name: "Custom lagThreshold", | ||
metadata: map[string]string{"topic": "foo", "address": "using-mock", "group": "mygroup", "lagThreshold": "20"}, | ||
ExpectedErr: nil, | ||
ExpectedMetatada: &liiklusMetadata{ | ||
LagThreshold: 20, | ||
ActivationLagThreshold: defaultLiiklusActivationLagThreshold, | ||
Address: "using-mock", | ||
Topic: "foo", | ||
Group: "mygroup", | ||
GroupVersion: 0, | ||
triggerIndex: 0, | ||
}, | ||
}, | ||
} | ||
|
||
var liiklusMetricIdentifiers = []liiklusMetricIdentifier{ | ||
|
@@ -45,38 +94,44 @@ var liiklusMetricIdentifiers = []liiklusMetricIdentifier{ | |
|
||
func TestLiiklusParseMetadata(t *testing.T) { | ||
for _, testData := range parseLiiklusMetadataTestDataset { | ||
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}) | ||
if err != nil && testData.err == nil { | ||
t.Error("Expected success but got error", err) | ||
continue | ||
} | ||
if testData.err != nil && err == nil { | ||
t.Error("Expected error but got success") | ||
continue | ||
} | ||
if testData.err != nil && err != nil && !errors.Is(err, testData.err) { | ||
t.Errorf("Expected error %v but got %v", testData.err, err) | ||
continue | ||
} | ||
if err != nil { | ||
continue | ||
} | ||
if testData.liiklusAddress != meta.address { | ||
t.Errorf("Expected address %q but got %q\n", testData.liiklusAddress, meta.address) | ||
continue | ||
} | ||
if meta.group != testData.group { | ||
t.Errorf("Expected group %q but got %q\n", testData.group, meta.group) | ||
continue | ||
} | ||
if meta.topic != testData.topic { | ||
t.Errorf("Expected topic %q but got %q\n", testData.topic, meta.topic) | ||
continue | ||
} | ||
if meta.lagThreshold != testData.threshold { | ||
t.Errorf("Expected threshold %d but got %d\n", testData.threshold, meta.lagThreshold) | ||
continue | ||
} | ||
t.Run(testData.name, func(t *testing.T) { | ||
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata}) | ||
|
||
// error cases | ||
if testData.ExpectedErr != nil { | ||
if err == nil { | ||
t.Errorf("Expected error %v but got success", testData.ExpectedErr) | ||
} else if err.Error() != testData.ExpectedErr.Error() { | ||
t.Errorf("Expected error %v but got %v", testData.ExpectedErr, err) | ||
} | ||
return // Skip the rest of the checks for error cases | ||
} | ||
|
||
// success cases | ||
if err != nil { | ||
t.Errorf("Expected success but got error %v", err) | ||
} | ||
if testData.ExpectedMetatada != nil { | ||
if testData.ExpectedMetatada.Address != meta.Address { | ||
t.Errorf("Expected address %q but got %q", testData.ExpectedMetatada.Address, meta.Address) | ||
} | ||
if meta.Group != testData.ExpectedMetatada.Group { | ||
t.Errorf("Expected group %q but got %q", testData.ExpectedMetatada.Group, meta.Group) | ||
} | ||
if meta.Topic != testData.ExpectedMetatada.Topic { | ||
t.Errorf("Expected topic %q but got %q", testData.ExpectedMetatada.Topic, meta.Topic) | ||
} | ||
if meta.LagThreshold != testData.ExpectedMetatada.LagThreshold { | ||
t.Errorf("Expected threshold %d but got %d", testData.ExpectedMetatada.LagThreshold, meta.LagThreshold) | ||
} | ||
if meta.ActivationLagThreshold != testData.ExpectedMetatada.ActivationLagThreshold { | ||
t.Errorf("Expected activation threshold %d but got %d", testData.ExpectedMetatada.ActivationLagThreshold, meta.ActivationLagThreshold) | ||
} | ||
if meta.GroupVersion != testData.ExpectedMetatada.GroupVersion { | ||
t.Errorf("Expected group version %d but got %d", testData.ExpectedMetatada.GroupVersion, meta.GroupVersion) | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
|
@@ -172,16 +227,17 @@ func TestLiiklusScalerGetMetricsBehavior(t *testing.T) { | |
|
||
func TestLiiklusGetMetricSpecForScaling(t *testing.T) { | ||
for _, testData := range liiklusMetricIdentifiers { | ||
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, TriggerIndex: testData.triggerIndex}) | ||
if err != nil { | ||
t.Fatal("Could not parse metadata:", err) | ||
} | ||
mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()} | ||
|
||
metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background()) | ||
metricName := metricSpec[0].External.Metric.Name | ||
if metricName != testData.name { | ||
t.Error("Wrong External metric source name:", metricName) | ||
} | ||
t.Run(testData.name, func(t *testing.T) { | ||
meta, err := parseLiiklusMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata}) | ||
if err != nil { | ||
t.Fatal("Could not parse metadata:", err) | ||
} | ||
meta.triggerIndex = testData.triggerIndex | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tests shouldn't modify the meta from |
||
mockLiiklusScaler := liiklusScaler{"", meta, nil, nil, logr.Discard()} | ||
metricSpec := mockLiiklusScaler.GetMetricSpecForScaling(context.Background()) | ||
if metricSpec[0].External.Metric.Name != testData.name { | ||
t.Errorf("Wrong External metric source name: %s", metricSpec[0].External.Metric.Name) | ||
} | ||
}) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be safe to remove because
Topic
,Address
, andGroup
are not marked asoptional
in the field tags so theTypedConfig
parsing will throw an errorThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree. fixed in: 4934fd0