diff --git a/src/Contracts/MessageProducer.php b/src/Contracts/MessageProducer.php index 6317ce75..277bd946 100644 --- a/src/Contracts/MessageProducer.php +++ b/src/Contracts/MessageProducer.php @@ -59,7 +59,7 @@ public function withDebugEnabled(bool $enabled = true): self; * * @throws \Exception */ - public function send(): bool; + public function send(bool $shouldFlush = false): bool; public function build(): Producer|ProducerFake; @@ -68,5 +68,5 @@ public function build(): Producer|ProducerFake; * * @throws \Junges\Kafka\Exceptions\CouldNotPublishMessage */ - public function sendBatch(MessageBatch $messageBatch): int; + public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int; } diff --git a/src/Contracts/Producer.php b/src/Contracts/Producer.php index 6f8489da..20a321f2 100644 --- a/src/Contracts/Producer.php +++ b/src/Contracts/Producer.php @@ -13,13 +13,13 @@ interface Producer * @return mixed * @throws \Exception */ - public function produce(ProducerMessage $message): bool; + public function produce(ProducerMessage $message, bool $shouldFlush = false): bool; /** * @throws CouldNotPublishMessage * @throws \Junges\Kafka\Exceptions\CouldNotPublishMessageBatch */ - public function produceBatch(MessageBatch $messageBatch): int; + public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int; /** * @throws \Junges\Kafka\Exceptions\Transactions\TransactionShouldBeRetriedException diff --git a/src/Producers/Builder.php b/src/Producers/Builder.php index 1a601543..fa96eb03 100644 --- a/src/Producers/Builder.php +++ b/src/Producers/Builder.php @@ -162,11 +162,11 @@ public function withDebugDisabled(): self * * @throws \Exception */ - public function send(): bool + public function send(bool $shouldFlush = false): bool { $producer = $this->build(); - return $producer->produce($this->message); + return $producer->produce($this->message, $shouldFlush); } /** @@ -174,7 +174,7 @@ public function send(): bool * * @throws \Junges\Kafka\Exceptions\CouldNotPublishMessage */ - public function sendBatch(MessageBatch $messageBatch): int + public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int { $producer = $this->build(); @@ -182,7 +182,7 @@ public function sendBatch(MessageBatch $messageBatch): int $messageBatch->onTopic($this->topic); } - return $producer->produceBatch($messageBatch); + return $producer->produceBatch($messageBatch, $shouldFlush); } public function build(): Producer diff --git a/src/Producers/Producer.php b/src/Producers/Producer.php index 6cbbac22..b9df8f24 100644 --- a/src/Producers/Producer.php +++ b/src/Producers/Producer.php @@ -4,6 +4,7 @@ use Illuminate\Contracts\Events\Dispatcher; use Illuminate\Support\Facades\App; +use Illuminate\Support\Lottery; use Junges\Kafka\Concerns\ManagesTransactions; use Junges\Kafka\Config\Config; use Junges\Kafka\Contracts\MessageSerializer; @@ -58,7 +59,7 @@ private function setConf(array $options): Conf } /** @inheritDoc */ - public function produce(ProducerMessage $message): bool + public function produce(ProducerMessage $message, bool $shouldFlush = false): bool { $this->dispatcher->dispatch(new PublishingMessage($message)); @@ -72,11 +73,11 @@ public function produce(ProducerMessage $message): bool $this->producer->poll(0); - return $this->flush(); + return $this->flush($shouldFlush); } /** @inheritDoc */ - public function produceBatch(MessageBatch $messageBatch): int + public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int { if ($messageBatch->getTopicName() === '') { throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName()); @@ -91,6 +92,7 @@ public function produceBatch(MessageBatch $messageBatch): int $produced = 0; $this->dispatcher->dispatch(new PublishingMessageBatch($messageBatch)); + foreach ($messagesIterator as $message) { assert($message instanceof Message); $message->onTopic($messageBatch->getTopicName()); @@ -103,7 +105,7 @@ public function produceBatch(MessageBatch $messageBatch): int $produced++; } - $this->flush(); + $this->flush($shouldFlush); $this->dispatcher->dispatch(new MessageBatchPublished($messageBatch, $produced)); @@ -140,30 +142,44 @@ private function produceMessageBatch(ProducerTopic $topic, ProducerMessage $mess * @throws CouldNotPublishMessage * @throws \Exception */ - private function flush(): mixed + private function flush(bool $shouldFlush = true): mixed { - $sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100); - - try { - return retry(10, function () { - $result = $this->producer->flush(1000); - - if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { - return true; - } - - $message = rd_kafka_err2str($result); - - throw CouldNotPublishMessage::withMessage($message, $result); - }, $sleepMilliseconds); - } catch (CouldNotPublishMessage $exception) { - $this->dispatcher->dispatch(new \Junges\Kafka\Events\CouldNotPublishMessage( - $exception->getKafkaErrorCode(), - $exception->getMessage(), - $exception, - )); - - throw $exception; + // Here we define the flush callback that is called when flush is requested by + // the developer or when the lottery wins. Flush is not needed in after all + // messages, but is recommended to be called once in a while on kafka. + $flush = function () { + $sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100); + + try { + return retry(10, function () { + $result = $this->producer->flush(1000); + + if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { + return true; + } + + $message = rd_kafka_err2str($result); + + throw CouldNotPublishMessage::withMessage($message, $result); + }, $sleepMilliseconds); + } catch (CouldNotPublishMessage $exception) { + $this->dispatcher->dispatch(new \Junges\Kafka\Events\CouldNotPublishMessage( + $exception->getKafkaErrorCode(), + $exception->getMessage(), + $exception, + )); + + throw $exception; + } + }; + + if ($shouldFlush) { + return $flush(); } + + return Lottery::odds(1, 200) + ->winner($flush) + ->loser(fn () => true) + ->choose(); } } diff --git a/src/Support/Testing/Fakes/ProducerBuilderFake.php b/src/Support/Testing/Fakes/ProducerBuilderFake.php index 223c27f3..96b8bcde 100644 --- a/src/Support/Testing/Fakes/ProducerBuilderFake.php +++ b/src/Support/Testing/Fakes/ProducerBuilderFake.php @@ -153,7 +153,7 @@ public function usingSerializer(MessageSerializer $serializer): MessageProducer } /** Send the message to the producer to be published on kafka. */ - public function send(): bool + public function send(bool $shouldFlush = false): bool { $producer = $this->build(); @@ -161,7 +161,7 @@ public function send(): bool } /** Send a message batch to Kafka. */ - public function sendBatch(MessageBatch $messageBatch): int + public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int { $producer = $this->build(); diff --git a/src/Support/Testing/Fakes/ProducerFake.php b/src/Support/Testing/Fakes/ProducerFake.php index 36f67069..378345da 100644 --- a/src/Support/Testing/Fakes/ProducerFake.php +++ b/src/Support/Testing/Fakes/ProducerFake.php @@ -35,7 +35,7 @@ public function withProduceCallback(callable $callback): self return $this; } - public function produce(ProducerMessage $message): bool + public function produce(ProducerMessage $message, bool $shouldFlush = false): bool { if ($this->producerCallback !== null) { $callback = $this->producerCallback; @@ -46,7 +46,7 @@ public function produce(ProducerMessage $message): bool } /** @throws \Junges\Kafka\Exceptions\CouldNotPublishMessageBatch */ - public function produceBatch(MessageBatch $messageBatch): int + public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int { if ($messageBatch->getTopicName() === '') { throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName()); diff --git a/tests/KafkaTest.php b/tests/KafkaTest.php index b3e1cf19..f023ef23 100644 --- a/tests/KafkaTest.php +++ b/tests/KafkaTest.php @@ -274,7 +274,7 @@ public function testProducerThrowsExceptionIfMessageCouldNotBePublished(): void $this->app->bind(Producer::class, fn () => $mockedProducer); - Kafka::publish()->onTopic('test')->withBodyKey('foo', 'bar')->send(); + Kafka::publish()->onTopic('test')->withBodyKey('foo', 'bar')->send(shouldFlush: true); Event::assertDispatched(CouldNotPublishMessageEvent::class, function (CouldNotPublishMessageEvent $event) use ($expectedMessage) { return $event->throwable instanceof CouldNotPublishMessage @@ -314,7 +314,7 @@ public function testSendMessageBatch(): void Event::fake(); - Kafka::publish()->withBodyKey('foo', 'bar')->onTopic('test')->sendBatch($messageBatch); + Kafka::publish()->withBodyKey('foo', 'bar')->onTopic('test')->sendBatch($messageBatch, shouldFlush: true); Event::assertDispatched(PublishingMessageBatch::class, function (PublishingMessageBatch $event) use ($messageBatch) { return $event->batch === $messageBatch;