From dac69cf95f3543897bb497a256ad5c76e56a297c Mon Sep 17 00:00:00 2001 From: Dmytro Rakhmanov Date: Wed, 6 Mar 2024 15:44:04 +0200 Subject: [PATCH] Adding default correlation headers to configure additional message correlation for Azure Service Bus. (#1497) * Adding option to disable automation message correlation * Added correlation properties collection --- .../en/transport/azure-service-bus.md | 23 ++++++++++--------- .../AzureServiceBusConsumerClient.cs | 11 +++++++-- .../CAP.AzureServiceBusOptions.cs | 6 +++++ 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/docs/content/user-guide/en/transport/azure-service-bus.md b/docs/content/user-guide/en/transport/azure-service-bus.md index 5ff71baa7..4d1f16e0c 100644 --- a/docs/content/user-guide/en/transport/azure-service-bus.md +++ b/docs/content/user-guide/en/transport/azure-service-bus.md @@ -39,17 +39,18 @@ public void ConfigureServices(IServiceCollection services) The AzureServiceBus configuration options provided directly by the CAP: -| NAME | DESCRIPTION | TYPE | DEFAULT | -| :---------------------- | :------------------------------------------------------------------------------------------------------------------------------------------ | ---------------------------------------------------- | :---------------- | -| ConnectionString | Endpoint address | string | | -| EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false | -| TopicPath | Topic entity path | string | cap | -| SubscriptionAutoDeleteOnIdle | Automatically delete subscription after a certain idle interval. | TimeSpan | TimeSpan.MaxValue | -| ManagementTokenProvider | Token provider | ITokenProvider | null | -| AutoCompleteMessages | Gets a value that indicates whether the processor should automatically complete messages after the message handler has completed processing | bool | false | -| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | `Func>>?` | null | -| Namespace | Namespace of Servicebus , Needs to be set when using with TokenCredential Property | string | null | -| SQLFilters | Custom SQL Filters by name and expression on Topic Subscribtion | List> | null | +| NAME | DESCRIPTION | TYPE | DEFAULT | +|:------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|:----------------------------------| +| ConnectionString | Endpoint address | string | | +| EnableSessions | Enable [Service bus sessions](https://docs.microsoft.com/en-us/azure/service-bus-messaging/message-sessions) | bool | false | +| TopicPath | Topic entity path | string | cap | +| SubscriptionAutoDeleteOnIdle | Automatically delete subscription after a certain idle interval. | TimeSpan | TimeSpan.MaxValue | +| ManagementTokenProvider | Token provider | ITokenProvider | null | +| AutoCompleteMessages | Gets a value that indicates whether the processor should automatically complete messages after the message handler has completed processing | bool | false | +| CustomHeaders | Adds custom and/or mandatory Headers for incoming messages from heterogeneous systems. | `Func>>?` | null | +| Namespace | Namespace of Servicebus , Needs to be set when using with TokenCredential Property | string | null | +| DefaultCorrelationHeaders | Adds additional correlation properties to all [correlation filters](https://learn.microsoft.com/en-us/azure/service-bus-messaging/topic-filters#correlation-filters). | IDictionary | Dictionary.Empty | +| SQLFilters | Custom SQL Filters by name and expression on Topic Subscribtion | List> | null | #### Sessions diff --git a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs index 4875a5610..71b2cd4ba 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs @@ -54,10 +54,10 @@ public void Subscribe(IEnumerable topics) ConnectAsync().GetAwaiter().GetResult(); topics = topics.Concat(_asbOptions!.SQLFilters?.Select(o => o.Key) ?? Enumerable.Empty()); + var allRules = _administrationClient!.GetRulesAsync(_asbOptions!.TopicPath, _subscriptionName).ToEnumerable(); var allRuleNames = allRules.Select(o => o.Name); - foreach (var newRule in topics.Except(allRuleNames)) { var isSqlRule = _asbOptions.SQLFilters?.FirstOrDefault(o => o.Key == newRule).Value is not null; @@ -71,10 +71,17 @@ public void Subscribe(IEnumerable topics) } else { - currentRuleToAdd = new CorrelationRuleFilter + var correlationRule = new CorrelationRuleFilter { Subject = newRule }; + + foreach (var correlationHeader in _asbOptions.DefaultCorrelationHeaders) + { + correlationRule.ApplicationProperties.Add(correlationHeader.Key, correlationHeader.Value); + } + + currentRuleToAdd = correlationRule; } _administrationClient.CreateRuleAsync(_asbOptions.TopicPath, _subscriptionName, diff --git a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs index 38aeabbd7..0e7eedd9d 100644 --- a/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs +++ b/src/DotNetCore.CAP.AzureServiceBus/CAP.AzureServiceBusOptions.cs @@ -55,6 +55,12 @@ public class AzureServiceBusOptions /// public bool AutoCompleteMessages { get; set; } + /// + /// Adds additional correlation properties to all correlation filters. + /// https://learn.microsoft.com/en-us/azure/service-bus-messaging/topic-filters#correlation-filters + /// + public IDictionary DefaultCorrelationHeaders { get; } = new Dictionary(); + /// /// Gets the maximum number of concurrent calls to the ProcessMessageAsync message handler the processor should initiate. ///