diff --git a/CHANGELOG.md b/CHANGELOG.md index 631d4379059..7c8b72c9492 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -88,6 +88,7 @@ Here is an overview of all new **experimental** features: - **General**: Prevented stuck status due to timeouts during scalers generation ([#5083](https://github.com/kedacore/keda/issues/5083)) - **General**: ScaledObject Validating Webhook should support dry-run=server requests ([#5306](https://github.com/kedacore/keda/issues/5306)) - **AWS Scalers**: Ensure session tokens are included when instantiating AWS credentials ([#5156](https://github.com/kedacore/keda/issues/5156)) +- **Azure Event Hub Scaler**: Improve unprocessedEventThreshold calculation ([#4250](https://github.com/kedacore/keda/issues/4250)) - **Azure Pipelines**: No more HTTP 400 errors produced by poolName with spaces ([#5107](https://github.com/kedacore/keda/issues/5107)) - **GCP pubsub scaler**: Added `project_id` to filter for metrics queries ([#5256](https://github.com/kedacore/keda/issues/5256)) - **GCP pubsub scaler**: Missing use of default value of `value` added ([#5093](https://github.com/kedacore/keda/issues/5093)) diff --git a/pkg/scalers/azure/azure_eventhub_checkpoint.go b/pkg/scalers/azure/azure_eventhub_checkpoint.go index ad2ba3d0a93..28f3048aa10 100644 --- a/pkg/scalers/azure/azure_eventhub_checkpoint.go +++ b/pkg/scalers/azure/azure_eventhub_checkpoint.go @@ -92,6 +92,10 @@ type defaultCheckpointer struct { containerName string } +func NewCheckpoint(offset string, sequenceNumber int64) Checkpoint { + return Checkpoint{baseCheckpoint: baseCheckpoint{Offset: offset}, SequenceNumber: sequenceNumber} +} + // GetCheckpointFromBlobStorage reads depending of the CheckpointStrategy the checkpoint from a azure storage func GetCheckpointFromBlobStorage(ctx context.Context, httpClient util.HTTPDoer, info EventHubInfo, partitionID string) (Checkpoint, error) { checkpointer := newCheckpointer(info, partitionID) diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 41503f7ba17..25d2ff29e7b 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -38,13 +38,14 @@ import ( ) const ( - defaultEventHubMessageThreshold = 64 - eventHubMetricType = "External" - thresholdMetricName = "unprocessedEventThreshold" - activationThresholdMetricName = "activationUnprocessedEventThreshold" - defaultEventHubConsumerGroup = "$Default" - defaultBlobContainer = "" - defaultCheckpointStrategy = "" + defaultEventHubMessageThreshold = 64 + eventHubMetricType = "External" + thresholdMetricName = "unprocessedEventThreshold" + activationThresholdMetricName = "activationUnprocessedEventThreshold" + defaultEventHubConsumerGroup = "$Default" + defaultBlobContainer = "" + defaultCheckpointStrategy = "" + defaultStalePartitionInfoThreshold = 10000 ) type azureEventHubScaler struct { @@ -56,10 +57,11 @@ type azureEventHubScaler struct { } type eventHubMetadata struct { - eventHubInfo azure.EventHubInfo - threshold int64 - activationThreshold int64 - scalerIndex int + eventHubInfo azure.EventHubInfo + threshold int64 + activationThreshold int64 + stalePartitionInfoThreshold int64 + scalerIndex int } // NewAzureEventHubScaler creates a new scaler for eventHub @@ -178,6 +180,15 @@ func parseCommonAzureEventHubMetadata(config *ScalerConfig, meta *eventHubMetada } meta.eventHubInfo.ActiveDirectoryEndpoint = activeDirectoryEndpoint + meta.stalePartitionInfoThreshold = defaultStalePartitionInfoThreshold + if val, ok := config.TriggerMetadata["stalePartitionInfoThreshold"]; ok { + stalePartitionInfoThreshold, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return fmt.Errorf("error parsing azure eventhub metadata stalePartitionInfoThreshold: %w", err) + } + meta.stalePartitionInfoThreshold = stalePartitionInfoThreshold + } + meta.scalerIndex = config.ScalerIndex return nil @@ -286,35 +297,45 @@ func (s *azureEventHubScaler) GetUnprocessedEventCountInPartition(ctx context.Co return -1, azure.Checkpoint{}, fmt.Errorf("unable to get checkpoint from storage: %w", err) } - unprocessedEventCountInPartition := int64(0) + unprocessedEventCountInPartition := calculateUnprocessedEvents(partitionInfo, checkpoint, s.metadata.stalePartitionInfoThreshold) + + return unprocessedEventCountInPartition, checkpoint, nil +} + +func calculateUnprocessedEvents(partitionInfo *eventhub.HubPartitionRuntimeInformation, checkpoint azure.Checkpoint, stalePartitionInfoThreshold int64) int64 { + unprocessedEventCount := int64(0) // If checkpoint.Offset is empty that means no messages has been processed from an event hub partition // And since partitionInfo.LastSequenceNumber = 0 for the very first message hence // total unprocessed message will be partitionInfo.LastSequenceNumber + 1 if checkpoint.Offset == "" { - unprocessedEventCountInPartition = partitionInfo.LastSequenceNumber + 1 - return unprocessedEventCountInPartition, checkpoint, nil + unprocessedEventCount = partitionInfo.LastSequenceNumber + 1 + return unprocessedEventCount } if partitionInfo.LastSequenceNumber >= checkpoint.SequenceNumber { - unprocessedEventCountInPartition = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber - return unprocessedEventCountInPartition, checkpoint, nil + unprocessedEventCount = partitionInfo.LastSequenceNumber - checkpoint.SequenceNumber + } else { + // Partition is a circular buffer, so it is possible that + // partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber + + // Checkpointing may or may not be always behind partition's LastSequenceNumber. + // The partition information read could be stale compared to checkpoint, + // especially when load is very small and checkpointing is happening often. + // This also results in partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber + // e.g., (9223372036854775807 - 15) + 10 = 9223372036854775802 + + // Calculate the unprocessed events + unprocessedEventCount = (math.MaxInt64 - checkpoint.SequenceNumber) + partitionInfo.LastSequenceNumber } - // Partition is a circular buffer, so it is possible that - // partitionInfo.LastSequenceNumber < blob checkpoint's SequenceNumber - unprocessedEventCountInPartition = (math.MaxInt64 - checkpoint.SequenceNumber) + partitionInfo.LastSequenceNumber - - // Checkpointing may or may not be always behind partition's LastSequenceNumber. - // The partition information read could be stale compared to checkpoint, - // especially when load is very small and checkpointing is happening often. - // e.g., (9223372036854775807 - 10) + 11 = -9223372036854775808 - // If unprocessedEventCountInPartition is negative that means there are 0 unprocessed messages in the partition - if unprocessedEventCountInPartition < 0 { - unprocessedEventCountInPartition = 0 + // If the result is greater than the buffer size - stale partition threshold + // we assume the partition info is stale. + if unprocessedEventCount > (math.MaxInt64 - stalePartitionInfoThreshold) { + return 0 } - return unprocessedEventCountInPartition, checkpoint, nil + return unprocessedEventCount } // GetUnprocessedEventCountWithoutCheckpoint returns the number of messages on the without a checkoutpoint info @@ -386,8 +407,14 @@ func (s *azureEventHubScaler) GetMetricsAndActivity(ctx context.Context, metricN totalUnprocessedEventCount += unprocessedEventCount - s.logger.V(1).Info(fmt.Sprintf("Partition ID: %s, Last Enqueued Offset: %s, Checkpoint Offset: %s, Total new events in partition: %d", - partitionRuntimeInfo.PartitionID, partitionRuntimeInfo.LastEnqueuedOffset, checkpoint.Offset, unprocessedEventCount)) + s.logger.V(1).Info(fmt.Sprintf("Partition ID: %s, Last SequenceNumber: %d, Checkpoint SequenceNumber: %d, Total new events in partition: %d", + partitionRuntimeInfo.PartitionID, partitionRuntimeInfo.LastSequenceNumber, checkpoint.SequenceNumber, unprocessedEventCount)) + } + + // set count to max if the sum is negative (Int64 overflow) to prevent negative metric values + // e.g., 9223372036854775797 (Partition 1) + 20 (Partition 2) = -9223372036854775799 + if totalUnprocessedEventCount < 0 { + totalUnprocessedEventCount = math.MaxInt64 } // don't scale out beyond the number of partitions diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 1d044e44083..b1e5d678deb 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -43,6 +43,12 @@ type eventHubMetricIdentifier struct { name string } +type calculateUnprocessedEventsTestData struct { + partitionInfo *eventhub.HubPartitionRuntimeInformation + checkpoint azure.Checkpoint + unprocessedEvents int64 +} + var sampleEventHubResolvedEnv = map[string]string{eventHubConnectionSetting: eventHubsConnection, storageConnectionSetting: "none"} var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ @@ -202,6 +208,57 @@ var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestDat }, } +var calculateUnprocessedEventsDataset = []calculateUnprocessedEventsTestData{ + { + checkpoint: azure.NewCheckpoint("1", 5), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"}, + unprocessedEvents: 5, + }, + { + checkpoint: azure.NewCheckpoint("1002", 4611686018427387903), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"}, + unprocessedEvents: 2, + }, + { + checkpoint: azure.NewCheckpoint("900", 4611686018427387900), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "1000"}, + unprocessedEvents: 5, + }, + { + checkpoint: azure.NewCheckpoint("800", 4000000000000200000), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4000000000000000000, LastEnqueuedOffset: "750"}, + unprocessedEvents: 9223372036854575807, + }, + // Empty checkpoint + { + checkpoint: azure.NewCheckpoint("", 0), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 1, LastEnqueuedOffset: "1"}, + unprocessedEvents: 2, + }, + // Stale PartitionInfo + { + checkpoint: azure.NewCheckpoint("5", 15), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 10, LastEnqueuedOffset: "2"}, + unprocessedEvents: 0, + }, + { + checkpoint: azure.NewCheckpoint("1000", 4611686018427387910), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 4611686018427387905, LastEnqueuedOffset: "900"}, + unprocessedEvents: 0, + }, + { + checkpoint: azure.NewCheckpoint("1", 5), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 9223372036854775797, LastEnqueuedOffset: "10000"}, + unprocessedEvents: 0, + }, + // Circular buffer reset + { + checkpoint: azure.NewCheckpoint("100000", 9223372036854775797), + partitionInfo: &eventhub.HubPartitionRuntimeInformation{LastSequenceNumber: 5, LastEnqueuedOffset: "1"}, + unprocessedEvents: 15, + }, +} + var eventHubMetricIdentifiers = []eventHubMetricIdentifier{ {&parseEventHubMetadataDataset[1], 0, "s0-azure-eventhub-testEventHubConsumerGroup"}, {&parseEventHubMetadataDataset[1], 1, "s1-azure-eventhub-testEventHubConsumerGroup"}, @@ -606,3 +663,12 @@ func TestEventHubGetMetricSpecForScaling(t *testing.T) { } } } + +func TestCalculateUnprocessedEvents(t *testing.T) { + for _, testData := range calculateUnprocessedEventsDataset { + v := calculateUnprocessedEvents(testData.partitionInfo, testData.checkpoint, defaultStalePartitionInfoThreshold) + if v != testData.unprocessedEvents { + t.Errorf("Wrong calculation: expected %d, got %d", testData.unprocessedEvents, v) + } + } +}