Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to run a callback before deserializing a message #258

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ jobs:
strategy:
matrix:
php: [8.3, 8.2]
laravel: [10.*]
laravel: [11.*, 10.*]
dependency-version: [prefer-stable]
include:
- laravel: 10.*
testbench: 8.*
- laravel: 11.*
testbench: 9.*

name: CI - PHP ${{ matrix.php }} - Laravel ${{ matrix.laravel }} - Testbench ${{ matrix.testbench }} (${{ matrix.dependency-version }})

Expand Down
8 changes: 4 additions & 4 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
"ext-rdkafka": "^6.0",
"monolog/monolog": "^3.4",
"mateusjunges/avro-serde-php": "^3.0",
"illuminate/support": "^10.0",
"illuminate/contracts": "^10.0"
"illuminate/support": "^10.0|^11.0",
"illuminate/contracts": "^10.0|^11.0"
},
"require-dev": {
"phpunit/phpunit": "^9.5",
"phpunit/phpunit": "^10.5",
"orchestra/testbench": "^7.16|^8.0",
"predis/predis": "^1.1",
"friendsofphp/php-cs-fixer": "^3.0",
"kwn/php-rdkafka-stubs": "^2.2",
"rector/rector": "^0.15.1"
"rector/rector": "^0.19.1"
},
"minimum-stability": "dev",
"autoload": {
Expand Down
7 changes: 7 additions & 0 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Junges\Kafka\Config;

use Closure;
use JetBrains\PhpStorm\Pure;
use Junges\Kafka\Contracts\Consumer;
use Junges\Kafka\Contracts\HandlesBatchConfiguration;
Expand Down Expand Up @@ -80,6 +81,7 @@ public function __construct(
private readonly int $restartInterval = 1000,
private readonly array $callbacks = [],
private readonly array $beforeConsumingCallbacks = [],
private readonly ?Closure $beforeDeserializing = null,
private readonly array $afterConsumingCallbacks = [],
private readonly int $maxTime = 0,
private readonly array $partitionAssignment = [],
Expand Down Expand Up @@ -205,6 +207,11 @@ public function getBeforeConsumingCallbacks(): array
return $this->beforeConsumingCallbacks;
}

public function getBeforeDeserializingCallback(): ?Closure
{
return $this->beforeDeserializing;
}

public function getAfterConsumingCallbacks(): array
{
return $this->afterConsumingCallbacks;
Expand Down
10 changes: 10 additions & 0 deletions src/Consumers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class Builder implements ConsumerBuilderContract
/** @var list<callable> */
protected array $beforeConsumingCallbacks = [];

protected ?Closure $beforeDeserializing = null;

/** @var list<callable> */
protected array $afterConsumingCallbacks = [];

Expand Down Expand Up @@ -314,6 +316,13 @@ public function assignPartitions(array $partitionAssignment): self
return $this;
}

public function beforeDeserializing(callable $beforeDeserializing): self
{
$this->beforeDeserializing = $beforeDeserializing(...);

return $this;
}

/** @inheritDoc */
public function build(): MessageConsumer
{
Expand All @@ -334,6 +343,7 @@ public function build(): MessageConsumer
stopAfterLastMessage: $this->stopAfterLastMessage,
callbacks: $this->callbacks,
beforeConsumingCallbacks: $this->beforeConsumingCallbacks,
beforeDeserializing: $this->beforeDeserializing,
afterConsumingCallbacks: $this->afterConsumingCallbacks,
maxTime: $this->maxTime,
partitionAssignment: $this->partitionAssignment,
Expand Down
29 changes: 28 additions & 1 deletion src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class Consumer implements MessageConsumer
private readonly CommitterFactory $committerFactory;
private bool $stopRequested = false;
private ?Closure $whenStopConsuming = null;

private ?Closure $beforeDeserializing;
protected int $lastRestart = 0;
protected Timer $restartTimer;
private Dispatcher $dispatcher;
Expand All @@ -74,6 +76,7 @@ public function __construct(private readonly Config $config, private readonly Me

$this->committerFactory = $committerFactory ?? new DefaultCommitterFactory($this->messageCounter);
$this->dispatcher = App::make(Dispatcher::class);
$this->beforeDeserializing = $this->config->getBeforeDeserializingCallback();
}

/**
Expand Down Expand Up @@ -232,7 +235,11 @@ private function executeMessage(Message $message): void
// was received and will be consumed as soon as a consumer is available to process it.
$this->dispatcher->dispatch(new StartedConsumingMessage($consumedMessage));

$this->config->getConsumer()->handle($consumedMessage = $this->deserializer->deserialize($consumedMessage));
if (($consumedMessage = $this->deserialize($consumedMessage)) === null) {
return;
}

$this->config->getConsumer()->handle($consumedMessage);
$success = true;

// Dispatch an event informing that a message was consumed.
Expand All @@ -245,6 +252,26 @@ private function executeMessage(Message $message): void
$this->commit($message, $success);
}

private function deserialize(ConsumerMessage $message): ?ConsumerMessage
{
if ($this->runBeforeDeserializingCallback($message) === false) {
return null;
}

return $this->deserializer->deserialize($message);
}

private function runBeforeDeserializingCallback(ConsumerMessage $message): bool
{
if ($this->beforeDeserializing === null) {
return true;
}

$beforeDeserializingCallback = $this->beforeDeserializing;

return $beforeDeserializingCallback($message);
}

/**
* Handles batch of consumed messages by checking two conditions:
* 1) if current batch size is greater than or equals to batch size limit
Expand Down
4 changes: 2 additions & 2 deletions src/Contracts/MessageProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public function withDebugEnabled(bool $enabled = true): self;
*
* @throws \Exception
*/
public function send(): bool;
public function send(bool $shouldFlush = false): bool;

public function build(): Producer|ProducerFake;

Expand All @@ -68,5 +68,5 @@ public function build(): Producer|ProducerFake;
*
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessage
*/
public function sendBatch(MessageBatch $messageBatch): int;
public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int;
}
4 changes: 2 additions & 2 deletions src/Contracts/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ interface Producer
* @return mixed
* @throws \Exception
*/
public function produce(ProducerMessage $message): bool;
public function produce(ProducerMessage $message, bool $shouldFlush = false): bool;

/**
* @throws CouldNotPublishMessage
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessageBatch
*/
public function produceBatch(MessageBatch $messageBatch): int;
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int;

/**
* @throws \Junges\Kafka\Exceptions\Transactions\TransactionShouldBeRetriedException
Expand Down
8 changes: 4 additions & 4 deletions src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,27 +162,27 @@ public function withDebugDisabled(): self
*
* @throws \Exception
*/
public function send(): bool
public function send(bool $shouldFlush = false): bool
{
$producer = $this->build();

return $producer->produce($this->message);
return $producer->produce($this->message, $shouldFlush);
}

/**
* Send a message batch to Kafka.
*
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessage
*/
public function sendBatch(MessageBatch $messageBatch): int
public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
{
$producer = $this->build();

if ($this->topic !== '' && $messageBatch->getTopicName() === '') {
$messageBatch->onTopic($this->topic);
}

return $producer->produceBatch($messageBatch);
return $producer->produceBatch($messageBatch, $shouldFlush);
}

public function build(): Producer
Expand Down
70 changes: 43 additions & 27 deletions src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Support\Facades\App;
use Illuminate\Support\Lottery;
use Junges\Kafka\Concerns\ManagesTransactions;
use Junges\Kafka\Config\Config;
use Junges\Kafka\Contracts\MessageSerializer;
Expand Down Expand Up @@ -58,7 +59,7 @@ private function setConf(array $options): Conf
}

/** @inheritDoc */
public function produce(ProducerMessage $message): bool
public function produce(ProducerMessage $message, bool $shouldFlush = false): bool
{
$this->dispatcher->dispatch(new PublishingMessage($message));

Expand All @@ -72,11 +73,11 @@ public function produce(ProducerMessage $message): bool

$this->producer->poll(0);

return $this->flush();
return $this->flush($shouldFlush);
}

/** @inheritDoc */
public function produceBatch(MessageBatch $messageBatch): int
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
{
if ($messageBatch->getTopicName() === '') {
throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName());
Expand All @@ -91,6 +92,7 @@ public function produceBatch(MessageBatch $messageBatch): int
$produced = 0;

$this->dispatcher->dispatch(new PublishingMessageBatch($messageBatch));

foreach ($messagesIterator as $message) {
assert($message instanceof Message);
$message->onTopic($messageBatch->getTopicName());
Expand All @@ -103,7 +105,7 @@ public function produceBatch(MessageBatch $messageBatch): int
$produced++;
}

$this->flush();
$this->flush($shouldFlush);

$this->dispatcher->dispatch(new MessageBatchPublished($messageBatch, $produced));

Expand Down Expand Up @@ -140,30 +142,44 @@ private function produceMessageBatch(ProducerTopic $topic, ProducerMessage $mess
* @throws CouldNotPublishMessage
* @throws \Exception
*/
private function flush(): mixed
private function flush(bool $shouldFlush = true): mixed
{
$sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100);

try {
return retry(10, function () {
$result = $this->producer->flush(1000);

if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
return true;
}

$message = rd_kafka_err2str($result);

throw CouldNotPublishMessage::withMessage($message, $result);
}, $sleepMilliseconds);
} catch (CouldNotPublishMessage $exception) {
$this->dispatcher->dispatch(new \Junges\Kafka\Events\CouldNotPublishMessage(
$exception->getKafkaErrorCode(),
$exception->getMessage(),
$exception,
));

throw $exception;
// Here we define the flush callback that is called when flush is requested by
// the developer or when the lottery wins. Flush is not needed in after all
// messages, but is recommended to be called once in a while on kafka.
$flush = function () {
$sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100);

try {
return retry(10, function () {
$result = $this->producer->flush(1000);

if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
return true;
}

$message = rd_kafka_err2str($result);

throw CouldNotPublishMessage::withMessage($message, $result);
}, $sleepMilliseconds);
} catch (CouldNotPublishMessage $exception) {
$this->dispatcher->dispatch(new \Junges\Kafka\Events\CouldNotPublishMessage(
$exception->getKafkaErrorCode(),
$exception->getMessage(),
$exception,
));

throw $exception;
}
};

if ($shouldFlush) {
return $flush();
}

return Lottery::odds(1, 200)
->winner($flush)
->loser(fn () => true)
->choose();
}
}
4 changes: 2 additions & 2 deletions src/Support/Testing/Fakes/ProducerBuilderFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ public function usingSerializer(MessageSerializer $serializer): MessageProducer
}

/** Send the message to the producer to be published on kafka. */
public function send(): bool
public function send(bool $shouldFlush = false): bool
{
$producer = $this->build();

return $producer->produce($this->getMessage());
}

/** Send a message batch to Kafka. */
public function sendBatch(MessageBatch $messageBatch): int
public function sendBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
{
$producer = $this->build();

Expand Down
4 changes: 2 additions & 2 deletions src/Support/Testing/Fakes/ProducerFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function withProduceCallback(callable $callback): self
return $this;
}

public function produce(ProducerMessage $message): bool
public function produce(ProducerMessage $message, bool $shouldFlush = false): bool
{
if ($this->producerCallback !== null) {
$callback = $this->producerCallback;
Expand All @@ -46,7 +46,7 @@ public function produce(ProducerMessage $message): bool
}

/** @throws \Junges\Kafka\Exceptions\CouldNotPublishMessageBatch */
public function produceBatch(MessageBatch $messageBatch): int
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
{
if ($messageBatch->getTopicName() === '') {
throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName());
Expand Down
6 changes: 1 addition & 5 deletions tests/Commit/BatchCommitterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ public function testShouldCommitMessageOnlyAfterTheBatchSizeIsReached(): void
$committer = $this->createMock(Committer::class);
$committer
->expects($this->exactly(2))
->method('commitMessage')
->withConsecutive(
[$this->isInstanceOf(Message::class), true],
[$this->isInstanceOf(Message::class), true]
);
->method('commitMessage');

$batchSize = 3;
$messageCounter = new MessageCounter(42);
Expand Down
Loading
Loading