From bee398403a82140779fd49664cc6dc560ac9bc8d Mon Sep 17 00:00:00 2001 From: Andrea Leopardi Date: Wed, 6 Mar 2024 13:25:58 +0100 Subject: [PATCH] Clean up timeouts in the conn and add :max_concurrent_requests_per_connection (#358) --- lib/xandra.ex | 14 +++ lib/xandra/connection.ex | 181 ++++++++++++++++++--------------- lib/xandra/connection_error.ex | 13 +++ test/xandra_test.exs | 110 ++++++++++++++++++++ 4 files changed, 237 insertions(+), 81 deletions(-) diff --git a/lib/xandra.ex b/lib/xandra.ex index 33dae586..3b49474e 100644 --- a/lib/xandra.ex +++ b/lib/xandra.ex @@ -392,6 +392,20 @@ defmodule Xandra do *Available since v0.18.0*. """ ], + max_concurrent_requests_per_connection: [ + type: :pos_integer, + default: 100, + doc: """ + The maximum number of requests that can be in flight at any given time on a single + connection. Xandra "multiplexes" requests on a single connection, since that is allowed + by the Cassandra protocol (via the use of stream IDs to identify in-flight requests on + a particular connection). Increasing this option means that a single connection will + handle more requests, so you can potentially lower the number of total connections in + your connection pool. However, the more requests are in flight on a single connection, + the more work that connection will have to do to decode and route requests and responses. + *Available since 0.19.0*. + """ + ], name: [ type: :any, doc: """ diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index d469b74f..1c284f23 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -18,10 +18,15 @@ defmodule Xandra.Connection do @behaviour :gen_statem @forced_transport_options [packet: :raw, mode: :binary, active: false] - @max_concurrent_requests 5000 + + # How old a timed-out stream ID can be before we flush it. + @max_timed_out_stream_id_age_in_millisec :timer.minutes(5) + + # How often to clean up timed-out requests. + @flush_timed_out_stream_id_interval_millisec :timer.seconds(30) + + # This is the max stream ID value that we can use (a [short] in Cassandra). @max_cassandra_stream_id 32_768 - @timed_out_stream_id_timeout_minutes 5 - @flush_timed_out_stream_id_interval 30 * 1000 # This record is used internally when we check out a "view" of the state of # the connection. This holds all the necessary info to encode queries and more. @@ -91,7 +96,7 @@ defmodule Xandra.Connection do :telemetry.span([:xandra, :prepare_query], metadata, fn -> with :ok <- send_prepare_frame(state, prepared, options), {:ok, %Frame{} = frame} <- - receive_response_frame(conn_pid, req_alias, state, timeout, metadata) do + receive_response_frame(conn_pid, req_alias, state, timeout) do case protocol_module.decode_response(frame, prepared, options) do {%Prepared{} = prepared, warnings} -> Prepared.Cache.insert(prepared_cache, prepared) @@ -172,13 +177,7 @@ defmodule Xandra.Connection do fun = fn -> with :ok <- Transport.send(transport, payload), {:ok, %Frame{} = frame} <- - receive_response_frame( - conn_pid, - req_alias, - checked_out_state, - timeout, - telemetry_meta - ) do + receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do case protocol_module.decode_response(frame, query, options) do {%_{} = response, warnings} -> maybe_execute_telemetry_for_warnings(checked_out_state, conn_pid, query, warnings) @@ -250,8 +249,7 @@ defmodule Xandra.Connection do conn_pid, req_alias, checked_out_state(atom_keys?: atom_keys?, stream_id: stream_id), - timeout, - telemetry_metadata + timeout ) do receive do {^req_alias, {:ok, %Frame{} = frame}} -> @@ -265,8 +263,7 @@ defmodule Xandra.Connection do {:error, {:connection_crashed, reason}} after timeout -> - :telemetry.execute([:xandra, :client_timeout], %{}, telemetry_metadata) - :gen_statem.cast(conn_pid, {:timed_out_id, stream_id}) + :gen_statem.cast(conn_pid, {:request_timed_out_at_caller, stream_id}) {:error, :timeout} end end @@ -276,6 +273,11 @@ defmodule Xandra.Connection do :gen_statem.call(conn, :get_transport) end + # Made public for testing. Only meant to be used in tests. + def trigger_flush_timed_out_stream_ids(conn) do + :gen_statem.call(conn, :flush_timed_out_stream_ids) + end + ## Data # [short] - a 2-byte integer, which clients can only use as a *positive* integer (so @@ -296,8 +298,9 @@ defmodule Xandra.Connection do current_keyspace: String.t() | nil, default_consistency: atom(), disconnection_reason: term(), + max_concurrent_requests: pos_integer(), in_flight_requests: %{optional(stream_id()) => term()}, - timed_out_ids: %{optional(stream_id()) => DateTime.t()}, + timed_out_ids: %{optional(stream_id()) => integer()}, options: keyword(), original_options: keyword(), peername: {:inet.ip_address(), :inet.port_number()}, @@ -319,6 +322,7 @@ defmodule Xandra.Connection do :connection_name, :default_consistency, :disconnection_reason, + :max_concurrent_requests, :options, :original_options, :peername, @@ -344,7 +348,7 @@ defmodule Xandra.Connection do actions = [ {:next_event, :internal, :connect}, - {{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush} + flush_timed_out_stream_ids_timeout_action() ] {:ok, :disconnected, data, actions} @@ -368,11 +372,22 @@ defmodule Xandra.Connection do send_reply(req_alias, {:error, :disconnected}) end) - data = put_in(data.in_flight_requests, %{}) + # Reset in-flight requests and timed out stream IDs. We just disconnected, so all the + # in-flight requests (including the timed-out ones) are now invalid and the server should + # have killed them anyway. + data = %__MODULE__{data | in_flight_requests: %{}, timed_out_ids: %{}} if data.backoff do {backoff_time, data} = get_and_update_in(data.backoff, &Backoff.backoff/1) - {:keep_state, data, {{:timeout, :reconnect}, backoff_time, _content = nil}} + + # Set a reconnection timer and cancel the timer that flushes timed out stream IDs, + # since we just emptied them. + actions = [ + {{:timeout, :reconnect}, backoff_time, _content = nil}, + {{:timeout, :flush_timed_out_stream_ids}, :infinity, nil} + ] + + {:keep_state, data, actions} else {:stop, reason} end @@ -408,6 +423,7 @@ defmodule Xandra.Connection do address: address, port: port, connect_timeout: Keyword.fetch!(options, :connect_timeout), + max_concurrent_requests: Keyword.fetch!(options, :max_concurrent_requests_per_connection), connection_name: Keyword.get(options, :name), cluster_pid: Keyword.get(options, :cluster_pid), protocol_version: data.protocol_version || Keyword.get(options, :protocol_version), @@ -534,30 +550,11 @@ defmodule Xandra.Connection do {:keep_state, data} end - def disconnected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do - data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) - data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now())) - - {:keep_state, data} - end - - def disconnected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do - now = DateTime.utc_now() - - data = - update_in( - data.timed_out_ids, - &Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc -> - if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do - acc - else - Map.put(acc, stream_id, timestamp) - end - end) - ) - - {:keep_state, data, - {{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}} + # The caller notified the conn that, on its side, the request timed out. However, + # here we're disconnected so we really don't need to do anything as there are no + # in-flight requests or timed-out requests to clean up. + def disconnected(:cast, {:request_timed_out_at_caller, _stream_id}, %__MODULE__{}) do + :keep_state_and_data end ## "Connected" state @@ -592,13 +589,18 @@ defmodule Xandra.Connection do end end - def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{ - in_flight_requests: in_flight_requests - }) - when map_size(in_flight_requests) == @max_concurrent_requests do + # We reached the max number of in-flight requests, so we don't do anything and just + # return an error to the caller. + def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{} = data) + when map_size(data.in_flight_requests) == data.max_concurrent_requests do {:keep_state_and_data, {:reply, from, {:error, :too_many_concurrent_requests}}} end + # When we check out the state to a caller so that that caller can perform a request, + # we don't need to *monitor* that caller. This is because the caller is identified + # by an alias (Process.alias/1). Even if the caller dies, and only *after* C* + # sends us the corresponding response, we can still route that C* response to the + # alias and it'll not go anywhere and not cause any issues. def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids) @@ -622,10 +624,17 @@ defmodule Xandra.Connection do {:keep_state, data, {:reply, from, {:ok, response}}} end + # Only used in tests. def connected({:call, from}, :get_transport, %__MODULE__{transport: transport}) do {:keep_state_and_data, {:reply, from, {:ok, transport}}} end + # Only used in tests. + def connected({:call, from}, :flush_timed_out_stream_ids, %__MODULE__{} = data) do + {:keep_state, data, actions} = connected({:timeout, :flush_timed_out_stream_ids}, nil, data) + {:keep_state, data, List.wrap(actions) ++ [{:reply, from, :ok}]} + end + def connected(:info, message, data) when is_data_message(data.transport, message) do :ok = Transport.setopts(data.transport, active: :once) {_mod, _socket, bytes} = message @@ -651,30 +660,31 @@ defmodule Xandra.Connection do {:keep_state, data} end - def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do + # The caller is notifying the connection that the request (on stream_id) timed + # out on its side (that is, the caller reached its "after" in the receive block). + # We need to remove the stream ID from the in-flight requests but still keep + # track of it, because C* might still send us a response for that query (and when it + # does, we will throw it away and free the timed-out stream ID). + # + # C* does not support CANCELING queries, by the way, otherwise that's what we'd do here. + def connected(:cast, {:request_timed_out_at_caller, stream_id}, %__MODULE__{} = data) do data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) - data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now())) - + data = put_in(data.timed_out_ids[stream_id], System.system_time(:millisecond)) {:keep_state, data} end - def connected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do - now = DateTime.utc_now() + def connected({:timeout, :flush_timed_out_stream_ids}, _content, %__MODULE__{} = data) do + now = System.system_time(:millisecond) - data = - update_in( - data.timed_out_ids, - &Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc -> - if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do - acc - else - Map.put(acc, stream_id, timestamp) - end - end) - ) + new_timed_out_ids = + for {id, ts} <- data.timed_out_ids, + now - ts < @max_timed_out_stream_id_age_in_millisec, + into: %{}, + do: {id, ts} + + data = %__MODULE__{data | timed_out_ids: new_timed_out_ids} - {:keep_state, data, - {{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}} + {:keep_state, data, flush_timed_out_stream_ids_timeout_action()} end ## Helpers @@ -757,25 +767,30 @@ defmodule Xandra.Connection do end end - defp handle_frame(%__MODULE__{} = data, %Frame{stream_id: stream_id} = frame) do + defp handle_frame( + %__MODULE__{timed_out_ids: timed_out_ids} = data, + %Frame{stream_id: stream_id} = frame + ) do case pop_in(data.in_flight_requests[stream_id]) do - {nil, data} -> - if Map.has_key?(data.timed_out_ids, stream_id) do - :telemetry.execute( - [:xandra, :timed_out_response], - telemetry_meta(data, %{stream_id: stream_id}) - ) + # There is no in-flight req for this response frame, BUT there is a request + # for it that timed out on the caller's side. Let's just emit a + {nil, data} when is_map_key(timed_out_ids, stream_id) -> + :telemetry.execute( + [:xandra, :debug, :received_timed_out_response], + %{}, + telemetry_meta(data, %{stream_id: stream_id}) + ) - update_in(data.timed_out_ids, &Map.delete(&1, stream_id)) - else - raise """ - internal error in Xandra connection, we received a frame from the server with \ - stream ID #{stream_id}, but there was no in-flight request for this stream ID. \ - The frame is: + %__MODULE__{data | timed_out_ids: Map.delete(timed_out_ids, stream_id)} - #{inspect(frame)} - """ - end + {nil, _data} -> + raise """ + internal error in Xandra connection, we received a frame from the server with \ + stream ID #{stream_id}, but there was no in-flight request for this stream ID. \ + The frame is: + + #{inspect(frame)} + """ {req_alias, data} -> send_reply(req_alias, {:ok, frame}) @@ -901,4 +916,8 @@ defmodule Xandra.Connection do random_id end end + + defp flush_timed_out_stream_ids_timeout_action do + {{:timeout, :flush_timed_out_stream_ids}, @flush_timed_out_stream_id_interval_millisec, nil} + end end diff --git a/lib/xandra/connection_error.ex b/lib/xandra/connection_error.ex index 910db5c3..3dafbb8d 100644 --- a/lib/xandra/connection_error.ex +++ b/lib/xandra/connection_error.ex @@ -75,6 +75,19 @@ defmodule Xandra.ConnectionError do "request timeout" end + defp format_reason(:too_many_concurrent_requests) do + """ + this connection has too many requests in flight; to fix this, consider: + + 1. increasing the size of the connection pool so that you'll have more + connections available and you'll be able to spread the request load + over more connections + 2. increasing the maximum number of allowed in-flight requests per connection, + which can be configured through the :max_concurrent_requests_per_connection + option when starting connections (defaults to 100) + """ + end + defp format_reason({:cluster, :not_connected}) do "not connected to any of the nodes" end diff --git a/test/xandra_test.exs b/test/xandra_test.exs index 2c323481..d22fc9be 100644 --- a/test/xandra_test.exs +++ b/test/xandra_test.exs @@ -56,6 +56,14 @@ defmodule XandraTest do end end + test "validates the :max_concurrent_requests_per_connection option" do + message = ~r{invalid value for :max_concurrent_requests_per_connection option} + + assert_raise NimbleOptions.ValidationError, message, fn -> + Xandra.start_link(max_concurrent_requests_per_connection: 0) + end + end + test "returns an error if the connection is not established" do telemetry_ref = :telemetry_test.attach_event_handlers(self(), [[:xandra, :failed_to_connect]]) @@ -193,6 +201,108 @@ defmodule XandraTest do assert error.reason == :timeout end + + # It's an annoyance to set up support for UDFs in Scylla in CI. + @tag :cassandra_specific + @tag start_conn: false + test "returns an error if the max number of concurrent requests is reached", + %{start_options: start_options, keyspace: keyspace} do + modified_start_options = + Keyword.merge(start_options, + max_concurrent_requests_per_connection: 1, + keyspace: keyspace + ) + + assert {:ok, conn} = start_supervised({Xandra, modified_start_options}) + + # Not ideal, but here it is. + # https://stackoverflow.com/questions/55497473/does-cassandra-have-a-sleep-cql-query + Xandra.execute!(conn, """ + CREATE OR REPLACE FUNCTION #{keyspace}.sleep (time int) + CALLED ON NULL INPUT RETURNS int LANGUAGE java AS + ' + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() < start + time); + return time; + '; + """) + + results = + 1..2 + |> Task.async_stream(fn index -> + {index, Xandra.execute(conn, "SELECT #{keyspace}.sleep(200) FROM system.local")} + end) + |> Enum.map(fn {:ok, result} -> result end) + + # The first and second calls succeeds, but the third call fails because it goes over + # the max concurrent conns. + assert [ + {1, {:ok, %Xandra.Page{}}}, + {2, {:error, %ConnectionError{reason: :too_many_concurrent_requests} = error}} + ] = results + + assert Exception.message(error) =~ "this connection has too many requests in flight" + end + + # It's an annoyance to set up support for UDFs in Scylla in CI. + @tag :cassandra_specific + test "returns an error for requests that time out on the caller but only later on the server", + %{conn: conn, keyspace: keyspace} do + telemetry_ref = + :telemetry_test.attach_event_handlers(self(), [ + [:xandra, :debug, :received_timed_out_response] + ]) + + Xandra.execute!(conn, """ + CREATE OR REPLACE FUNCTION #{keyspace}.sleep (time int) + CALLED ON NULL INPUT RETURNS int LANGUAGE java AS + ' + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() < start + time); + return time; + '; + """) + + :erlang.trace(conn, true, [:receive]) + + server_timeout = 200 + + assert {:error, %ConnectionError{reason: :timeout}} = + Xandra.execute( + conn, + "SELECT #{keyspace}.sleep(#{server_timeout}) FROM system.local", + [], + timeout: div(server_timeout, 5) + ) + + assert_receive {:trace, ^conn, :receive, + {:"$gen_cast", {:request_timed_out_at_caller, stream_id}}} + + assert {:connected, data} = :sys.get_state(conn) + assert map_size(data.timed_out_ids) == 1 + assert %{^stream_id => _ts} = data.timed_out_ids + assert data.in_flight_requests == %{} + + # Now trigger a flush. + assert :ok = Xandra.Connection.trigger_flush_timed_out_stream_ids(conn) + assert {:connected, data_after_flush} = :sys.get_state(conn) + assert data_after_flush.timed_out_ids == data.timed_out_ids + + # Now actually wait for the original request to finish. + assert_receive {[:xandra, :debug, :received_timed_out_response], ^telemetry_ref, %{}, + %{connection: ^conn, stream_id: ^stream_id}}, + 1000 + + assert {:connected, data} = :sys.get_state(conn) + assert data.timed_out_ids == %{} + end + end + + describe "prepare/3" do + test "works as expected", %{conn: conn} do + assert {:ok, prepared} = Xandra.prepare(conn, "SELECT * FROM system.local") + assert {:ok, %Xandra.Page{}} = Xandra.execute(conn, prepared, []) + end end describe "failure handling" do