Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for failure message during serialization and handling #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

rdotter
Copy link

@rdotter rdotter commented Nov 10, 2020

This solves: #22

When a message is received and there is an MessageDecodingFailedException thrown during serialization or an exception during handling a WorkerMessageFailedEvent will be dispatched.

@KonstantinCodes

return;
}

$consumer = $this->getConsumer();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call ack() here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KonstantinCodes this depends on the question below, if there is always an envelope with correct serialized class it can use the ack() if you think its a good idea to catch the serialization exception and ack the message + continue then its not possible.

'headers' => $message->headers,
]);
} catch (MessageDecodingFailedException $exception) {
$this->rejectKafkaMessage($message);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default behavior would be that message offsets get committed if the serializer throws an exception.
This would probably be an unexpected side effect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KonstantinCodes this is also done in other transports like RabbitMQ, all enqueue integrations.

The question is now: if serialization fails what should it do? Stop the consumer everytime and try serialization the message everytime (untill the code is fixed to accept this message)?

Or do what it does now: try to deserialize and if it fails throw this exception in the log and ack the message (so no hammering will occur).

Copy link
Owner

@KonstantinCodes KonstantinCodes Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serializer throws an exception, either because of a programming error or because of an unexpected value. Maybe a new schema, maybe it's an avro message and the schema registry is temporarily down.

In that case, no messages will get processed but they will all be acked.

Once the schema registry is back online, messages will be processed again. We may just have lost a few hours of data. And the programmers have to manually reset the group.id to the last offset before the schema registry went offline, if they want to process the "lost" messages. Additionally, the messages that already got processed after the registry came back online, will be processed again after our group.id reset.

It is a very messy problem to fix.

On the other hand - if message processing waits until the schema registry has come online again, no manual intervention by the programmers is necessary. The system "healed" itself.

It seems to me that the program you have in mind, operates in a "at most once processing" scenario, where it is allowed to more or less randomly skip messages. You're probably dealing with tons of messages and also want to be careful, not to overload the brokers with too many requests.

What's the cause of hammering? In your deployment, how does the symfony consumer process get managed? Does it get restarted immediately after a failure? This could actually create chaos. If you have 3 consumers constantly restarting, they will really hammer Kafka with rebalance operations. You almost DOS your Kafka in that scenario. Maybe an incremental backoff would be the way to go?

Or, if you really operate in "atmost once", why throw an exception in your serializer at all? Just return an empty message. For example, you build a try-catch around your decode logic and in the case of an exception, return a new NullMessage(). Then configure the symfony messenger bus to default_middleware: allow_no_handlers.

In your serializer you could even send messages that failed to decode to a different bus for later re-processing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply!

Im still doubting what to do about the failing serialization, it could have multiple issues:

  • Registered schema does not completely match with code so errors could occur durring deserialization (logic thing that this should be fixed)
  • There is no registered schema so 'anything' can be send on producer side and consume can fail
  • Avro message and schema registry is down

By default the Symfony messenger setup is done with supervisord, so when a consumer stops it just restarts so when a message failed it can just consume the next if you setup the retry strategy correctly.

On all projects at the moment we do now;

  • When serialization fails -> stop consumer and retry (untill now this doesn't occured yet)
  • When consuming fails (during handling) -> move to failed db transport (able to retry)

But I see a problem arrises when serialization will fail for a single message and hold all the other thousands of messages in Kafka. Of course there is also a problem if all messages fails during serialization (because of down time schema registry or an incorrect code update on producer/consuming side).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/symfony/symfony-docs/pull/13597/files
This PR looks rather interesting. There is a bash script that sleeps for 30 seconds to prevent hammering.
Not sure if this could help but maybe worth a try?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry, I didn't see your reply yet so I just edited my message #26 (comment)
Basically I'd try to catch the exception in the serializer and move the message the the failed bus.

Copy link
Author

@rdotter rdotter Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KonstantinCodes indeed true. Good thing to check out consuming and supervisord 👍

Idea to change this PR to not ack when serialization fails (logic should then be in serialization class if it should do something and if it fails it will be retried?) and ack the message if it during handling fails and after that the Kafka can be recreated through retry strategy (eg move to a queue where you can delay a message for retrying like SQS/db/Redis/RabbitMQ) or to the bottom of the Kafka topic (if this is enabled via config)
And set default ack when fail to false

$consumer->commit($message);
}

$this->logger->warning(sprintf('Message %s %s %s rejected successful.', $message->topic_name, $message->partition, $message->offset));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log line here suggests that the offset hasn't been committed, but it just was committed.

@@ -70,7 +70,8 @@ public function createTransport(string $dsn, array $options, SerializerInterface
$options['topic']['name'],
$options['flushTimeout'] ?? 10000,
$options['receiveTimeout'] ?? 10000,
$options['commitAsync'] ?? false
$options['commitAsync'] ?? false,
$options['ackFailedMessages'] ?? true
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually set the default to false here? The reject() call is intended for messages that have failed to process after all.

The Exceptions that cause Messenger to call reject() should be caused by the applications message handlers.

Could the same effect be achieved by the application implementing a "CatchAllExceptionsMiddleware"?

@KonstantinCodes
Copy link
Owner

Hi @rdotter! Great to see your PR :)

I know we talked about this feature a while ago, and your case definitely makes sense.

Still, I'm a bit unsure about basically making reject() do the same as ack().
It kind of feels like a workaround for code that throws too many Exceptions. (And blocks message processing that way.)

It really strikes me like the same could be achieved with a CatchAllExceptionsMiddleware.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants