From 37f8e5f5b934245b974caedf930c4ceff0930816 Mon Sep 17 00:00:00 2001 From: Victor Gaiva <13839490+VictorGaiva@users.noreply.github.com> Date: Wed, 7 Feb 2024 23:50:56 -0300 Subject: [PATCH] test: Tests the interop between AMQP Queues and Streams Protocol --- mix.exs | 5 ++- mix.lock | 7 ++++ services/docker-compose.yaml | 6 +++ test/interop_test.exs | 73 ++++++++++++++++++++++++++++++++++++ 4 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 test/interop_test.exs diff --git a/mix.exs b/mix.exs index 0403347..2ff72cc 100644 --- a/mix.exs +++ b/mix.exs @@ -41,7 +41,9 @@ defmodule RabbitMQStream.MixProject do defp deps do [ {:ex_doc, "~> 0.28.4", only: :dev, runtime: false}, - {:jason, "~> 1.4.1", only: :test, runtime: false} + {:jason, "~> 1.4.1", only: :test, runtime: false}, + {:amqp, "~> 3.2", only: :test, runtime: false}, + {:amqp10_common, "~> 3.12", only: :test, runtime: false} ] end @@ -72,6 +74,7 @@ defmodule RabbitMQStream.MixProject do "guides/concepts/streams.md", "guides/concepts/super-streams.md", "guides/concepts/single-active-consumer.md", + "guides/concepts/interop.md", "guides/concepts/offset.md", "guides/setup/getting-started.md", "guides/setup/configuration.md", diff --git a/mix.lock b/mix.lock index d290b1a..8b54a82 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,8 @@ %{ + "amqp": {:hex, :amqp, "3.3.0", "056d9f4bac96c3ab5a904b321e70e78b91ba594766a1fc2f32afd9c016d9f43b", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "8d3ae139d2646c630d674a1b8d68c7f85134f9e8b2a1c3dd5621616994b10a8b"}, + "amqp10_common": {:hex, :amqp10_common, "3.12.12", "501d769078031f3f39e787620a8cd771815e74e493000f0768b448258d3d90ee", [:make, :rebar3], [], "hexpm", "d38133676450a1128d83a66b06bbb39a1c8c6cb65f7d4e33885786ce8e96022a"}, + "amqp_client": {:hex, :amqp_client, "3.12.12", "e7065dc769e2ddec11b66422b131377e656bf656125e526b8fece72d8b4e6fe9", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.12", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "8b4d591ae0dc8938dcbb49c77df9b19e63e7c3c92d69844babc4fcf7c3184a9d"}, + "credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "ex_doc": {:hex, :ex_doc, "0.28.6", "2bbd7a143d3014fc26de9056793e97600ae8978af2ced82c2575f130b7c0d7d7", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bca1441614654710ba37a0e173079273d619f9160cbcc8cd04e6bd59f1ad0e29"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, @@ -6,4 +10,7 @@ "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "rabbit_common": {:hex, :rabbit_common, "3.12.12", "b87525eb02bcd738463d9c64b00f380c30825b6dcfb93c2e1d9168cad239012c", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "027b5bf9de22d04198b1c02bc82fc01c9ed4129673218365d728db93a982f424"}, + "recon": {:hex, :recon, "2.5.3", "739107b9050ea683c30e96de050bc59248fd27ec147696f79a8797ff9fa17153", [:mix, :rebar3], [], "hexpm", "6c6683f46fd4a1dfd98404b9f78dcabc7fcd8826613a89dcb984727a8c3099d7"}, + "thoas": {:hex, :thoas, "1.0.0", "567c03902920827a18a89f05b79a37b5bf93553154b883e0131801600cf02ce0", [:rebar3], [], "hexpm", "fc763185b932ecb32a554fb735ee03c3b6b1b31366077a2427d2a97f3bd26735"}, } diff --git a/services/docker-compose.yaml b/services/docker-compose.yaml index e05bfe1..dbb3191 100644 --- a/services/docker-compose.yaml +++ b/services/docker-compose.yaml @@ -12,6 +12,8 @@ services: ports: - 5552:5552 - 5551:5551 + - 5672:5672 + - 15672:15672 environment: - RABBITMQ_DEFAULT_USER=guest - RABBITMQ_DEFAULT_PASS=guest @@ -27,6 +29,8 @@ services: ports: - 5552:5552 - 5551:5551 + - 5672:5672 + - 15672:15672 environment: - RABBITMQ_DEFAULT_USER=guest - RABBITMQ_DEFAULT_PASS=guest @@ -42,6 +46,8 @@ services: ports: - 5552:5552 - 5551:5551 + - 5672:5672 + - 15672:15672 environment: - RABBITMQ_DEFAULT_USER=guest - RABBITMQ_DEFAULT_PASS=guest \ No newline at end of file diff --git a/test/interop_test.exs b/test/interop_test.exs new file mode 100644 index 0000000..1421554 --- /dev/null +++ b/test/interop_test.exs @@ -0,0 +1,73 @@ +defmodule RabbitMQStreamTest.Interop do + use ExUnit.Case, async: false + alias RabbitMQStream.OsirisChunk + + @moduletag :v3_11 + @moduletag :v3_12 + @moduletag :v3_13 + + defmodule MyConnection do + use RabbitMQStream.Connection + end + + defmodule MyConsumer do + use RabbitMQStream.Consumer, + connection: MyConnection + + @impl true + def handle_chunk(%OsirisChunk{} = chunk, %{private: parent}) do + send(parent, {:chunk, chunk}) + + :ok + end + + def decode!(message) do + :amqp10_framing.decode_bin(message)[:"v1_0.data"] + end + end + + setup do + {:ok, _conn} = MyConnection.start_link(host: "localhost", vhost: "/") + :ok = MyConnection.connect() + + :ok + end + + @exchange "interop-exchange" + @stream "interop-stream" + test "should consume from a stream that was declared by amqp" do + {:ok, conn} = AMQP.Connection.open() + {:ok, channel} = AMQP.Channel.open(conn) + + # Since we are reading from ':first', we should make sure the queue is empty + AMQP.Queue.delete(channel, @stream) + + {:ok, _} = + AMQP.Queue.declare( + channel, + @stream, + durable: true, + arguments: [{"x-queue-type", "stream"}] + ) + + :ok = AMQP.Exchange.declare(channel, @exchange) + AMQP.Queue.bind(channel, @stream, @exchange) + + message = "Hello, World!" + + :ok = AMQP.Basic.publish(channel, @exchange, "", message) + + {:ok, _consumer} = + MyConsumer.start_link( + initial_offset: :first, + stream_name: @stream, + private: self(), + offset_tracking: [count: [store_after: 1]] + ) + + # We are pattern matching with the content itself because we have already decoded + # the message from the `:amqp` binary format, in the `decode!/1` callback defined + # in the `MyConsumer` + assert_receive {:chunk, %OsirisChunk{data_entries: [^message]}}, 1000 + end +end