-
Notifications
You must be signed in to change notification settings - Fork 29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[KEDA][AzureEventHub] App not scaling down #972
Comments
can you share testing code? |
using Azure.Identity;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs.Consumer;
var ehNamespace = "<EventHub NamespaceName>";
var ehSharedKeyName = "<KeyName>";
var ehSharedKey = "<KeyValue>";
var ehName = "<EventHubName>";
var consumerGroup = "<ConsumerGroupName>";
var storageAccountUrl = "<StorageAccountUrl>";
var storageContainerName = "<ContainerName>";
// EH Client
var ehConsumerClient = new EventHubConsumerClient(consumerGroup, $"Endpoint=sb://{ehNamespace}.servicebus.windows.net/;SharedAccessKeyName={ehSharedKeyName};SharedAccessKey={ehSharedKey};EntityPath={ehName}");
var partitionIds = await ehConsumerClient.GetPartitionIdsAsync();
// Checkpoint blobs from SA
var checkpointBlobs = await GetCheckpointBlobClients(new Uri(storageAccountUrl), storageContainerName, ehNamespace, ehName, consumerGroup);
// Get Unprocessed events count - each 30sec
while (true)
{
var checkpoints = await GetCheckpoints(checkpointBlobs);
long unprocessed = 0;
foreach (var partitionId in partitionIds)
{
var props = await ehConsumerClient.GetPartitionPropertiesAsync(partitionId);
unprocessed += props.LastEnqueuedSequenceNumber - checkpoints[partitionId].sequencenumber;
}
Console.WriteLine($"{DateTime.UtcNow} - Unprocessed: {unprocessed}");
await Task.Delay(TimeSpan.FromSeconds(30));
}
static async Task<Dictionary<string, (long offset, long sequencenumber)>> GetCheckpoints(IList<(string partitionId, BlobClient blobClient)> checkpointBlobClients)
{
var checkpoints = new Dictionary<string, (long offset, long sequencenumber)>();
foreach (var checkpoint in checkpointBlobClients)
{
var props = await checkpoint.blobClient.GetPropertiesAsync();
var offset = long.Parse(props.Value.Metadata["offset"]);
var sequenceNumber = long.Parse(props.Value.Metadata["sequencenumber"]);
checkpoints[checkpoint.partitionId] = (offset, sequenceNumber);
}
return checkpoints;
}
static async Task<IList<(string partitionId, BlobClient blobClient)>> GetCheckpointBlobClients(Uri storageAccountUrl, string containerName, string ehNamespace, string ehName, string consumerGroup)
{
var blobServiceClient = new BlobServiceClient(storageAccountUrl, new DefaultAzureCredential());
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);
var checkpointBlobs = new List<(string partitionId, BlobClient blobClient)>();
await foreach (BlobItem blobItem in containerClient.GetBlobsAsync(prefix: $"{ehNamespace}.servicebus.windows.net/{ehName}/{consumerGroup}/checkpoint"))
{
var partitionId = blobItem.Name.Substring(blobItem.Name.LastIndexOf('/') + 1);
var blobClient = containerClient.GetBlobClient(blobItem.Name);
checkpointBlobs.Add((partitionId, blobClient));
}
return checkpointBlobs;
} |
We are seeing the same issue with Redis Streams. App successfully scaled to 10 replicas but didn't scale down after all messages had been ack'd |
in our case we screwed up one of the secrets. but without that verbose logging, we had no idea keda was rejected its inputs and scaling out due to that. so i suspect that'll be your issue too, it's just kinda hard to tell what/why without visibility if you're not really careful to inspect every value/secret given to keda via ACA. |
I am encountering the same issue. I created the same |
Any updates on this? I am also facing same issue. |
I'm facing a similar issue... running 4 container apps with the KEDA I've reviewed the configuration and found that actually the two apps that were fine, in reality were not properly configured. When the configuration was adjusted, they started suffering from the same issue.
Are there any updated on this? |
Alright... sorted out my own issue. Looking at the latest version of the scaler (2.14), I've found this new parameter (or at least, I don't remember seeing it before).
And a bit further it says
It came as a surprise that the default would be legacy checkpointing, to be honest, I expected the other way around. Nonetheless, after setting this to Hopefully this will help someone in a similar situation. |
This is related to this KEDA issue kedacore/keda#6084. The KEDA team fixed it and released it in v2.16 kedacore/keda#6260. Can we upgrade the ACA to use this version of KEDA? @tomkerkhove , could you please help take a look? Thanks |
I don't work on Azure Container Apps so can't help - Sorry. This usually takes some time though, KEDA releases need to mature first before building an SLA-based service on top of it. |
Thanks @tomkerkhove for your information. |
This issue is a: (mark with an x)
Issue description
AzureContainerApp is processing data from EventHub and is configured as follows:
and is using KEDA Scale Rule of type
azure-eventhub
with following settings:When there was a high count of unprocessed messages, it was scaled to its defined maximum - 10 replicas.
However even the unprocessed count is low for long time now, App is not scaling down and stays in 10 replicas.
I created testing code to detect unprocessed count in EventHub, run it every approx. 30 secs (similar interval as scale rule evaluation) with following results:
In the Keda Source code I found that actual metrics are being logged: keda/pkg/scalers/azure_eventhub_scaler.go, lines 389, 396. These logs are not available in Azure Log Analytics. How to enable verbose logging for Keda?
Steps to reproduce
azure-eventhub
scale rule as described aboveExpected behavior
When the Unprocessed count will be lower than threshold defined, App should scale back down (gradually down to minimum replica count)
Actual behavior
Replica count stays at maximal count.
The text was updated successfully, but these errors were encountered: