diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1a18ad8..3bf5d23 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -55,15 +55,15 @@ jobs: - uses: actions/checkout@v3 - - uses: isbang/compose-action@v1.5.1 + - uses: hoverkraft-tech/compose-action@2.0.1 with: compose-file: "./services/docker-compose.yaml" services: "rabbitmq_stream_${{ matrix.version }}" - - - name: Wait RabbitMQ is Up + + - name: Wait until RabbitMQ is Up run: sleep 10s shell: bash - + - name: Create 'invoices' SuperStream run: docker exec rabbitmq_stream rabbitmq-streams add_super_stream invoices --partitions 3 @@ -79,4 +79,44 @@ jobs: run: mix compile - name: Run tests - run: mix test --exclude test --include v${{ matrix.version }} + run: mix test --exclude test --include v${{ matrix.version }} + test-clustered: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + version: ["3_13"] + elixir: ["1.15", "1.16"] + otp: ["26"] + steps: + - uses: erlef/setup-beam@v1 + with: + otp-version: ${{ matrix.otp }} + elixir-version: ${{ matrix.elixir }} + + - uses: actions/checkout@v3 + + - uses: hoverkraft-tech/compose-action@2.0.1 + with: + compose-file: "./services/cluster/docker-compose.yaml" + + - name: Wait util RabbitMQ is Up + run: sleep 20s + shell: bash + + - name: Create 'invoices' SuperStream + run: docker exec rabbitmq1 rabbitmq-streams add_super_stream invoices --partitions 3 + + - name: Install Dependencies + run: | + mix local.rebar --force + mix local.hex --force + mix deps.unlock --all + mix deps.get + mix deps.compile + + - name: Compile app + run: mix compile + + - name: Run tests + run: mix test --exclude test --include v${{ matrix.version }} diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 54d1df9..f7a0204 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -39,13 +39,24 @@ Make your change. Add tests for your change. Make the tests pass: To run the Clustered test, run: # Start the cluster - docker compose --project-directory services/cluster up -d + docker-compose --project-directory services/cluster up -d # or for proxied - docker compose --project-directory services/proxied-cluster up -d + # docker-compose --project-directory services/proxied-cluster up -d + + docker run --network cluster_rabbitmq -v "${PWD}:/home/rabbitmq-stream" --rm -it --entrypoint bash elixir:1.17.2-otp-26 + + cd /home/rabbitmq-stream + + mix local.rebar --force + mix local.hex --force + mix deps.unlock --all + mix deps.get + mix deps.compile + mix compile # Run the tests - mix test --exclude test --include v3_13_proxied_cluster + mix test --exclude test --include v3_13_cluster - docker run --network proxied-cluster_rabbitmq -v "${PWD}:/home/rabbitmq-stream" -P elixir:1.17.2-otp-26 /bin/sh -c 'cd /home/rabbitmq-stream; ./services/cluster/test.sh' + - + diff --git a/lib/client/client.ex b/lib/client/client.ex new file mode 100644 index 0000000..017a86f --- /dev/null +++ b/lib/client/client.ex @@ -0,0 +1,88 @@ +defmodule RabbitMQStream.Client do + @moduledoc """ + Client for connection to a RabbitMQ Stream Cluster. + + This module provides the API for setting up and managing a connection to multiple RabbitMQ Stream + nodes in a cluster. It implements RabbitMQStream.Connection.Behavior, but with added functionality + related to managing multiple connections. + + + ## Lifecycle + + At startup, the client creates a`RabbitMQStream.Connection` to the provided host. It is used to + discover other nodes, mainly using the `query_metadata` command. + + + """ + + defmacro __using__(opts) do + quote location: :keep do + @opts unquote(opts) + + def child_spec(opts) do + %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}} + end + + def start_link(opts \\ []) when is_list(opts) do + opts = + Application.get_env(:rabbitmq_stream, __MODULE__, []) + |> Keyword.merge(@opts) + |> Keyword.merge(opts) + |> Keyword.put(:name, __MODULE__) + + RabbitMQStream.Client.start_link(opts) + end + + def stop(reason \\ :normal, timeout \\ :infinity) do + GenServer.stop(__MODULE__, reason, timeout) + end + end + end + + def start_link(opts \\ []) when is_list(opts) do + opts = + Application.get_env(:rabbitmq_stream, :defaults, []) + |> Keyword.get(:client, []) + |> Keyword.merge(opts) + + GenServer.start_link(RabbitMQStream.Client.Lifecycle, opts, name: opts[:name]) + end + + def child_spec(opts) do + %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}} + end + + defdelegate subscribe(server, stream_name, pid, offset, credit, properties \\ []), to: RabbitMQStream.Connection + + @type client_option :: + {:auto_discovery, boolean()} + | {:max_retries, non_neg_integer()} + | {:proxied?, boolean()} + + defstruct [ + :max_retries, + status: :setup, + opts: [], + control: nil, + proxied?: false, + monitors: %{}, + clients: %{}, + client_sequence: 0 + ] + + @type t :: %__MODULE__{ + control: pid() | nil, + status: :open | :setup | :closed, + client_sequence: non_neg_integer(), + proxied?: boolean(), + # Maps each subscriber to the connection's PID, so that we can garbage collect it when the subscriber dies + monitors: %{ + reference() => {type :: :brooker | :subscriber | :producer, other :: reference(), id :: non_neg_integer()} + }, + clients: %{ + reference() => {type :: :subscriber | :producer, subscriber :: pid(), connection :: pid(), args :: term()} + }, + max_retries: non_neg_integer() | nil, + opts: [RabbitMQStream.Connection.connection_options() | client_option()] + } +end diff --git a/lib/client/lifecycle.ex b/lib/client/lifecycle.ex new file mode 100644 index 0000000..f598d81 --- /dev/null +++ b/lib/client/lifecycle.ex @@ -0,0 +1,286 @@ +defmodule RabbitMQStream.Client.Lifecycle do + use GenServer + require Logger + alias RabbitMQStream.Client + alias RabbitMQStream.Message + + @moduledoc """ + This module defines the lifecycle of the RabbitMQStream.Client. + + It is responsible for setting up the connections and routing requests to one or more RabbitMQ + servers within a cluster. It uses the protocol's nodes disovery mechanisms, mainly `query_metadata` + to resolve the leader of each stream. + + ### Subscription + + A subscribe request is handled by creating a new `RabbitMQStream.Connection` process, being + responsible for only that subscriber. If the subscriber dies, the connection is also killed. + + + How each command is handled by the Client + - `:connect`: NOOP + - `:close`: NOOP + - `:create_stream`: Forwarded to control (Issue: The stream's leader end's up being in the control's node) + - `:delete_stream`: Forwarded to control + - `:query_offset`: Forwarded to control + - `:delete_producer`: Forwarded to broker + - `:query_metadata`: Forwarded to control + - `:unsubscribe`: Forwarded to broker + - `:subscribe`: Spawns a new connection + - `:credit`: Forwarded to broker + - `:stream_stats`: Forwarded to control + - `:partitions`: Forwarded to control + - `:route`: Forwarded to control + - `:delete_super_stream`: Forwarded to control + - `:respond`: Forwarded to broker + - `:supports?`: Forwarded to control + - `:query_producer_sequence`: Forwarded to control + - `:create_super_stream`: Forwarded to control + - `:declare_producer`: Spawns a new connection + - `:publish`: Forwarded to broker + + """ + + @impl true + def init(opts) do + {client_opts, connection_opts} = Keyword.split(opts, [:auto_discovery, :name, :max_retries, :proxied?]) + + # We specifically want to ignore the 'lazy' option when setting up a child connection + connection_opts = Keyword.drop(connection_opts, [:lazy]) + + client_opts = Keyword.put(client_opts, :connection_opts, connection_opts) + + conn = + struct( + RabbitMQStream.Client, + Keyword.put(client_opts, :opts, client_opts) + ) + + {:ok, conn, {:continue, :setup}} + end + + @impl true + def handle_continue(:setup, conn) do + connection_opts = Keyword.get(conn.opts, :connection_opts, []) + # First we start the control connection with the provided options as default + {:ok, control} = RabbitMQStream.Connection.start_link(connection_opts) + + # Ensure it is connected + :ok = RabbitMQStream.Connection.connect(control) + + conn = + conn + |> Map.put(:control, control) + |> Map.put(:status, :open) + + {:noreply, conn} + end + + # 1. Should client's commands other than 'subscribe' be sent to control or its own connection? + # + + @impl true + def handle_call({type, opts} = args, _from, %Client{} = conn) when type in [:subscribe, :declare_producer] do + stream = Keyword.fetch!(opts, :stream_name) + client_pid = Keyword.fetch!(opts, :pid) + + # We start a new conneciton on every 'subscribe' request. + if Keyword.has_key?(opts, :subscription_id) || Keyword.has_key?(opts, :producer_id) do + Logger.error("Manually passing `producer_id` or `subscription_id` to a Client is not allowed") + {:reply, {:ok, :not_allowed}, conn} + else + case new_leader_connection(conn, stream) do + {:ok, broker_pid} -> + {id, conn} = Map.get_and_update!(conn, :client_sequence, &{&1, &1 + 1}) + # Listen to each process's lifecycle + client_ref = Process.monitor(client_pid) + brooker_ref = Process.monitor(broker_pid) + + opts = + case type do + :subscribe -> + Keyword.put(opts, :subscription_id, id) + + :declare_producer -> + Keyword.put(opts, :producer_id, id) + end + + # Forward the request to the new connection + case GenServer.call(broker_pid, {type, opts}) do + {:ok, ^id} -> + # Adds it to the subscriptions list + monitors = + conn.monitors + # And we add both so we can get each other + |> Map.put(client_ref, {:client, brooker_ref, id}) + |> Map.put(brooker_ref, {:brooker, client_ref, id}) + + clients = + conn.clients + |> Map.put(id, {client_pid, broker_pid, args}) + + {:reply, {:ok, id}, %{conn | monitors: monitors, clients: clients}} + + {:error, error} -> + {:reply, error, conn} + end + + reply -> + {:reply, reply, conn} + end + end + end + + def handle_call({type, opts}, _from, %Client{} = conn) + when type in [ + :query_offset, + :store_offset, + :query_metadata, + :query_producer_sequence, + :stream_stats, + :partitions, + :route, + :delete_super_stream, + :create_stream, + :delete_stream, + :create_super_stream + ] do + {:reply, GenServer.call(conn.control, {type, opts}), conn} + end + + def handle_call({:unsubscribe, opts}, _from, %Client{} = conn) do + broker_pid = + conn.clients + |> Map.get(opts[:subscription_id]) + |> then(&elem(&1, 1)) + + # TODO: Cleanup + {:reply, GenServer.call(broker_pid, {:unsubscribe, opts}), conn} + end + + def handle_call({:delete_producer, opts}, _from, %Client{} = conn) do + broker_pid = + conn.clients + |> Map.get(opts[:producer_id]) + |> then(&elem(&1, 1)) + + # TODO: Cleanup + {:reply, GenServer.call(broker_pid, {:delete_producer, opts}), conn} + end + + # Noop + def handle_call({type, _opts}, _from, %Client{} = conn) + when type in [:connect, :close] do + Logger.warning("Calling \"#{type}\" on a Client has no effect.") + {:reply, :ok, conn} + end + + @impl true + def handle_cast({:credit, opts}, %Client{} = conn) do + broker_pid = + conn.clients + |> Map.get(opts[:subscription_id]) + |> then(&elem(&1, 1)) + + GenServer.cast(broker_pid, {:credit, opts}) + {:noreply, conn} + end + + def handle_cast({:respond, opts}, %Client{} = conn) do + # We only accept one type of 'respond' command + %Message.Request{ + command: :consumer_update, + data: %Message.Types.ConsumerUpdateRequestData{subscription_id: subscription_id} + } = Keyword.fetch!(opts, :request) + + broker_pid = + conn.clients + |> Map.get(subscription_id) + |> then(&elem(&1, 1)) + + GenServer.cast(broker_pid, {:respond, opts}) + {:noreply, conn} + end + + # An issue with only forwarding the messages to the broker is that it adds an extra message pass. + # To workaround this issue we could buffer messages so that it offsets the possible performance hit. + # Or we could attempt to use ':ets' in some way to prevent work around this, but there doesn't + # seem to be a way to do this while keeping the exact same interface as a Connection. + def handle_cast({:publish, opts}, %Client{} = conn) do + broker_pid = + conn.clients + |> Map.get(opts[:producer_id]) + |> then(&elem(&1, 1)) + + GenServer.cast(broker_pid, {:publish, opts}) + {:noreply, conn} + end + + @impl true + def handle_info({:DOWN, ref, :process, _pid, _reason}, %Client{} = conn) do + case Map.get(conn.clients, ref) do + {type, other_ref, id} -> + {client_pid, broker_pid, args} = Map.fetch!(conn.clients, id) + + case type do + # If the process that has shut down is Client process + :client -> + # We should shut down the connection as it is not needed anymore + :ok = GenServer.stop(broker_pid, :normal, 1000) + + # If it was the broker that shutdown + :brooker -> + # We tell the client process that the connection has exited, so it can handle + # it the way it sees fit. Meaning that a Client might re-run the offset-fetching + + # subscribing flow, while a producer has to redeclare itself + case args do + {:subscribe, opts} -> + # We forward the 'opts' so that the user's process can use it if needed. + send(client_pid, {:resubscribe, opts}) + + {:declare_producer, opts} -> + # We forward the 'opts' so that the user's process can use it if needed. + send(client_pid, {:redeclare_producer, opts}) + end + end + + # We always do this cleanup as the 'user' process is responsible for any 'init' or 're-init' + # flow. It also applies to the 'clients' tracker + monitors = Map.drop(conn.monitors, [ref, other_ref]) + clients = Map.drop(conn.clients, [id]) + + {:noreply, %{conn | monitors: monitors, clients: clients}} + + nil -> + Logger.warning("Received :DOWN for #{ref}, but it was not found in 'clients'") + {:noreply, conn} + end + end + + # Creates a new connection to the leader of the stream + defp new_leader_connection(%Client{} = conn, stream, _attempts \\ nil) do + # Query metadata to identify brokers + case RabbitMQStream.Connection.query_metadata(conn.control, [stream]) do + {:ok, %{streams: [%{code: :ok, name: ^stream, leader: leader}]} = metadata} -> + %{host: host, port: port} = Enum.find(metadata.brokers, &(&1.reference == leader)) + + # Start a new connection to the leader + {:ok, broker_conn} = + RabbitMQStream.Connection.start_link( + Keyword.merge( + conn.opts[:connection_opts], + host: host, + port: port + ) + ) + + # Ensure the connection is up + :ok = RabbitMQStream.Connection.connect(broker_conn) + + {:ok, broker_conn} + + _ -> + {:error, :no_broker_available} + end + end +end diff --git a/lib/connection/behavior.ex b/lib/connection/behavior.ex index 6959719..eef35b2 100644 --- a/lib/connection/behavior.ex +++ b/lib/connection/behavior.ex @@ -27,10 +27,18 @@ defmodule RabbitMQStream.Connection.Behavior do @callback declare_producer(GenServer.server(), stream_name :: String.t(), producer_reference :: String.t()) :: {:ok, producer_id :: integer()} | {:error, reason :: atom()} + @callback declare_producer( + GenServer.server(), + stream_name :: String.t(), + producer_reference :: String.t(), + producer_id :: integer() + ) :: + {:ok, producer_id :: integer()} | {:error, reason :: atom()} + @callback delete_producer(GenServer.server(), producer_id :: integer()) :: :ok | {:error, reason :: atom()} - @callback query_producer_sequence(GenServer.server(), String.t(), String.t()) :: + @callback query_producer_sequence(GenServer.server(), stream_name :: String.t(), producer_reference :: String.t()) :: {:ok, sequence :: integer()} | {:error, reason :: atom()} @callback query_metadata(GenServer.server(), streams :: [String.t()]) :: @@ -58,7 +66,12 @@ defmodule RabbitMQStream.Connection.Behavior do @callback unsubscribe(GenServer.server(), subscription_id :: non_neg_integer()) :: :ok | {:error, reason :: atom()} - @callback respond(GenServer.server(), request :: RabbitMQStream.Message.Request.t(), opts :: Keyword.t()) :: :ok + @callback respond( + GenServer.server(), + subscription_id :: non_neg_integer(), + request :: RabbitMQStream.Message.Request.t(), + opts :: Keyword.t() + ) :: :ok @callback credit(GenServer.server(), subscription_id :: non_neg_integer(), credit :: non_neg_integer()) :: :ok diff --git a/lib/connection/connection.ex b/lib/connection/connection.ex index c0e8afc..319b520 100644 --- a/lib/connection/connection.ex +++ b/lib/connection/connection.ex @@ -80,8 +80,8 @@ defmodule RabbitMQStream.Connection do """ defmacro __using__(opts) do - quote bind_quoted: [opts: opts], location: :keep do - @opts opts + quote location: :keep do + @opts unquote(opts) def start_link(opts \\ []) when is_list(opts) do opts = @@ -133,6 +133,15 @@ defmodule RabbitMQStream.Connection do ) end + def declare_producer(stream_name, producer_reference, producer_id) do + RabbitMQStream.Connection.declare_producer( + __MODULE__, + stream_name, + producer_reference, + producer_id + ) + end + def delete_producer(producer_id) do RabbitMQStream.Connection.delete_producer(__MODULE__, producer_id) end @@ -203,8 +212,8 @@ defmodule RabbitMQStream.Connection do RabbitMQStream.Connection.delete_super_stream(__MODULE__, name) end - def respond(request, opts) do - RabbitMQStream.Connection.respond(__MODULE__, request, opts) + def respond(subscription_id, request, opts) do + RabbitMQStream.Connection.respond(__MODULE__, subscription_id, request, opts) end end end @@ -303,7 +312,16 @@ defmodule RabbitMQStream.Connection do is_binary(stream_name) do GenServer.call( server, - {:declare_producer, stream_name: stream_name, producer_reference: producer_reference} + {:declare_producer, stream_name: stream_name, producer_reference: producer_reference, pid: self()} + ) + end + + def declare_producer(server, stream_name, producer_reference, producer_id) + when is_binary(producer_reference) and + is_binary(stream_name) and is_integer(producer_id) do + GenServer.call( + server, + {:declare_producer, stream_name: stream_name, producer_reference: producer_reference, producer_id: producer_id} ) end @@ -396,7 +414,6 @@ defmodule RabbitMQStream.Connection do is_integer(credit) and is_offset(offset) and is_list(properties) and - is_pid(pid) and credit >= 0 do GenServer.call( server, @@ -489,15 +506,15 @@ defmodule RabbitMQStream.Connection do offset. So the connection sends the request to the subscription handler, which then calls this function to send the response back to the server. """ - def respond(server, request, opts) when is_list(opts) do - GenServer.cast(server, {:respond, request, opts}) + def respond(server, subscription_id, request, opts) when is_list(opts) do + GenServer.cast(server, {:respond, subscription_id: subscription_id, request: request, opts: opts}) end @doc """ Checks if the connected server supports the given command. """ def supports?(server, command, version \\ 1) do - GenServer.call(server, {:supports?, command, version}) + GenServer.call(server, {:supports?, command: command, version: version}) end @type offset :: :first | :last | :next | {:offset, non_neg_integer()} | {:timestamp, integer()} @@ -519,7 +536,7 @@ defmodule RabbitMQStream.Connection do producer_sequence: non_neg_integer(), subscriber_sequence: non_neg_integer(), peer_properties: %{String.t() => term()}, - connection_properties: Keyword.t(), + connection_properties: %{String.t() => String.t()}, mechanisms: [String.t()], connect_requests: [pid()], request_tracker: %{{atom(), integer()} => {pid(), any()}}, @@ -545,7 +562,7 @@ defmodule RabbitMQStream.Connection do subscriber_sequence: 1, subscriptions: %{}, state: :closed, - peer_properties: [], + peer_properties: %{}, connection_properties: [], mechanisms: [], connect_requests: [], diff --git a/lib/connection/handler.ex b/lib/connection/handler.ex index ecd9221..fbd49e6 100644 --- a/lib/connection/handler.ex +++ b/lib/connection/handler.ex @@ -10,8 +10,10 @@ defmodule RabbitMQStream.Connection.Handler do Logger.debug("Connection close requested by server: #{request.data.code} #{request.data.reason}") Logger.debug("Connection closed") - %{conn | state: :closing, close_reason: request.data.reason} + conn + |> Map.put(:close_reason, request.data.reason) |> Helpers.push_internal(:response, :close, correlation_id: request.correlation_id, code: :ok) + |> transition(:closing) end def handle_message(%Connection{} = conn, %Request{command: :tune} = request) do @@ -21,9 +23,11 @@ defmodule RabbitMQStream.Connection.Handler do options = Keyword.merge(conn.options, frame_max: request.data.frame_max, heartbeat: request.data.heartbeat) - %{conn | options: options, state: :opening} + conn + |> Map.put(:options, options) |> Helpers.push_internal(:response, :tune, correlation_id: 0) |> Helpers.push_internal(:request, :open) + |> transition(:opening) end def handle_message(%Connection{} = conn, %Request{command: :heartbeat}) do @@ -57,7 +61,7 @@ defmodule RabbitMQStream.Connection.Handler do GenServer.reply(pid, :ok) - %{conn | state: :closing} + transition(conn, :closed) end def handle_message(%Connection{} = conn, %Response{code: code}) @@ -75,7 +79,9 @@ defmodule RabbitMQStream.Connection.Handler do GenServer.reply(request, {:error, code}) end - %{conn | state: :closing, close_reason: code} + conn + |> Map.put(:close_reason, code) + |> transition(:closing) end def handle_message(%Connection{} = conn, %Response{command: :credit, code: code}) @@ -151,7 +157,7 @@ defmodule RabbitMQStream.Connection.Handler do conn |> Helpers.push_internal(:request, :open) - |> Map.put(:state, :opening) + |> transition(:opening) end def handle_message(%Connection{} = conn, %Response{command: :tune} = response) do @@ -161,8 +167,8 @@ defmodule RabbitMQStream.Connection.Handler do options = Keyword.merge(conn.options, frame_max: response.data.frame_max, heartbeat: response.data.heartbeat) %{conn | options: options} - |> Map.put(:state, :opening) |> Helpers.push_internal(:request, :open) + |> transition(:opening) end # If the server has a version lower than 3.13, this is the 'terminating' response. @@ -179,7 +185,10 @@ defmodule RabbitMQStream.Connection.Handler do send(self(), :flush_request_buffer) - %{conn | state: :open, connect_requests: [], connection_properties: response.data.connection_properties} + conn + |> Map.put(:connect_requests, []) + |> Map.put(:connection_properties, response.data.connection_properties) + |> transition(:open) end def handle_message( @@ -275,7 +284,10 @@ defmodule RabbitMQStream.Connection.Handler do end send(self(), :flush_request_buffer) - %{conn | state: :open, connect_requests: [], commands: commands} + + conn + |> Map.merge(%{connect_requests: [], commands: commands}) + |> transition(:open) end def handle_message(%Connection{} = conn, %Response{command: command, data: data} = response) @@ -316,4 +328,14 @@ defmodule RabbitMQStream.Connection.Handler do ) end end + + @doc """ + Transitions the lifecycle of the connection to the given state, while notifying all monitors. + + Always call this function after all the state manipulation of the transition is done. + """ + def transition(%Connection{} = conn, to) when is_atom(to) do + # TODO: Telemetry events + %{conn | state: to} + end end diff --git a/lib/connection/lifecycle.ex b/lib/connection/lifecycle.ex index d767ad1..a0817ff 100644 --- a/lib/connection/lifecycle.ex +++ b/lib/connection/lifecycle.ex @@ -14,8 +14,6 @@ defmodule RabbitMQStream.Connection.Lifecycle do @impl GenServer def init(opts) do - {transport, opts} = Keyword.pop(opts, :transport, :tcp) - opts = opts |> Keyword.put_new(:host, "localhost") @@ -27,6 +25,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do |> Keyword.put_new(:heartbeat, 60) |> Keyword.put_new(:transport, :tcp) + {transport, opts} = Keyword.pop(opts, :transport, :tcp) + transport = case transport do :tcp -> RabbitMQStream.Connection.Transport.TCP @@ -92,12 +92,21 @@ defmodule RabbitMQStream.Connection.Lifecycle do end def handle_call({:subscribe, opts}, from, %Connection{} = conn) do - {id, conn} = Map.get_and_update!(conn, :subscriber_sequence, &{&1, &1 + 1}) + {opts, conn} = + if Keyword.has_key?(opts, :subscription_id) do + {opts, conn} + else + {id, conn} = Map.get_and_update!(conn, :subscriber_sequence, &{&1, &1 + 1}) + + opts = Keyword.put(opts, :subscription_id, id) + + {opts, conn} + end conn = conn - |> Helpers.push_tracker(:subscribe, from, {id, opts[:pid]}) - |> send_request(:subscribe, opts ++ [subscription_id: id]) + |> Helpers.push_tracker(:subscribe, from, {opts[:subscription_id], opts[:pid]}) + |> send_request(:subscribe, opts) {:noreply, conn} end @@ -111,7 +120,10 @@ defmodule RabbitMQStream.Connection.Lifecycle do {:noreply, conn} end - def handle_call({:supports?, command, version}, _from, %Connection{} = conn) do + def handle_call({:supports?, opts}, _from, %Connection{} = conn) do + command = Keyword.get(opts, :command) + version = Keyword.get(opts, :version) + flag = case conn.commands do %{^command => %{max: max}} when max <= version -> @@ -161,17 +173,27 @@ defmodule RabbitMQStream.Connection.Lifecycle do end def handle_call({:declare_producer, opts}, from, %Connection{} = conn) do - {id, conn} = Map.get_and_update!(conn, :producer_sequence, &{&1, &1 + 1}) + {opts, conn} = + if Keyword.has_key?(opts, :producer_id) do + {opts, conn} + else + {id, conn} = Map.get_and_update!(conn, :producer_sequence, &{&1, &1 + 1}) + + opts = Keyword.put(opts, :producer_id, id) + + {opts, conn} + end conn = conn - |> Helpers.push_tracker(:declare_producer, from, id) - |> send_request(:declare_producer, opts ++ [id: id]) + |> Helpers.push_tracker(:declare_producer, from, opts[:producer_id]) + |> send_request(:declare_producer, opts) {:noreply, conn} end @impl GenServer + # User facing events should be handled only when the connection is open. def handle_cast(action, %Connection{state: state} = conn) when state != :open do {:noreply, %{conn | request_buffer: :queue.in({:cast, action}, conn.request_buffer)}} end @@ -208,7 +230,10 @@ defmodule RabbitMQStream.Connection.Lifecycle do {:noreply, conn} end - def handle_cast({:respond, %Request{} = request, opts}, %Connection{} = conn) do + def handle_cast({:respond, opts}, %Connection{} = conn) do + %Request{} = request = Keyword.fetch!(opts, :request) + opts = Keyword.fetch!(opts, :opts) + conn = conn |> send_response(request.command, [correlation_id: request.correlation_id] ++ opts) @@ -310,7 +335,12 @@ defmodule RabbitMQStream.Connection.Lifecycle do defp connect(%Connection{} = conn) do with {:ok, socket} <- conn.transport.connect(conn.options) do - {:ok, %{conn | socket: socket, state: :connecting}} + conn = + conn + |> Map.put(:socket, socket) + |> RabbitMQStream.Connection.Handler.transition(:connecting) + + {:ok, conn} end end @@ -327,7 +357,9 @@ defmodule RabbitMQStream.Connection.Lifecycle do GenServer.reply(client, {:error, {:closed, conn.close_reason}}) end - conn = %{conn | request_tracker: %{}, connect_requests: [], socket: nil, state: :closed, close_reason: nil} + conn = + %{conn | request_tracker: %{}, connect_requests: [], socket: nil, close_reason: nil} + |> RabbitMQStream.Connection.Handler.transition(:closed) {:noreply, conn, :hibernate} end diff --git a/lib/consumer/consumer.ex b/lib/consumer/consumer.ex index 1d36de0..abd7ed5 100644 --- a/lib/consumer/consumer.ex +++ b/lib/consumer/consumer.ex @@ -308,7 +308,7 @@ defmodule RabbitMQStream.Consumer do credits: non_neg_integer(), initial_credit: non_neg_integer(), initial_offset: RabbitMQStream.Connection.offset(), - properties: [RabbitMQStream.Message.Types.ConsumerequestData.property()], + properties: [RabbitMQStream.Message.Types.SubscribeRequestData.property()], consumer_module: module() } @@ -321,7 +321,7 @@ defmodule RabbitMQStream.Consumer do | {:offset_tracking, [{RabbitMQStream.Consumer.OffsetTracking.t(), term()}]} | {:flow_control, {RabbitMQStream.Consumer.FlowControl.t(), term()}} | {:private, any()} - | {:properties, [RabbitMQStream.Message.Types.ConsumerequestData.property()]} + | {:properties, [RabbitMQStream.Message.Types.SubscribeRequestData.property()]} @type opts :: [option()] end diff --git a/lib/consumer/lifecycle.ex b/lib/consumer/lifecycle.ex index a2e92b8..da6585b 100644 --- a/lib/consumer/lifecycle.ex +++ b/lib/consumer/lifecycle.ex @@ -88,6 +88,9 @@ defmodule RabbitMQStream.Consumer.LifeCycle do offset _ -> + # We don't need to default to anything other than '0' here since any possibly duplicate + # messages received after a 'resubscribe' would be filtered by the ':deliver' handler + # bellow, since it persists the 'last_offset' of each chunk. 0 end @@ -122,56 +125,113 @@ defmodule RabbitMQStream.Consumer.LifeCycle do end @impl true - def handle_info({:deliver, %DeliverData{osiris_chunk: %RabbitMQStream.OsirisChunk{} = chunk}}, state) do - if function_exported?(state.consumer_module, :handle_chunk, 2) do - apply(state.consumer_module, :handle_chunk, [chunk, state]) - end + # The 'resubscribe' flow is not necessarily the same as 'init'. + def handle_info({:resubscribe, _args}, state) do + # The 'args' we receive are the ones we + Logger.info("Connection has shut down. Re-executing 'subscribe' flow") - for message <- Enum.slice(chunk.data_entries, (state.last_offset - chunk.chunk_id)..chunk.num_entries) do - message = - if function_exported?(state.consumer_module, :decode!, 1) do - apply(state.consumer_module, :decode!, [message]) - else - message - end + # If we had a 'last_offset' set, we should to use it. + last_offset = + if state.last_offset != 0 do + {:offset, state.last_offset} + else + # Or else, we should follow the same setup flow of fetching the offset from the stream + case RabbitMQStream.Connection.query_offset(state.connection, state.stream_name, state.offset_reference) do + {:ok, offset} -> + {:offset, offset} - if filtered?(message, state) do - if function_exported?(state.consumer_module, :handle_message, 3) do - apply(state.consumer_module, :handle_message, [message, chunk, state]) + _ -> + state.initial_offset end + end - if function_exported?(state.consumer_module, :handle_message, 2) do - apply(state.consumer_module, :handle_message, [message, state]) - end + case RabbitMQStream.Connection.subscribe( + state.connection, + state.stream_name, + self(), + last_offset, + state.initial_credit, + state.properties + ) do + {:ok, id} -> + last_offset = + case last_offset do + {:offset, offset} -> + offset - if function_exported?(state.consumer_module, :handle_message, 1) do - apply(state.consumer_module, :handle_message, [message]) - end - end + _ -> + 0 + end + + # A consumer should always update its own id + {:noreply, %{state | id: id, last_offset: last_offset}} + + err -> + {:stop, err, state} end + end - # Based on the [Python implementation](https://github.com/qweeze/rstream/blob/a81176a5c7cf4accaee25ca7725bd7bd94bf0ce8/rstream/consumer.py#L327), - # it seems to be OK to sum the amount of messages received to the chunk_id, which represents - # the offset of the first message, to get the new `last_offset` for the messages. - # During the second iteration of this logic to get the `last_offset`, I attempted to use the - # `commited_offset` of the DeliveryData, thinking it was already the offset for the `next` messsage, - # but it isn't. - state = %{state | last_offset: chunk.chunk_id + chunk.num_entries} - - offset_tracking = - for {strategy, track_state} <- state.offset_tracking do - if function_exported?(strategy, :after_chunk, 3) do - {strategy, strategy.after_chunk(track_state, chunk, state)} - else - {strategy, track_state} + def handle_info({:deliver, %DeliverData{osiris_chunk: %RabbitMQStream.OsirisChunk{} = chunk}}, state) do + # If some of the messages in the chunk we haven't yet processed, we can go forward with the processing. + # We do this check since a connection can fail, and be automatically reset by the client. In this + # case we fast forward any repeated messages since the last offset store + if chunk.chunk_id + chunk.num_entries > state.last_offset do + if function_exported?(state.consumer_module, :handle_chunk, 2) do + apply(state.consumer_module, :handle_chunk, [chunk, state]) + end + + for message <- Enum.slice(chunk.data_entries, (state.last_offset - chunk.chunk_id)..chunk.num_entries) do + message = + if function_exported?(state.consumer_module, :decode!, 1) do + apply(state.consumer_module, :decode!, [message]) + else + message + end + + if filtered?(message, state) do + if function_exported?(state.consumer_module, :handle_message, 3) do + apply(state.consumer_module, :handle_message, [message, chunk, state]) + end + + if function_exported?(state.consumer_module, :handle_message, 2) do + apply(state.consumer_module, :handle_message, [message, state]) + end + + if function_exported?(state.consumer_module, :handle_message, 1) do + apply(state.consumer_module, :handle_message, [message]) + end end end - state = %{state | offset_tracking: offset_tracking, credits: state.credits - chunk.num_entries} + # Based on the [Python implementation](https://github.com/qweeze/rstream/blob/a81176a5c7cf4accaee25ca7725bd7bd94bf0ce8/rstream/consumer.py#L327), + # it seems to be OK to sum the amount of messages received to the chunk_id, which represents + # the offset of the first message, to get the new `last_offset` for the messages. + # During the second iteration of this logic to get the `last_offset`, I attempted to use the + # `commited_offset` of the DeliveryData, thinking it was already the offset for the `next` messsage, + # but it isn't. + state = %{state | last_offset: chunk.chunk_id + chunk.num_entries} + + offset_tracking = + for {strategy, track_state} <- state.offset_tracking do + if function_exported?(strategy, :after_chunk, 3) do + {strategy, strategy.after_chunk(track_state, chunk, state)} + else + {strategy, track_state} + end + end + + state = %{state | offset_tracking: offset_tracking, credits: state.credits - chunk.num_entries} - state = state |> OffsetTracking.run() |> FlowControl.run() + state = state |> OffsetTracking.run() |> FlowControl.run() - {:noreply, state} + {:noreply, state} + else + Logger.debug( + "#{state.consumer_module}: Received already processed chunk, possibly due to reconnection. Fast fowarding." + ) + + {:noreply, state} + end end def handle_info(:run_offset_tracking, state) do @@ -182,7 +242,10 @@ defmodule RabbitMQStream.Consumer.LifeCycle do {:noreply, FlowControl.run(state)} end - def handle_info({:command, %Request{command: :consumer_update, data: data} = request}, state) do + def handle_info( + {:command, %Request{command: :consumer_update, data: data} = request}, + %RabbitMQStream.Consumer{} = state + ) do if function_exported?(state.consumer_module, :handle_update, 2) do action = if data.active, do: :upgrade, else: :downgrade @@ -194,15 +257,15 @@ defmodule RabbitMQStream.Consumer.LifeCycle do case apply(state.consumer_module, :handle_update, [state, action]) do {:ok, offset} -> Logger.debug("Consumer upgraded to active consumer") - RabbitMQStream.Connection.respond(state.connection, request, offset: offset, code: :ok) + RabbitMQStream.Connection.respond(state.connection, state.id, request, offset: offset, code: :ok) {:error, reason} -> Logger.error("Error updating consumer: #{inspect(reason)}") - RabbitMQStream.Connection.respond(state.connection, request, code: :internal_error) + RabbitMQStream.Connection.respond(state.connection, state.id, request, code: :internal_error) end else Logger.error("handle_update/2 must be implemented when using single-active-consumer property") - RabbitMQStream.Connection.respond(state.connection, request, code: :internal_error) + RabbitMQStream.Connection.respond(state.connection, state.id, request, code: :internal_error) end {:noreply, state} @@ -210,6 +273,7 @@ defmodule RabbitMQStream.Consumer.LifeCycle do @impl true def handle_cast({:credit, amount}, state) do + # Should be sent to same connection RabbitMQStream.Connection.credit(state.connection, state.id, amount) {:noreply, %{state | credits: state.credits + amount}} end diff --git a/lib/message/data/data.ex b/lib/message/data/data.ex index 0f4d828..623d054 100644 --- a/lib/message/data/data.ex +++ b/lib/message/data/data.ex @@ -179,7 +179,7 @@ defmodule RabbitMQStream.Message.Data do {buffer, [{key, value} | acc]} end) - %Types.OpenResponseData{connection_properties: connection_properties} + %Types.OpenResponseData{connection_properties: Map.new(connection_properties)} end def decode(%Response{command: :route}, buffer) do @@ -330,7 +330,7 @@ defmodule RabbitMQStream.Message.Data do stream_name = encode_string(data.stream_name) << - data.id::unsigned-integer-size(8), + data.producer_id::unsigned-integer-size(8), producer_reference::binary, stream_name::binary >> diff --git a/lib/message/data/types.ex b/lib/message/data/types.ex index bf0c6bc..aa5ca8f 100644 --- a/lib/message/data/types.ex +++ b/lib/message/data/types.ex @@ -60,7 +60,7 @@ defmodule RabbitMQStream.Message.Types do defmodule OpenResponseData do @moduledoc false @enforce_keys [:connection_properties] - @type t :: %__MODULE__{connection_properties: Keyword.t()} + @type t :: %__MODULE__{connection_properties: %{String.t() => String.t()}} defstruct [:connection_properties] end @@ -167,14 +167,6 @@ defmodule RabbitMQStream.Message.Types do end defmodule QueryMetadataResponseData do - @moduledoc false - @type t :: %__MODULE__{ - streams: [StreamData.t()], - brokers: [BrokerData.t()] - } - - defstruct [:streams, :brokers] - defmodule BrokerData do @moduledoc false @enforce_keys [:reference, :host, :port] @@ -208,6 +200,14 @@ defmodule RabbitMQStream.Message.Types do :replicas ] end + + @moduledoc false + @type t :: %__MODULE__{ + streams: [StreamData.t()], + brokers: [BrokerData.t()] + } + + defstruct [:streams, :brokers] end defmodule MetadataUpdateData do @@ -222,15 +222,15 @@ defmodule RabbitMQStream.Message.Types do defmodule DeclareProducerRequestData do @moduledoc false - @enforce_keys [:id, :producer_reference, :stream_name] + @enforce_keys [:producer_id, :producer_reference, :stream_name] @type t :: %__MODULE__{ - id: non_neg_integer(), + producer_id: non_neg_integer(), producer_reference: String.t(), stream_name: String.t() } defstruct [ - :id, + :producer_id, :producer_reference, :stream_name ] @@ -285,14 +285,6 @@ defmodule RabbitMQStream.Message.Types do end defmodule PublishErrorData do - @moduledoc false - @enforce_keys [:producer_id, :errors] - @type t :: %__MODULE__{ - producer_id: non_neg_integer(), - errors: [Error.t()] - } - defstruct [:producer_id, :errors] - defmodule Error do @moduledoc false @enforce_keys [:publishing_id, :code] @@ -303,6 +295,14 @@ defmodule RabbitMQStream.Message.Types do defstruct [:publishing_id, :code] end + + @moduledoc false + @enforce_keys [:producer_id, :errors] + @type t :: %__MODULE__{ + producer_id: non_neg_integer(), + errors: [Error.t()] + } + defstruct [:producer_id, :errors] end defmodule PublishConfirmData do @@ -460,11 +460,6 @@ defmodule RabbitMQStream.Message.Types do end defmodule ExchangeCommandVersionsData do - @moduledoc false - @enforce_keys [:commands] - @type t :: %__MODULE__{commands: [Command.t()]} - defstruct [:commands] - defmodule Command do @moduledoc false @enforce_keys [:key, :min_version, :max_version] @@ -476,6 +471,11 @@ defmodule RabbitMQStream.Message.Types do defstruct [:key, :min_version, :max_version] end + @moduledoc false + @enforce_keys [:commands] + @type t :: %__MODULE__{commands: [Command.t()]} + defstruct [:commands] + def new!(_opts \\ []) do %__MODULE__{ commands: [ @@ -557,4 +557,55 @@ defmodule RabbitMQStream.Message.Types do @type t :: %__MODULE__{} defstruct [] end + + @type t :: + TuneData.t() + | PeerPropertiesData.t() + | SaslHandshakeData.t() + | SaslAuthenticateData.t() + | OpenRequestData.t() + | OpenResponseData.t() + | HeartbeatData.t() + | CloseRequestData.t() + | CloseResponseData.t() + | CreateStreamRequestData.t() + | CreateStreamResponseData.t() + | DeleteStreamRequestData.t() + | DeleteStreamResponseData.t() + | StoreOffsetRequestData.t() + | StoreOffsetResponseData.t() + | QueryOffsetRequestData.t() + | QueryOffsetResponseData.t() + | QueryMetadataRequestData.t() + | QueryMetadataResponseData.t() + | MetadataUpdateData.t() + | DeclareProducerRequestData.t() + | DeclareProducerResponseData.t() + | DeleteProducerRequestData.t() + | DeleteProducerResponseData.t() + | QueryProducerSequenceRequestData.t() + | QueryProducerSequenceResponseData.t() + | PublishData.t() + | PublishErrorData.t() + | PublishConfirmData.t() + | SubscribeRequestData.t() + | ConsumerUpdateRequestData.t() + | ConsumerUpdateResponseData.t() + | UnsubscribeRequestData.t() + | CreditRequestData.t() + | SubscribeResponseData.t() + | UnsubscribeResponseData.t() + | CreditResponseData.t() + | RouteRequestData.t() + | RouteResponseData.t() + | PartitionsQueryRequestData.t() + | PartitionsQueryResponseData.t() + | DeliverData.t() + | ExchangeCommandVersionsData.t() + | StreamStatsRequestData.t() + | StreamStatsResponseData.t() + | CreateSuperStreamRequestData.t() + | CreateSuperStreamResponseData.t() + | DeleteSuperStreamRequestData.t() + | DeleteSuperStreamResponseData.t() end diff --git a/lib/message/message.ex b/lib/message/message.ex index 6eb76a2..4ca11e4 100644 --- a/lib/message/message.ex +++ b/lib/message/message.ex @@ -192,7 +192,7 @@ defmodule RabbitMQStream.Message do command: :declare_producer, correlation_id: conn.correlation_sequence, data: %Types.DeclareProducerRequestData{ - id: opts[:id], + producer_id: opts[:producer_id], producer_reference: opts[:producer_reference], stream_name: opts[:stream_name] } diff --git a/lib/producer/lifecycle.ex b/lib/producer/lifecycle.ex index bc1ec53..e66350f 100644 --- a/lib/producer/lifecycle.ex +++ b/lib/producer/lifecycle.ex @@ -48,6 +48,20 @@ defmodule RabbitMQStream.Producer.LifeCycle do {:noreply, %{state | sequence: state.sequence + 1}} end + @impl GenServer + def handle_info({:redeclare_producer, _opts}, state) do + # Since the connection has changed, we need to update the producer's own ID. This shouldn't be + # an issue. + with {:ok, id} <- + RabbitMQStream.Connection.declare_producer(state.connection, state.stream_name, state.reference_name) do + # The sequence number should have changed + {:noreply, %{state | id: id}} + else + err -> + {:stop, err, state} + end + end + @impl GenServer def terminate(_reason, %{id: nil}), do: :ok diff --git a/mix.lock b/mix.lock index aae08c3..d50b61a 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{ - "amqp": {:hex, :amqp, "3.3.0", "056d9f4bac96c3ab5a904b321e70e78b91ba594766a1fc2f32afd9c016d9f43b", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "8d3ae139d2646c630d674a1b8d68c7f85134f9e8b2a1c3dd5621616994b10a8b"}, - "amqp10_common": {:hex, :amqp10_common, "3.13.6", "4aa7bce71522f6f56898218e013a7bba52600185a84b8c3066e950c84549154e", [:make, :rebar3], [], "hexpm", "defd497445e8bd6ac31f968b9aea68b54ec619ce92686fca9eec59fd76c4b32c"}, + "amqp": {:hex, :amqp, "3.3.2", "6cad7469957b29c517a26a27474828f1db28278a13bcc2e7970db9854a3d3080", [:mix], [{:amqp_client, "~> 3.9", [hex: :amqp_client, repo: "hexpm", optional: false]}], "hexpm", "f977c41d81b65a21234a9158e6491b2296f8bd5bda48d5b611a64b6e0d2c3f31"}, + "amqp10_common": {:hex, :amqp10_common, "3.13.7", "43b6205df5fd1e2e6d967d908dd70289b247e14cb85d57edd072d460cba4a44c", [:make, :rebar3], [], "hexpm", "593331d33f8415a926ada5fe30a87544f1bb9b89047123b6d7b081dcc3d4f635"}, "amqp_client": {:hex, :amqp_client, "3.12.14", "2b677bc3f2e2234ba7517042b25d72071a79735042e91f9116bd3c176854b622", [:make, :rebar3], [{:credentials_obfuscation, "3.4.0", [hex: :credentials_obfuscation, repo: "hexpm", optional: false]}, {:rabbit_common, "3.12.14", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "5f70b6c3b1a739790080da4fddc94a867e99f033c4b1edc20d6ff8b8fb4bd160"}, "credentials_obfuscation": {:hex, :credentials_obfuscation, "3.4.0", "34e18b126b3aefd6e8143776fbe1ceceea6792307c99ac5ee8687911f048cfd7", [:rebar3], [], "hexpm", "738ace0ed5545d2710d3f7383906fc6f6b582d019036e5269c4dbd85dbced566"}, "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, diff --git a/services/docker-compose.yaml b/services/docker-compose.yaml index 6496782..b3e4771 100644 --- a/services/docker-compose.yaml +++ b/services/docker-compose.yaml @@ -3,7 +3,7 @@ services: container_name: rabbitmq_stream image: rabbitmq:3.13-management restart: always - hostname: rabbitmq_stream + hostname: localhost volumes: - ./enabled_plugins:/etc/rabbitmq/enabled_plugins - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf @@ -20,7 +20,7 @@ services: container_name: rabbitmq_stream image: rabbitmq:3.12-management restart: always - hostname: rabbitmq_stream + hostname: localhost volumes: - ./enabled_plugins:/etc/rabbitmq/enabled_plugins - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf @@ -37,7 +37,7 @@ services: container_name: rabbitmq_stream image: rabbitmq:3.11-management restart: always - hostname: rabbitmq_stream + hostname: localhost volumes: - ./enabled_plugins:/etc/rabbitmq/enabled_plugins - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf diff --git a/test/client_test.exs b/test/client_test.exs new file mode 100644 index 0000000..135694c --- /dev/null +++ b/test/client_test.exs @@ -0,0 +1,41 @@ +defmodule RabbitMQStreamTest.ClientTest do + use ExUnit.Case, async: false + + # @tag :v3_13_proxied_cluster + # test "should auto discover and connect to all node when behind a loadbalancer" do + # end + + defmodule TheClient do + use RabbitMQStream.Client, host: "localhost" + end + + defmodule ClientProducer do + use RabbitMQStream.Producer + end + + # @tag :v3_13_cluster + test "should create a stream" do + {:ok, conn} = RabbitMQStream.Connection.start_link(host: "localhost") + + assert :ok = RabbitMQStream.Connection.connect(conn) + + {:ok, client} = RabbitMQStream.Client.start_link(host: "localhost") + + RabbitMQStream.Connection.create_stream(conn, "stream1") + + {:ok, _subscription_id} = RabbitMQStream.Connection.subscribe(client, "stream1", self(), :next, 999) + + {:ok, _} = + ClientProducer.start_link( + connection: client, + reference_name: "client-producer", + stream_name: "stream1" + ) + + message = Jason.encode!(%{message: "Hello, world2!"}) + + ClientProducer.publish(message) + + assert_receive {:deliver, %{osiris_chunk: %{data_entries: [^message]}}}, 500 + end +end diff --git a/test/clustered_test.exs b/test/clustered_test.exs index 18281d8..1b8cba0 100644 --- a/test/clustered_test.exs +++ b/test/clustered_test.exs @@ -1,12 +1,16 @@ defmodule RabbitMQStreamTest.Clustered do use ExUnit.Case, async: false - @tag :v3_13_proxied_cluster - test "should auto discover and connect to all node when behind a loadbalancer" do + # @tag :v3_13_proxied_cluster + # test "should auto discover and connect to all node when behind a loadbalancer" do + # end + + defmodule TheClient do + use RabbitMQStream.Client, host: "rabbitmq1" end - @tag :v3_13_cluster - test "should auto discover and connect to all nodes" do + # @tag :v3_13_cluster + test "should connect to all the replicated nodes" do {:ok, conn1} = RabbitMQStream.Connection.start_link(host: "rabbitmq1") {:ok, conn2} = RabbitMQStream.Connection.start_link(host: "rabbitmq2") {:ok, conn3} = RabbitMQStream.Connection.start_link(host: "rabbitmq3") @@ -20,9 +24,26 @@ defmodule RabbitMQStreamTest.Clustered do RabbitMQStream.Connection.create_stream(conn3, "stream3") dbg(RabbitMQStream.Connection.query_metadata(conn1, ["stream1", "stream2", "stream3"])) + end + + @tag :v3_13_cluster + test "should auto discover and connect to all nodes" do + {:ok, conn1} = RabbitMQStream.Connection.start_link(host: "rabbitmq1") + {:ok, conn2} = RabbitMQStream.Connection.start_link(host: "rabbitmq2") + {:ok, conn3} = RabbitMQStream.Connection.start_link(host: "rabbitmq3") + + {:ok, client} = RabbitMQStream.Client.start_link(host: "rabbitmq1") + + RabbitMQStream.Connection.create_stream(conn1, "stream1") + RabbitMQStream.Connection.create_stream(conn2, "stream2") + RabbitMQStream.Connection.create_stream(conn3, "stream3") + + assert {:ok, _subscription_id} = RabbitMQStream.Connection.subscribe(client, "stream1", self(), :next, 999) + assert {:ok, _subscription_id} = RabbitMQStream.Connection.subscribe(client, "stream2", self(), :next, 999) + assert {:ok, _subscription_id} = RabbitMQStream.Connection.subscribe(client, "stream3", self(), :next, 999) - dbg(:sys.get_state(conn1)) - dbg(:sys.get_state(conn2)) - dbg(:sys.get_state(conn3)) + assert {:ok, _subscription_id} = RabbitMQStream.Connection.subscribe(client, "stream1", self(), :next, 999) + assert {:ok, _subscription_id} = RabbitMQStream.Connection.subscribe(client, "stream2", self(), :next, 999) + assert {:ok, _subscription_id} = RabbitMQStream.Connection.subscribe(client, "stream3", self(), :next, 999) end end diff --git a/test/producer_test.exs b/test/producer_test.exs index 2226e4d..dd6ce46 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -22,24 +22,24 @@ defmodule RabbitMQStreamTest.Producer do end setup do - {:ok, _conn} = SupervisedConnection.start_link(host: "localhost", vhost: "/") + {:ok, conn} = SupervisedConnection.start_link(host: "localhost", vhost: "/") :ok = SupervisedConnection.connect() - :ok + [conn: conn] end @stream "producer-test-01" @reference_name "producer-test-reference-01" - test "should declare itself and its stream" do + test "should declare itself and its stream", %{conn: conn} do assert {:ok, _} = SupervisorProducer.start_link(reference_name: @reference_name, stream_name: @stream) - SupervisedConnection.delete_stream(@stream) + RabbitMQStream.Connection.delete_stream(conn, @stream) end @stream "producer-test-02" @reference_name "producer-test-reference-02" - test "should query its sequence when declaring" do + test "should query its sequence when declaring", %{conn: conn} do {:ok, _} = SupervisorProducer.start_link( reference_name: @reference_name, @@ -47,14 +47,15 @@ defmodule RabbitMQStreamTest.Producer do ) assert %{sequence: 1} = :sys.get_state(Process.whereis(SupervisorProducer)) - SupervisedConnection.delete_stream(@stream) + RabbitMQStream.Connection.delete_stream(conn, @stream) end @stream "producer-test-03" @reference_name "producer-test-reference-03" - test "should publish a message" do + test "should publish a message", %{conn: conn} do {:ok, _} = SupervisorProducer.start_link( + connection: conn, reference_name: @reference_name, stream_name: @stream ) @@ -73,6 +74,6 @@ defmodule RabbitMQStreamTest.Producer do assert %{sequence: ^sequence} = :sys.get_state(Process.whereis(SupervisorProducer)) - SupervisedConnection.delete_stream(@stream) + RabbitMQStream.Connection.delete_stream(conn, @stream) end end