Skip to content

Commit

Permalink
FIXUP
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Jul 10, 2024
1 parent 70a6f52 commit 82523c3
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 86 deletions.
27 changes: 12 additions & 15 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -305,24 +305,21 @@ defmodule Xandra.Cluster.ControlConnection do
end
end

rest_fun = & &1

function =
decode_fun =
case state.protocol_module do
Xandra.Protocol.V5 -> :decode_v5
Xandra.Protocol.V4 -> :decode_v4
Xandra.Protocol.V3 -> :decode_v4
Xandra.Protocol.V5 -> &Xandra.Frame.decode_v5/4
Xandra.Protocol.V4 -> &Xandra.Frame.decode_v4/4
Xandra.Protocol.V3 -> &Xandra.Frame.decode_v4/4
end

case apply(Xandra.Frame, function, [
fetch_bytes_fun,
state.buffer,
_compressor = nil,
rest_fun
]) do
{:ok, frame, rest} ->
change_event = state.protocol_module.decode_response(frame)
state = handle_change_event(state, change_event)
case decode_fun.(fetch_bytes_fun, state.buffer, _compressor = nil, _rest_fun = & &1) do
{:ok, frames, rest} ->
state =
Enum.reduce(frames, state, fn frame, acc ->
change_event = state.protocol_module.decode_response(frame)
handle_change_event(acc, change_event)
end)

consume_new_data(%__MODULE__{state | buffer: rest})

{:error, _reason} ->
Expand Down
41 changes: 8 additions & 33 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ defmodule Xandra.Connection do
alias Xandra.Simple
alias Xandra.Transport

require Logger

@behaviour :gen_statem

@forced_transport_options [packet: :raw, mode: :binary, active: false]
Expand Down Expand Up @@ -182,7 +180,6 @@ defmodule Xandra.Connection do
# This is in an anonymous function so that we can use it in a Telemetry span.
fun = fn ->
with :ok <- Transport.send(transport, payload),
Logger.debug("Sent query on stream ID #{stream_id}, will wait for an answer"),
{:ok, %Frame{} = frame} <-
receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do
case protocol_module.decode_response(frame, query, options) do
Expand Down Expand Up @@ -344,8 +341,7 @@ defmodule Xandra.Connection do
in_flight_requests: %{},
timed_out_ids: %{},
current_keyspace: nil,
buffer: <<>>,
free_stream_ids: Enum.to_list(1..@max_cassandra_stream_id)
buffer: <<>>
]

## Callbacks
Expand Down Expand Up @@ -559,7 +555,6 @@ defmodule Xandra.Connection do

def disconnected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.free_stream_ids, &[stream_id | &1])
{:keep_state, data}
end

Expand Down Expand Up @@ -595,7 +590,7 @@ defmodule Xandra.Connection do
{:error, reason} -> disconnect(data, reason)
end
else
case Transport.setopts(data.transport, active: true) do
case Transport.setopts(data.transport, active: :once) do
:ok -> {:keep_state_and_data, {{:timeout, :reconnect}, :infinity, nil}}
{:error, reason} -> disconnect(data, reason)
end
Expand All @@ -615,8 +610,7 @@ defmodule Xandra.Connection do
# sends us the corresponding response, we can still route that C* response to the
# alias and it'll not go anywhere and not cause any issues.
def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do
# stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids)
{stream_id, data} = next_stream_id(data)
stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids)

response =
checked_out_state(
Expand Down Expand Up @@ -650,13 +644,9 @@ defmodule Xandra.Connection do
end

def connected(:info, message, data) when is_data_message(data.transport, message) do
:ok = Transport.setopts(data.transport, active: true)
:ok = Transport.setopts(data.transport, active: :once)
{_mod, _socket, bytes} = message

Logger.debug(
"Received data:\n#{inspect(bytes, limit: :infinity, iexprintable_limit: :infinity)}"
)

data = update_in(data.buffer, &(&1 <> bytes))
handle_new_bytes(data)
end
Expand All @@ -675,9 +665,7 @@ defmodule Xandra.Connection do
end

def connected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do
Logger.debug("Releasing stream ID #{stream_id}")
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.free_stream_ids, &[stream_id | &1])
{:keep_state, data}
end

Expand Down Expand Up @@ -775,19 +763,16 @@ defmodule Xandra.Connection do
data.compressor,
_rest_fun = & &1
) do
{:ok, frame, rest} ->
Logger.debug("Received full frame")
data = handle_frame(%__MODULE__{data | buffer: rest}, frame)
{:ok, frames, rest} ->
data = Enum.reduce(frames, %__MODULE__{data | buffer: rest}, &handle_frame/2)

if rest != "" do
handle_new_bytes(data)
else
Logger.debug("No more frames to decode")
{:keep_state, data}
end

{:error, :insufficient_data} ->
Logger.debug("Received a packet that did not contain a full frame")
{:keep_state, data}

{:error, reason} ->
Expand All @@ -796,13 +781,9 @@ defmodule Xandra.Connection do
end

defp handle_frame(
%__MODULE__{timed_out_ids: timed_out_ids} = data,
%Frame{stream_id: stream_id} = frame
%Frame{stream_id: stream_id} = frame,
%__MODULE__{timed_out_ids: timed_out_ids} = data
) do
Logger.debug(
"Received frame with stream ID #{stream_id}, in_flight_req is: #{inspect(data.in_flight_requests[stream_id])}"
)

case pop_in(data.in_flight_requests[stream_id]) do
# There is no in-flight req for this response frame, BUT there is a request
# for it that timed out on the caller's side. Let's just emit a
Expand Down Expand Up @@ -939,12 +920,6 @@ defmodule Xandra.Connection do
end
end

defp next_stream_id(data) do
[next_id | rest] = data.free_stream_ids
data = %__MODULE__{data | free_stream_ids: rest}
{next_id, data}
end

defp random_free_stream_id(in_flight_requests, timed_out_ids) do
random_id = Enum.random(1..@max_cassandra_stream_id)

Expand Down
12 changes: 9 additions & 3 deletions lib/xandra/connection/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ defmodule Xandra.Connection.Utils do
do: {:ok, binary, fetch_state}
end

case protocol_format do
:v4_or_less -> Frame.decode_v4(fetch_bytes_fun, :no_fetch_state, compressor)
:v5_or_more -> Frame.decode_v5(fetch_bytes_fun, :no_fetch_state, compressor)
decode_fun =
case protocol_format do
:v4_or_less -> &Frame.decode_v4/3
:v5_or_more -> &Frame.decode_v5/3
end

case decode_fun.(fetch_bytes_fun, :no_fetch_state, compressor) do
{:ok, [frame], rest} -> {:ok, frame, rest}
{:error, reason} -> {:error, reason}
end
end

Expand Down
23 changes: 16 additions & 7 deletions lib/xandra/frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ defmodule Xandra.Frame do
(fetch_state, pos_integer() -> {:ok, binary(), fetch_state} | {:error, reason}),
fetch_state,
module() | nil
) :: {:ok, t(), binary()} | {:error, reason}
) :: {:ok, [t(), ...], binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode(
protocol_module,
Expand All @@ -313,7 +313,7 @@ defmodule Xandra.Frame do
fetch_state,
module() | nil,
(fetch_state -> binary())
) :: {:ok, t(), binary()} | {:error, reason}
) :: {:ok, [t(), ...], binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode_v4(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end)
when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do
Expand All @@ -322,11 +322,11 @@ defmodule Xandra.Frame do
with {:ok, header, fetch_state} <- fetch_bytes_fun.(fetch_state, length) do
case body_length(header) do
0 ->
{:ok, decode(header), rest_fun.(fetch_state)}
{:ok, [decode(header)], rest_fun.(fetch_state)}

body_length ->
with {:ok, body, bytes_state} <- fetch_bytes_fun.(fetch_state, body_length),
do: {:ok, decode(header, body, compressor), rest_fun.(bytes_state)}
do: {:ok, [decode(header, body, compressor)], rest_fun.(bytes_state)}
end
end
end
Expand All @@ -336,17 +336,26 @@ defmodule Xandra.Frame do
fetch_state,
module() | nil,
(fetch_state -> binary())
) :: {:ok, t(), rest :: binary()} | {:error, reason}
) :: {:ok, [t(), ...], rest :: binary()} | {:error, reason}
when fetch_state: term(), reason: term()
def decode_v5(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end)
when is_function(fetch_bytes_fun, 2) and is_atom(compressor) do
with {:ok, envelope, rest} <-
decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun) do
{frame, _ignored_rest} = decode_from_binary(envelope, compressor)
{:ok, frame, rest}
frames = decode_all_v5_frames_in_envelope(envelope, compressor, _acc = [])
{:ok, frames, rest}
end
end

defp decode_all_v5_frames_in_envelope("", _compressor, acc) do
Enum.reverse(acc)
end

defp decode_all_v5_frames_in_envelope(envelope, compressor, acc) do
{frame, rest} = decode_from_binary(envelope, compressor)
decode_all_v5_frames_in_envelope(rest, compressor, [frame | acc])
end

# Made public for testing.
@doc false
def decode_v5_wrapper(fetch_bytes_fun, fetch_state, compressor, rest_fun \\ fn _ -> "" end) do
Expand Down
26 changes: 25 additions & 1 deletion test/xandra/frame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ defmodule Xandra.FrameTest do
|> Frame.encode(protocol_module)
|> IO.iodata_to_binary()

assert {:ok, redecoded_frame, _rest = ""} =
assert {:ok, [redecoded_frame], _rest = ""} =
Frame.decode_v5(&fetch_bytes_from_binary/2, encoded, _compressor = nil)

assert redecoded_frame == frame
Expand Down Expand Up @@ -240,6 +240,30 @@ defmodule Xandra.FrameTest do
end
end

describe "decode/5" do
# Regression for: https://issues.apache.org/jira/browse/CASSANDRA-19753
@tag :regression
test "can decode multiple envelopes in a single frame" do
payload =
<<144, 0, 2, 138, 218, 155, 133, 0, 0, 2, 8, 0, 0, 0, 63, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0,
1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97, 108, 0, 12, 99, 108, 117,
115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1, 0, 0, 0, 12, 116, 101,
115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 133, 0, 0, 1, 8, 0, 0, 0, 63, 0, 0, 0,
2, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 115, 121, 115, 116, 101, 109, 0, 5, 108, 111, 99, 97,
108, 0, 12, 99, 108, 117, 115, 116, 101, 114, 95, 110, 97, 109, 101, 0, 13, 0, 0, 0, 1,
0, 0, 0, 12, 116, 101, 115, 116, 95, 99, 108, 117, 115, 116, 101, 114, 40, 65, 100, 21>>

assert {:ok, [%Frame{stream_id: 2}, %Frame{stream_id: 1}], _rest = ""} =
Frame.decode(
Xandra.Protocol.V5,
&fetch_bytes_from_binary/2,
payload,
_compressor = nil,
_rest_fun = & &1
)
end
end

defp kind_generator do
member_of([
:startup,
Expand Down
36 changes: 9 additions & 27 deletions test/xandra_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -346,35 +346,17 @@ defmodule XandraTest do
end
end

@tag start_conn: false
# Regression for timeouts on native protocol v5:
# https://github.com/whatyouhide/xandra/issues/356
@tag :regression
test "concurrent requests on a single connection", %{start_options: start_options} do
conn =
start_supervised!({Xandra, start_options ++ [max_concurrent_requests_per_connection: 2]})

max_requests = 5

Xandra.Telemetry.attach_default_handler()
Xandra.Telemetry.attach_debug_handler()
Logger.configure(level: :debug)

results =
1..max_requests
|> Task.async_stream(
fn _i ->
Xandra.execute(conn, "SELECT cluster_name FROM system.local", [], timeout: 5000)
end,
timeout: 15_000,
max_concurrency: 2
)
|> Enum.map(fn {:ok, result} -> result end)

for result <- results do
test "concurrent requests on a single connection", %{conn: conn} do
1..5
|> Task.async_stream(fn _i ->
Xandra.execute(conn, "SELECT cluster_name FROM system.local", [], timeout: 5000)
end)
|> Enum.each(fn {:ok, result} ->
assert {:ok, %Xandra.Page{}} = result
end
after
:telemetry.detach("xandra-default-telemetry-handler")
:telemetry.detach("xandra-debug-telemetry-handler")
end)
end

def configure_fun(options, original_start_options, pid, ref) do
Expand Down

0 comments on commit 82523c3

Please sign in to comment.