Skip to content

Commit

Permalink
test: Tests the interop between AMQP Queues and Streams Protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
VictorGaiva committed Feb 8, 2024
1 parent 2da0fcf commit 37f8e5f
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 1 deletion.
5 changes: 4 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ defmodule RabbitMQStream.MixProject do
defp deps do
[
{:ex_doc, "~> 0.28.4", only: :dev, runtime: false},
{:jason, "~> 1.4.1", only: :test, runtime: false}
{:jason, "~> 1.4.1", only: :test, runtime: false},
{:amqp, "~> 3.2", only: :test, runtime: false},
{:amqp10_common, "~> 3.12", only: :test, runtime: false}
]
end

Expand Down Expand Up @@ -72,6 +74,7 @@ defmodule RabbitMQStream.MixProject do
"guides/concepts/streams.md",
"guides/concepts/super-streams.md",
"guides/concepts/single-active-consumer.md",
"guides/concepts/interop.md",
"guides/concepts/offset.md",
"guides/setup/getting-started.md",
"guides/setup/configuration.md",
Expand Down
7 changes: 7 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
%{
"amqp": {:hex, :amqp, "3.3.0", "056d9f4bac96c3ab5a904b321e70e78b91ba594766a1fc2f32afd9c016d9f43b", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "8d3ae139d2646c630d674a1b8d68c7f85134f9e8b2a1c3dd5621616994b10a8b"},
"amqp10_common": {:hex, :amqp10_common, "3.12.12", "501d769078031f3f39e787620a8cd771815e74e493000f0768b448258d3d90ee", [:make, :rebar3], [], "hexpm", "d38133676450a1128d83a66b06bbb39a1c8c6cb65f7d4e33885786ce8e96022a"},
"amqp_client": {:hex, :amqp_client, "3.12.12", "e7065dc769e2ddec11b66422b131377e656bf656125e526b8fece72d8b4e6fe9", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.12", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "8b4d591ae0dc8938dcbb49c77df9b19e63e7c3c92d69844babc4fcf7c3184a9d"},
"credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"ex_doc": {:hex, :ex_doc, "0.28.6", "2bbd7a143d3014fc26de9056793e97600ae8978af2ced82c2575f130b7c0d7d7", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bca1441614654710ba37a0e173079273d619f9160cbcc8cd04e6bd59f1ad0e29"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"rabbit_common": {:hex, :rabbit_common, "3.12.12", "b87525eb02bcd738463d9c64b00f380c30825b6dcfb93c2e1d9168cad239012c", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:recon, "2.5.3", [hex: :recon, repo: "hexpm", optional: false]}, {:thoas, "1.0.0", [hex: :thoas, repo: "hexpm", optional: false]}], "hexpm", "027b5bf9de22d04198b1c02bc82fc01c9ed4129673218365d728db93a982f424"},
"recon": {:hex, :recon, "2.5.3", "739107b9050ea683c30e96de050bc59248fd27ec147696f79a8797ff9fa17153", [:mix, :rebar3], [], "hexpm", "6c6683f46fd4a1dfd98404b9f78dcabc7fcd8826613a89dcb984727a8c3099d7"},
"thoas": {:hex, :thoas, "1.0.0", "567c03902920827a18a89f05b79a37b5bf93553154b883e0131801600cf02ce0", [:rebar3], [], "hexpm", "fc763185b932ecb32a554fb735ee03c3b6b1b31366077a2427d2a97f3bd26735"},
}
6 changes: 6 additions & 0 deletions services/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ services:
ports:
- 5552:5552
- 5551:5551
- 5672:5672
- 15672:15672
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
Expand All @@ -27,6 +29,8 @@ services:
ports:
- 5552:5552
- 5551:5551
- 5672:5672
- 15672:15672
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
Expand All @@ -42,6 +46,8 @@ services:
ports:
- 5552:5552
- 5551:5551
- 5672:5672
- 15672:15672
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
73 changes: 73 additions & 0 deletions test/interop_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
defmodule RabbitMQStreamTest.Interop do
use ExUnit.Case, async: false
alias RabbitMQStream.OsirisChunk

@moduletag :v3_11
@moduletag :v3_12
@moduletag :v3_13

defmodule MyConnection do
use RabbitMQStream.Connection
end

defmodule MyConsumer do
use RabbitMQStream.Consumer,
connection: MyConnection

@impl true
def handle_chunk(%OsirisChunk{} = chunk, %{private: parent}) do
send(parent, {:chunk, chunk})

:ok
end

def decode!(message) do

Check warning on line 24 in test/interop_test.exs

View workflow job for this annotation

GitHub Actions / test (3_13, 1.15, 26)

module attribute @impl was not set for function decode!/1 callback (specified in RabbitMQStream.Consumer). This either means you forgot to add the "@impl true" annotation before the definition or that you are accidentally overriding this callback

Check warning on line 24 in test/interop_test.exs

View workflow job for this annotation

GitHub Actions / test (3_12, 1.15, 26)

module attribute @impl was not set for function decode!/1 callback (specified in RabbitMQStream.Consumer). This either means you forgot to add the "@impl true" annotation before the definition or that you are accidentally overriding this callback

Check warning on line 24 in test/interop_test.exs

View workflow job for this annotation

GitHub Actions / test (3_11, 1.15, 26)

module attribute @impl was not set for function decode!/1 callback (specified in RabbitMQStream.Consumer). This either means you forgot to add the "@impl true" annotation before the definition or that you are accidentally overriding this callback
:amqp10_framing.decode_bin(message)[:"v1_0.data"]
end
end

setup do
{:ok, _conn} = MyConnection.start_link(host: "localhost", vhost: "/")
:ok = MyConnection.connect()

:ok
end

@exchange "interop-exchange"
@stream "interop-stream"
test "should consume from a stream that was declared by amqp" do
{:ok, conn} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(conn)

# Since we are reading from ':first', we should make sure the queue is empty
AMQP.Queue.delete(channel, @stream)

{:ok, _} =
AMQP.Queue.declare(
channel,
@stream,
durable: true,
arguments: [{"x-queue-type", "stream"}]
)

:ok = AMQP.Exchange.declare(channel, @exchange)
AMQP.Queue.bind(channel, @stream, @exchange)

message = "Hello, World!"

:ok = AMQP.Basic.publish(channel, @exchange, "", message)

{:ok, _consumer} =
MyConsumer.start_link(
initial_offset: :first,
stream_name: @stream,
private: self(),
offset_tracking: [count: [store_after: 1]]
)

# We are pattern matching with the content itself because we have already decoded
# the message from the `:amqp` binary format, in the `decode!/1` callback defined
# in the `MyConsumer`
assert_receive {:chunk, %OsirisChunk{data_entries: [^message]}}, 1000
end
end

0 comments on commit 37f8e5f

Please sign in to comment.