Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Host.Core] Introduce multi-targeting for net8, net6 and netstandard2.0 #212

Merged
merged 1 commit into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Common.NuGet.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<Import Project="Common.Properties.xml" />

<PropertyGroup>
<TargetFrameworks>net8.0;net6.0;netstandard2.0</TargetFrameworks>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageReadmeFile>NuGet.md</PackageReadmeFile>
<PackageIcon>icon.png</PackageIcon>
Expand Down
2 changes: 1 addition & 1 deletion src/Common.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Product>SlimMessageBus</Product>
<Company>zarusz</Company>
<Authors>zarusz</Authors>
<Copyright>Copyright © 2023</Copyright>
<Copyright>Copyright © 2024</Copyright>
<PackageReleaseNotes>See https://github.com/zarusz/SlimMessageBus/releases</PackageReleaseNotes>
<RepositoryUrl>https://github.com/zarusz/SlimMessageBus</RepositoryUrl>
<PackageProjectUrl>https://github.com/zarusz/SlimMessageBus</PackageProjectUrl>
Expand Down
4 changes: 2 additions & 2 deletions src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFrameworks>netstandard2.1;net6.0;net8.0</TargetFrameworks>
<Version>2.2.1</Version>
<TargetFrameworks>netstandard2.0;net6.0;net8.0</TargetFrameworks>
<Version>2.2.2</Version>
</PropertyGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.AzureEventHub;

using System.Diagnostics.CodeAnalysis;

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Processor;

Expand Down Expand Up @@ -85,7 +83,7 @@ private async Task Checkpoint(ProcessEventArgs args)
public Task TryCheckpoint()
=> Checkpoint(_lastMessage);

protected static IReadOnlyDictionary<string, object> GetHeadersFromTransportMessage([NotNull] EventData e)
protected static IReadOnlyDictionary<string, object> GetHeadersFromTransportMessage(EventData e)
// Note: Try to see if the Properties are already IReadOnlyDictionary or Dictionary prior allocating a new collection
=> e.Properties as IReadOnlyDictionary<string, object> ?? new Dictionary<string, object>(e.Properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFrameworks>netstandard2.0;net6.0;net8.0</TargetFrameworks>
<Version>2.0.6</Version>
<Description>Core configuration interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
<RootNamespace>SlimMessageBus.Host</RootNamespace>
<Version>2.0.5</Version>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard1.3</TargetFramework>
<Version>2.0.1</Version>
<Version>2.0.2</Version>
<Description>Core interceptor interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;
using ConsumeResult = Confluent.Kafka.ConsumeResult<Confluent.Kafka.Ignore, byte[]>;

/// <summary>
Expand All @@ -10,9 +9,9 @@ public interface IKafkaPartitionConsumer : IDisposable
{
TopicPartition TopicPartition { get; }

void OnPartitionAssigned([NotNull] TopicPartition partition);
Task OnMessage([NotNull] ConsumeResult message);
void OnPartitionEndReached([NotNull] TopicPartitionOffset offset);
void OnPartitionAssigned(TopicPartition partition);
Task OnMessage(ConsumeResult message);
void OnPartitionEndReached(TopicPartitionOffset offset);
void OnPartitionRevoked();

void OnClose();
Expand Down
5 changes: 1 addition & 4 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

public static class KafkaExtensions
{
public static TopicPartitionOffset AddOffset([NotNull] this TopicPartitionOffset topicPartitionOffset, int addOffset)
public static TopicPartitionOffset AddOffset(this TopicPartitionOffset topicPartitionOffset, int addOffset)
=> new(topicPartitionOffset.TopicPartition, topicPartitionOffset.Offset + addOffset);

public static IReadOnlyDictionary<string, object> ToHeaders(this ConsumeResult<Ignore, byte[]> consumeResult, IMessageSerializer headerSerializer)
Expand Down
12 changes: 5 additions & 7 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using ConsumeResult = ConsumeResult<Ignore, byte[]>;
using IConsumer = IConsumer<Ignore, byte[]>;

Expand Down Expand Up @@ -175,7 +173,7 @@ protected override async Task OnStop()
}
}

protected virtual void OnPartitionAssigned([NotNull] ICollection<TopicPartition> partitions)
protected virtual void OnPartitionAssigned(ICollection<TopicPartition> partitions)
{
// Ensure processors exist for each assigned topic-partition
foreach (var partition in partitions)
Expand All @@ -187,7 +185,7 @@ protected virtual void OnPartitionAssigned([NotNull] ICollection<TopicPartition>
}
}

protected virtual void OnPartitionRevoked([NotNull] ICollection<TopicPartitionOffset> partitions)
protected virtual void OnPartitionRevoked(ICollection<TopicPartitionOffset> partitions)
{
foreach (var partition in partitions)
{
Expand All @@ -198,23 +196,23 @@ protected virtual void OnPartitionRevoked([NotNull] ICollection<TopicPartitionOf
}
}

protected virtual void OnPartitionEndReached([NotNull] TopicPartitionOffset offset)
protected virtual void OnPartitionEndReached(TopicPartitionOffset offset)
{
Logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);

var processor = _processors[offset.TopicPartition];
processor.OnPartitionEndReached(offset);
}

protected async virtual ValueTask OnMessage([NotNull] ConsumeResult message)
protected async virtual ValueTask OnMessage(ConsumeResult message)
{
Logger.LogDebug("Group [{Group}]: Received message with Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, payload size: {MessageSize}", Group, message.Topic, message.Partition, message.Offset, message.Message.Value?.Length ?? 0);

var processor = _processors[message.TopicPartition];
await processor.OnMessage(message).ConfigureAwait(false);
}

protected virtual void OnOffsetsCommitted([NotNull] CommittedOffsets e)
protected virtual void OnOffsetsCommitted(CommittedOffsets e)
{
if (e.Error.IsError || e.Error.IsFatal)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using ConsumeResult = ConsumeResult<Ignore, byte[]>;

public abstract class KafkaPartitionConsumer : IKafkaPartitionConsumer
Expand Down Expand Up @@ -30,7 +28,7 @@ protected KafkaPartitionConsumer(ILoggerFactory loggerFactory, AbstractConsumerS
_logger = loggerFactory.CreateLogger<KafkaPartitionConsumer>();

_logger.LogInformation("Creating consumer for Group: {Group}, Topic: {Topic}, Partition: {Partition}", group, topicPartition.Topic, topicPartition.Partition);

ConsumerSettings = consumerSettings ?? throw new ArgumentNullException(nameof(consumerSettings));
Group = group;
TopicPartition = topicPartition;
Expand Down Expand Up @@ -76,7 +74,7 @@ public void Dispose()

#region Implementation of IKafkaTopicPartitionProcessor

public void OnPartitionAssigned([NotNull] TopicPartition partition)
public void OnPartitionAssigned(TopicPartition partition)
{
_lastCheckpointOffset = null;
_lastOffset = null;
Expand All @@ -91,7 +89,7 @@ public void OnPartitionAssigned([NotNull] TopicPartition partition)
}
}

public async Task OnMessage([NotNull] ConsumeResult message)
public async Task OnMessage(ConsumeResult message)
{
if (_cancellationTokenSource.IsCancellationRequested)
{
Expand Down
21 changes: 8 additions & 13 deletions src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.Kafka;

using System.Diagnostics.CodeAnalysis;

using IProducer = Confluent.Kafka.IProducer<byte[], byte[]>;
using Message = Confluent.Kafka.Message<byte[], byte[]>;

Expand Down Expand Up @@ -156,15 +154,15 @@ protected override async Task ProduceToTransport(object message, string path, by
var deliveryResult = await task.ConfigureAwait(false);
if (deliveryResult.Status == PersistenceStatus.NotPersisted)
{
throw new ProducerMessageBusException($"Error while publish message {message} of type {messageType.Name} to topic {path}. Kafka persistence status: {deliveryResult.Status}");
throw new ProducerMessageBusException($"Error while publish message {message} of type {messageType?.Name} to topic {path}. Kafka persistence status: {deliveryResult.Status}");
}

// log some debug information
_logger.LogDebug("Message {Message} of type {MessageType} delivered to topic {Topic}, partition {Partition}, offset: {Offset}",
message, messageType?.Name, deliveryResult.Topic, deliveryResult.Partition, deliveryResult.Offset);
}

protected byte[] GetMessageKey(ProducerSettings producerSettings, [NotNull] Type messageType, object message, string topic)
protected byte[] GetMessageKey(ProducerSettings producerSettings, Type messageType, object message, string topic)
{
var keyProvider = producerSettings?.GetKeyProvider();
if (keyProvider != null)
Expand All @@ -173,27 +171,24 @@ protected byte[] GetMessageKey(ProducerSettings producerSettings, [NotNull] Type

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("The message {Message} type {MessageType} calculated key is {Key} (Base64)", message, messageType.Name, Convert.ToBase64String(key));
_logger.LogDebug("The message {Message} type {MessageType} calculated key is {Key} (Base64)", message, messageType?.Name, Convert.ToBase64String(key));
}

return key;
}
return Array.Empty<byte>();
return [];
}

private const int NoPartition = -1;

protected int GetMessagePartition(ProducerSettings producerSettings, [NotNull] Type messageType, object message, string topic)
protected int GetMessagePartition(ProducerSettings producerSettings, Type messageType, object message, string topic)
{
var partitionProvider = producerSettings.GetPartitionProvider();
if (partitionProvider != null)
{
var partition = partitionProvider(message, topic);

if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug("The Message {Message} type {MessageType} calculated partition is {Partition}", message, messageType.Name, partition);
}
var partition = partitionProvider(message, topic);

_logger.LogDebug("The Message {Message} type {MessageType} calculated partition is {Partition}", message, messageType?.Name, partition);

return partition;
}
Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected override async Task ProduceToTransport(object message, string path, by
{
var m = new MqttApplicationMessage
{
PayloadSegment = messagePayload,
PayloadSegment = new ArraySegment<byte>(messagePayload),
Topic = path
};

Expand Down
18 changes: 17 additions & 1 deletion src/SlimMessageBus.Host.Outbox.Sql/SqlOutboxRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,47 @@ protected virtual SqlCommand CreateCommand()
return cmd;
}

#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
public virtual SqlTransaction CurrentTransaction => _transaction;

public async virtual ValueTask BeginTransaction()
{
ValidateNoTransactionStarted();
#if NETSTANDARD2_0
_transaction = Connection.BeginTransaction(Settings.TransactionIsolationLevel);
#else
_transaction = (SqlTransaction)await Connection.BeginTransactionAsync(Settings.TransactionIsolationLevel);
#endif
}

public async virtual ValueTask CommitTransaction()
{
ValidateTransactionStarted();

#if NETSTANDARD2_0
_transaction.Commit();
_transaction.Dispose();
#else
await _transaction.CommitAsync();
await _transaction.DisposeAsync();
#endif
_transaction = null;
}

public async virtual ValueTask RollbackTransaction()
{
ValidateTransactionStarted();

#if NETSTANDARD2_0
_transaction.Rollback();
_transaction.Dispose();
#else
await _transaction.RollbackAsync();
await _transaction.DisposeAsync();
#endif
_transaction = null;
}
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

protected void ValidateNoTransactionStarted()
{
Expand Down Expand Up @@ -186,7 +202,7 @@ await ExecuteNonQuery(Settings.SchemaCreationRetry,
BEGIN
CREATE NONCLUSTERED INDEX [{indexName}] ON {_sqlTemplate.TableNameQualified}
(
{string.Join(',', columns.Select(c => $"{c} ASC"))}
{string.Join(",", columns.Select(c => $"{c} ASC"))}
)
END", token: token);
}
Expand Down
12 changes: 8 additions & 4 deletions src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
namespace SlimMessageBus.Host.RabbitMQ;

using System.Runtime;

using global::RabbitMQ.Client;

public class RabbitMqTopologyService
Expand Down Expand Up @@ -117,8 +115,14 @@ private string DeclareQueue(HasProviderExtensions settings, string queueName, Ac
_logger.LogInformation("Declaring queue {QueueName}, Durable: {Durable}, AutoDelete: {AutoDelete}, Exclusive: {Exclusive}", queueName, queueDurable, queueAutoDelete, queueExclusive);
try
{
var arguments = new Dictionary<string, object>(queueArguments ?? Enumerable.Empty<KeyValuePair<string, object>>());
argumentModifier?.Invoke(arguments);
var arguments = queueArguments;
if (argumentModifier != null)
{
arguments = arguments != null
? new Dictionary<string, object>(queueArguments) // make a copy of the arguments for the modifier to mutate
: [];
argumentModifier(arguments);
}

_channel.QueueDeclare(queueName, durable: queueDurable, exclusive: queueExclusive, autoDelete: queueAutoDelete, arguments: arguments);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
<Import Project="../Common.NuGet.Properties.xml" />

<PropertyGroup>
<TargetFramework>netstandard1.3</TargetFramework>
<Version>2.0.1</Version>
<Version>2.0.2</Version>
<Description>Core serialization interfaces of SlimMessageBus</Description>
<PackageTags>SlimMessageBus</PackageTags>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,27 @@ public class CheckpointTrigger : ICheckpointTrigger
private int _lastCheckpointCount;
private readonly Stopwatch _lastCheckpointDuration;

public CheckpointTrigger(int countLimit, TimeSpan durationlimit, ILoggerFactory loggerFactory)
public CheckpointTrigger(CheckpointValue checkpointValue, ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<CheckpointTrigger>();

_checkpointCount = countLimit;
_checkpointDuration = (int)durationlimit.TotalMilliseconds;
_checkpointCount = checkpointValue.CheckpointCount;
_checkpointDuration = (int)checkpointValue.CheckpointDuration.TotalMilliseconds;

_lastCheckpointCount = 0;
_lastCheckpointDuration = new Stopwatch();
}

public CheckpointTrigger(HasProviderExtensions settings, ILoggerFactory loggerFactory)
: this(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault),
loggerFactory)
: this(GetCheckpointValue(settings), loggerFactory)
{
}

public static bool IsConfigured(HasProviderExtensions settings)
=> settings.GetOrDefault<int?>(CheckpointSettings.CheckpointCount, null) != null || settings.GetOrDefault<TimeSpan?>(CheckpointSettings.CheckpointDuration, null) != null;

public static (int CheckpontCount, TimeSpan CheckpointDuration) GetConfiguration(HasProviderExtensions settings)
=> (settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
public static CheckpointValue GetCheckpointValue(HasProviderExtensions settings)
=> new(settings.GetOrDefault(CheckpointSettings.CheckpointCount, CheckpointSettings.CheckpointCountDefault),
settings.GetOrDefault(CheckpointSettings.CheckpointDuration, CheckpointSettings.CheckpointDurationDefault));


Expand Down Expand Up @@ -68,4 +66,4 @@ public void Reset()
}

#endregion
}
}
Loading
Loading