diff --git a/src/Consumers/Consumer.php b/src/Consumers/Consumer.php index 09aa4b81..d3532972 100644 --- a/src/Consumers/Consumer.php +++ b/src/Consumers/Consumer.php @@ -434,7 +434,10 @@ private function handleMessage(Message $message): void if (! in_array($message->err, self::IGNORABLE_CONSUMER_ERRORS, true)) { $this->logger->error($message, null, 'CONSUMER'); - throw new ConsumerException($message->errstr(), $message->err); + $exception = new ConsumerException($message->errstr(), $message->err); + $exception->setKafkaMessage($message); + + throw $exception; } } diff --git a/src/Exceptions/ConsumerException.php b/src/Exceptions/ConsumerException.php index 4bbf2968..1674c068 100644 --- a/src/Exceptions/ConsumerException.php +++ b/src/Exceptions/ConsumerException.php @@ -2,10 +2,24 @@ namespace Junges\Kafka\Exceptions; +use RdKafka\Message; + class ConsumerException extends LaravelKafkaException { + protected Message $kafkaMessage; + public static function dlqCanNotBeSetWithoutSubscribingToAnyTopics(): self { return new static("You must subscribe to a kafka topic before specifying the DLQ."); } + + public function setKafkaMessage(Message $kafkaMessage) + { + $this->kafkaMessage = $kafkaMessage; + } + + public function getKafkaMessage(): Message + { + return $this->kafkaMessage; + } }