diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs index e20bf6258..b7cc3c798 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs @@ -136,19 +136,19 @@ IConsumerConfigurationBuilder WithWorkersCount( /// /// Sets the strategy to choose a worker when a message arrives /// - /// A class that implements the interface + /// A class that implements the interface /// A factory to create the instance /// - IConsumerConfigurationBuilder WithWorkDistributionStrategy(Factory factory) - where T : class, IDistributionStrategy; + IConsumerConfigurationBuilder WithWorkerDistributionStrategy(Factory factory) + where T : class, IWorkerDistributionStrategy; /// /// Sets the strategy to choose a worker when a message arrives /// - /// A class that implements the interface + /// A class that implements the interface /// - IConsumerConfigurationBuilder WithWorkDistributionStrategy() - where T : class, IDistributionStrategy; + IConsumerConfigurationBuilder WithWorkerDistributionStrategy() + where T : class, IWorkerDistributionStrategy; /// /// Configures the consumer for manual message completion. @@ -191,4 +191,4 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy() /// IConsumerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs); } -} +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/IDistributionStrategy.cs b/src/KafkaFlow.Abstractions/IDistributionStrategy.cs deleted file mode 100644 index 591ade02d..000000000 --- a/src/KafkaFlow.Abstractions/IDistributionStrategy.cs +++ /dev/null @@ -1,26 +0,0 @@ -namespace KafkaFlow -{ - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - - /// - /// An interface used to create a distribution strategy - /// - public interface IDistributionStrategy - { - /// - /// Initializes the distribution strategy, this method is called when a consumer is started - /// - /// List of workers to be initialized - void Init(IReadOnlyList workers); - - /// - /// Gets an available worker to process the message - /// - /// Message partition key - /// A that is cancelled when the consumers stops - /// - Task GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken); - } -} diff --git a/src/KafkaFlow.Abstractions/IWorkerDistributionStrategy.cs b/src/KafkaFlow.Abstractions/IWorkerDistributionStrategy.cs new file mode 100644 index 000000000..c883b2f7a --- /dev/null +++ b/src/KafkaFlow.Abstractions/IWorkerDistributionStrategy.cs @@ -0,0 +1,23 @@ +namespace KafkaFlow; + +using System.Collections.Generic; +using System.Threading.Tasks; + +/// +/// An interface used to create a distribution strategy +/// +public interface IWorkerDistributionStrategy +{ + /// + /// Initializes the distribution strategy, this method is called when a consumer is started + /// + /// List of workers to be initialized + void Initialize(IReadOnlyList workers); + + /// + /// Retrieves an available worker based on the provided distribution strategy context. + /// + /// The distribution strategy context containing message and consumer details. + /// The selected instance. + ValueTask GetWorkerAsync(WorkerDistributionContext context); +} diff --git a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj index 50e3dd839..283ef9c6b 100644 --- a/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj +++ b/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj @@ -7,4 +7,9 @@ Contains all KafkaFlow extendable interfaces + + + + + diff --git a/src/KafkaFlow.Abstractions/WorkerDistributionContext.cs b/src/KafkaFlow.Abstractions/WorkerDistributionContext.cs new file mode 100644 index 000000000..43c3ce04d --- /dev/null +++ b/src/KafkaFlow.Abstractions/WorkerDistributionContext.cs @@ -0,0 +1,57 @@ +namespace KafkaFlow; + +using System; +using System.Threading; + +/// +/// Represents a strategy context for distributing workers based on specific message and consumer details. +/// +public ref struct WorkerDistributionContext +{ + /// + /// Initializes a new instance of the struct. + /// + /// Name of the consumer. + /// Topic associated with the message. + /// Partition of the topic. + /// Raw key of the message. + /// A cancellation token that is cancelled when the consumer has stopped + public WorkerDistributionContext( + string consumerName, + string topic, + int partition, + ReadOnlyMemory? rawMessageKey, + CancellationToken consumerStoppedCancellationToken) + { + this.ConsumerName = consumerName; + this.Topic = topic; + this.Partition = partition; + this.RawMessageKey = rawMessageKey; + this.ConsumerStoppedCancellationToken = consumerStoppedCancellationToken; + } + + /// + /// Gets the name of the consumer. + /// + public string ConsumerName { get; } + + /// + /// Gets the topic associated with the message. + /// + public string Topic { get; } + + /// + /// Gets the partition number of the topic. + /// + public int Partition { get; } + + /// + /// Gets the raw key of the message. + /// + public ReadOnlyMemory? RawMessageKey { get; } + + /// + /// Gets the cancellation token that is cancelled when the consumer has stopped + /// + public CancellationToken ConsumerStoppedCancellationToken { get; } +} diff --git a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs index a74de9d43..01eca3090 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfiguration.cs @@ -20,7 +20,7 @@ public ConsumerConfiguration( TimeSpan workersCountEvaluationInterval, int bufferSize, TimeSpan workerStopTimeout, - Factory distributionStrategyFactory, + Factory distributionStrategyFactory, IReadOnlyList middlewaresConfigurations, bool autoMessageCompletion, bool noStoreOffsets, @@ -69,7 +69,7 @@ public ConsumerConfiguration( "The value must be greater than 0"); } - public Factory DistributionStrategyFactory { get; } + public Factory DistributionStrategyFactory { get; } public IReadOnlyList MiddlewaresConfigurations { get; } diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index e1c027151..b0544080b 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -36,7 +36,7 @@ internal sealed class ConsumerConfigurationBuilder : IConsumerConfigurationBuild private ConsumerInitialState initialState = ConsumerInitialState.Running; private int statisticsInterval; - private Factory distributionStrategyFactory = _ => new BytesSumDistributionStrategy(); + private Factory distributionStrategyFactory = _ => new BytesSumDistributionStrategy(); private TimeSpan autoCommitInterval = TimeSpan.FromSeconds(5); private ConsumerCustomFactory customFactory = (consumer, _) => consumer; @@ -157,15 +157,15 @@ public IConsumerConfigurationBuilder WithWorkerStopTimeout(TimeSpan timeout) return this; } - public IConsumerConfigurationBuilder WithWorkDistributionStrategy(Factory factory) - where T : class, IDistributionStrategy + public IConsumerConfigurationBuilder WithWorkerDistributionStrategy(Factory factory) + where T : class, IWorkerDistributionStrategy { this.distributionStrategyFactory = factory; return this; } - public IConsumerConfigurationBuilder WithWorkDistributionStrategy() - where T : class, IDistributionStrategy + public IConsumerConfigurationBuilder WithWorkerDistributionStrategy() + where T : class, IWorkerDistributionStrategy { this.DependencyConfigurator.AddTransient(); this.distributionStrategyFactory = resolver => resolver.Resolve(); diff --git a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs index 4099867b9..21f5b6896 100644 --- a/src/KafkaFlow/Configuration/IConsumerConfiguration.cs +++ b/src/KafkaFlow/Configuration/IConsumerConfiguration.cs @@ -13,7 +13,7 @@ public interface IConsumerConfiguration /// /// Gets the consumer worker distribution strategy /// - Factory DistributionStrategyFactory { get; } + Factory DistributionStrategyFactory { get; } /// /// Gets the consumer middlewares configurations diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs index 587d6aaaf..2c45bc2fe 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs @@ -14,7 +14,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool private readonly IDependencyResolver consumerDependencyResolver; private readonly IMiddlewareExecutor middlewareExecutor; private readonly ILogHandler logHandler; - private readonly Factory distributionStrategyFactory; + private readonly Factory distributionStrategyFactory; private readonly IOffsetCommitter offsetCommitter; private readonly Event workerPoolStoppedSubject; @@ -22,7 +22,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool private TaskCompletionSource startedTaskSource = new(); private List workers = new(); - private IDistributionStrategy distributionStrategy; + private IWorkerDistributionStrategy distributionStrategy; private IOffsetManager offsetManager; public ConsumerWorkerPool( @@ -85,7 +85,7 @@ await Task.WhenAll( .ConfigureAwait(false); this.distributionStrategy = this.distributionStrategyFactory(this.consumerDependencyResolver); - this.distributionStrategy.Init(this.workers.AsReadOnly()); + this.distributionStrategy.Initialize(this.workers.AsReadOnly()); this.startedTaskSource.TrySetResult(null); } @@ -130,7 +130,13 @@ public async Task EnqueueAsync(ConsumeResult message, Cancellati await this.startedTaskSource.Task.ConfigureAwait(false); var worker = (IConsumerWorker)await this.distributionStrategy - .GetWorkerAsync(message.Message.Key, stopCancellationToken) + .GetWorkerAsync( + new WorkerDistributionContext( + this.consumer.Configuration.ConsumerName, + message.Topic, + message.Partition.Value, + message.Message.Key, + stopCancellationToken)) .ConfigureAwait(false); if (worker is null) @@ -167,4 +173,4 @@ private MessageContext CreateMessageContext(ConsumeResult messag return context; } } -} +} \ No newline at end of file diff --git a/src/KafkaFlow/Consumers/DistributionStrategies/BytesSumDistributionStrategy.cs b/src/KafkaFlow/Consumers/DistributionStrategies/BytesSumDistributionStrategy.cs index 5c5409839..6581cbb6d 100644 --- a/src/KafkaFlow/Consumers/DistributionStrategies/BytesSumDistributionStrategy.cs +++ b/src/KafkaFlow/Consumers/DistributionStrategies/BytesSumDistributionStrategy.cs @@ -1,44 +1,42 @@ -namespace KafkaFlow.Consumers.DistributionStrategies +namespace KafkaFlow.Consumers.DistributionStrategies; + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +/// +/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen +/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed +/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed) +/// +public class BytesSumDistributionStrategy : IWorkerDistributionStrategy { - using System.Collections.Generic; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; + private IReadOnlyList workers; - /// - /// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen - /// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed - /// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed) - /// - public class BytesSumDistributionStrategy : IDistributionStrategy + /// + public void Initialize(IReadOnlyList workers) { - private IReadOnlyList workers; + this.workers = workers; + } - /// - public void Init(IReadOnlyList workers) + /// + public ValueTask GetWorkerAsync(WorkerDistributionContext context) + { + if (context.RawMessageKey is null || this.workers.Count == 1) { - this.workers = workers; + return new ValueTask(this.workers[0]); } - /// - public Task GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken) - { - if (partitionKey is null || this.workers.Count == 1) - { - return Task.FromResult(this.workers[0]); - } + var bytesSum = 0; - var bytesSum = 0; - - for (int i = 0; i < partitionKey.Length; i++) - { - bytesSum += partitionKey[i]; - } - - return Task.FromResult( - cancellationToken.IsCancellationRequested - ? null - : this.workers.ElementAtOrDefault(bytesSum % this.workers.Count)); + for (var i = 0; i < context.RawMessageKey.Value.Length; i++) + { + bytesSum += context.RawMessageKey.Value.Span[i]; } + + return new ValueTask( + context.ConsumerStoppedCancellationToken.IsCancellationRequested + ? null + : this.workers.ElementAtOrDefault(bytesSum % this.workers.Count)); } } diff --git a/src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs b/src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs index e9761ae90..9511a868a 100644 --- a/src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs +++ b/src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs @@ -1,32 +1,30 @@ -namespace KafkaFlow.Consumers.DistributionStrategies +namespace KafkaFlow.Consumers.DistributionStrategies; + +using System.Collections.Generic; +using System.Threading.Channels; +using System.Threading.Tasks; + +/// +/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message +/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee +/// +public class FreeWorkerDistributionStrategy : IWorkerDistributionStrategy { - using System.Collections.Generic; - using System.Threading; - using System.Threading.Channels; - using System.Threading.Tasks; + private readonly Channel freeWorkers = Channel.CreateUnbounded(); - /// - /// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message - /// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee - /// - public class FreeWorkerDistributionStrategy : IDistributionStrategy + /// + public void Initialize(IReadOnlyList workers) { - private readonly Channel freeWorkers = Channel.CreateUnbounded(); - - /// - public void Init(IReadOnlyList workers) + foreach (var worker in workers) { - foreach (var worker in workers) - { - worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker))); - this.freeWorkers.Writer.TryWrite(worker); - } + worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker))); + this.freeWorkers.Writer.TryWrite(worker); } + } - /// - public Task GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken) - { - return this.freeWorkers.Reader.ReadAsync(cancellationToken).AsTask(); - } + /// + public ValueTask GetWorkerAsync(WorkerDistributionContext context) + { + return this.freeWorkers.Reader.ReadAsync(context.ConsumerStoppedCancellationToken); } -} +} \ No newline at end of file