Skip to content

Commit

Permalink
Merge pull request #8 from faisal2010/fix-message-count
Browse files Browse the repository at this point in the history
Fix getMessageCount to also count delayed queue messages
  • Loading branch information
ragboyjr authored Oct 27, 2020
2 parents f0d03fc + 0994794 commit bd5fbe2
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
5 changes: 4 additions & 1 deletion src/Transport/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ private function isDebounceStampExist(Envelope $env): bool

public function getMessageCount(): int {
$this->connect();
return (int) $this->redis->lLen($this->queue);
$pipe = $this->redis->multi(Redis::PIPELINE);
$pipe->lLen($this->queue);
$pipe->zCount($this->getDelayedSetName(), '-inf', '+inf');
return array_sum(array_map('intval', $pipe->exec()));
}

private function connect(): void {
Expand Down
35 changes: 32 additions & 3 deletions tests/Feature/TransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Symfony\Component\Messenger\Stamp\BusNameStamp;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
Expand Down Expand Up @@ -153,6 +154,24 @@ public function can_debounce_messages_and_wait_for_receiving() {
$this->then_the_queue_has_size(0);
}

/**
* @test
* @dataProvider provide_get_message_count_for_stamps
*/
public function get_message_count_for_stamps(StampInterface $stamp, int $count) {
$this->given_there_is_a_wrapped_message();
$this->given_there_is_a_stamp_on_the_message($stamp);
$this->when_the_message_is_sent_on_the_transport();
$this->then_message_count_in_the_queue($count);
}

public function provide_get_message_count_for_stamps()
{
yield 'unique stamp' => [new UniqueStamp(1), 1];
yield 'delay stamp' => [new DelayStamp(100), 1];
yield 'debounce stamp' => [new DebounceStamp(100, 1), 1];
}

private function given_there_is_a_message_on_the_queue_with_legacy_serialization() {
$this->redis->lPush('messenger', json_encode([
json_encode(['id' => null]),
Expand All @@ -165,15 +184,20 @@ private function given_there_is_a_wrapped_message() {
}

private function given_there_is_a_unique_stamp_on_the_message(?string $id = null) {
$this->envelope = $this->envelope->with(new UniqueStamp($id));
$this->given_there_is_a_stamp_on_the_message(new UniqueStamp($id));
}

private function given_there_is_a_delay_stamp_on_the_message(int $delayMs) {
$this->envelope = $this->envelope->with(new DelayStamp($delayMs));
$this->given_there_is_a_stamp_on_the_message(new DelayStamp($delayMs));
}

private function given_there_is_a_debounce_stamp_on_the_message(int $delay, ?string $id = null): void {
$this->envelope = $this->envelope->with(new DebounceStamp($delay, $id));
$this->given_there_is_a_stamp_on_the_message(new DebounceStamp($delay, $id));
}

private function given_there_is_a_stamp_on_the_message(StampInterface $stamp): void
{
$this->envelope = $this->envelope->with($stamp);
}

public function given_there_is_a_wrapped_message_in_the_queue() {
Expand Down Expand Up @@ -282,4 +306,9 @@ private function then_the_redis_transport_connect_params_use_tls() {
TransportTest::assertEquals(['tls://redis', 6379], $this->connectParams);
}, $this->transport, RedisTransport::class)();
}

private function then_message_count_in_the_queue(int $count): void
{
$this->assertEquals($count, $this->transport->getMessageCount());
}
}

0 comments on commit bd5fbe2

Please sign in to comment.