From 09947940aa53eb9e9ac738940c4ae9c839ad67e9 Mon Sep 17 00:00:00 2001 From: faisal2010 Date: Tue, 27 Oct 2020 21:03:45 +0500 Subject: [PATCH] Fix getMessageCount to also count delayed queue messages - adding up messages queued in delay set with main queue in getMessageCount() method. --- src/Transport/RedisTransport.php | 5 ++++- tests/Feature/TransportTest.php | 35 +++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/Transport/RedisTransport.php b/src/Transport/RedisTransport.php index 6e1894b..fe6adc5 100644 --- a/src/Transport/RedisTransport.php +++ b/src/Transport/RedisTransport.php @@ -162,7 +162,10 @@ private function isDebounceStampExist(Envelope $env): bool public function getMessageCount(): int { $this->connect(); - return (int) $this->redis->lLen($this->queue); + $pipe = $this->redis->multi(Redis::PIPELINE); + $pipe->lLen($this->queue); + $pipe->zCount($this->getDelayedSetName(), '-inf', '+inf'); + return array_sum(array_map('intval', $pipe->exec())); } private function connect(): void { diff --git a/tests/Feature/TransportTest.php b/tests/Feature/TransportTest.php index 4714c3d..1ffa1ed 100644 --- a/tests/Feature/TransportTest.php +++ b/tests/Feature/TransportTest.php @@ -11,6 +11,7 @@ use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\Serializer; use Symfony\Component\Messenger\Transport\TransportFactoryInterface; @@ -153,6 +154,24 @@ public function can_debounce_messages_and_wait_for_receiving() { $this->then_the_queue_has_size(0); } + /** + * @test + * @dataProvider provide_get_message_count_for_stamps + */ + public function get_message_count_for_stamps(StampInterface $stamp, int $count) { + $this->given_there_is_a_wrapped_message(); + $this->given_there_is_a_stamp_on_the_message($stamp); + $this->when_the_message_is_sent_on_the_transport(); + $this->then_message_count_in_the_queue($count); + } + + public function provide_get_message_count_for_stamps() + { + yield 'unique stamp' => [new UniqueStamp(1), 1]; + yield 'delay stamp' => [new DelayStamp(100), 1]; + yield 'debounce stamp' => [new DebounceStamp(100, 1), 1]; + } + private function given_there_is_a_message_on_the_queue_with_legacy_serialization() { $this->redis->lPush('messenger', json_encode([ json_encode(['id' => null]), @@ -165,15 +184,20 @@ private function given_there_is_a_wrapped_message() { } private function given_there_is_a_unique_stamp_on_the_message(?string $id = null) { - $this->envelope = $this->envelope->with(new UniqueStamp($id)); + $this->given_there_is_a_stamp_on_the_message(new UniqueStamp($id)); } private function given_there_is_a_delay_stamp_on_the_message(int $delayMs) { - $this->envelope = $this->envelope->with(new DelayStamp($delayMs)); + $this->given_there_is_a_stamp_on_the_message(new DelayStamp($delayMs)); } private function given_there_is_a_debounce_stamp_on_the_message(int $delay, ?string $id = null): void { - $this->envelope = $this->envelope->with(new DebounceStamp($delay, $id)); + $this->given_there_is_a_stamp_on_the_message(new DebounceStamp($delay, $id)); + } + + private function given_there_is_a_stamp_on_the_message(StampInterface $stamp): void + { + $this->envelope = $this->envelope->with($stamp); } public function given_there_is_a_wrapped_message_in_the_queue() { @@ -282,4 +306,9 @@ private function then_the_redis_transport_connect_params_use_tls() { TransportTest::assertEquals(['tls://redis', 6379], $this->connectParams); }, $this->transport, RedisTransport::class)(); } + + private function then_message_count_in_the_queue(int $count): void + { + $this->assertEquals($count, $this->transport->getMessageCount()); + } }