Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems with partition rebalancing #328

Closed
dasilvaKevin opened this issue Jun 10, 2024 · 23 comments · Fixed by #352
Closed

Problems with partition rebalancing #328

dasilvaKevin opened this issue Jun 10, 2024 · 23 comments · Fixed by #352
Labels
Milestone

Comments

@dasilvaKevin
Copy link

Description

We are having problems with partition rebalancing. We use Kubernetes and we have two replicas for a service with a stream.
One of the problems is message loss.
We have this problem when one of the two pods is down and the second has not yet taken over. If a message is produced at that time and the pod restarts, then the message is not processed.
In some cases, messages are published in duplicates but we have not identified when this happens.
In other cases, both replicas get stuck and we have to restart a single pod to unblock the situation. Here is our configuration.

 Stream property:
                client.id:      product
                num.stream.threads:     1
                default.key.serdes:     Streamiz.Kafka.Net.SerDes.StringSerDes
                default.value.serdes:   Streamiz.Kafka.Net.SerDes.StringSerDes
                default.timestamp.extractor:    Streamiz.Kafka.Net.Processors.Internal.FailOnInvalidTimestamp
                commit.interval.ms:     30000
                processing.guarantee:   AT_LEAST_ONCE
                transaction.timeout:    00:00:10
                poll.ms:        100
                max.poll.records:       500
                max.poll.restoring.records:     1000
                max.task.idle.ms:       0
                buffered.records.per.partition:         2147483647
                inner.exception.handler:        System.Func`2[System.Exception,Streamiz.Kafka.Net.ExceptionHandlerResponse]
                production.exception.handler:   System.Func`2[Confluent.Kafka.DeliveryReport`2[System.Byte[],System.Byte[]],Streamiz.Kafka.Net.ExceptionHandlerResponse]
                deserialization.exception.handler:      System.Func`4[Streamiz.Kafka.Net.ProcessorContext,Confluent.Kafka.ConsumeResult`2[System.Byte[],System.Byte[]],System.Exception,Streamiz.Kafka.Net.ExceptionHandlerResponse]
                rocksdb.config.setter:  System.Action`2[System.String,Streamiz.Kafka.Net.State.RocksDb.RocksDbOptions]
                follow.metadata:        False
                state.dir:      C:\Users\****\AppData\Local\Temp\streamiz-kafka-net
                replication.factor:     1
                windowstore.changelog.additional.retention.ms:  86400000
                offset.checkpoint.manager:
                metrics.interval.ms:    30000
                metrics.recording.level:        INFO
                log.processing.summary:         00:01:00
                metrics.reporter:       System.Action`1[System.Collections.Generic.IEnumerable`1[Streamiz.Kafka.Net.Metrics.Sensor]]
                expose.librdkafka.stats:        False
                start.task.delay.ms:    5000
                parallel.processing:    False
                max.degree.of.parallelism:      8
                application.id:         product
                schema.registry.url:    10.10.10.10
                schema.registry.request.timeout.ms:     30000
                schema.registry.max.cached.schemas:     10
                avro.serializer.buffer.bytes:   2048
                protobuf.serializer.buffer.bytes:       2048
                avro.serializer.auto.register.schemas:  True
                protobuf.serializer.auto.register.schemas:      True
        Client property:
                bootstrap.servers:      10.10.10.10
                security.protocol:      plaintext
                debug:
        Consumer property:
                max.poll.interval.ms:   300000
                enable.auto.commit:     False
                enable.auto.offset.store:       False
                partition.assignment.strategy:  roundrobin
                auto.offset.reset:      earliest
                allow.auto.create.topics:       True
        Producer property:
                partitioner:    consistent_random
                enable.idempotence:     True
                message.timeout.ms:     2147483647
                message.send.max.retries:       1000000
                retry.backoff.ms:       100
        Admin client property:
                None

How to reproduce

Run the stream on two pods.

@LGouellec
Copy link
Owner

Hi @dasilvaKevin ,

Why did you override the consumer property partition.assignment.strategy: roundrobin ?
Can you share the logs of the two pods when the issue appears ?

Thanks,

@dasilvaKevin
Copy link
Author

Hi @LGouellec,

Thank you for your reply.

We have overridden the consumer property partition.assignment.strategy to roundrobin for testing because we have the same problem with range.
We have logs for the case when a message is not handled.
We started with one pod (log0) and we produced "product11". This message is handled.
We started a second pod (log1) and we produced "product12". This message is not handled and we have a lag on the source topic.

log0.txt
log1.txt

@LGouellec
Copy link
Owner

@dasilvaKevin

Product12 has been handled by your pod0.
Can you retry the same test but with changing the log4net pattern with the timestamp ? I could compare and understand better the different actions and timeline please ?

@dasilvaKevin
Copy link
Author

Hi @LGouellec,

We have not reproduced the same problem, but we do have two other cases.

First problem:

We started with one pod (log0) and we produced "product13" and this message is processed.
We started a second pod (log1) and we produced "product14" and this message is processed.
We produced "product15" and this message is processed eight minutes later.
Second problem:

We start two pods and produce "product15" to "product19" and all messages are processed.
We kill a pod and produce "product20"

and this message is not processed and we have a lag on the source topic.

The log2 file contains the logs of the remaining pod.

log0.txt
log1.txt
log2.txt

@LGouellec
Copy link
Owner

Hi @dasilvaKevin,

1st Problem :
As you can see on log1.txt, product15 has been processed at 2024-06-13T08:12:22.9014142Z and the timestamp of the message produced in Kafka is 1718266342600 = Thursday, 13 June 2024 08:12:22.600 so 300ms before the processing, not 8 minutes. If you see a delay between the producing and the processing, it's maybe due to multiple retry during processing, because in the processing side, the timestamp of the processing and the timestamp of the message persisted in Kafka is really near.

{ timestamp = 2024-06-13T08:12:22.9014142Z, log = { level = Debug, logger = Streamiz.Kafka.Net.Processors.StreamTask, original = stream-task[0|2] Start processing one record [Topic:product|Partition:2|Offset:6|Timestamp:1718266342600] }, message = stream-task[0|2] Start processing one record [Topic:product|Partition:2|Offset:6|Timestamp:1718266342600], metadata = { message_template = stream-task[0|2] Start processing one record [Topic:product|Partition:2|Offset:6|Timestamp:1718266342600] }, ecs = { version = 1.5.0 }, event = { severity = 1, timezone = Coordinated Universal Time, created = 2024-06-13T08:12:22.9014216Z }, process = { thread = { id = 16 }, pid = 1, name = ProductCategoryStream, executable = /app/Shared.dll } }

2nd Problem :
It seems Product20 has been processed, are you sure that you haven't a Product20 key record in the output topic ?

{ timestamp = 2024-06-13T08:56:49.3788938Z, log = { level = Debug, logger = Streamiz.Kafka.Net.Processors.SinkProcessor, original = stream-task[0|0]|processor[KSTREAM-SINK-0000000006]- Process<String,ProductWithCategory> message with key Product20 and stream.ProductWithCategory with record metadata [topic:product|partition:0|offset:4] }, message = stream-task[0|0]|processor[KSTREAM-SINK-0000000006]- Process<String,ProductWithCategory> message with key Product20 and stream.ProductWithCategory with record metadata [topic:product|partition:0|offset:4], metadata = { message_template = stream-task[0|0]|processor[KSTREAM-SINK-0000000006]- Process<String,ProductWithCategory> message with key Product20 and stream.ProductWithCategory with record metadata [topic:product|partition:0|offset:4] }, ecs = { version = 1.5.0 }, event = { severity = 1, timezone = Coordinated Universal Time, created = 2024-06-13T08:56:49.3788968Z }, process = { thread = { id = 16 }, pid = 1, name = ProductCategoryStream, executable = /app/Shared.dll } }

If not, please enable the debug librdkafka log (config.Debug = "broker,topic,msg") , the Kafka Producer is asynchronous, so if the produce doesn't succeed to produce messages, it will retry multiple times.

@dasilvaKevin
Copy link
Author

Hi @LGouellec,

Thanks for the clarification on the first problem.

For the second problem, when we enable the librdkafka log (config.Debug = "broker,topic,msg"), we encounter a System.NullReferenceException: 'Object reference not set to an instance of an object.' at Confluent.Kafka.Consumer`2.get_Name() in Confluent.Kafka\Consumer.cs: line 134.

image

@LGouellec
Copy link
Owner

Hi @dasilvaKevin ,

Can you share the full stack trace of exception please ?

@dasilvaKevin
Copy link
Author

dasilvaKevin commented Jun 21, 2024

Hi @LGouellec,

The full stack trace :
image

The call stack:
image

Confluent.Kafka.dll!Confluent.Kafka.Consumer<byte[], byte[]>.Name.get() Line 134
Streamiz.Kafka.Net.dll!Streamiz.Kafka.Net.Kafka.Internal.KafkaLoggerAdapter.GetName(Confluent.Kafka.IClient client) Line 63
Streamiz.Kafka.Net.dll!Streamiz.Kafka.Net.Kafka.Internal.KafkaLoggerAdapter.LogConsume(Confluent.Kafka.IConsumer<byte[], byte[]> consumer, Confluent.Kafka.LogMessage message) Line 25
Confluent.Kafka.dll!Confluent.Kafka.ConsumerBuilder<System.__Canon, System.__Canon>.ConstructBaseConfig.AnonymousMethod__1(Confluent.Kafka.LogMessage logMessage) Line 45
Confluent.Kafka.dll!Confluent.Kafka.Consumer<byte[], byte[]>.LogCallback(nint rk, Confluent.Kafka.SyslogLevel level, string fac, string buf) Line 218
[Native to Managed Transition]	
[Managed to Native Transition]	
Confluent.Kafka.dll!Confluent.Kafka.Impl.Librdkafka.kafka_new(Confluent.Kafka.Impl.RdKafkaType type, nint conf, System.Text.StringBuilder errstr, nuint errstr_size) Line 1671
Confluent.Kafka.dll!Confluent.Kafka.Impl.SafeKafkaHandle.Create(Confluent.Kafka.Impl.RdKafkaType type, nint config, Confluent.Kafka.IClient owner) Line 113
Confluent.Kafka.dll!Confluent.Kafka.Consumer<byte[], byte[]>.Consumer(Confluent.Kafka.ConsumerBuilder<byte[], byte[]> builder) Line 697
Confluent.Kafka.dll!Confluent.Kafka.ConsumerBuilder<byte[], byte[]>.Build() Line 216
Streamiz.Kafka.Net.dll!Streamiz.Kafka.Net.Kafka.Internal.DefaultKafkaClientSupplier.GetGlobalConsumer(Confluent.Kafka.ConsumerConfig config) Line 107
Streamiz.Kafka.Net.dll!Streamiz.Kafka.Net.KafkaStream.KafkaStream(Streamiz.Kafka.Net.Stream.Topology topology, Streamiz.Kafka.Net.IStreamConfig configuration, Streamiz.Kafka.Net.Kafka.IKafkaSupplier kafkaSupplier) Line 182
Streamiz.Kafka.Net.dll!Streamiz.Kafka.Net.KafkaStream.KafkaStream(Streamiz.Kafka.Net.Stream.Topology topology, Streamiz.Kafka.Net.IStreamConfig configuration) Line 147
Shared.dll!Shared.KStream<ProductCategoryStream.Infrastructure.Kafka.ProductCatStream>.StartAsync(Streamiz.Kafka.Net.StreamBuilder builder, System.Threading.CancellationToken cancellationToken) Line 65
ProductCategoryStream.dll!ProductCategoryStream.Infrastructure.Kafka.ProductCatStream.BuildAndStartStream(System.Threading.CancellationToken cancellationToken) Line 30
ProductCategoryStream.dll!Program.<Main>$(string[] args) Line 20

@LGouellec
Copy link
Owner

@dasilvaKevin

Really weird , this exception is catched : https://github.com/LGouellec/kafka-streams-dotnet/blob/0635ab5795dceb86f5a92b2b4ce68dee5d4d5346/core/Kafka/Internal/KafkaLoggerAdapter.cs#L72

Which version are you using ?

Can you try to override the conf :

config.Client = "your client ID";

@dasilvaKevin
Copy link
Author

dasilvaKevin commented Jun 21, 2024

Hi @LGouellec,

We use the version 1.5.1.

For configuration, we override the ClientID.

Here's our configuration: :

var config = new StreamConfig<StringSerDes, StringSerDes>
{
    ApplicationId = _options.ClientId,
    BootstrapServers = _options.Servers,
    SecurityProtocol = _options.SecurityProtocol,
    AutoOffsetReset = _options.AutoOffsetReset,
    EnableIdempotence = true,
    MessageTimeoutMs = _options.ProducerMessageTimeoutMs,
    MessageSendMaxRetries = _options.ProducerMessageMaxRetries,
    RetryBackoffMs = _options.ProducerRetryBackOffMs,
    AllowAutoCreateTopics = true,
    SchemaRegistryUrl = _options.SchemaRegistryUrl,
    SchemaRegistryRequestTimeoutMs = _options.SchemaRegistryRequestTimeoutMs,
    SchemaRegistryMaxCachedSchemas = _options.SchemaRegistryMaxCachedSchemas,
    BufferBytes = _options.BufferBytes,
    AutoRegisterSchemas = _options.AutoRegisterSchemas,
    PartitionAssignmentStrategy = PartitionAssignmentStrategy.RoundRobin,
    Partitioner = Partitioner.ConsistentRandom,
    ClientId = _options.ClientId,
    Debug = "broker,topic,msg",
};

image

@LGouellec
Copy link
Owner

Please add also to have more debug logs

config.Logger = LoggerFactory.Create((b) =>
                {
                    b.AddConsole();
                    b.SetMinimumLevel(LogLevel.Debug);
                })
 

@dasilvaKevin
Copy link
Author

Hi @LGouellec,
We already have that.
The System.NullReferenceException occurs when we override the Debug property.
For the first log files I sent you, we used this loggerFactory.

@LGouellec
Copy link
Owner

HI @dasilvaKevin ,
Can you upgrade and try the last RC version ? 1.6.0-RC2

@dasilvaKevin
Copy link
Author

Hi @LGouellec,

We have the same error with the last RC.

@LGouellec
Copy link
Owner

LGouellec commented Jun 26, 2024

Hi @dasilvaKevin,

Can you open a ticket here confluent-kafka-dotnet and asking why client.Name throw a NullReferenceException whereas you have configured the ClientId property thks

@dasilvaKevin
Copy link
Author

dasilvaKevin commented Jul 8, 2024

Hi @LGouellec,

I found a workaround.
I added an if statement in this method in class KafkaLoggerAdapter:

private string GetName(IClient client)
{
    // FOR FIX
    var name = "";
    try
    {
        if (client.Handle == null || client.Handle.IsInvalid)
            return "Unknown";

        name = client.Name;
    }
    catch (NullReferenceException)
    {
        name = "Unknown";
    }

    return name;
}

I reproduced the issue where a message is not consumed and there is lag in my consumer group.
The message was produced at 15:46:23.068.
category1.txt
category2.txt

@LGouellec LGouellec added this to the 1.7.0 milestone Jul 15, 2024
@LGouellec LGouellec linked a pull request Jul 18, 2024 that will close this issue
EmanueleAlbero pushed a commit to ppilev/kafka-streams-dotnet that referenced this issue Jul 26, 2024
EmanueleAlbero pushed a commit to ppilev/kafka-streams-dotnet that referenced this issue Jul 26, 2024
@EmanueleAlbero
Copy link

Hi, I've encountered a similar issue in my tests.
I've an application with a topology that reads data from two topics (let's say TA and TB) with an N number of partition for each topic.
If I deploy the app on it's own everything works fine.
If I add another instance the rebalacing kicks in and the following can happen:
Only X partition of TA are assigned to the consumer therefore public void CreateTasks(ICollection<TopicPartition> assignment) is executed without any TB partition in.
Tasks are created but only partition from TA are associated to it.

Few seconds later also the related partition from TB are associated to the consumer and public void CreateTasks(ICollection<TopicPartition> assignment) is executed again.
However the tasks are already present in the activeTasks list and the TB partitions are ignored.

When this happens the consumer results as it's consuming messages from TB but nothing happens as the partition is not associated to any task.

@LGouellec
Copy link
Owner

Hey @EmanueleAlbero,

Can you share your logs when this issue appears ? It seems another problem, not directly related to this original issue, but maybe I'm wrong.
With more logs (please put the logs of your 2 instances, date and time will help a lot), I can troubleshoot better

Appreciate,

@EmanueleAlbero
Copy link

EmanueleAlbero commented Jul 28, 2024

CI.Processor.Debug.JustPublished.Reducted.log

Attached the logs of the app just published on ESK.
The previous running instance get deleted once the new app liveness probe is fine.
This is the events that actually triggers the issue.

The issue is usually with topic-root.source1.v1

If I delete the running pod and launch it again everything works fine.
I got

[07-26 10:34:00 DBG][Streamiz.Kafka.Net.Processors.Internal.TaskCreator][TraceId:] Created task 0-0 with assigned partition topic-root.source2.v1 [[0]],topic-root.source1.v1 [[0]]
[07-26 10:34:00 DBG][Streamiz.Kafka.Net.Processors.Internal.TaskCreator][TraceId:] Created task 0-1 with assigned partition topic-root.source2.v1 [[1]],topic-root.source1.v1 [[1]]
[07-26 10:34:00 DBG][Streamiz.Kafka.Net.Processors.Internal.TaskCreator][TraceId:] Created task 0-1 with assigned partition topic-root.source2.v1 [[1]],topic-root.source1.v1 [[1]]

and so on.

Otherwise, as in the attached logs If it receives source2 first (line 244) it creates the task with only 1 partition and when it receives the source1 it's ignored (line 3810).

I've tried to apply the fix you wrote for the issue I'm commenting but it didn't solves my problem.

Thanks a lot for your incredible effort in this library!

@EmanueleAlbero
Copy link

EmanueleAlbero commented Jul 29, 2024

Hi, found out the issue for not receiving all the required topics at once was due to the consumer Partition strategy set to CooperativeSticky.
Using Range or RoundRobin strategy allows to receive all the topics at once.
Now the only issue remaining is that sometime after the initial rebalancing the application is not consuming messages from the topics (and I see the consumer group latency grow up on kafka ui) but I'm still investigating on this and for this I haven't tried yet the fix you made in this issue (I will try and update about the result).

@EmanueleAlbero
Copy link

EmanueleAlbero commented Jul 30, 2024

Hi, I have found out a possible cause for the missing messages consumed\sent.

The application I'm working on uses the default value configuration.Guarantee = ProcessingGuarantee.AT_LEAST_ONCE.
With this setting the producer is shared

{
    log.LogInformation("{LogPrefix}Creating shared producer client", logPrefix);
    producer = kafkaSupplier.GetProducer(configuration.ToProducerConfig(GetThreadProducerClientId(threadId))
        .Wrap(threadId));
}

But, when the rebalancing occurs the record collector tries to dispose it

{
    log.LogDebug("{LogPrefix}Closing producer", logPrefix);
    if (producer != null)
    {
        lock (_lock)
        {
            string producerName = producer.Name.Split('#')[0];
            if (instanceProducer.ContainsKey(producerName) && --instanceProducer[producerName] <= 0)
            {
                if (retryRecordContext.HasNext)
                    log.LogWarning(
                        "There are messages still pending to retry in the backpressure queue. These messages won't longer flush into the corresponding topic !");
                retryRecordContext.Clear();
                producer.Dispose();
                producer = null;
                CheckForException();
            }
        }
    }
    _adminClient?.Dispose();
}

Once disposed, at the next Partitions assignment a disposed producer is assigned causing issues and access violations

@LGouellec
Copy link
Owner

@EmanueleAlbero Will take a look next week

@LGouellec
Copy link
Owner

Hey @EmanueleAlbero ,

Hi, found out the issue for not receiving all the required topics at once was due to the consumer Partition strategy set to CooperativeSticky. Using Range or RoundRobin strategy allows to receive all the topics at once. Now the only issue remaining is that sometime after the initial rebalancing the application is not consuming messages from the topics (and I see the consumer group latency grow up on kafka ui) but I'm still investigating on this and for this I haven't tried yet the fix you made in this issue (I will try and update about the result).

If you join multiple topics, Range is better to be sure all co-partitioned tasks are assigned to the same instance.

When the rebalancing occurs the record collector is flushed first, after closed but only if there is no other partitions which use the producer instance.
I encourage you to create another ticket, becasue this one was a synchronization issue between KStream and GlobalKTable, and no directly related to your issue.

It will easier and better to separate both issues.

Btw, I'm currently conducting a satisfaction survey to understand how I can better serve and I would love to get your feedback on the product.

Your insights are invaluable and will help us shape the future of our product to better meet your needs. The survey will only take a few minutes, and your responses will be completely confidential.

Survey

Thank you for your time and feedback!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants