Skip to content

Commit

Permalink
Improve producer performance by reducing flush calls (#252)
Browse files Browse the repository at this point in the history
* Add `shouldFlush` flag to `send` and `sendBatch` methods

* Add `shouldFlush` flag to `send` and `sendBatch` methods

* Only flush when requested or for 0.5 percent of published messages

* Add shouldFlush flag

* Fix tests
  • Loading branch information
mateusjunges authored Feb 19, 2024
1 parent e32843b commit 5d3bc64
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 41 deletions.
4 changes: 2 additions & 2 deletions src/Contracts/MessageProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function withDebugEnabled(bool $enabled = true): self;
*
* @throws \Exception
*/
public function send(): bool;
public function send(bool $shouldFlush = false): bool;

public function build(): Producer|ProducerFake;

Expand All @@ -68,5 +68,5 @@ public function build(): Producer|ProducerFake;
*
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessage
*/
public function sendBatch(MessageBatch $messageBatch): int;
public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int;
}
4 changes: 2 additions & 2 deletions src/Contracts/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ interface Producer
* @return mixed
* @throws \Exception
*/
public function produce(ProducerMessage $message): bool;
public function produce(ProducerMessage $message, bool $shouldFlush = false): bool;

/**
* @throws CouldNotPublishMessage
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessageBatch
*/
public function produceBatch(MessageBatch $messageBatch): int;
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int;

/**
* @throws \Junges\Kafka\Exceptions\Transactions\TransactionShouldBeRetriedException
Expand Down
8 changes: 4 additions & 4 deletions src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,27 +162,27 @@ public function withDebugDisabled(): self
*
* @throws \Exception
*/
public function send(): bool
public function send(bool $shouldFlush = false): bool
{
$producer = $this->build();

return $producer->produce($this->message);
return $producer->produce($this->message, $shouldFlush);
}

/**
* Send a message batch to Kafka.
*
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessage
*/
public function sendBatch(MessageBatch $messageBatch): int
public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
{
$producer = $this->build();

if ($this->topic !== '' && $messageBatch->getTopicName() === '') {
$messageBatch->onTopic($this->topic);
}

return $producer->produceBatch($messageBatch);
return $producer->produceBatch($messageBatch, $shouldFlush);
}

public function build(): Producer
Expand Down
70 changes: 43 additions & 27 deletions src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Support\Facades\App;
use Illuminate\Support\Lottery;
use Junges\Kafka\Concerns\ManagesTransactions;
use Junges\Kafka\Config\Config;
use Junges\Kafka\Contracts\MessageSerializer;
Expand Down Expand Up @@ -58,7 +59,7 @@ private function setConf(array $options): Conf
}

/** @inheritDoc */
public function produce(ProducerMessage $message): bool
public function produce(ProducerMessage $message, bool $shouldFlush = false): bool
{
$this->dispatcher->dispatch(new PublishingMessage($message));

Expand All @@ -72,11 +73,11 @@ public function produce(ProducerMessage $message): bool

$this->producer->poll(0);

return $this->flush();
return $this->flush($shouldFlush);
}

/** @inheritDoc */
public function produceBatch(MessageBatch $messageBatch): int
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
{
if ($messageBatch->getTopicName() === '') {
throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName());
Expand All @@ -91,6 +92,7 @@ public function produceBatch(MessageBatch $messageBatch): int
$produced = 0;

$this->dispatcher->dispatch(new PublishingMessageBatch($messageBatch));

foreach ($messagesIterator as $message) {
assert($message instanceof Message);
$message->onTopic($messageBatch->getTopicName());
Expand All @@ -103,7 +105,7 @@ public function produceBatch(MessageBatch $messageBatch): int
$produced++;
}

$this->flush();
$this->flush($shouldFlush);

$this->dispatcher->dispatch(new MessageBatchPublished($messageBatch, $produced));

Expand Down Expand Up @@ -140,30 +142,44 @@ private function produceMessageBatch(ProducerTopic $topic, ProducerMessage $mess
* @throws CouldNotPublishMessage
* @throws \Exception
*/
private function flush(): mixed
private function flush(bool $shouldFlush = true): mixed
{
$sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100);

try {
return retry(10, function () {
$result = $this->producer->flush(1000);

if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
return true;
}

$message = rd_kafka_err2str($result);

throw CouldNotPublishMessage::withMessage($message, $result);
}, $sleepMilliseconds);
} catch (CouldNotPublishMessage $exception) {
$this->dispatcher->dispatch(new \Junges\Kafka\Events\CouldNotPublishMessage(
$exception->getKafkaErrorCode(),
$exception->getMessage(),
$exception,
));

throw $exception;
// Here we define the flush callback that is called when flush is requested by
// the developer or when the lottery wins. Flush is not needed in after all
// messages, but is recommended to be called once in a while on kafka.
$flush = function () {
$sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100);

try {
return retry(10, function () {
$result = $this->producer->flush(1000);

if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
return true;
}

$message = rd_kafka_err2str($result);

throw CouldNotPublishMessage::withMessage($message, $result);
}, $sleepMilliseconds);
} catch (CouldNotPublishMessage $exception) {
$this->dispatcher->dispatch(new \Junges\Kafka\Events\CouldNotPublishMessage(
$exception->getKafkaErrorCode(),
$exception->getMessage(),
$exception,
));

throw $exception;
}
};

if ($shouldFlush) {
return $flush();
}

return Lottery::odds(1, 200)
->winner($flush)
->loser(fn () => true)
->choose();
}
}
4 changes: 2 additions & 2 deletions src/Support/Testing/Fakes/ProducerBuilderFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ public function usingSerializer(MessageSerializer $serializer): MessageProducer
}

/** Send the message to the producer to be published on kafka. */
public function send(): bool
public function send(bool $shouldFlush = false): bool
{
$producer = $this->build();

return $producer->produce($this->getMessage());
}

/** Send a message batch to Kafka. */
public function sendBatch(MessageBatch $messageBatch): int
public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
{
$producer = $this->build();

Expand Down
4 changes: 2 additions & 2 deletions src/Support/Testing/Fakes/ProducerFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function withProduceCallback(callable $callback): self
return $this;
}

public function produce(ProducerMessage $message): bool
public function produce(ProducerMessage $message, bool $shouldFlush = false): bool
{
if ($this->producerCallback !== null) {
$callback = $this->producerCallback;
Expand All @@ -46,7 +46,7 @@ public function produce(ProducerMessage $message): bool
}

/** @throws \Junges\Kafka\Exceptions\CouldNotPublishMessageBatch */
public function produceBatch(MessageBatch $messageBatch): int
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
{
if ($messageBatch->getTopicName() === '') {
throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName());
Expand Down
4 changes: 2 additions & 2 deletions tests/KafkaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public function testProducerThrowsExceptionIfMessageCouldNotBePublished(): void

$this->app->bind(Producer::class, fn () => $mockedProducer);

Kafka::publish()->onTopic('test')->withBodyKey('foo', 'bar')->send();
Kafka::publish()->onTopic('test')->withBodyKey('foo', 'bar')->send(shouldFlush: true);

Event::assertDispatched(CouldNotPublishMessageEvent::class, function (CouldNotPublishMessageEvent $event) use ($expectedMessage) {
return $event->throwable instanceof CouldNotPublishMessage
Expand Down Expand Up @@ -314,7 +314,7 @@ public function testSendMessageBatch(): void

Event::fake();

Kafka::publish()->withBodyKey('foo', 'bar')->onTopic('test')->sendBatch($messageBatch);
Kafka::publish()->withBodyKey('foo', 'bar')->onTopic('test')->sendBatch($messageBatch, shouldFlush: true);

Event::assertDispatched(PublishingMessageBatch::class, function (PublishingMessageBatch $event) use ($messageBatch) {
return $event->batch === $messageBatch;
Expand Down

0 comments on commit 5d3bc64

Please sign in to comment.