Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Kafka::asyncPublish() that will not flush on each send/batchSend but only once when the application is terminating #310

Merged
merged 3 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion docs/producing-messages/1-producing-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,13 @@ Kafka::publish('broker')->onTopic('topic-name')
```

This method returns a `ProducerBuilder` instance, which contains a few methods to configure your kafka producer.
The following lines describes these methods.
The following lines describes these methods.

mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class
```php
use Junges\Kafka\Facades\Kafka;

mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
Kafka::asyncPublish('broker')->onTopic('topic-name')
```

The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send or batch send. That reduces the overhead when you want to send a lot of messages in your request handlers.
1 change: 1 addition & 0 deletions src/Facades/Kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

/**
* @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null)
* @method static \Junges\Kafka\Contracts\MessageProducer asyncPublish(string $broker = null)
mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
* @method static \Junges\Kafka\Consumers\Builder consumer(array $topics = [], string $groupId = null, string $brokers = null)
* @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null)
* @method static void assertPublishedTimes(int $times = 1, ProducerMessage $expectedMessage = null, callable $callback = null)
Expand Down
13 changes: 13 additions & 0 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ public function publish(string $broker = null): MessageProducer
);
}

/** Creates a new ProducerBuilder instance, setting brokers and topic. The producer will be flushed only when the application terminates, and doing SEND does not mean that the message was flushed! */
public function asyncPublish(string $broker = null): MessageProducer
mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
{
if ($this->shouldFake) {
return Kafka::fake()->publish();
}

return new ProducerBuilder(
broker: $broker ?? config('kafka.brokers'),
asyncProducer: true
);
}

/** Return a ConsumerBuilder instance. */
public function consumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder
{
Expand Down
13 changes: 11 additions & 2 deletions src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Builder implements MessageProducer
private array $options = [];
private ProducerMessage $message;
private MessageSerializer $serializer;
private Producer $producer;
private ?Producer $producer = null;
private string $topic = '';
private ?Sasl $saslConfig = null;
private readonly string $broker;
Expand All @@ -27,6 +27,7 @@ class Builder implements MessageProducer

public function __construct(
?string $broker = null,
private readonly bool $asyncProducer = false,
) {
/** @var ProducerMessage $message */
$message = app(ProducerMessage::class);
Expand Down Expand Up @@ -191,6 +192,9 @@ public function sendBatch(MessageBatch $messageBatch): int

public function build(): Producer
{
if ($this->asyncProducer && $this->producer){
return $this->producer;
}
$conf = new Config(
broker: $this->broker,
topics: [],
Expand All @@ -200,9 +204,14 @@ public function build(): Producer
callbacks: $this->callbacks,
);

return app(Producer::class, [
$res = app(Producer::class, [
mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
'config' => $conf,
'serializer' => $this->serializer,
'async' => $this->asyncProducer,
]);
if ($this->asyncProducer) {
$this->producer = $res;
mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
}
return $res;
mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
}
}
14 changes: 13 additions & 1 deletion src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ class Producer implements ProducerContract
public function __construct(
private readonly Config $config,
private readonly MessageSerializer $serializer,
private readonly bool $async = false,
) {
$this->producer = app(KafkaProducer::class, [
'conf' => $this->setConf($this->config->getProducerOptions()),
]);
$this->dispatcher = App::make(Dispatcher::class);
if ($this->async) {
app()->terminating(function () {
$this->flush();
});
}
}

/** Set the Kafka Configuration. */
Expand Down Expand Up @@ -72,6 +78,10 @@ public function produce(ProducerMessage $message): bool

$this->producer->poll(0);

if ($this->async) {
return true;
}

return $this->flush();
}

Expand Down Expand Up @@ -104,7 +114,9 @@ public function produceBatch(MessageBatch $messageBatch): int
$produced++;
}

$this->flush();
if (!$this->async) {
mateusjunges marked this conversation as resolved.
Show resolved Hide resolved
$this->flush();
}

$this->dispatcher->dispatch(new MessageBatchPublished($messageBatch, $produced));

Expand Down
Loading