Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Report]: A TopicPartitionException occurred while calling the consumer.Pause method in PartitionsAssignedHandler #595

Open
1 task done
DmitryParfonov opened this issue Sep 12, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@DmitryParfonov
Copy link

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

Hello,

I'm trying to deal with the rebalancing of a consumer group when a new consumer joins the group using PartitionsAssignedHandler and PartitionsRevokedHandler, but when trying to call the Pause or Resume method in the handler, an error (TopicPartitionException ) is thrown.

Steps to reproduce

  1. Configure a KafkaFlow consumer
builder
    .Topic("product-events")
    .WithGroupId("testGroup")
    .WithName("testConsumer")
    .WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Earliest)
    .WithBufferSize(25)
    .WithConsumerLagWorkerBalancer(totalWorkers: 100, minInstanceWorkers: 1, maxInstanceWorkers: 20, TimeSpan.FromMinutes(5))
    .WithWorkerDistributionStrategy<FreeWorkerDistributionStrategy>()
    .WithPartitionsAssignedHandler((resolver, topicPartitions) =>
    {
        try
        {
            var consumerAccessor = resolver.Resolve<IConsumerAccessor>();
            var consumer = consumerAccessor.GetConsumer("testConsumer);
            consumer.Pause(topicPartitions);
        }
        catch (TopicPartitionException ex)
        {
            Console.WriteLine("Error [{0}]: {1} |\n {2}", ex.Error.Code, ex.Error.Reason, ex.StackTrace);
            return;
        }

        Console.WriteLine("Consumer '{0}' Paused", "testConsumer");
    })
  1. Run your application instance.

Expected behavior

No errors. Consumer is paused.

Actual behavior

TopicPartitionException occurred:

Kafka Error Code: Local_Partial (-158):
Message: `An error occurred for topic partitions: [product-events [[0]], product-events [[1]], product-events [[2]], product-events [[3]], product-events [[4]]].

StackTrace:

at Confluent.Kafka.Impl.SafeKafkaHandle.Pause(IEnumerable`1 partitions)
   at Confluent.Kafka.Consumer`2.Pause(IEnumerable`1 partitions)
   at KafkaFlow.Consumers.ConsumerFlowManager.Pause(IReadOnlyCollection`1 topicPartitions)
   at KafkaFlow.Consumers.MessageConsumer.Pause(IReadOnlyCollection`1 topicPartitions)

KafkaFlow version

v3.0.10

@DmitryParfonov DmitryParfonov added the bug Something isn't working label Sep 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

No branches or pull requests

1 participant