From 379fccec78e4a902c0cfc60318a13ab9daecdd8e Mon Sep 17 00:00:00 2001 From: Andrea Leopardi Date: Wed, 10 Jul 2024 10:44:23 +0200 Subject: [PATCH] Fix decoding multiple frames in a single envelope in native protocol v5 (#368) --- lib/xandra/cluster/control_connection.ex | 27 +++++++++++------------- lib/xandra/connection.ex | 16 ++++++++------ lib/xandra/connection/utils.ex | 12 ++++++++--- lib/xandra/frame.ex | 23 ++++++++++++++------ test/xandra/frame_test.exs | 26 ++++++++++++++++++++++- test/xandra_test.exs | 13 ++++++++++++ 6 files changed, 85 insertions(+), 32 deletions(-) diff --git a/lib/xandra/cluster/control_connection.ex b/lib/xandra/cluster/control_connection.ex index f57ce87c..c82cb405 100644 --- a/lib/xandra/cluster/control_connection.ex +++ b/lib/xandra/cluster/control_connection.ex @@ -305,24 +305,21 @@ defmodule Xandra.Cluster.ControlConnection do end end - rest_fun = & &1 - - function = + decode_fun = case state.protocol_module do - Xandra.Protocol.V5 -> :decode_v5 - Xandra.Protocol.V4 -> :decode_v4 - Xandra.Protocol.V3 -> :decode_v4 + Xandra.Protocol.V5 -> &Xandra.Frame.decode_v5/4 + Xandra.Protocol.V4 -> &Xandra.Frame.decode_v4/4 + Xandra.Protocol.V3 -> &Xandra.Frame.decode_v4/4 end - case apply(Xandra.Frame, function, [ - fetch_bytes_fun, - state.buffer, - _compressor = nil, - rest_fun - ]) do - {:ok, frame, rest} -> - change_event = state.protocol_module.decode_response(frame) - state = handle_change_event(state, change_event) + case decode_fun.(fetch_bytes_fun, state.buffer, _compressor = nil, _rest_fun = & &1) do + {:ok, frames, rest} -> + state = + Enum.reduce(frames, state, fn frame, acc -> + change_event = state.protocol_module.decode_response(frame) + handle_change_event(acc, change_event) + end) + consume_new_data(%__MODULE__{state | buffer: rest}) {:error, _reason} -> diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 87eb28e5..1fcc733b 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -762,10 +762,14 @@ defmodule Xandra.Connection do data.compressor, _rest_fun = & &1 ) do - {:ok, frame, rest} -> - %__MODULE__{data | buffer: rest} - |> handle_frame(frame) - |> handle_new_bytes() + {:ok, frames, rest} -> + data = Enum.reduce(frames, %__MODULE__{data | buffer: rest}, &handle_frame/2) + + if rest != "" do + handle_new_bytes(data) + else + {:keep_state, data} + end {:error, :insufficient_data} -> {:keep_state, data} @@ -776,8 +780,8 @@ defmodule Xandra.Connection do end defp handle_frame( - %__MODULE__{timed_out_ids: timed_out_ids} = data, - %Frame{stream_id: stream_id} = frame + %Frame{stream_id: stream_id} = frame, + %__MODULE__{timed_out_ids: timed_out_ids} = data ) do case pop_in(data.in_flight_requests[stream_id]) do # There is no in-flight req for this response frame, BUT there is a request diff --git a/lib/xandra/connection/utils.ex b/lib/xandra/connection/utils.ex index 462dd9ae..43577681 100644 --- a/lib/xandra/connection/utils.ex +++ b/lib/xandra/connection/utils.ex @@ -13,9 +13,15 @@ defmodule Xandra.Connection.Utils do do: {:ok, binary, fetch_state} end - case protocol_format do - :v4_or_less -> Frame.decode_v4(fetch_bytes_fun, :no_fetch_state, compressor) - :v5_or_more -> Frame.decode_v5(fetch_bytes_fun, :no_fetch_state, compressor) + decode_fun = + case protocol_format do + :v4_or_less -> &Frame.decode_v4/3 + :v5_or_more -> &Frame.decode_v5/3 + end + + case decode_fun.(fetch_bytes_fun, :no_fetch_state, compressor) do + {:ok, [frame], rest} -> {:ok, frame, rest} + {:error, reason} -> {:error, reason} end end diff --git a/lib/xandra/frame.ex b/lib/xandra/frame.ex index fad397eb..8ca646cd 100644 --- a/lib/xandra/frame.ex +++ b/lib/xandra/frame.ex @@ -293,7 +293,7 @@ defmodule Xandra.Frame do (fetch_state, pos_integer() -> {:ok, binary(), fetch_state} | {:error, reason}), fetch_state, module() | nil - ) :: {:ok, t(), binary()} | {:error, reason} + ) :: {:ok, [t(), ...], binary()} | {:error, reason} when fetch_state: term(), reason: term() def decode( protocol_module, @@ -313,7 +313,7 @@ defmodule Xandra.Frame do fetch_state, module() | nil, (fetch_state -> binary()) - ) :: {:ok, t(), binary()} | {:error, reason} + ) :: {:ok, [t(), ...], binary()} | {:error, reason} when fetch_state: term(), reason: term() def decode_v4(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do @@ -322,11 +322,11 @@ defmodule Xandra.Frame do with {:ok, header, fetch_state} <- fetch_bytes_fun.(fetch_state, length) do case body_length(header) do 0 -> - {:ok, decode(header), rest_fun.(fetch_state)} + {:ok, [decode(header)], rest_fun.(fetch_state)} body_length -> with {:ok, body, bytes_state} <- fetch_bytes_fun.(fetch_state, body_length), - do: {:ok, decode(header, body, compressor), rest_fun.(bytes_state)} + do: {:ok, [decode(header, body, compressor)], rest_fun.(bytes_state)} end end end @@ -336,17 +336,26 @@ defmodule Xandra.Frame do fetch_state, module() | nil, (fetch_state -> binary()) - ) :: {:ok, t(), rest :: binary()} | {:error, reason} + ) :: {:ok, [t(), ...], rest :: binary()} | {:error, reason} when fetch_state: term(), reason: term() def decode_v5(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do with {:ok, envelope, rest} <- decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun) do - {frame, _ignored_rest} = decode_from_binary(envelope, compressor) - {:ok, frame, rest} + frames = decode_all_v5_frames_in_envelope(envelope, compressor, _acc = []) + {:ok, frames, rest} end end + defp decode_all_v5_frames_in_envelope("", _compressor, acc) do + Enum.reverse(acc) + end + + defp decode_all_v5_frames_in_envelope(envelope, compressor, acc) do + {frame, rest} = decode_from_binary(envelope, compressor) + decode_all_v5_frames_in_envelope(rest, compressor, [frame | acc]) + end + # Made public for testing. @doc false def decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) do diff --git a/test/xandra/frame_test.exs b/test/xandra/frame_test.exs index 19fb01ee..55e8f4fb 100644 --- a/test/xandra/frame_test.exs +++ b/test/xandra/frame_test.exs @@ -146,7 +146,7 @@ defmodule Xandra.FrameTest do |> Frame.encode(protocol_module) |> IO.iodata_to_binary() - assert {:ok, redecoded_frame, _rest = ""} = + assert {:ok, [redecoded_frame], _rest = ""} = Frame.decode_v5(&fetch_bytes_from_binary/2, encoded, _compressor = nil) assert redecoded_frame == frame @@ -240,6 +240,30 @@ defmodule Xandra.FrameTest do end end + describe "decode/5" do + # Regression for: https://issues.apache.org/jira/browse/CASSANDRA-19753 + @tag :regression + test "can decode multiple envelopes in a single frame" do + payload = + <<144, 0, 2, 138, 218, 155, 133, 0, 0, 2, 8, 0, 0, 0, 63, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, + 1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97, 108, 0, 12, 99, 108, 117, + 115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1, 0, 0, 0, 12, 116, 101, + 115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 133, 0, 0, 1, 8, 0, 0, 0, 63, 0, 0, 0, + 2, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97, + 108, 0, 12, 99, 108, 117, 115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1, + 0, 0, 0, 12, 116, 101, 115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 40, 65, 100, 21>> + + assert {:ok, [%Frame{stream_id: 2}, %Frame{stream_id: 1}], _rest = ""} = + Frame.decode( + Xandra.Protocol.V5, + &fetch_bytes_from_binary/2, + payload, + _compressor = nil, + _rest_fun = & &1 + ) + end + end + defp kind_generator do member_of([ :startup, diff --git a/test/xandra_test.exs b/test/xandra_test.exs index 168f2eee..854e76c9 100644 --- a/test/xandra_test.exs +++ b/test/xandra_test.exs @@ -346,6 +346,19 @@ defmodule XandraTest do end end + # Regression for timeouts on native protocol v5: + # https://github.com/whatyouhide/xandra/issues/356 + @tag :regression + test "concurrent requests on a single connection", %{conn: conn} do + 1..5 + |> Task.async_stream(fn _i -> + Xandra.execute(conn, "SELECT cluster_name FROM system.local", [], timeout: 5000) + end) + |> Enum.each(fn {:ok, result} -> + assert {:ok, %Xandra.Page{}} = result + end) + end + def configure_fun(options, original_start_options, pid, ref) do send(pid, {ref, options}) Keyword.replace!(options, :nodes, original_start_options[:nodes])