Skip to content

Commit

Permalink
Fix decoding multiple frames in a single envelope in native protocol …
Browse files Browse the repository at this point in the history
…v5 (#368)
  • Loading branch information
whatyouhide committed Jul 10, 2024
1 parent 2714ef6 commit 379fcce
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 32 deletions.
27 changes: 12 additions & 15 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -305,24 +305,21 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

rest_fun = & &1

function =
decode_fun =
case state.protocol_module do
Xandra.Protocol.V5 -> :decode_v5
Xandra.Protocol.V4 -> :decode_v4
Xandra.Protocol.V3 -> :decode_v4
Xandra.Protocol.V5 -> &Xandra.Frame.decode_v5/4
Xandra.Protocol.V4 -> &Xandra.Frame.decode_v4/4
Xandra.Protocol.V3 -> &Xandra.Frame.decode_v4/4
end

case apply(Xandra.Frame, function, [
fetch_bytes_fun,
state.buffer,
_compressor = nil,
rest_fun
]) do
{:ok, frame, rest} ->
change_event = state.protocol_module.decode_response(frame)
state = handle_change_event(state, change_event)
case decode_fun.(fetch_bytes_fun, state.buffer, _compressor = nil, _rest_fun = & &1) do
{:ok, frames, rest} ->
state =
Enum.reduce(frames, state, fn frame, acc ->
change_event = state.protocol_module.decode_response(frame)
handle_change_event(acc, change_event)
end)

consume_new_data(%__MODULE__{state | buffer: rest})

{:error, _reason} ->
Expand Down
16 changes: 10 additions & 6 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -762,10 +762,14 @@ defmodule Xandra.Connection do
data.compressor,
_rest_fun = & &1
) do
{:ok, frame, rest} ->
%__MODULE__{data | buffer: rest}
|> handle_frame(frame)
|> handle_new_bytes()
{:ok, frames, rest} ->
data = Enum.reduce(frames, %__MODULE__{data | buffer: rest}, &handle_frame/2)

if rest != "" do
handle_new_bytes(data)
else
{:keep_state, data}
end

{:error, :insufficient_data} ->
{:keep_state, data}
Expand All @@ -776,8 +780,8 @@ defmodule Xandra.Connection do
end

defp handle_frame(
%__MODULE__{timed_out_ids: timed_out_ids} = data,
%Frame{stream_id: stream_id} = frame
%Frame{stream_id: stream_id} = frame,
%__MODULE__{timed_out_ids: timed_out_ids} = data
) do
case pop_in(data.in_flight_requests[stream_id]) do
# There is no in-flight req for this response frame, BUT there is a request
Expand Down
12 changes: 9 additions & 3 deletions lib/xandra/connection/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ defmodule Xandra.Connection.Utils do
do: {:ok, binary, fetch_state}
end

case protocol_format do
:v4_or_less -> Frame.decode_v4(fetch_bytes_fun, :no_fetch_state, compressor)
:v5_or_more -> Frame.decode_v5(fetch_bytes_fun, :no_fetch_state, compressor)
decode_fun =
case protocol_format do
:v4_or_less -> &Frame.decode_v4/3
:v5_or_more -> &Frame.decode_v5/3
end

case decode_fun.(fetch_bytes_fun, :no_fetch_state, compressor) do
{:ok, [frame], rest} -> {:ok, frame, rest}
{:error, reason} -> {:error, reason}
end
end

Expand Down
23 changes: 16 additions & 7 deletions lib/xandra/frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ defmodule Xandra.Frame do
(fetch_state, pos_integer() -> {:ok, binary(), fetch_state} | {:error, reason}),
fetch_state,
module() | nil
) :: {:ok, t(), binary()} | {:error, reason}
) :: {:ok, [t(), ...], binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode(
protocol_module,
Expand All @@ -313,7 +313,7 @@ defmodule Xandra.Frame do
fetch_state,
module() | nil,
(fetch_state -> binary())
) :: {:ok, t(), binary()} | {:error, reason}
) :: {:ok, [t(), ...], binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode_v4(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end)
when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do
Expand All @@ -322,11 +322,11 @@ defmodule Xandra.Frame do
with {:ok, header, fetch_state} <- fetch_bytes_fun.(fetch_state, length) do
case body_length(header) do
0 ->
{:ok, decode(header), rest_fun.(fetch_state)}
{:ok, [decode(header)], rest_fun.(fetch_state)}

body_length ->
with {:ok, body, bytes_state} <- fetch_bytes_fun.(fetch_state, body_length),
do: {:ok, decode(header, body, compressor), rest_fun.(bytes_state)}
do: {:ok, [decode(header, body, compressor)], rest_fun.(bytes_state)}
end
end
end
Expand All @@ -336,17 +336,26 @@ defmodule Xandra.Frame do
fetch_state,
module() | nil,
(fetch_state -> binary())
) :: {:ok, t(), rest :: binary()} | {:error, reason}
) :: {:ok, [t(), ...], rest :: binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode_v5(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end)
when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do
with {:ok, envelope, rest} <-
decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun) do
{frame, _ignored_rest} = decode_from_binary(envelope, compressor)
{:ok, frame, rest}
frames = decode_all_v5_frames_in_envelope(envelope, compressor, _acc = [])
{:ok, frames, rest}
end
end

defp decode_all_v5_frames_in_envelope("", _compressor, acc) do
Enum.reverse(acc)
end

defp decode_all_v5_frames_in_envelope(envelope, compressor, acc) do
{frame, rest} = decode_from_binary(envelope, compressor)
decode_all_v5_frames_in_envelope(rest, compressor, [frame | acc])
end

# Made public for testing.
@doc false
def decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) do
Expand Down
26 changes: 25 additions & 1 deletion test/xandra/frame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ defmodule Xandra.FrameTest do
|> Frame.encode(protocol_module)
|> IO.iodata_to_binary()

assert {:ok, redecoded_frame, _rest = ""} =
assert {:ok, [redecoded_frame], _rest = ""} =
Frame.decode_v5(&fetch_bytes_from_binary/2, encoded, _compressor = nil)

assert redecoded_frame == frame
Expand Down Expand Up @@ -240,6 +240,30 @@ defmodule Xandra.FrameTest do
end
end

describe "decode/5" do
# Regression for: https://issues.apache.org/jira/browse/CASSANDRA-19753
@tag :regression
test "can decode multiple envelopes in a single frame" do
payload =
<<144, 0, 2, 138, 218, 155, 133, 0, 0, 2, 8, 0, 0, 0, 63, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0,
1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97, 108, 0, 12, 99, 108, 117,
115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1, 0, 0, 0, 12, 116, 101,
115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 133, 0, 0, 1, 8, 0, 0, 0, 63, 0, 0, 0,
2, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97,
108, 0, 12, 99, 108, 117, 115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1,
0, 0, 0, 12, 116, 101, 115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 40, 65, 100, 21>>

assert {:ok, [%Frame{stream_id: 2}, %Frame{stream_id: 1}], _rest = ""} =
Frame.decode(
Xandra.Protocol.V5,
&fetch_bytes_from_binary/2,
payload,
_compressor = nil,
_rest_fun = & &1
)
end
end

defp kind_generator do
member_of([
:startup,
Expand Down
13 changes: 13 additions & 0 deletions test/xandra_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,19 @@ defmodule XandraTest do
end
end

# Regression for timeouts on native protocol v5:
# https://github.com/whatyouhide/xandra/issues/356
@tag :regression
test "concurrent requests on a single connection", %{conn: conn} do
1..5
|> Task.async_stream(fn _i ->
Xandra.execute(conn, "SELECT cluster_name FROM system.local", [], timeout: 5000)
end)
|> Enum.each(fn {:ok, result} ->
assert {:ok, %Xandra.Page{}} = result
end)
end

def configure_fun(options, original_start_options, pid, ref) do
send(pid, {ref, options})
Keyword.replace!(options, :nodes, original_start_options[:nodes])
Expand Down

0 comments on commit 379fcce

Please sign in to comment.