Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.AzureServiceBus] Create subscriptions in parallel #368

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 103 additions & 103 deletions src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
namespace SlimMessageBus.Host.AzureServiceBus;

using System;
using System.Data;

public class ServiceBusTopologyService
{
private readonly ILogger<ServiceBusTopologyService> _logger;
Expand Down Expand Up @@ -119,7 +116,7 @@ private Task<TopologyCreationStatus> TryCreateRule(string path, string subscript
return TopologyCreationStatus.NotExists;
}

_logger.LogInformation("Creating options: {RuleName} on subscription {SubscriptionName} on topic: {Path} ...", options.Name, subscriptionName, path);
_logger.LogInformation("Creating rule: {RuleName} on subscription {SubscriptionName} on topic: {Path} ...", options.Name, subscriptionName, path);
await _adminClient.CreateRuleAsync(path, subscriptionName, options);

return TopologyCreationStatus.Exists | TopologyCreationStatus.Created;
Expand All @@ -133,7 +130,7 @@ private Task<TopologyCreationStatus> TryDeleteRule(string path, string subscript
return TopologyCreationStatus.Exists;
}

_logger.LogInformation("Replacing options: removing {RuleName} on subscription {SubscriptionName} on topic: {Path} ...", name, subscriptionName, path);
_logger.LogInformation("Replacing rule: removing {RuleName} on subscription {SubscriptionName} on topic: {Path} ...", name, subscriptionName, path);
await _adminClient.DeleteRuleAsync(path, subscriptionName, name);

return TopologyCreationStatus.Exists | TopologyCreationStatus.Created;
Expand All @@ -147,12 +144,105 @@ private Task<TopologyCreationStatus> TryUpdateRule(string path, string subscript
return TopologyCreationStatus.NotExists;
}

_logger.LogInformation("Updating options: {RuleName} on subscription {SubscriptionName} on topic: {Path} ...", options.Name, subscriptionName, path);
_logger.LogInformation("Updating rule: {RuleName} on subscription {SubscriptionName} on topic: {Path} ...", options.Name, subscriptionName, path);
await _adminClient.UpdateRuleAsync(path, subscriptionName, options);

return TopologyCreationStatus.Exists | TopologyCreationStatus.Updated;
});

private async Task PrepareSubscription(ServiceBusTopologySettings topologyProvisioning, string path, string subscriptionName, IList<AbstractConsumerSettings> consumerSettingsGroup)
{
var subscriptionStatus = await TryCreateSubscription(path, subscriptionName, () =>
{
void ThrowOnFalse(bool value, string settingName)
{
if (value)
{
return;
}

var topicSubscription = new TopicSubscriptionParams(path, subscriptionName);
throw new ConfigurationMessageBusException($"All {nameof(CreateSubscriptionOptions)} instances across the same path/subscription {topicSubscription} must have the same {settingName} settings.");
}

var options = consumerSettingsGroup.Aggregate((CreateSubscriptionOptions)null, (acc, consumerSettings) =>
{
var options = new CreateSubscriptionOptions(path, subscriptionName);
_providerSettings.TopologyProvisioning?.CreateSubscriptionOptions?.Invoke(options);
options.RequiresSession = consumerSettings.GetEnableSession();

consumerSettings.GetSubscriptionOptions()?.Invoke(options);
if (acc != null && !ReferenceEquals(acc, options))
{
ThrowOnFalse(acc.AutoDeleteOnIdle.Equals(options.AutoDeleteOnIdle), nameof(options.AutoDeleteOnIdle));
ThrowOnFalse(acc.DefaultMessageTimeToLive.Equals(options.DefaultMessageTimeToLive), nameof(options.DefaultMessageTimeToLive));
ThrowOnFalse(acc.EnableBatchedOperations == options.EnableBatchedOperations, nameof(options.EnableBatchedOperations));
ThrowOnFalse(acc.DeadLetteringOnMessageExpiration == options.DeadLetteringOnMessageExpiration, nameof(options.DeadLetteringOnMessageExpiration));
ThrowOnFalse(acc.EnableDeadLetteringOnFilterEvaluationExceptions == options.EnableDeadLetteringOnFilterEvaluationExceptions, nameof(options.EnableDeadLetteringOnFilterEvaluationExceptions));
ThrowOnFalse(string.Equals(acc.ForwardDeadLetteredMessagesTo, options.ForwardDeadLetteredMessagesTo, StringComparison.OrdinalIgnoreCase), nameof(options.ForwardDeadLetteredMessagesTo));
ThrowOnFalse(string.Equals(acc.ForwardTo, options.ForwardTo, StringComparison.OrdinalIgnoreCase), nameof(options.ForwardTo));
ThrowOnFalse(acc.LockDuration.Equals(options.LockDuration), nameof(options.LockDuration));
ThrowOnFalse(acc.MaxDeliveryCount == options.MaxDeliveryCount, nameof(options.MaxDeliveryCount));
ThrowOnFalse(acc.RequiresSession.Equals(options.RequiresSession), nameof(options.RequiresSession));
ThrowOnFalse(acc.Status.Equals(options.Status), nameof(options.Status));
ThrowOnFalse(string.Equals(acc.UserMetadata, options.UserMetadata, StringComparison.OrdinalIgnoreCase), nameof(options.UserMetadata));
}

return options;
});

return options;
}).ConfigureAwait(false);

if ((subscriptionStatus & TopologyCreationStatus.Exists) != 0 && (topologyProvisioning.CanConsumerValidateSubscriptionFilters || topologyProvisioning.CanConsumerCreateSubscriptionFilter || topologyProvisioning.CanConsumerReplaceSubscriptionFilters))
{
var tasks = new List<Task>();
var rules = MergeFilters(path, subscriptionName, consumerSettingsGroup).ToDictionary(x => x.Name, x => x);

await foreach (var page in _adminClient.GetRulesAsync(path, subscriptionName).AsPages())
{
foreach (var serviceRule in page.Values)
{
if (!rules.TryGetValue(serviceRule.Name, out var rule))
{
// server rule was not defined in SMB
if ((rules.Count > 0 || serviceRule.Name != RuleProperties.DefaultRuleName) && ((subscriptionStatus & TopologyCreationStatus.Created) != 0 || topologyProvisioning.CanConsumerReplaceSubscriptionFilters))
{
// Note: for a newly created subscription, ASB creates a $Default filter automatically
// We need to remove the filter if its not matching what is declared in SMB and let the user defined rules take over
// On the other hand if there are no user defined rules, we need to preserve the $Default filter
tasks.Add(TryDeleteRule(path, subscriptionName, serviceRule.Name));
}
continue;
}

if (rule.Filter.Equals(serviceRule.Filter) && ((rule.Action == null && serviceRule.Action == null) || (rule.Action.Equals(serviceRule.Action))))
{
// server rule matched what is defined in SMB
rules.Remove(serviceRule.Name);
continue;
}

if (topologyProvisioning.CanConsumerReplaceSubscriptionFilters)
{
// update existing rule
serviceRule.Filter = rule.Filter;
serviceRule.Action = rule.Action;
tasks.Add(TryUpdateRule(path, subscriptionName, serviceRule));
rules.Remove(serviceRule.Name);
}
}

if (topologyProvisioning.CanConsumerCreateSubscriptionFilter)
{
tasks.AddRange(rules.Values.Select(options => TryCreateRule(path, subscriptionName, options)));
}
}

await Task.WhenAll(tasks).ConfigureAwait(false);
}
}

public Task ProvisionTopology() => _providerSettings.TopologyProvisioning.OnProvisionTopology(_adminClient, DoProvisionTopology);

protected async Task DoProvisionTopology()
Expand Down Expand Up @@ -182,7 +272,7 @@ await TryCreateQueue(path, topologyProvisioning.CanConsumerCreateQueue, options

consumerSettings.GetQueueOptions()?.Invoke(options);
}
});
}).ConfigureAwait(false);
}
if (pathKind == PathKind.Topic)
{
Expand All @@ -192,107 +282,17 @@ await TryCreateQueue(path, topologyProvisioning.CanConsumerCreateQueue, options
{
consumerSettings.GetTopicOptions()?.Invoke(options);
}
});
}).ConfigureAwait(false);

if ((topicStatus & TopologyCreationStatus.Exists) != 0)
{
var consumerSettingsBySubscription = consumerSettingsList
var subscriptionPreperationTasks = consumerSettingsList
.Select(x => (ConsumerSettings: x, SubscriptionName: x.GetSubscriptionName(_providerSettings)))
.Where(x => x.SubscriptionName != null)
.GroupBy(x => x.SubscriptionName)
.ToDictionary(x => x.Key, x => x.Select(z => z.ConsumerSettings).ToList());
.Select(x => PrepareSubscription(topologyProvisioning, path, x.Key, [.. x.Select(z => z.ConsumerSettings)]));

foreach (var (subscriptionName, consumerSettingsGroup) in consumerSettingsBySubscription)
{
var subscriptionStatus = await TryCreateSubscription(path, subscriptionName, () =>
{
void ThrowOnFalse(bool value, string settingName)
{
if (value)
{
return;
}

var topicSubscription = new TopicSubscriptionParams(path, subscriptionName);
throw new ConfigurationMessageBusException($"All {nameof(CreateSubscriptionOptions)} instances across the same path/subscription {topicSubscription} must have the same {settingName} settings.");
}

var options = consumerSettingsGroup.Aggregate((CreateSubscriptionOptions)null, (acc, consumerSettings) =>
{
var options = new CreateSubscriptionOptions(path, subscriptionName);
_providerSettings.TopologyProvisioning?.CreateSubscriptionOptions?.Invoke(options);
options.RequiresSession = consumerSettings.GetEnableSession();

consumerSettings.GetSubscriptionOptions()?.Invoke(options);
if (acc != null && !ReferenceEquals(acc, options))
{
ThrowOnFalse(acc.AutoDeleteOnIdle.Equals(options.AutoDeleteOnIdle), nameof(options.AutoDeleteOnIdle));
ThrowOnFalse(acc.DefaultMessageTimeToLive.Equals(options.DefaultMessageTimeToLive), nameof(options.DefaultMessageTimeToLive));
ThrowOnFalse(acc.EnableBatchedOperations == options.EnableBatchedOperations, nameof(options.EnableBatchedOperations));
ThrowOnFalse(acc.DeadLetteringOnMessageExpiration == options.DeadLetteringOnMessageExpiration, nameof(options.DeadLetteringOnMessageExpiration));
ThrowOnFalse(acc.EnableDeadLetteringOnFilterEvaluationExceptions == options.EnableDeadLetteringOnFilterEvaluationExceptions, nameof(options.EnableDeadLetteringOnFilterEvaluationExceptions));
ThrowOnFalse(string.Equals(acc.ForwardDeadLetteredMessagesTo, options.ForwardDeadLetteredMessagesTo, StringComparison.OrdinalIgnoreCase), nameof(options.ForwardDeadLetteredMessagesTo));
ThrowOnFalse(string.Equals(acc.ForwardTo, options.ForwardTo, StringComparison.OrdinalIgnoreCase), nameof(options.ForwardTo));
ThrowOnFalse(acc.LockDuration.Equals(options.LockDuration), nameof(options.LockDuration));
ThrowOnFalse(acc.MaxDeliveryCount == options.MaxDeliveryCount, nameof(options.MaxDeliveryCount));
ThrowOnFalse(acc.RequiresSession.Equals(options.RequiresSession), nameof(options.RequiresSession));
ThrowOnFalse(acc.Status.Equals(options.Status), nameof(options.Status));
ThrowOnFalse(string.Equals(acc.UserMetadata, options.UserMetadata, StringComparison.OrdinalIgnoreCase), nameof(options.UserMetadata));
}

return options;
});

return options;
});

if ((subscriptionStatus & TopologyCreationStatus.Exists) != 0 && (topologyProvisioning.CanConsumerValidateSubscriptionFilters || topologyProvisioning.CanConsumerCreateSubscriptionFilter || topologyProvisioning.CanConsumerReplaceSubscriptionFilters))
{
var tasks = new List<Task>();
var rules = MergeFilters(path, subscriptionName, consumerSettingsGroup).ToDictionary(x => x.Name, x => x);

await foreach (var page in _adminClient.GetRulesAsync(path, subscriptionName).AsPages())
{
foreach (var serviceRule in page.Values)
{
if (!rules.TryGetValue(serviceRule.Name, out var rule))
{
// server rule was not defined in SMB
if ((rules.Count > 0 || serviceRule.Name != RuleProperties.DefaultRuleName) && ((subscriptionStatus & TopologyCreationStatus.Created) != 0 || topologyProvisioning.CanConsumerReplaceSubscriptionFilters))
{
// Note: for a newly created subscription, ASB creates a $Default filter automatically
// We need to remove the filter if its not matching what is declared in SMB and let the user defined rules take over
// On the other hand if there are no user defined rules, we need to preserve the $Default filter
tasks.Add(TryDeleteRule(path, subscriptionName, serviceRule.Name));
}
continue;
}

if (rule.Filter.Equals(serviceRule.Filter) && ((rule.Action == null && serviceRule.Action == null) || (rule.Action.Equals(serviceRule.Action))))
{
// server rule matched what is defined in SMB
rules.Remove(serviceRule.Name);
continue;
}

if (topologyProvisioning.CanConsumerReplaceSubscriptionFilters)
{
// update existing rule
serviceRule.Filter = rule.Filter;
serviceRule.Action = rule.Action;
tasks.Add(TryUpdateRule(path, subscriptionName, serviceRule));
rules.Remove(serviceRule.Name);
}
}

if (topologyProvisioning.CanConsumerCreateSubscriptionFilter)
{
tasks.AddRange(rules.Values.Select(options => TryCreateRule(path, subscriptionName, options)));
}
}
await Task.WhenAll(tasks);
}
}
await Task.WhenAll(subscriptionPreperationTasks).ConfigureAwait(false);
}
}
}
Expand All @@ -301,11 +301,11 @@ void ThrowOnFalse(bool value, string settingName)
{
if (producerSettings.PathKind == PathKind.Queue)
{
await TryCreateQueue(producerSettings.DefaultPath, topologyProvisioning.CanProducerCreateQueue, options => producerSettings.GetQueueOptions()?.Invoke(options));
await TryCreateQueue(producerSettings.DefaultPath, topologyProvisioning.CanProducerCreateQueue, options => producerSettings.GetQueueOptions()?.Invoke(options)).ConfigureAwait(false);
}
if (producerSettings.PathKind == PathKind.Topic)
{
await TryCreateTopic(producerSettings.DefaultPath, topologyProvisioning.CanProducerCreateTopic, options => producerSettings.GetTopicOptions()?.Invoke(options));
await TryCreateTopic(producerSettings.DefaultPath, topologyProvisioning.CanProducerCreateTopic, options => producerSettings.GetTopicOptions()?.Invoke(options)).ConfigureAwait(false);
}
}
}
Expand Down
Loading