From 206d522ba3c6559ac5a37b066a6c5877591a4f09 Mon Sep 17 00:00:00 2001 From: Dimitar Ilkov Date: Wed, 3 Jul 2024 12:06:20 +0300 Subject: [PATCH] fix ack payload rendering --- src/Message/Ack.php | 7 ++--- src/Message/Msg.php | 10 ++---- src/Message/Nak.php | 21 +++++++++++++ src/Message/Progress.php | 16 ++++++++++ tests/Functional/StreamTest.php | 2 +- tests/Unit/Message/AckTest.php | 19 ++++++++++++ tests/Unit/Message/NakTest.php | 47 +++++++++++++++++++++++++++++ tests/Unit/Message/ProgressTest.php | 19 ++++++++++++ 8 files changed, 128 insertions(+), 13 deletions(-) create mode 100644 src/Message/Nak.php create mode 100644 src/Message/Progress.php create mode 100644 tests/Unit/Message/AckTest.php create mode 100644 tests/Unit/Message/NakTest.php create mode 100644 tests/Unit/Message/ProgressTest.php diff --git a/src/Message/Ack.php b/src/Message/Ack.php index 5e2c428..bd8765d 100644 --- a/src/Message/Ack.php +++ b/src/Message/Ack.php @@ -7,13 +7,10 @@ class Ack extends Prototype { public string $subject; - public string $command = '+ACK'; - - public ?Payload $payload = null; public function render(): string { - $payload = ($this->payload ?: Payload::parse(''))->render(); - return "PUB $this->subject $this->command $payload"; + $payload = Payload::parse('')->render(); + return "PUB $this->subject $payload"; } } diff --git a/src/Message/Msg.php b/src/Message/Msg.php index 3800d7d..4c54ab5 100644 --- a/src/Message/Msg.php +++ b/src/Message/Msg.php @@ -77,12 +77,9 @@ public function getClient(): ?Client public function nack(float $delay = 0): void { - $this->reply(new Ack([ - 'command' => '-NAK', + $this->reply(new Nak([ 'subject' => $this->replyTo, - 'payload' => Payload::parse([ - 'delay' => $delay, - ]), + 'delay' => $delay, ])); } @@ -125,8 +122,7 @@ public function parse($payload): self public function progress(): void { - $this->reply(new Ack([ - 'command' => '+WPI', + $this->reply(new Progress([ 'subject' => $this->replyTo, ])); } diff --git a/src/Message/Nak.php b/src/Message/Nak.php new file mode 100644 index 0000000..f0c9519 --- /dev/null +++ b/src/Message/Nak.php @@ -0,0 +1,21 @@ +delay) && $this->delay > 0) { + $data[] = json_encode(['delay' => $this->delay * 10 ** 9]); + } + $payload = Payload::parse(implode(' ', $data))->render(); + return "PUB $this->subject $payload"; + } +} diff --git a/src/Message/Progress.php b/src/Message/Progress.php new file mode 100644 index 0000000..72b5228 --- /dev/null +++ b/src/Message/Progress.php @@ -0,0 +1,16 @@ +render(); + return "PUB $this->subject $payload"; + } +} diff --git a/tests/Functional/StreamTest.php b/tests/Functional/StreamTest.php index 261f5e5..f8bc1bf 100644 --- a/tests/Functional/StreamTest.php +++ b/tests/Functional/StreamTest.php @@ -44,7 +44,7 @@ public function testNack() $message = $queue->fetch(); $this->assertNotNull($message); $this->assertSame((string) $message->payload, 'first'); - $message->nack(1); + $message->nack(30); $this->assertSame(1, $consumer->info()->num_ack_pending); $this->assertSame(1, $consumer->info()->num_pending); diff --git a/tests/Unit/Message/AckTest.php b/tests/Unit/Message/AckTest.php new file mode 100644 index 0000000..22b9d4f --- /dev/null +++ b/tests/Unit/Message/AckTest.php @@ -0,0 +1,19 @@ + '$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0' + ]); + + $this->assertEquals("PUB \$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0 0\r\n", $ack->render()); + } + +} diff --git a/tests/Unit/Message/NakTest.php b/tests/Unit/Message/NakTest.php new file mode 100644 index 0000000..e1ef196 --- /dev/null +++ b/tests/Unit/Message/NakTest.php @@ -0,0 +1,47 @@ + '$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0' + ]); + + $this->assertEquals("PUB \$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0 4\r\n-NAK", $nak->render()); + } + + public function testNakDelay() + { + $nak = new Nak([ + 'subject' => '$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0', + 'delay' => 10 + ]); + + $this->assertEquals("PUB \$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0 26\r\n-NAK {\"delay\":10000000000}", $nak->render()); + } + + public function testNakFloatDelay() + { + $nak = new Nak([ + 'subject' => '$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0', + 'delay' => 1.1 + ]); + + $this->assertEquals("PUB \$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0 25\r\n-NAK {\"delay\":1100000000}", $nak->render()); + } + public function testNakZeroDelay() + { + $nak = new Nak([ + 'subject' => '$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0', + 'delay' => 0 + ]); + + $this->assertEquals("PUB \$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0 4\r\n-NAK", $nak->render()); + } +} diff --git a/tests/Unit/Message/ProgressTest.php b/tests/Unit/Message/ProgressTest.php new file mode 100644 index 0000000..7982b59 --- /dev/null +++ b/tests/Unit/Message/ProgressTest.php @@ -0,0 +1,19 @@ + '$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0' + ]); + + $this->assertEquals("PUB \$JS.ACK.stream.consumer.1.3.18.1719992702186105579.0 4\r\n+WPI", $progress->render()); + } + +}