From 7b6f0ee75ef10f05c2bb635cb96c418b84285500 Mon Sep 17 00:00:00 2001 From: golanbz Date: Mon, 2 Sep 2024 14:47:32 +0300 Subject: [PATCH] * Added support for consumer strategies that do not act as "stop the world" scenarios (e.g., cooperative sticky). Fixes issue #557 and Fixes issue #456 * Enabled automatic committing with `confluent auto commit: true` instead of relying solely on manual commits, but only when the consumer strategy is cooperative sticky. (Refer to the open librdkafka issue at https://github.com/confluentinc/librdkafka/issues/4059). --- .../ConsumerConfigurationBuilder.cs | 4 +- src/KafkaFlow/Consumers/Consumer.cs | 70 +++++++++++++++---- src/KafkaFlow/Consumers/ConsumerManager.cs | 41 +++++++++-- .../Extensions/StrategyExtensions.cs | 17 +++++ 4 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 src/KafkaFlow/Extensions/StrategyExtensions.cs diff --git a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs index 304fe2d34..a87f58be8 100644 --- a/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow/Configuration/ConsumerConfigurationBuilder.cs @@ -4,6 +4,7 @@ using System.Linq; using System.Threading.Tasks; using KafkaFlow.Consumers.DistributionStrategies; +using KafkaFlow.Extensions; namespace KafkaFlow.Configuration; @@ -251,7 +252,8 @@ public IConsumerConfiguration Build(ClusterConfiguration clusterConfiguration) consumerConfigCopy.StatisticsIntervalMs = _consumerConfig.StatisticsIntervalMs ?? _statisticsInterval; consumerConfigCopy.EnableAutoOffsetStore = false; - consumerConfigCopy.EnableAutoCommit = false; + consumerConfigCopy.EnableAutoCommit = _consumerConfig.PartitionAssignmentStrategy.IsStopTheWorldStrategy() is false; + consumerConfigCopy.AutoCommitIntervalMs = _consumerConfig.AutoCommitIntervalMs ?? 5000; consumerConfigCopy.ReadSecurityInformationFrom(clusterConfiguration); diff --git a/src/KafkaFlow/Consumers/Consumer.cs b/src/KafkaFlow/Consumers/Consumer.cs index 7f478f6e5..77f74e07b 100644 --- a/src/KafkaFlow/Consumers/Consumer.cs +++ b/src/KafkaFlow/Consumers/Consumer.cs @@ -7,6 +7,7 @@ using Confluent.Kafka; using KafkaFlow.Authentication; using KafkaFlow.Configuration; +using KafkaFlow.Extensions; namespace KafkaFlow.Consumers; @@ -14,6 +15,7 @@ internal class Consumer : IConsumer { private readonly IDependencyResolver _dependencyResolver; private readonly ILogHandler _logHandler; + private readonly bool _stopTheWorldStrategy; private readonly List, List>> _partitionsAssignedHandlers = new(); @@ -40,6 +42,7 @@ public Consumer( this.Configuration = configuration; _flowManager = new ConsumerFlowManager(this, _logHandler); _maxPollIntervalExceeded = new(_logHandler); + _stopTheWorldStrategy = Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy(); foreach (var handler in this.Configuration.StatisticsHandlers) { @@ -148,7 +151,17 @@ public void Commit(IReadOnlyCollection off return; } - _consumer.Commit(validOffsets); + if (_stopTheWorldStrategy) + { + _consumer.Commit(validOffsets); + } + else + { + foreach (var topicPartitionOffset in validOffsets) + { + _consumer.StoreOffset(topicPartitionOffset); + } + } foreach (var offset in validOffsets) { @@ -237,17 +250,8 @@ private void EnsureConsumer() var kafkaConfig = this.Configuration.GetKafkaConfig(); var consumerBuilder = new ConsumerBuilder(kafkaConfig) - .SetPartitionsAssignedHandler( - (consumer, partitions) => this.FirePartitionsAssignedHandlers(consumer, partitions)) - .SetPartitionsRevokedHandler( - (consumer, partitions) => - { - _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); - this.Assignment = new List(); - this.Subscription = new List(); - _currentPartitionsOffsets.Clear(); - _flowManager.Stop(); - }) + .SetPartitionsAssignedHandler(FirePartitionsAssignedHandlers) + .SetPartitionsRevokedHandler(FirePartitionRevokedHandlers) .SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error))) .SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics))); @@ -293,13 +297,51 @@ private void FirePartitionsAssignedHandlers( IConsumer consumer, List partitions) { - this.Assignment = partitions; + if (_stopTheWorldStrategy) + { + this.Assignment = partitions; + this.Subscription = consumer.Subscription; + _flowManager.Start(consumer); + _partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + return; + } + + if (partitions.Count == 0) + { + return; + } + + this.Assignment = this.Assignment.Union(partitions).ToArray(); this.Subscription = consumer.Subscription; + _flowManager.Stop(); _flowManager.Start(consumer); - _partitionsAssignedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); } + private void FirePartitionRevokedHandlers(IConsumer consumer, List partitions) + { + if (_stopTheWorldStrategy) + { + _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + this.Assignment = new List(); + this.Subscription = new List(); + _currentPartitionsOffsets.Clear(); + _flowManager.Stop(); + return; + } + + this.Assignment = this.Assignment.Except(partitions.Select(x => x.TopicPartition)).ToArray(); + this.Subscription = consumer.Subscription; + foreach (var partition in partitions) + { + _currentPartitionsOffsets.TryRemove(partition.TopicPartition, out _); + } + + _flowManager.Stop(); + _flowManager.Start(consumer); + _partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions)); + } + private void InvalidateConsumer() { _consumer?.Close(); diff --git a/src/KafkaFlow/Consumers/ConsumerManager.cs b/src/KafkaFlow/Consumers/ConsumerManager.cs index 9d627cbcd..0dacccbfc 100644 --- a/src/KafkaFlow/Consumers/ConsumerManager.cs +++ b/src/KafkaFlow/Consumers/ConsumerManager.cs @@ -3,7 +3,9 @@ using System.Linq; using System.Threading; using System.Threading.Tasks; +using Confluent.Kafka; using KafkaFlow.Configuration; +using KafkaFlow.Extensions; namespace KafkaFlow.Consumers; @@ -11,6 +13,7 @@ internal class ConsumerManager : IConsumerManager { private readonly IDependencyResolver _dependencyResolver; private readonly ILogHandler _logHandler; + private readonly bool _stopTheWorldStrategy; private Timer _evaluateWorkersCountTimer; @@ -26,7 +29,7 @@ public ConsumerManager( this.Consumer = consumer; this.WorkerPool = consumerWorkerPool; this.Feeder = feeder; - + _stopTheWorldStrategy = consumer.Configuration.GetKafkaConfig().PartitionAssignmentStrategy.IsStopTheWorldStrategy(); this.Consumer.OnPartitionsAssigned((_, _, partitions) => this.OnPartitionAssigned(partitions)); this.Consumer.OnPartitionsRevoked((_, _, partitions) => this.OnPartitionRevoked(partitions)); } @@ -104,9 +107,23 @@ private void OnPartitionRevoked(IEnumerable x.TopicPartition))); + this.GetConsumerLogInfo(topicPartitions?.Select(x => x.TopicPartition).ToArray() + ?? Array.Empty())); this.WorkerPool.StopAsync().GetAwaiter().GetResult(); + + if (_stopTheWorldStrategy) + { + return; + } + + var assignedPartitions = Consumer.Assignment; + var workersCount = this.CalculateWorkersCount(assignedPartitions).GetAwaiter().GetResult(); + + this.WorkerPool + .StartAsync(assignedPartitions, workersCount) + .GetAwaiter() + .GetResult(); } private void OnPartitionAssigned(IReadOnlyCollection partitions) @@ -115,10 +132,16 @@ private void OnPartitionAssigned(IReadOnlyCollection x.Topic) .Select( x => new @@ -136,6 +159,14 @@ private void OnPartitionAssigned(IReadOnlyCollection y.Partition.Value), }), + CurrentTopics = Consumer.Assignment.GroupBy(x => x.Topic) + .Select( + x => new + { + x.First().Topic, + PartitionsCount = x.Count(), + Partitions = x.Select(y => y.Partition.Value), + }), }; private async Task CalculateWorkersCount(IEnumerable partitions) diff --git a/src/KafkaFlow/Extensions/StrategyExtensions.cs b/src/KafkaFlow/Extensions/StrategyExtensions.cs new file mode 100644 index 000000000..82e5057e3 --- /dev/null +++ b/src/KafkaFlow/Extensions/StrategyExtensions.cs @@ -0,0 +1,17 @@ +using Confluent.Kafka; + +namespace KafkaFlow.Extensions; + +/// +/// Strategy extension methods. +/// +public static class StrategyExtensions +{ + /// + /// Determine if the strategy is a "stop the world" behavior. + /// + /// Strategy + /// + public static bool IsStopTheWorldStrategy(this PartitionAssignmentStrategy? strategy) => + strategy is null or PartitionAssignmentStrategy.Range or PartitionAssignmentStrategy.RoundRobin; +}