Skip to content
This repository has been archived by the owner on Apr 9, 2022. It is now read-only.

Deserialization error #46

Open
ParthShirolawala opened this issue Feb 21, 2020 · 7 comments
Open

Deserialization error #46

ParthShirolawala opened this issue Feb 21, 2020 · 7 comments

Comments

@ParthShirolawala
Copy link

Error : A response could not be decoded for the node - Kafka.Protocol.ProtocolException: The record batch says it has length that stops at 1049715 but the list of all batches stop at 1048641.

Throws : Could not deserialize a message for topic xyz / partition 2. Because our configuration is set to 'ConsumerErrorStrategy == Discard', we will now read from latest offset.

In my consumer, I am seeing the above error intermittently and whenever I receive this error, it discards a whole lot of messages from that partition.

So for example I receive this error in partition -2 offset (5100), whenever it resumes to consume from that partition it starts from the latest for example offset (9000). It discards the 4000 messages and I lose them.

Can you help me by explaining what may be wrong or what your library does in this case, and any ideas how to overcome such scenario.

@ychuzevi
Copy link
Contributor

Hello,
The consumer is not able to deserialize the batch of messages because the batch lenght does not seem to match. This should not happen. How are those messages produced?
In such case it is usually not a good idea to retry from the culprit offset because if the batch stored on the servers is permanently broken, you will be reading the failing batch in a loop.
The current discard strategy is quite basic: as the driver does not know how many messages are corrupted, it will just discard all of them and go to the latest offset.

@verdie-g
Copy link
Contributor

verdie-g commented Feb 24, 2020

This is probably because of the following optimization:

the server is allowed to return a partial message at the end of the message set

This was fixed internally. We need to make a new release.

@ParthShirolawala
Copy link
Author

@verdie-g when can I expect the new release ? I hope it will be soon. Please fill me on that.

@verdie-g
Copy link
Contributor

I just need to merge #47

@sdanzan
Copy link
Contributor

sdanzan commented Feb 26, 2020

The partial message thing was already taken care of when deserialising message sets. The new problem was introduced when supporting record batches?

@spocthier
Copy link
Contributor

The partial message thing was already taken care of when deserialising message sets. The new problem was introduced when supporting record batches?

Precisely. Should be ok with the fix.

@oguzhaneren
Copy link

oguzhaneren commented Jul 24, 2020

hi,

when i subscribe to topic, k# throws exception like below

[2020-07-24 14:45:48] ERROR A response could not be decoded for the node (Id:2 Host:localhostt Port:9092): Kafka.Protocol.UncompressException: Invalid compressed data.
 ---> System.IO.InvalidDataException: Input is not a valid snappy-compressed block
   at Snappy.SnappyCodec.Uncompress(Byte[] input, Int32 offset, Int32 length, Byte[] output, Int32 outOffset)
   at Kafka.Protocol.Basics.Uncompress(ReusableMemoryStream uncompressed, Byte[] body, Int32 offset, Int32 length, CompressionCodec codec)
   --- End of inner exception stack trace ---
   at Kafka.Protocol.Basics.Uncompress(ReusableMemoryStream uncompressed, Byte[] body, Int32 offset, Int32 length, CompressionCodec codec)
   at Kafka.Protocol.FetchPartitionResponse.LazyDeserializeMessageSet(ReusableMemoryStream stream, Int32 messageSetSize, Tuple`2 deserializers)+MoveNext()
   at System.Collections.Generic.List`1.InsertRange(Int32 index, IEnumerable`1 collection)
   at System.Collections.Generic.List`1.AddRange(IEnumerable`1 collection)
   at Kafka.Protocol.FetchPartitionResponse.DeserializeMessageSet(ReusableMemoryStream stream, Tuple`2 deserializers)
   at Kafka.Protocol.FetchPartitionResponse.Deserialize(ReusableMemoryStream stream, Object extra, ApiVersion version)
   at Kafka.Protocol.TopicData`1.Deserialize(ReusableMemoryStream stream, Object extra, ApiVersion version)
   at Kafka.Protocol.Basics.DeserializeArrayExtra[TData](ReusableMemoryStream stream, Object extra, ApiVersion version)
   at Kafka.Protocol.CommonResponse`1.Deserialize(ReusableMemoryStream stream, Object extra, ApiVersion version)
   at Kafka.Protocol.FetchResponse.Deserialize(ReusableMemoryStream stream, Object extra, ApiVersion version)
   at Kafka.Cluster.Node.Serialization.DeserializeResponse[TResponse](Int32 correlationId, ReusableMemoryStream data, ApiVersion version)
   at Kafka.Cluster.Node.ProcessFetchResponse(Int32 correlationId, ReusableMemoryStream responseData, IBatchByTopic`1 originalRequest, ApiVersion version)

[2020-07-24 14:45:48] ERROR Could not deserialize a message for topic mytopic / partition 0. Because our configuration is set to 'ConsumerErrorStrategy == Discard', we will now read from latest offset.

here my configs

producer

   var serializationConfig = new SerializationConfig()
            {
                SerializeOnProduce = true
            };
            var serializer = new StringSerializer();
            serializationConfig.SetDefaultSerializers( serializer, serializer);

           var clusterClient = new ClusterClient(new Configuration
            {
                Seeds = _kafkaBrokerOptions.Servers,
                SerializationConfig = serializationConfig,
                ClientRequestTimeoutMs = (int) kafkaBrokerOptions.RequestTimeout.TotalMilliseconds,
                ClientId = $"K#-{Guid.NewGuid()}__producer"
            }, new ConsoleLogger());
     var clusterClient.Produce("mytopic", "hello world");

consumer

 var serializationConfig = new SerializationConfig()
            {
                SerializeOnProduce = true
            };
            var deserializer = new StringDeserializer();
            serializationConfig.SetDefaultDeserializers(deserializer,deserializer);
          var clusterClient = new ClusterClient(new Configuration
            {
                ErrorStrategy =ErrorStrategy.Retry, 
                Seeds = _kafkaBrokerOptions.Servers,
                SerializationConfig = serializationConfig,
                ClientRequestTimeoutMs = (int)kafkaBrokerOptions.RequestTimeout.TotalMilliseconds,
                ClientId = $"K#-{Guid.NewGuid()}__consumer"
            }, new ConsoleLogger());

  clusterClient.Messages.Where(kr => kr.Topic == "mytopic")
                .Subscribe(kr => Console.WriteLine("{0}/{1} {2}: {3}", kr.Topic, kr.Partition, kr.Offset, kr.Value as string));

 var consConf = new ConsumerGroupConfiguration
            {
                SessionTimeoutMs = 30000,
                RebalanceTimeoutMs = 20000,
                DefaultOffsetToReadFrom = Offset.Lastest,
                AutoCommitEveryMs = 5000
            };
            _clusterClient.Subscribe("test-cgid", new []{topic}, consConf);

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants