diff --git a/src/Connectors/Consumer/Manager.php b/src/Connectors/Consumer/Manager.php index cc1b0de0..3e7b7d42 100644 --- a/src/Connectors/Consumer/Manager.php +++ b/src/Connectors/Consumer/Manager.php @@ -1,7 +1,6 @@ consumerHandler->warning($exception); return; - } catch (Exception $exception) { - $this->consumerHandler->failed($exception); + } catch (Throwable $throwable) { + $this->consumerHandler->failed($throwable); return; } diff --git a/src/TopicHandler/Consumer/AbstractHandler.php b/src/TopicHandler/Consumer/AbstractHandler.php index 73ad2307..d284b7aa 100644 --- a/src/TopicHandler/Consumer/AbstractHandler.php +++ b/src/TopicHandler/Consumer/AbstractHandler.php @@ -1,9 +1,9 @@ $exception, + 'exception' => $throwable, ]); } } diff --git a/tests/Unit/Connectors/Consumer/ManagerTest.php b/tests/Unit/Connectors/Consumer/ManagerTest.php index 1b740360..e47d996c 100644 --- a/tests/Unit/Connectors/Consumer/ManagerTest.php +++ b/tests/Unit/Connectors/Consumer/ManagerTest.php @@ -1,7 +1,9 @@ expects() + ->consume() + ->andThrow($throwable); + + $consumerHandler->expects() + ->failed($throwable); + + // Actions + $runner->handleMessage(); + } + public function testShouldHandleMultiplesMessages(): void { // Set @@ -58,25 +86,11 @@ public function testShouldHandleMultiplesMessages(): void $kafkaMessage3->payload = 'original message 3'; $kafkaMessage3->err = RD_KAFKA_RESP_ERR_NO_ERROR; - $messages = [$kafkaMessage1, $kafkaMessage2, $kafkaMessage3]; - $count = 0; - $exception = new Exception('Exception occurs when consuming.'); - // Expectations - $consumer->shouldReceive('consume') - ->times(4) - ->andReturnUsing(function () use ($messages, &$count, $exception) { - $message = $messages[$count] ?? null; - if (!$message) { - throw $exception; - } - $count++; - - return $message; - }); - - $consumerHandler->expects() - ->failed($exception); + $consumer->shouldReceive() + ->consume() + ->times(3) + ->andReturn($kafkaMessage1, $kafkaMessage2, $kafkaMessage3); $dispatcher->expects() ->handle($consumerRecord) @@ -86,7 +100,6 @@ public function testShouldHandleMultiplesMessages(): void $runner->handleMessage(); $runner->handleMessage(); $runner->handleMessage(); - $runner->handleMessage(); } public function testShouldCallWarningWhenErrorOccurs(): void @@ -171,4 +184,22 @@ public function testShouldHandleAsyncCommit(): void $runner->handleMessage(); $runner->handleMessage(); } + + public function getThrowableScenarios(): array + { + return [ + 'Exception' => [ + 'throwable' => new Exception(), + ], + 'Error' => [ + 'throwable' => new Error(), + ], + 'InvalidArgumentException' => [ + 'throwable' => new InvalidArgumentException(), + ], + 'TypeError' => [ + 'throwable' => new TypeError(), + ], + ]; + } } diff --git a/tests/Unit/Dummies/ConsumerHandlerDummy.php b/tests/Unit/Dummies/ConsumerHandlerDummy.php index 08eb14a0..52ffc955 100644 --- a/tests/Unit/Dummies/ConsumerHandlerDummy.php +++ b/tests/Unit/Dummies/ConsumerHandlerDummy.php @@ -1,9 +1,9 @@