Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for failure message during serialization and handling #26

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ framework:
options:
commitAsync: true
receiveTimeout: 10000
ackFailedMessages: true
topic:
name: "events"
kafka_conf:
Expand Down Expand Up @@ -111,3 +112,14 @@ final class MySerializer extends Serializer
}
}
```

## Failing messages
By default this bundle supports the `MessageDecodingFailedException` (when failing deserialization),
a message will be acknowledged (because there is no reject within Kafka).

When an exception is thrown within a MessageHandlerInterface, a `WorkerMessageFailedEvent` will be
fired. The original message will be acknowledged and the retry process is started (creating a retry
message or move to failed transport).

If you want to disable the acknowledgement of a failing message, you can configure `ackFailedMessages
: false` within options.
56 changes: 47 additions & 9 deletions src/Messenger/KafkaTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
use const RD_KAFKA_PARTITION_UA;
use RdKafka\Conf as KafkaConf;
use RdKafka\KafkaConsumer;
use RdKafka\Message;
use RdKafka\Producer as KafkaProducer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
Expand Down Expand Up @@ -50,6 +52,9 @@ class KafkaTransport implements TransportInterface
/** @var bool */
private $subscribed;

/** @var bool */
private $ackFailedMessages;

public function __construct(
LoggerInterface $logger,
SerializerInterface $serializer,
Expand All @@ -58,7 +63,8 @@ public function __construct(
string $topicName,
int $flushTimeoutMs,
int $receiveTimeoutMs,
bool $commitAsync
bool $commitAsync,
bool $ackFailedMessages
) {
$this->logger = $logger;
$this->serializer = $serializer;
Expand All @@ -68,6 +74,7 @@ public function __construct(
$this->flushTimeoutMs = $flushTimeoutMs;
$this->receiveTimeoutMs = $receiveTimeoutMs;
$this->commitAsync = $commitAsync;
$this->ackFailedMessages = $ackFailedMessages;

$this->subscribed = false;
}
Expand All @@ -79,13 +86,7 @@ public function get(): iterable
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$this->logger->info(sprintf('Kafka: Message %s %s %s received ', $message->topic_name, $message->partition, $message->offset));

$envelope = $this->serializer->decode([
'body' => $message->payload,
'headers' => $message->headers,
]);

return [$envelope->with(new KafkaMessageStamp($message))];
return $this->getEnvelope($message);
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
$this->logger->info('Kafka: Partition EOF reached. Waiting for next message ...');
break;
Expand All @@ -102,6 +103,22 @@ public function get(): iterable
return [];
}

private function getEnvelope(Message $message): iterable
{
try {
$envelope = $this->serializer->decode([
'body' => $message->payload,
'headers' => $message->headers,
]);
} catch (MessageDecodingFailedException $exception) {
$this->rejectKafkaMessage($message);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default behavior would be that message offsets get committed if the serializer throws an exception.
This would probably be an unexpected side effect.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KonstantinCodes this is also done in other transports like RabbitMQ, all enqueue integrations.

The question is now: if serialization fails what should it do? Stop the consumer everytime and try serialization the message everytime (untill the code is fixed to accept this message)?

Or do what it does now: try to deserialize and if it fails throw this exception in the log and ack the message (so no hammering will occur).

Copy link
Owner

@KonstantinCodes KonstantinCodes Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The serializer throws an exception, either because of a programming error or because of an unexpected value. Maybe a new schema, maybe it's an avro message and the schema registry is temporarily down.

In that case, no messages will get processed but they will all be acked.

Once the schema registry is back online, messages will be processed again. We may just have lost a few hours of data. And the programmers have to manually reset the group.id to the last offset before the schema registry went offline, if they want to process the "lost" messages. Additionally, the messages that already got processed after the registry came back online, will be processed again after our group.id reset.

It is a very messy problem to fix.

On the other hand - if message processing waits until the schema registry has come online again, no manual intervention by the programmers is necessary. The system "healed" itself.

It seems to me that the program you have in mind, operates in a "at most once processing" scenario, where it is allowed to more or less randomly skip messages. You're probably dealing with tons of messages and also want to be careful, not to overload the brokers with too many requests.

What's the cause of hammering? In your deployment, how does the symfony consumer process get managed? Does it get restarted immediately after a failure? This could actually create chaos. If you have 3 consumers constantly restarting, they will really hammer Kafka with rebalance operations. You almost DOS your Kafka in that scenario. Maybe an incremental backoff would be the way to go?

Or, if you really operate in "atmost once", why throw an exception in your serializer at all? Just return an empty message. For example, you build a try-catch around your decode logic and in the case of an exception, return a new NullMessage(). Then configure the symfony messenger bus to default_middleware: allow_no_handlers.

In your serializer you could even send messages that failed to decode to a different bus for later re-processing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your reply!

Im still doubting what to do about the failing serialization, it could have multiple issues:

  • Registered schema does not completely match with code so errors could occur durring deserialization (logic thing that this should be fixed)
  • There is no registered schema so 'anything' can be send on producer side and consume can fail
  • Avro message and schema registry is down

By default the Symfony messenger setup is done with supervisord, so when a consumer stops it just restarts so when a message failed it can just consume the next if you setup the retry strategy correctly.

On all projects at the moment we do now;

  • When serialization fails -> stop consumer and retry (untill now this doesn't occured yet)
  • When consuming fails (during handling) -> move to failed db transport (able to retry)

But I see a problem arrises when serialization will fail for a single message and hold all the other thousands of messages in Kafka. Of course there is also a problem if all messages fails during serialization (because of down time schema registry or an incorrect code update on producer/consuming side).

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/symfony/symfony-docs/pull/13597/files
This PR looks rather interesting. There is a bash script that sleeps for 30 seconds to prevent hammering.
Not sure if this could help but maybe worth a try?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sorry, I didn't see your reply yet so I just edited my message #26 (comment)
Basically I'd try to catch the exception in the serializer and move the message the the failed bus.

Copy link
Author

@rdotter rdotter Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KonstantinCodes indeed true. Good thing to check out consuming and supervisord 👍

Idea to change this PR to not ack when serialization fails (logic should then be in serialization class if it should do something and if it fails it will be retried?) and ack the message if it during handling fails and after that the Kafka can be recreated through retry strategy (eg move to a queue where you can delay a message for retrying like SQS/db/Redis/RabbitMQ) or to the bottom of the Kafka topic (if this is enabled via config)
And set default ack when fail to false


throw $exception;
}

return [$envelope->with(new KafkaMessageStamp($message))];
}

public function getSubscribedConsumer(): KafkaConsumer
{
$consumer = $this->getConsumer();
Expand Down Expand Up @@ -135,7 +152,28 @@ public function ack(Envelope $envelope): void

public function reject(Envelope $envelope): void
{
// Do nothing. auto commit should be set to false!
/** @var KafkaMessageStamp $transportStamp */
$transportStamp = $envelope->last(KafkaMessageStamp::class);
$message = $transportStamp->getMessage();

$this->rejectKafkaMessage($message);
}

private function rejectKafkaMessage(Message $message): void
{
if (false === $this->ackFailedMessages) {
return;
}

$consumer = $this->getConsumer();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just call ack() here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KonstantinCodes this depends on the question below, if there is always an envelope with correct serialized class it can use the ack() if you think its a good idea to catch the serialization exception and ack the message + continue then its not possible.


if ($this->commitAsync) {
$consumer->commitAsync($message);
} else {
$consumer->commit($message);
}

$this->logger->warning(sprintf('Message %s %s %s rejected successful.', $message->topic_name, $message->partition, $message->offset));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log line here suggests that the offset hasn't been committed, but it just was committed.

}

public function send(Envelope $envelope): Envelope
Expand Down
3 changes: 2 additions & 1 deletion src/Messenger/KafkaTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public function createTransport(string $dsn, array $options, SerializerInterface
$options['topic']['name'],
$options['flushTimeout'] ?? 10000,
$options['receiveTimeout'] ?? 10000,
$options['commitAsync'] ?? false
$options['commitAsync'] ?? false,
$options['ackFailedMessages'] ?? true
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually set the default to false here? The reject() call is intended for messages that have failed to process after all.

The Exceptions that cause Messenger to call reject() should be caused by the applications message handlers.

Could the same effect be achieved by the application implementing a "CatchAllExceptionsMiddleware"?

);
}

Expand Down
10 changes: 8 additions & 2 deletions tests/Unit/Messenger/KafkaTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public function testConstruct()
'test',
10000,
10000,
false
false,
true
);

static::assertInstanceOf(TransportInterface::class, $transport);
Expand Down Expand Up @@ -104,6 +105,10 @@ public function testGet()
])
->willReturn(new Envelope(new TestMessage()));

$this->mockRdKafkaConsumer
->method('commitAsync')
->with($testMessage);

$transport = new KafkaTransport(
$this->mockLogger,
$this->mockSerializer,
Expand All @@ -112,7 +117,8 @@ public function testGet()
'test',
10000,
10000,
false
false,
true
);

$receivedMessages = $transport->get();
Expand Down