Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed Aug 9, 2024
1 parent 3031da7 commit cb07735
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 7 deletions.
4 changes: 3 additions & 1 deletion docs/producing-messages/1-producing-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ This method returns a `ProducerBuilder` instance, which contains a few methods t
The following lines describes these methods.

If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class

```php
use Junges\Kafka\Facades\Kafka;

Kafka::asyncPublish('broker')->onTopic('topic-name')
```

The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send or batch send. That reduces the overhead when you want to send a lot of messages in your request handlers.
The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send or batch send.
This reduces the overhead when you want to send a lot of messages in your request handlers.
4 changes: 2 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ public function publish(string $broker = null): MessageProducer
);
}

/** Creates a new ProducerBuilder instance, setting brokers and topic. The producer will be flushed only when the application terminates, and doing SEND does not mean that the message was flushed! */
/** Creates a new ProducerBuilder instance, optionally setting the brokers. The producer will be flushed only when the application terminates, and doing SEND does not mean that the message was flushed! */
public function asyncPublish(string $broker = null): MessageProducer
{
if ($this->shouldFake) {
return Kafka::fake()->publish();
return Kafka::fake()->publish($broker);
}

return new ProducerBuilder(
Expand Down
9 changes: 6 additions & 3 deletions src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ public function build(): Producer
if ($this->asyncProducer && $this->producer){
return $this->producer;
}

$conf = new Config(
broker: $this->broker,
topics: [],
Expand All @@ -204,14 +205,16 @@ public function build(): Producer
callbacks: $this->callbacks,
);

$res = app(Producer::class, [
$producer = app(Producer::class, [
'config' => $conf,
'serializer' => $this->serializer,
'async' => $this->asyncProducer,
]);

if ($this->asyncProducer) {
$this->producer = $res;
$this->producer = $producer;
}
return $res;

return $producer;
}
}
3 changes: 2 additions & 1 deletion src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public function __construct(
'conf' => $this->setConf($this->config->getProducerOptions()),
]);
$this->dispatcher = App::make(Dispatcher::class);

if ($this->async) {
app()->terminating(function () {
$this->flush();
Expand Down Expand Up @@ -114,7 +115,7 @@ public function produceBatch(MessageBatch $messageBatch): int
$produced++;
}

if (!$this->async) {
if (! $this->async) {
$this->flush();
}

Expand Down

0 comments on commit cb07735

Please sign in to comment.