-
Notifications
You must be signed in to change notification settings - Fork 119
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: evolve worker distribution strategy
- Loading branch information
1 parent
35aafb5
commit c9f6244
Showing
11 changed files
with
165 additions
and
104 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,19 +136,19 @@ IConsumerConfigurationBuilder WithWorkersCount( | |
/// <summary> | ||
/// Sets the strategy to choose a worker when a message arrives | ||
/// </summary> | ||
/// <typeparam name="T">A class that implements the <see cref="IDistributionStrategy"/> interface</typeparam> | ||
/// <typeparam name="T">A class that implements the <see cref="IWorkerDistributionStrategy"/> interface</typeparam> | ||
/// <param name="factory">A factory to create the instance</param> | ||
/// <returns></returns> | ||
IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>(Factory<T> factory) | ||
where T : class, IDistributionStrategy; | ||
IConsumerConfigurationBuilder WithWorkerDistributionStrategy<T>(Factory<T> factory) | ||
where T : class, IWorkerDistributionStrategy; | ||
|
||
/// <summary> | ||
/// Sets the strategy to choose a worker when a message arrives | ||
/// </summary> | ||
/// <typeparam name="T">A class that implements the <see cref="IDistributionStrategy"/> interface</typeparam> | ||
/// <typeparam name="T">A class that implements the <see cref="IWorkerDistributionStrategy"/> interface</typeparam> | ||
/// <returns></returns> | ||
IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>() | ||
where T : class, IDistributionStrategy; | ||
IConsumerConfigurationBuilder WithWorkerDistributionStrategy<T>() | ||
where T : class, IWorkerDistributionStrategy; | ||
|
||
/// <summary> | ||
/// Configures the consumer for manual message completion. | ||
|
@@ -191,4 +191,4 @@ IConsumerConfigurationBuilder WithWorkDistributionStrategy<T>() | |
/// <returns></returns> | ||
IConsumerConfigurationBuilder WithStatisticsIntervalMs(int statisticsIntervalMs); | ||
} | ||
} | ||
} | ||
Check warning on line 194 in src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs GitHub Actions / Test deployment
|
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
namespace KafkaFlow; | ||
|
||
using System.Collections.Generic; | ||
using System.Threading.Tasks; | ||
|
||
/// <summary> | ||
/// An interface used to create a distribution strategy | ||
/// </summary> | ||
public interface IWorkerDistributionStrategy | ||
{ | ||
/// <summary> | ||
/// Initializes the distribution strategy, this method is called when a consumer is started | ||
/// </summary> | ||
/// <param name="workers">List of workers to be initialized</param> | ||
void Initialize(IReadOnlyList<IWorker> workers); | ||
|
||
/// <summary> | ||
/// Retrieves an available worker based on the provided distribution strategy context. | ||
/// </summary> | ||
/// <param name="context">The distribution strategy context containing message and consumer details.</param> | ||
/// <returns>The selected <see cref="IWorker"/> instance.</returns> | ||
ValueTask<IWorker> GetWorkerAsync(WorkerDistributionContext context); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
namespace KafkaFlow; | ||
|
||
using System; | ||
using System.Threading; | ||
|
||
/// <summary> | ||
/// Represents a strategy context for distributing workers based on specific message and consumer details. | ||
/// </summary> | ||
public ref struct WorkerDistributionContext | ||
{ | ||
/// <summary> | ||
/// Initializes a new instance of the <see cref="WorkerDistributionContext"/> struct. | ||
/// </summary> | ||
/// <param name="consumerName">Name of the consumer.</param> | ||
/// <param name="topic">Topic associated with the message.</param> | ||
/// <param name="partition">Partition of the topic.</param> | ||
/// <param name="rawMessageKey">Raw key of the message.</param> | ||
/// <param name="consumerStoppedCancellationToken">A cancellation token that is cancelled when the consumer has stopped</param> | ||
public WorkerDistributionContext( | ||
string consumerName, | ||
string topic, | ||
int partition, | ||
ReadOnlyMemory<byte>? rawMessageKey, | ||
CancellationToken consumerStoppedCancellationToken) | ||
{ | ||
this.ConsumerName = consumerName; | ||
this.Topic = topic; | ||
this.Partition = partition; | ||
this.RawMessageKey = rawMessageKey; | ||
this.ConsumerStoppedCancellationToken = consumerStoppedCancellationToken; | ||
} | ||
|
||
/// <summary> | ||
/// Gets the name of the consumer. | ||
/// </summary> | ||
public string ConsumerName { get; } | ||
|
||
/// <summary> | ||
/// Gets the topic associated with the message. | ||
/// </summary> | ||
public string Topic { get; } | ||
|
||
/// <summary> | ||
/// Gets the partition number of the topic. | ||
/// </summary> | ||
public int Partition { get; } | ||
|
||
/// <summary> | ||
/// Gets the raw key of the message. | ||
/// </summary> | ||
public ReadOnlyMemory<byte>? RawMessageKey { get; } | ||
|
||
/// <summary> | ||
/// Gets the cancellation token that is cancelled when the consumer has stopped | ||
/// </summary> | ||
public CancellationToken ConsumerStoppedCancellationToken { get; } | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
64 changes: 31 additions & 33 deletions
64
src/KafkaFlow/Consumers/DistributionStrategies/BytesSumDistributionStrategy.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,44 +1,42 @@ | ||
namespace KafkaFlow.Consumers.DistributionStrategies | ||
namespace KafkaFlow.Consumers.DistributionStrategies; | ||
|
||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading.Tasks; | ||
|
||
/// <summary> | ||
/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen | ||
/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed | ||
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed) | ||
/// </summary> | ||
public class BytesSumDistributionStrategy : IWorkerDistributionStrategy | ||
{ | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
private IReadOnlyList<IWorker> workers; | ||
|
||
/// <summary> | ||
/// This strategy sums all bytes in the partition key and apply a mod operator with the total number of workers, the resulting number is the worker ID to be chosen | ||
/// This algorithm is fast and creates a good work balance. Messages with the same partition key are always delivered in the same worker, so, message order is guaranteed | ||
/// Set an optimal message buffer value to avoid idle workers (it will depends how many messages with the same partition key are consumed) | ||
/// </summary> | ||
public class BytesSumDistributionStrategy : IDistributionStrategy | ||
/// <inheritdoc /> | ||
public void Initialize(IReadOnlyList<IWorker> workers) | ||
{ | ||
private IReadOnlyList<IWorker> workers; | ||
this.workers = workers; | ||
} | ||
|
||
/// <inheritdoc /> | ||
public void Init(IReadOnlyList<IWorker> workers) | ||
/// <inheritdoc /> | ||
public ValueTask<IWorker> GetWorkerAsync(WorkerDistributionContext context) | ||
{ | ||
if (context.RawMessageKey is null || this.workers.Count == 1) | ||
{ | ||
this.workers = workers; | ||
return new ValueTask<IWorker>(this.workers[0]); | ||
} | ||
|
||
/// <inheritdoc /> | ||
public Task<IWorker> GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken) | ||
{ | ||
if (partitionKey is null || this.workers.Count == 1) | ||
{ | ||
return Task.FromResult(this.workers[0]); | ||
} | ||
var bytesSum = 0; | ||
|
||
var bytesSum = 0; | ||
|
||
for (int i = 0; i < partitionKey.Length; i++) | ||
{ | ||
bytesSum += partitionKey[i]; | ||
} | ||
|
||
return Task.FromResult( | ||
cancellationToken.IsCancellationRequested | ||
? null | ||
: this.workers.ElementAtOrDefault(bytesSum % this.workers.Count)); | ||
for (var i = 0; i < context.RawMessageKey.Value.Length; i++) | ||
{ | ||
bytesSum += context.RawMessageKey.Value.Span[i]; | ||
} | ||
|
||
return new ValueTask<IWorker>( | ||
context.ConsumerStoppedCancellationToken.IsCancellationRequested | ||
? null | ||
: this.workers.ElementAtOrDefault(bytesSum % this.workers.Count)); | ||
} | ||
} |
48 changes: 23 additions & 25 deletions
48
src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,32 +1,30 @@ | ||
namespace KafkaFlow.Consumers.DistributionStrategies | ||
namespace KafkaFlow.Consumers.DistributionStrategies; | ||
|
||
using System.Collections.Generic; | ||
using System.Threading.Channels; | ||
using System.Threading.Tasks; | ||
|
||
/// <summary> | ||
/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message | ||
/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee | ||
/// </summary> | ||
public class FreeWorkerDistributionStrategy : IWorkerDistributionStrategy | ||
{ | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Channels; | ||
using System.Threading.Tasks; | ||
private readonly Channel<IWorker> freeWorkers = Channel.CreateUnbounded<IWorker>(); | ||
|
||
/// <summary> | ||
/// This strategy chooses the first free worker to process the message. When a worker finishes the processing, it notifies the worker pool that it is free to get a new message | ||
/// This is the fastest and resource-friendly strategy (the message buffer is not used) but messages with the same partition key can be delivered in different workers, so, no message order guarantee | ||
/// </summary> | ||
public class FreeWorkerDistributionStrategy : IDistributionStrategy | ||
/// <inheritdoc /> | ||
public void Initialize(IReadOnlyList<IWorker> workers) | ||
{ | ||
private readonly Channel<IWorker> freeWorkers = Channel.CreateUnbounded<IWorker>(); | ||
|
||
/// <inheritdoc /> | ||
public void Init(IReadOnlyList<IWorker> workers) | ||
foreach (var worker in workers) | ||
{ | ||
foreach (var worker in workers) | ||
{ | ||
worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker))); | ||
this.freeWorkers.Writer.TryWrite(worker); | ||
} | ||
worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker))); | ||
this.freeWorkers.Writer.TryWrite(worker); | ||
} | ||
} | ||
|
||
/// <inheritdoc /> | ||
public Task<IWorker> GetWorkerAsync(byte[] partitionKey, CancellationToken cancellationToken) | ||
{ | ||
return this.freeWorkers.Reader.ReadAsync(cancellationToken).AsTask(); | ||
} | ||
/// <inheritdoc /> | ||
public ValueTask<IWorker> GetWorkerAsync(WorkerDistributionContext context) | ||
{ | ||
return this.freeWorkers.Reader.ReadAsync(context.ConsumerStoppedCancellationToken); | ||
} | ||
} | ||
} |