Skip to content

Commit

Permalink
Improve Azure Event Hub scaling (#5125)
Browse files Browse the repository at this point in the history
* Update Azure EventHub scaling

Signed-off-by: Troy <[email protected]>

* Update changelog

Signed-off-by: Troy <[email protected]>

* Remove unused context

Signed-off-by: Troy <[email protected]>

---------

Signed-off-by: Troy <[email protected]>
  • Loading branch information
troydn authored Jan 4, 2024
1 parent eec9b26 commit 9be8ee6
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Here is an overview of all new **experimental** features:
- **General**: Prevented stuck status due to timeouts during scalers generation ([#5083](https://github.com/kedacore/keda/issues/5083))
- **General**: ScaledObject Validating Webhook should support dry-run=server requests ([#5306](https://github.com/kedacore/keda/issues/5306))
- **AWS Scalers**: Ensure session tokens are included when instantiating AWS credentials ([#5156](https://github.com/kedacore/keda/issues/5156))
- **Azure Event Hub Scaler**: Improve unprocessedEventThreshold calculation ([#4250](https://github.com/kedacore/keda/issues/4250))
- **Azure Pipelines**: No more HTTP 400 errors produced by poolName with spaces ([#5107](https://github.com/kedacore/keda/issues/5107))
- **GCP pubsub scaler**: Added `project_id` to filter for metrics queries ([#5256](https://github.com/kedacore/keda/issues/5256))
- **GCP pubsub scaler**: Missing use of default value of `value` added ([#5093](https://github.com/kedacore/keda/issues/5093))
Expand Down
4 changes: 4 additions & 0 deletions pkg/scalers/azure/azure_eventhub_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ type defaultCheckpointer struct {
containerName string
}

func NewCheckpoint(offset string, sequenceNumber int64) Checkpoint {
return Checkpoint{baseCheckpoint: baseCheckpoint{Offset: offset}, SequenceNumber: sequenceNumber}
}

// GetCheckpointFromBlobStorage reads depending of the CheckpointStrategy the checkpoint from a azure storage
func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) {
checkpointer := newCheckpointer(info, partitionID)
Expand Down
87 changes: 57 additions & 30 deletions pkg/scalers/azure_eventhub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,14 @@ import (
)

const (
defaultEventHubMessageThreshold = 64
eventHubMetricType = "External"
thresholdMetricName = "unprocessedEventThreshold"
activationThresholdMetricName = "activationUnprocessedEventThreshold"
defaultEventHubConsumerGroup = "$Default"
defaultBlobContainer = ""
defaultCheckpointStrategy = ""
defaultEventHubMessageThreshold = 64
eventHubMetricType = "External"
thresholdMetricName = "unprocessedEventThreshold"
activationThresholdMetricName = "activationUnprocessedEventThreshold"
defaultEventHubConsumerGroup = "$Default"
defaultBlobContainer = ""
defaultCheckpointStrategy = ""
defaultStalePartitionInfoThreshold = 10000
)

type azureEventHubScaler struct {
Expand All @@ -56,10 +57,11 @@ type azureEventHubScaler struct {
}

type eventHubMetadata struct {
eventHubInfo azure.EventHubInfo
threshold int64
activationThreshold int64
scalerIndex int
eventHubInfo azure.EventHubInfo
threshold int64
activationThreshold int64
stalePartitionInfoThreshold int64
scalerIndex int
}

// NewAzureEventHubScaler creates a new scaler for eventHub
Expand Down Expand Up @@ -178,6 +180,15 @@ func parseCommonAzureEventHubMetadata(config *ScalerConfig, meta *eventHubMetada
}
meta.eventHubInfo.ActiveDirectoryEndpoint = activeDirectoryEndpoint

meta.stalePartitionInfoThreshold = defaultStalePartitionInfoThreshold
if val, ok := config.TriggerMetadata["stalePartitionInfoThreshold"]; ok {
stalePartitionInfoThreshold, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return fmt.Errorf("error parsing azure eventhub metadata stalePartitionInfoThreshold: %w", err)
}
meta.stalePartitionInfoThreshold = stalePartitionInfoThreshold
}

meta.scalerIndex = config.ScalerIndex

return nil
Expand Down Expand Up @@ -286,35 +297,45 @@ func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Co
return -1, azure.Checkpoint{}, fmt.Errorf("unable to get checkpoint from storage: %w", err)
}

unprocessedEventCountInPartition := int64(0)
unprocessedEventCountInPartition := calculateUnprocessedEvents(partitionInfo, checkpoint, s.metadata.stalePartitionInfoThreshold)

return unprocessedEventCountInPartition, checkpoint, nil
}

func calculateUnprocessedEvents(partitionInfo *eventhub.HubPartitionRuntimeInformation, checkpoint azure.Checkpoint, stalePartitionInfoThreshold int64) int64 {
unprocessedEventCount := int64(0)

// If checkpoint.Offset is empty that means no messages has been processed from an event hub partition
// And since partitionInfo.LastSequenceNumber = 0 for the very first message hence
// total unprocessed message will be partitionInfo.LastSequenceNumber + 1
if checkpoint.Offset == "" {
unprocessedEventCountInPartition = partitionInfo.LastSequenceNumber + 1
return unprocessedEventCountInPartition, checkpoint, nil
unprocessedEventCount = partitionInfo.LastSequenceNumber + 1
return unprocessedEventCount
}

if partitionInfo.LastSequenceNumber >= checkpoint.SequenceNumber {
unprocessedEventCountInPartition = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber
return unprocessedEventCountInPartition, checkpoint, nil
unprocessedEventCount = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber
} else {
// Partition is a circular buffer, so it is possible that
// partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber

// Checkpointing may or may not be always behind partition's LastSequenceNumber.
// The partition information read could be stale compared to checkpoint,
// especially when load is very small and checkpointing is happening often.
// This also results in partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber
// e.g., (9223372036854775807 - 15) + 10 = 9223372036854775802

// Calculate the unprocessed events
unprocessedEventCount = (math.MaxInt64 - checkpoint.SequenceNumber) + partitionInfo.LastSequenceNumber
}

// Partition is a circular buffer, so it is possible that
// partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber
unprocessedEventCountInPartition = (math.MaxInt64 - checkpoint.SequenceNumber) + partitionInfo.LastSequenceNumber

// Checkpointing may or may not be always behind partition's LastSequenceNumber.
// The partition information read could be stale compared to checkpoint,
// especially when load is very small and checkpointing is happening often.
// e.g., (9223372036854775807 - 10) + 11 = -9223372036854775808
// If unprocessedEventCountInPartition is negative that means there are 0 unprocessed messages in the partition
if unprocessedEventCountInPartition < 0 {
unprocessedEventCountInPartition = 0
// If the result is greater than the buffer size - stale partition threshold
// we assume the partition info is stale.
if unprocessedEventCount > (math.MaxInt64 - stalePartitionInfoThreshold) {
return 0
}

return unprocessedEventCountInPartition, checkpoint, nil
return unprocessedEventCount
}

// GetUnprocessedEventCountWithoutCheckpoint returns the number of messages on the without a checkoutpoint info
Expand Down Expand Up @@ -386,8 +407,14 @@ func (s *azureEventHubScaler) GetMetricsAndActivity(ctx context.Context, metricN

totalUnprocessedEventCount += unprocessedEventCount

s.logger.V(1).Info(fmt.Sprintf("Partition ID: %s, Last Enqueued Offset: %s, Checkpoint Offset: %s, Total new events in partition: %d",
partitionRuntimeInfo.PartitionID, partitionRuntimeInfo.LastEnqueuedOffset, checkpoint.Offset, unprocessedEventCount))
s.logger.V(1).Info(fmt.Sprintf("Partition ID: %s, Last SequenceNumber: %d, Checkpoint SequenceNumber: %d, Total new events in partition: %d",
partitionRuntimeInfo.PartitionID, partitionRuntimeInfo.LastSequenceNumber, checkpoint.SequenceNumber, unprocessedEventCount))
}

// set count to max if the sum is negative (Int64 overflow) to prevent negative metric values
// e.g., 9223372036854775797 (Partition 1) + 20 (Partition 2) = -9223372036854775799
if totalUnprocessedEventCount < 0 {
totalUnprocessedEventCount = math.MaxInt64
}

// don't scale out beyond the number of partitions
Expand Down
66 changes: 66 additions & 0 deletions pkg/scalers/azure_eventhub_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type eventHubMetricIdentifier struct {
name string
}

type calculateUnprocessedEventsTestData struct {
partitionInfo *eventhub.HubPartitionRuntimeInformation
checkpoint azure.Checkpoint
unprocessedEvents int64
}

var sampleEventHubResolvedEnv = map[string]string{eventHubConnectionSetting: eventHubsConnection, storageConnectionSetting: "none"}

var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{
Expand Down Expand Up @@ -202,6 +208,57 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat
},
}

var calculateUnprocessedEventsDataset = []calculateUnprocessedEventsTestData{
{
checkpoint: azure.NewCheckpoint("1", 5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"},
unprocessedEvents: 5,
},
{
checkpoint: azure.NewCheckpoint("1002", 4611686018427387903),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"},
unprocessedEvents: 2,
},
{
checkpoint: azure.NewCheckpoint("900", 4611686018427387900),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"},
unprocessedEvents: 5,
},
{
checkpoint: azure.NewCheckpoint("800", 4000000000000200000),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4000000000000000000, LastEnqueuedOffset: "750"},
unprocessedEvents: 9223372036854575807,
},
// Empty checkpoint
{
checkpoint: azure.NewCheckpoint("", 0),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 1, LastEnqueuedOffset: "1"},
unprocessedEvents: 2,
},
// Stale PartitionInfo
{
checkpoint: azure.NewCheckpoint("5", 15),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"},
unprocessedEvents: 0,
},
{
checkpoint: azure.NewCheckpoint("1000", 4611686018427387910),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "900"},
unprocessedEvents: 0,
},
{
checkpoint: azure.NewCheckpoint("1", 5),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 9223372036854775797, LastEnqueuedOffset: "10000"},
unprocessedEvents: 0,
},
// Circular buffer reset
{
checkpoint: azure.NewCheckpoint("100000", 9223372036854775797),
partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 5, LastEnqueuedOffset: "1"},
unprocessedEvents: 15,
},
}

var eventHubMetricIdentifiers = []eventHubMetricIdentifier{
{&parseEventHubMetadataDataset[1], 0, "s0-azure-eventhub-testEventHubConsumerGroup"},
{&parseEventHubMetadataDataset[1], 1, "s1-azure-eventhub-testEventHubConsumerGroup"},
Expand Down Expand Up @@ -606,3 +663,12 @@ func TestEventHubGetMetricSpecForScaling(t *testing.T) {
}
}
}

func TestCalculateUnprocessedEvents(t *testing.T) {
for _, testData := range calculateUnprocessedEventsDataset {
v := calculateUnprocessedEvents(testData.partitionInfo, testData.checkpoint, defaultStalePartitionInfoThreshold)
if v != testData.unprocessedEvents {
t.Errorf("Wrong calculation: expected %d, got %d", testData.unprocessedEvents, v)
}
}
}

0 comments on commit 9be8ee6

Please sign in to comment.