From fb268802a450dc449dc80d32cc099f66a9fee075 Mon Sep 17 00:00:00 2001 From: Victor Gaiva <13839490+VictorGaiva@users.noreply.github.com> Date: Sat, 30 Mar 2024 14:44:59 -0300 Subject: [PATCH] feat: Split user and internal commands buffers --- lib/connection/connection.ex | 6 +++-- lib/connection/handler.ex | 23 +++++++++-------- lib/connection/helpers.ex | 12 ++++++--- lib/connection/lifecycle.ex | 48 +++++++++++++++++++++++++++--------- test/connection_test.exs | 8 ++++++ 5 files changed, 70 insertions(+), 27 deletions(-) diff --git a/lib/connection/connection.ex b/lib/connection/connection.ex index cbbb951..c0e8afc 100644 --- a/lib/connection/connection.ex +++ b/lib/connection/connection.ex @@ -529,7 +529,8 @@ defmodule RabbitMQStream.Connection do }, frames_buffer: RabbitMQStream.Message.Buffer.t(), request_buffer: :queue.queue({term(), pid()}), - commands_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}), + internal_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}), + user_buffer: :queue.queue({atom(), atom(), list({atom(), term()})}), # this should not be here. Should find a better way to return the close reason from the 'handler' module close_reason: String.t() | atom() | nil, transport: RabbitMQStream.Connection.Transport.t() @@ -552,7 +553,8 @@ defmodule RabbitMQStream.Connection do commands: %{}, request_buffer: :queue.new(), frames_buffer: RabbitMQStream.Message.Buffer.init(), - commands_buffer: :queue.new(), + internal_buffer: :queue.new(), + user_buffer: :queue.new(), close_reason: nil ] end diff --git a/lib/connection/handler.ex b/lib/connection/handler.ex index 1c0668f..ecd9221 100644 --- a/lib/connection/handler.ex +++ b/lib/connection/handler.ex @@ -11,7 +11,7 @@ defmodule RabbitMQStream.Connection.Handler do Logger.debug("Connection closed") %{conn | state: :closing, close_reason: request.data.reason} - |> Helpers.push(:response, :close, correlation_id: request.correlation_id, code: :ok) + |> Helpers.push_internal(:response, :close, correlation_id: request.correlation_id, code: :ok) end def handle_message(%Connection{} = conn, %Request{command: :tune} = request) do @@ -22,8 +22,8 @@ defmodule RabbitMQStream.Connection.Handler do options = Keyword.merge(conn.options, frame_max: request.data.frame_max, heartbeat: request.data.heartbeat) %{conn | options: options, state: :opening} - |> Helpers.push(:response, :tune, correlation_id: 0) - |> Helpers.push(:request, :open) + |> Helpers.push_internal(:response, :tune, correlation_id: 0) + |> Helpers.push_internal(:request, :open) end def handle_message(%Connection{} = conn, %Request{command: :heartbeat}) do @@ -32,7 +32,7 @@ defmodule RabbitMQStream.Connection.Handler do def handle_message(%Connection{} = conn, %Request{command: :metadata_update} = request) do conn - |> Helpers.push(:request, :query_metadata, streams: [request.data.stream_name]) + |> Helpers.push_internal(:request, :query_metadata, streams: [request.data.stream_name]) end def handle_message(%Connection{} = conn, %Request{command: :deliver} = response) do @@ -129,14 +129,14 @@ defmodule RabbitMQStream.Connection.Handler do peer_properties = Map.put(response.data.peer_properties, "base-version", version) %{conn | peer_properties: peer_properties} - |> Helpers.push(:request, :sasl_handshake) + |> Helpers.push_internal(:request, :sasl_handshake) end def handle_message(%Connection{} = conn, %Response{command: :sasl_handshake} = response) do Logger.debug("SASL handshake successful. Initiating authentication.") %{conn | mechanisms: response.data.mechanisms} - |> Helpers.push(:request, :sasl_authenticate) + |> Helpers.push_internal(:request, :sasl_authenticate) end def handle_message(%Connection{} = conn, %Response{command: :sasl_authenticate, data: %{sasl_opaque_data: ""}}) do @@ -150,7 +150,7 @@ defmodule RabbitMQStream.Connection.Handler do Logger.debug("Opening connection to vhost: \"#{conn.options[:vhost]}\"") conn - |> Helpers.push(:request, :open) + |> Helpers.push_internal(:request, :open) |> Map.put(:state, :opening) end @@ -162,7 +162,7 @@ defmodule RabbitMQStream.Connection.Handler do %{conn | options: options} |> Map.put(:state, :opening) - |> Helpers.push(:request, :open) + |> Helpers.push_internal(:request, :open) end # If the server has a version lower than 3.13, this is the 'terminating' response. @@ -192,7 +192,7 @@ defmodule RabbitMQStream.Connection.Handler do ) %{conn | connection_properties: response.data.connection_properties} - |> Helpers.push(:request, :exchange_command_versions) + |> Helpers.push_internal(:request, :exchange_command_versions) end def handle_message(%Connection{} = conn, %Response{command: :query_metadata} = response) do @@ -310,7 +310,10 @@ defmodule RabbitMQStream.Connection.Handler do conn else conn - |> Helpers.push(:response, :consumer_update, correlation_id: request.correlation_id, code: :internal_error) + |> Helpers.push_internal(:response, :consumer_update, + correlation_id: request.correlation_id, + code: :internal_error + ) end end end diff --git a/lib/connection/helpers.ex b/lib/connection/helpers.ex index a4a129b..1f65ad9 100644 --- a/lib/connection/helpers.ex +++ b/lib/connection/helpers.ex @@ -13,10 +13,16 @@ defmodule RabbitMQStream.Connection.Helpers do {entry, %{conn | request_tracker: request_tracker}} end - def push(conn, action, command, opts \\ []) do - commands_buffer = :queue.in({action, command, opts}, conn.commands_buffer) + def push_user(conn, action, command, opts \\ []) do + user_buffer = :queue.in({action, command, opts}, conn.user_buffer) - %{conn | commands_buffer: commands_buffer} + %{conn | user_buffer: user_buffer} + end + + def push_internal(conn, action, command, opts \\ []) do + internal_buffer = :queue.in({action, command, opts}, conn.internal_buffer) + + %{conn | internal_buffer: internal_buffer} end defguard is_offset(offset) diff --git a/lib/connection/lifecycle.ex b/lib/connection/lifecycle.ex index 46d78a8..d767ad1 100644 --- a/lib/connection/lifecycle.ex +++ b/lib/connection/lifecycle.ex @@ -52,7 +52,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do conn = %{conn | connect_requests: [from | conn.connect_requests]} - |> send_request(:peer_properties) + |> Helpers.push_internal(:request, :peer_properties) + |> flush_buffer(:internal) {:noreply, conn} else @@ -229,7 +230,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do # command to the socket. This also would allow us to better test the 'handler' logic. commands |> Enum.reduce(conn, &Handler.handle_message(&2, &1)) - |> flush_commands() + |> flush_buffer(:internal) + |> flush_buffer(:user) |> handle_closing() end @@ -252,7 +254,10 @@ defmodule RabbitMQStream.Connection.Lifecycle do def handle_info({:heartbeat}, conn) do Process.send_after(self(), {:heartbeat}, conn.options[:heartbeat] * 1000) - conn = send_request(conn, :heartbeat, correlation_sum: 0) + conn = + conn + |> Helpers.push_internal(:request, :heartbeat, correlation_sum: 0) + |> flush_buffer(:internal) {:noreply, conn} end @@ -292,7 +297,8 @@ defmodule RabbitMQStream.Connection.Lifecycle do conn = conn - |> send_request(:peer_properties) + |> Helpers.push_internal(:request, :peer_properties) + |> flush_buffer(:internal) {:noreply, conn} else @@ -328,19 +334,33 @@ defmodule RabbitMQStream.Connection.Lifecycle do defp handle_closing(conn), do: {:noreply, conn} - defp send_request(%Connection{} = conn, command, opts \\ []) do + defp send_request(%Connection{} = conn, command, opts) do conn - |> Helpers.push(:request, command, opts) - |> flush_commands() + |> Helpers.push_user(:request, command, opts) + |> flush_buffer(:user) end defp send_response(%Connection{} = conn, command, opts) do conn - |> Helpers.push(:response, command, opts) - |> flush_commands() + |> Helpers.push_user(:response, command, opts) + |> flush_buffer(:user) + end + + defp flush_buffer(%Connection{} = conn, :internal) do + conn = + :queue.fold( + fn + command, conn -> + send_command(conn, command) + end, + conn, + conn.internal_buffer + ) + + %{conn | internal_buffer: :queue.new()} end - defp flush_commands(%Connection{} = conn) do + defp flush_buffer(%Connection{state: :open} = conn, :user) do conn = :queue.fold( fn @@ -348,10 +368,14 @@ defmodule RabbitMQStream.Connection.Lifecycle do send_command(conn, command) end, conn, - conn.commands_buffer + conn.user_buffer ) - %{conn | commands_buffer: :queue.new()} + %{conn | user_buffer: :queue.new()} + end + + defp flush_buffer(%Connection{} = conn, :user) do + conn end defp send_command(%Connection{} = conn, {:request, command, opts}) do diff --git a/test/connection_test.exs b/test/connection_test.exs index e301066..3203034 100644 --- a/test/connection_test.exs +++ b/test/connection_test.exs @@ -168,4 +168,12 @@ defmodule RabbitMQStreamTest.Connection do assert {:ok, _data} = SupervisedConnection.stream_stats(@stream) assert {:error, :stream_does_not_exist} = SupervisedConnection.stream_stats("#{@stream}-NON-EXISTENT") end + + # I'm not really sure how to test this. + # @stream "consumer-test-stream-11" + # test "should buffer user commands before the connection is open" do + # {:ok, _conn} = SupervisedConnection.start_link(host: "localhost", vhost: "/") + # :ok = SupervisedConnection.connect() + + # end end