Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump RabbitMQ.Client from 6.8.1 to 7.0.0 #1222

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public record QueueState(string QueueName, long ReadyMessages, long ConsumerCoun

public interface IQueueMonitor
{
Task<QueueState> GetCurrentState();
Task<QueueState> GetCurrentStateAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public QueueMonitorMetricsCollector(IEnumerable<IQueueMonitor> queueMonitors)
public void Collect(IMetricsWriter writer)
{
var tasks = _queueMonitors
.Select(m => m.GetCurrentState());
.Select(m => m.GetCurrentStateAsync());
var states = Task.WhenAll(tasks).Result;
WriteMessagesReadyMetric(writer, states);
WriteActiveConsumersMetric(writer, states);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static void WriteCloudEventIntoHeader(this IBasicProperties self, MotorCl
self.ContentEncoding = cloudEvent.GetEncoding();
self.ContentType = cloudEvent.ContentType;

var headers = new Dictionary<string, object>();
var headers = new Dictionary<string, object?>();

var attributesToConsider = cloudEvent.GetPopulatedAttributes()
.Where(t => !IgnoredAttributes.Contains(t.Key));
Expand All @@ -53,11 +53,11 @@ public static void WriteCloudEventIntoHeader(this IBasicProperties self, MotorCl
self.Headers = headers;
}

public static MotorCloudEvent<byte[]> ExtractCloudEvent(this IBasicProperties self,
public static MotorCloudEvent<byte[]> ExtractCloudEvent(this IReadOnlyBasicProperties self,
IApplicationNameService applicationNameService, ReadOnlyMemory<byte> body)
{
var attributes = new Dictionary<string, object>();
IDictionary<string, object> headers = new Dictionary<string, object>();
var attributes = new Dictionary<string, object?>();
IDictionary<string, object?> headers = new Dictionary<string, object?>();
if (self.IsHeadersPresent() && self.Headers is not null)
{
headers = self.Headers;
Expand Down
3 changes: 3 additions & 0 deletions src/Motor.Extensions.Hosting.RabbitMQ/LogEvents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ public static class LogEvents
public static readonly EventId CriticalFailureOnConsume = new(1, nameof(CriticalFailureOnConsume));
public static readonly EventId QueueStateRetrieval = new(2, nameof(QueueStateRetrieval));
public static readonly EventId QueueStateRetrievalFailed = new(3, nameof(QueueStateRetrievalFailed));
public static readonly EventId ChannelNullAfterProcessingComplete =
new(4, nameof(ChannelNullAfterProcessingComplete));
public static readonly EventId ConsumerNotStarted = new(5, nameof(ConsumerNotStarted));
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="9.0.1" />
<PackageReference Include="CloudNative.CloudEvents.SystemTextJson" Version="2.8.0" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ public record RabbitMQBindingOptions
[NotWhitespaceOrEmpty]
public string Exchange { get; set; } = string.Empty;

public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
public IDictionary<string, object?> Arguments { get; set; } = new Dictionary<string, object?>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ public record RabbitMQQueueOptions : RabbitMQQueueLimitOptions
public bool Durable { get; set; } = true;
public bool AutoDelete { get; set; }
public QueueMode Mode { get; set; } = QueueMode.Default;
public IDictionary<string, object> Arguments { get; set; } = new Dictionary<string, object>();
public IDictionary<string, object?> Arguments { get; set; } = new Dictionary<string, object?>();
public RabbitMQDeadLetterExchangeOptions? DeadLetterExchange { get; set; }
}
29 changes: 15 additions & 14 deletions src/Motor.Extensions.Hosting.RabbitMQ/RabbitMQConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.ComponentModel.DataAnnotations;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using Motor.Extensions.Hosting.RabbitMQ.Options;
using RabbitMQ.Client;

Expand All @@ -24,27 +25,27 @@ public interface IRabbitMQConnectionFactory<T> : IDisposable
/// </summary>
string VirtualHost { get; }

IConnection CurrentConnection { get; }
IModel CurrentChannel { get; }
IConnection CreateConnection();
IModel CreateModel();
Task<IConnection> CurrentConnectionAsync();
Task<IChannel> CurrentChannelAsync();
Task<IConnection> CreateConnectionAsync(CancellationToken ct);
Task<IChannel> CreateChannelAsync(CancellationToken ct);
}

public sealed class RabbitMQConnectionFactory<TC> : IRabbitMQConnectionFactory<TC>
{
private readonly IConnectionFactory _connectionFactory;
private readonly Lazy<IConnection> _lazyConnection;
private readonly Lazy<IModel> _lazyChannel;
private readonly Lazy<Task<IConnection>> _lazyConnection;
private readonly Lazy<Task<IChannel>> _lazyChannel;

public RabbitMQConnectionFactory(IConnectionFactory connectionFactory)
{
_connectionFactory = connectionFactory;
_lazyConnection = new Lazy<IConnection>(
() => _connectionFactory.CreateConnection(),
_lazyConnection = new Lazy<Task<IConnection>>(
async () => await _connectionFactory.CreateConnectionAsync(),
LazyThreadSafetyMode.ExecutionAndPublication
);
_lazyChannel = new Lazy<IModel>(
() => CurrentConnection.CreateModel(),
_lazyChannel = new Lazy<Task<IChannel>>(
async () => await (await CurrentConnectionAsync()).CreateChannelAsync(),
LazyThreadSafetyMode.ExecutionAndPublication
);
}
Expand Down Expand Up @@ -80,13 +81,13 @@ private static IConnectionFactory FromConfig(RabbitMQBaseOptions baseOptions)
public string Password => _connectionFactory.Password;
public string VirtualHost => _connectionFactory.VirtualHost;

public IConnection CurrentConnection => _lazyConnection.Value;
public async Task<IConnection> CurrentConnectionAsync() => await _lazyConnection.Value;

public IModel CurrentChannel => _lazyChannel.Value;
public async Task<IChannel> CurrentChannelAsync() => await _lazyChannel.Value;

public IConnection CreateConnection() => _connectionFactory.CreateConnection();
public async Task<IConnection> CreateConnectionAsync(CancellationToken ct) => await _connectionFactory.CreateConnectionAsync(ct);

public IModel CreateModel() => CurrentConnection.CreateModel();
public async Task<IChannel> CreateChannelAsync(CancellationToken ct) => await (await CurrentConnectionAsync()).CreateChannelAsync(cancellationToken: ct);

public void Dispose()
{
Expand Down
Loading
Loading