diff --git a/README.md b/README.md index be022eb..e7cf687 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ framework: options: commitAsync: true receiveTimeout: 10000 + ackFailedMessages: true topic: name: "events" kafka_conf: @@ -111,3 +112,14 @@ final class MySerializer extends Serializer } } ``` + +## Failing messages +By default this bundle supports the `MessageDecodingFailedException` (when failing deserialization), +a message will be acknowledged (because there is no reject within Kafka). + +When an exception is thrown within a MessageHandlerInterface, a `WorkerMessageFailedEvent` will be + fired. The original message will be acknowledged and the retry process is started (creating a retry + message or move to failed transport). + +If you want to disable the acknowledgement of a failing message, you can configure `ackFailedMessages +: false` within options. diff --git a/src/Messenger/KafkaTransport.php b/src/Messenger/KafkaTransport.php index bd697fe..c5260e1 100644 --- a/src/Messenger/KafkaTransport.php +++ b/src/Messenger/KafkaTransport.php @@ -9,8 +9,10 @@ use const RD_KAFKA_PARTITION_UA; use RdKafka\Conf as KafkaConf; use RdKafka\KafkaConsumer; +use RdKafka\Message; use RdKafka\Producer as KafkaProducer; use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportInterface; @@ -50,6 +52,9 @@ class KafkaTransport implements TransportInterface /** @var bool */ private $subscribed; + /** @var bool */ + private $ackFailedMessages; + public function __construct( LoggerInterface $logger, SerializerInterface $serializer, @@ -58,7 +63,8 @@ public function __construct( string $topicName, int $flushTimeoutMs, int $receiveTimeoutMs, - bool $commitAsync + bool $commitAsync, + bool $ackFailedMessages ) { $this->logger = $logger; $this->serializer = $serializer; @@ -68,6 +74,7 @@ public function __construct( $this->flushTimeoutMs = $flushTimeoutMs; $this->receiveTimeoutMs = $receiveTimeoutMs; $this->commitAsync = $commitAsync; + $this->ackFailedMessages = $ackFailedMessages; $this->subscribed = false; } @@ -79,13 +86,7 @@ public function get(): iterable switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $this->logger->info(sprintf('Kafka: Message %s %s %s received ', $message->topic_name, $message->partition, $message->offset)); - - $envelope = $this->serializer->decode([ - 'body' => $message->payload, - 'headers' => $message->headers, - ]); - - return [$envelope->with(new KafkaMessageStamp($message))]; + return $this->getEnvelope($message); case RD_KAFKA_RESP_ERR__PARTITION_EOF: $this->logger->info('Kafka: Partition EOF reached. Waiting for next message ...'); break; @@ -102,6 +103,22 @@ public function get(): iterable return []; } + private function getEnvelope(Message $message): iterable + { + try { + $envelope = $this->serializer->decode([ + 'body' => $message->payload, + 'headers' => $message->headers, + ]); + } catch (MessageDecodingFailedException $exception) { + $this->rejectKafkaMessage($message); + + throw $exception; + } + + return [$envelope->with(new KafkaMessageStamp($message))]; + } + public function getSubscribedConsumer(): KafkaConsumer { $consumer = $this->getConsumer(); @@ -135,7 +152,28 @@ public function ack(Envelope $envelope): void public function reject(Envelope $envelope): void { - // Do nothing. auto commit should be set to false! + /** @var KafkaMessageStamp $transportStamp */ + $transportStamp = $envelope->last(KafkaMessageStamp::class); + $message = $transportStamp->getMessage(); + + $this->rejectKafkaMessage($message); + } + + private function rejectKafkaMessage(Message $message): void + { + if (false === $this->ackFailedMessages) { + return; + } + + $consumer = $this->getConsumer(); + + if ($this->commitAsync) { + $consumer->commitAsync($message); + } else { + $consumer->commit($message); + } + + $this->logger->warning(sprintf('Message %s %s %s rejected successful.', $message->topic_name, $message->partition, $message->offset)); } public function send(Envelope $envelope): Envelope diff --git a/src/Messenger/KafkaTransportFactory.php b/src/Messenger/KafkaTransportFactory.php index dc84794..1042399 100644 --- a/src/Messenger/KafkaTransportFactory.php +++ b/src/Messenger/KafkaTransportFactory.php @@ -70,7 +70,8 @@ public function createTransport(string $dsn, array $options, SerializerInterface $options['topic']['name'], $options['flushTimeout'] ?? 10000, $options['receiveTimeout'] ?? 10000, - $options['commitAsync'] ?? false + $options['commitAsync'] ?? false, + $options['ackFailedMessages'] ?? true ); } diff --git a/tests/Unit/Messenger/KafkaTransportTest.php b/tests/Unit/Messenger/KafkaTransportTest.php index e0db16c..73020da 100644 --- a/tests/Unit/Messenger/KafkaTransportTest.php +++ b/tests/Unit/Messenger/KafkaTransportTest.php @@ -65,7 +65,8 @@ public function testConstruct() 'test', 10000, 10000, - false + false, + true ); static::assertInstanceOf(TransportInterface::class, $transport); @@ -104,6 +105,10 @@ public function testGet() ]) ->willReturn(new Envelope(new TestMessage())); + $this->mockRdKafkaConsumer + ->method('commitAsync') + ->with($testMessage); + $transport = new KafkaTransport( $this->mockLogger, $this->mockSerializer, @@ -112,7 +117,8 @@ public function testGet() 'test', 10000, 10000, - false + false, + true ); $receivedMessages = $transport->get();