From 82523c327690e47c54ecda09f0d8abb7735757e3 Mon Sep 17 00:00:00 2001 From: Andrea Leopardi Date: Wed, 10 Jul 2024 10:39:41 +0200 Subject: [PATCH] FIXUP --- lib/xandra/cluster/control_connection.ex | 27 +++++++--------- lib/xandra/connection.ex | 41 +++++------------------- lib/xandra/connection/utils.ex | 12 +++++-- lib/xandra/frame.ex | 23 +++++++++---- test/xandra/frame_test.exs | 26 ++++++++++++++- test/xandra_test.exs | 36 ++++++--------------- 6 files changed, 79 insertions(+), 86 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index f57ce87c..c82cb405 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -305,24 +305,21 @@ defmodule Xandra.Cluster.ControlConnection do end end - rest_fun = & &1 - - function = + decode_fun = case state.protocol_module do - Xandra.Protocol.V5 -> :decode_v5 - Xandra.Protocol.V4 -> :decode_v4 - Xandra.Protocol.V3 -> :decode_v4 + Xandra.Protocol.V5 -> &Xandra.Frame.decode_v5/4 + Xandra.Protocol.V4 -> &Xandra.Frame.decode_v4/4 + Xandra.Protocol.V3 -> &Xandra.Frame.decode_v4/4 end - case apply(Xandra.Frame, function, [ - fetch_bytes_fun, - state.buffer, - _compressor = nil, - rest_fun - ]) do - {:ok, frame, rest} -> - change_event = state.protocol_module.decode_response(frame) - state = handle_change_event(state, change_event) + case decode_fun.(fetch_bytes_fun, state.buffer, _compressor = nil, _rest_fun = & &1) do + {:ok, frames, rest} -> + state = + Enum.reduce(frames, state, fn frame, acc -> + change_event = state.protocol_module.decode_response(frame) + handle_change_event(acc, change_event) + end) + consume_new_data(%__MODULE__{state | buffer: rest}) {:error, _reason} -> diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 784addda..e1794154 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -15,8 +15,6 @@ defmodule Xandra.Connection do alias Xandra.Simple alias Xandra.Transport - require Logger - @behaviour :gen_statem @forced_transport_options [packet: :raw, mode: :binary, active: false] @@ -182,7 +180,6 @@ defmodule Xandra.Connection do # This is in an anonymous function so that we can use it in a Telemetry span. fun = fn -> with :ok <- Transport.send(transport, payload), - Logger.debug("Sent query on stream ID #{stream_id}, will wait for an answer"), {:ok, %Frame{} = frame} <- receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do case protocol_module.decode_response(frame, query, options) do @@ -344,8 +341,7 @@ defmodule Xandra.Connection do in_flight_requests: %{}, timed_out_ids: %{}, current_keyspace: nil, - buffer: <<>>, - free_stream_ids: Enum.to_list(1..@max_cassandra_stream_id) + buffer: <<>> ] ## Callbacks @@ -559,7 +555,6 @@ defmodule Xandra.Connection do def disconnected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) - data = update_in(data.free_stream_ids, &[stream_id | &1]) {:keep_state, data} end @@ -595,7 +590,7 @@ defmodule Xandra.Connection do {:error, reason} -> disconnect(data, reason) end else - case Transport.setopts(data.transport, active: true) do + case Transport.setopts(data.transport, active: :once) do :ok -> {:keep_state_and_data, {{:timeout, :reconnect}, :infinity, nil}} {:error, reason} -> disconnect(data, reason) end @@ -615,8 +610,7 @@ defmodule Xandra.Connection do # sends us the corresponding response, we can still route that C* response to the # alias and it'll not go anywhere and not cause any issues. def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do - # stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids) - {stream_id, data} = next_stream_id(data) + stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids) response = checked_out_state( @@ -650,13 +644,9 @@ defmodule Xandra.Connection do end def connected(:info, message, data) when is_data_message(data.transport, message) do - :ok = Transport.setopts(data.transport, active: true) + :ok = Transport.setopts(data.transport, active: :once) {_mod, _socket, bytes} = message - Logger.debug( - "Received data:\n#{inspect(bytes, limit: :infinity, iexprintable_limit: :infinity)}" - ) - data = update_in(data.buffer, &(&1 <> bytes)) handle_new_bytes(data) end @@ -675,9 +665,7 @@ defmodule Xandra.Connection do end def connected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do - Logger.debug("Releasing stream ID #{stream_id}") data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) - data = update_in(data.free_stream_ids, &[stream_id | &1]) {:keep_state, data} end @@ -775,19 +763,16 @@ defmodule Xandra.Connection do data.compressor, _rest_fun = & &1 ) do - {:ok, frame, rest} -> - Logger.debug("Received full frame") - data = handle_frame(%__MODULE__{data | buffer: rest}, frame) + {:ok, frames, rest} -> + data = Enum.reduce(frames, %__MODULE__{data | buffer: rest}, &handle_frame/2) if rest != "" do handle_new_bytes(data) else - Logger.debug("No more frames to decode") {:keep_state, data} end {:error, :insufficient_data} -> - Logger.debug("Received a packet that did not contain a full frame") {:keep_state, data} {:error, reason} -> @@ -796,13 +781,9 @@ defmodule Xandra.Connection do end defp handle_frame( - %__MODULE__{timed_out_ids: timed_out_ids} = data, - %Frame{stream_id: stream_id} = frame + %Frame{stream_id: stream_id} = frame, + %__MODULE__{timed_out_ids: timed_out_ids} = data ) do - Logger.debug( - "Received frame with stream ID #{stream_id}, in_flight_req is: #{inspect(data.in_flight_requests[stream_id])}" - ) - case pop_in(data.in_flight_requests[stream_id]) do # There is no in-flight req for this response frame, BUT there is a request # for it that timed out on the caller's side. Let's just emit a @@ -939,12 +920,6 @@ defmodule Xandra.Connection do end end - defp next_stream_id(data) do - [next_id | rest] = data.free_stream_ids - data = %__MODULE__{data | free_stream_ids: rest} - {next_id, data} - end - defp random_free_stream_id(in_flight_requests, timed_out_ids) do random_id = Enum.random(1..@max_cassandra_stream_id) diff --git a/lib/xandra/connection/utils.ex b/lib/xandra/connection/utils.ex index 462dd9ae..43577681 100644 --- a/lib/xandra/connection/utils.ex +++ b/lib/xandra/connection/utils.ex @@ -13,9 +13,15 @@ defmodule Xandra.Connection.Utils do do: {:ok, binary, fetch_state} end - case protocol_format do - :v4_or_less -> Frame.decode_v4(fetch_bytes_fun, :no_fetch_state, compressor) - :v5_or_more -> Frame.decode_v5(fetch_bytes_fun, :no_fetch_state, compressor) + decode_fun = + case protocol_format do + :v4_or_less -> &Frame.decode_v4/3 + :v5_or_more -> &Frame.decode_v5/3 + end + + case decode_fun.(fetch_bytes_fun, :no_fetch_state, compressor) do + {:ok, [frame], rest} -> {:ok, frame, rest} + {:error, reason} -> {:error, reason} end end diff --git a/lib/xandra/frame.ex b/lib/xandra/frame.ex index fad397eb..8ca646cd 100644 --- a/lib/xandra/frame.ex +++ b/lib/xandra/frame.ex @@ -293,7 +293,7 @@ defmodule Xandra.Frame do (fetch_state, pos_integer() -> {:ok, binary(), fetch_state} | {:error, reason}), fetch_state, module() | nil - ) :: {:ok, t(), binary()} | {:error, reason} + ) :: {:ok, [t(), ...], binary()} | {:error, reason} when fetch_state: term(), reason: term() def decode( protocol_module, @@ -313,7 +313,7 @@ defmodule Xandra.Frame do fetch_state, module() | nil, (fetch_state -> binary()) - ) :: {:ok, t(), binary()} | {:error, reason} + ) :: {:ok, [t(), ...], binary()} | {:error, reason} when fetch_state: term(), reason: term() def decode_v4(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do @@ -322,11 +322,11 @@ defmodule Xandra.Frame do with {:ok, header, fetch_state} <- fetch_bytes_fun.(fetch_state, length) do case body_length(header) do 0 -> - {:ok, decode(header), rest_fun.(fetch_state)} + {:ok, [decode(header)], rest_fun.(fetch_state)} body_length -> with {:ok, body, bytes_state} <- fetch_bytes_fun.(fetch_state, body_length), - do: {:ok, decode(header, body, compressor), rest_fun.(bytes_state)} + do: {:ok, [decode(header, body, compressor)], rest_fun.(bytes_state)} end end end @@ -336,17 +336,26 @@ defmodule Xandra.Frame do fetch_state, module() | nil, (fetch_state -> binary()) - ) :: {:ok, t(), rest :: binary()} | {:error, reason} + ) :: {:ok, [t(), ...], rest :: binary()} | {:error, reason} when fetch_state: term(), reason: term() def decode_v5(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do with {:ok, envelope, rest} <- decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun) do - {frame, _ignored_rest} = decode_from_binary(envelope, compressor) - {:ok, frame, rest} + frames = decode_all_v5_frames_in_envelope(envelope, compressor, _acc = []) + {:ok, frames, rest} end end + defp decode_all_v5_frames_in_envelope("", _compressor, acc) do + Enum.reverse(acc) + end + + defp decode_all_v5_frames_in_envelope(envelope, compressor, acc) do + {frame, rest} = decode_from_binary(envelope, compressor) + decode_all_v5_frames_in_envelope(rest, compressor, [frame | acc]) + end + # Made public for testing. @doc false def decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) do diff --git a/test/xandra/frame_test.exs b/test/xandra/frame_test.exs index 19fb01ee..55e8f4fb 100644 --- a/test/xandra/frame_test.exs +++ b/test/xandra/frame_test.exs @@ -146,7 +146,7 @@ defmodule Xandra.FrameTest do |> Frame.encode(protocol_module) |> IO.iodata_to_binary() - assert {:ok, redecoded_frame, _rest = ""} = + assert {:ok, [redecoded_frame], _rest = ""} = Frame.decode_v5(&fetch_bytes_from_binary/2, encoded, _compressor = nil) assert redecoded_frame == frame @@ -240,6 +240,30 @@ defmodule Xandra.FrameTest do end end + describe "decode/5" do + # Regression for: https://issues.apache.org/jira/browse/CASSANDRA-19753 + @tag :regression + test "can decode multiple envelopes in a single frame" do + payload = + <<144, 0, 2, 138, 218, 155, 133, 0, 0, 2, 8, 0, 0, 0, 63, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, + 1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97, 108, 0, 12, 99, 108, 117, + 115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1, 0, 0, 0, 12, 116, 101, + 115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 133, 0, 0, 1, 8, 0, 0, 0, 63, 0, 0, 0, + 2, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97, + 108, 0, 12, 99, 108, 117, 115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1, + 0, 0, 0, 12, 116, 101, 115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 40, 65, 100, 21>> + + assert {:ok, [%Frame{stream_id: 2}, %Frame{stream_id: 1}], _rest = ""} = + Frame.decode( + Xandra.Protocol.V5, + &fetch_bytes_from_binary/2, + payload, + _compressor = nil, + _rest_fun = & &1 + ) + end + end + defp kind_generator do member_of([ :startup, diff --git a/test/xandra_test.exs b/test/xandra_test.exs index baf0b060..854e76c9 100644 --- a/test/xandra_test.exs +++ b/test/xandra_test.exs @@ -346,35 +346,17 @@ defmodule XandraTest do end end - @tag start_conn: false + # Regression for timeouts on native protocol v5: + # https://github.com/whatyouhide/xandra/issues/356 @tag :regression - test "concurrent requests on a single connection", %{start_options: start_options} do - conn = - start_supervised!({Xandra, start_options ++ [max_concurrent_requests_per_connection: 2]}) - - max_requests = 5 - - Xandra.Telemetry.attach_default_handler() - Xandra.Telemetry.attach_debug_handler() - Logger.configure(level: :debug) - - results = - 1..max_requests - |> Task.async_stream( - fn _i -> - Xandra.execute(conn, "SELECT cluster_name FROM system.local", [], timeout: 5000) - end, - timeout: 15_000, - max_concurrency: 2 - ) - |> Enum.map(fn {:ok, result} -> result end) - - for result <- results do + test "concurrent requests on a single connection", %{conn: conn} do + 1..5 + |> Task.async_stream(fn _i -> + Xandra.execute(conn, "SELECT cluster_name FROM system.local", [], timeout: 5000) + end) + |> Enum.each(fn {:ok, result} -> assert {:ok, %Xandra.Page{}} = result - end - after - :telemetry.detach("xandra-default-telemetry-handler") - :telemetry.detach("xandra-debug-telemetry-handler") + end) end def configure_fun(options, original_start_options, pid, ref) do