Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up timeouts in the conn and add :max_concurrent_requests_per_connection #358

Merged
merged 7 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/xandra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,20 @@ defmodule Xandra do
*Available since v0.18.0*.
"""
],
max_concurrent_requests_per_connection: [
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here so that we can test it.

type: :pos_integer,
default: 100,
doc: """
The maximum number of requests that can be in flight at any given time on a single
connection. Xandra "multiplexes" requests on a single connection, since that is allowed
by the Cassandra protocol (via the use of stream IDs to identify in-flight requests on
a particular connection). Increasing this option means that a single connection will
handle more requests, so you can potentially lower the number of total connections in
your connection pool. However, the more requests are in flight on a single connection,
the more work that connection will have to do to decode and route requests and responses.
*Available since 0.19.0*.
"""
],
name: [
type: :any,
doc: """
Expand Down
181 changes: 100 additions & 81 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@ defmodule Xandra.Connection do
@behaviour :gen_statem

@forced_transport_options [packet: :raw, mode: :binary, active: false]
@max_concurrent_requests 5000

# How old a timed-out stream ID can be before we flush it.
@max_timed_out_stream_id_age_in_millisec :timer.minutes(5)

# How often to clean up timed-out requests.
@flush_timed_out_stream_id_interval_millisec :timer.seconds(30)

# This is the max stream ID value that we can use (a [short] in Cassandra).
@max_cassandra_stream_id 32_768
@timed_out_stream_id_timeout_minutes 5
@flush_timed_out_stream_id_interval 30 * 1000

# This record is used internally when we check out a "view" of the state of
# the connection. This holds all the necessary info to encode queries and more.
Expand Down Expand Up @@ -91,7 +96,7 @@ defmodule Xandra.Connection do
:telemetry.span([:xandra, :prepare_query], metadata, fn ->
with :ok <- send_prepare_frame(state, prepared, options),
{:ok, %Frame{} = frame} <-
receive_response_frame(conn_pid, req_alias, state, timeout, metadata) do
receive_response_frame(conn_pid, req_alias, state, timeout) do
case protocol_module.decode_response(frame, prepared, options) do
{%Prepared{} = prepared, warnings} ->
Prepared.Cache.insert(prepared_cache, prepared)
Expand Down Expand Up @@ -172,13 +177,7 @@ defmodule Xandra.Connection do
fun = fn ->
with :ok <- Transport.send(transport, payload),
{:ok, %Frame{} = frame} <-
receive_response_frame(
conn_pid,
req_alias,
checked_out_state,
timeout,
telemetry_meta
) do
receive_response_frame(conn_pid, req_alias, checked_out_state, timeout) do
case protocol_module.decode_response(frame, query, options) do
{%_{} = response, warnings} ->
maybe_execute_telemetry_for_warnings(checked_out_state, conn_pid, query, warnings)
Expand Down Expand Up @@ -250,8 +249,7 @@ defmodule Xandra.Connection do
conn_pid,
req_alias,
checked_out_state(atom_keys?: atom_keys?, stream_id: stream_id),
timeout,
telemetry_metadata
timeout
) do
receive do
{^req_alias, {:ok, %Frame{} = frame}} ->
Expand All @@ -265,8 +263,7 @@ defmodule Xandra.Connection do
{:error, {:connection_crashed, reason}}
after
timeout ->
:telemetry.execute([:xandra, :client_timeout], %{}, telemetry_metadata)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this, clients can measure this themselves for now if they want. I want to first run this code in your codebase and see how things go before making this a public Telemetry event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can test this. And yeah, it is measurable from the client ofc. but I thought that it might be an interesting metric.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harunzengin it probably will be in the future, but I want to avoid committing to an API for the telemetry event until we're sure.

:gen_statem.cast(conn_pid, {:timed_out_id, stream_id})
:gen_statem.cast(conn_pid, {:request_timed_out_at_caller, stream_id})
{:error, :timeout}
end
end
Expand All @@ -276,6 +273,11 @@ defmodule Xandra.Connection do
:gen_statem.call(conn, :get_transport)
end

# Made public for testing. Only meant to be used in tests.
def trigger_flush_timed_out_stream_ids(conn) do
:gen_statem.call(conn, :flush_timed_out_stream_ids)
end

## Data

# [short] - a 2-byte integer, which clients can only use as a *positive* integer (so
Expand All @@ -296,8 +298,9 @@ defmodule Xandra.Connection do
current_keyspace: String.t() | nil,
default_consistency: atom(),
disconnection_reason: term(),
max_concurrent_requests: pos_integer(),
in_flight_requests: %{optional(stream_id()) => term()},
timed_out_ids: %{optional(stream_id()) => DateTime.t()},
timed_out_ids: %{optional(stream_id()) => integer()},
options: keyword(),
original_options: keyword(),
peername: {:inet.ip_address(), :inet.port_number()},
Expand All @@ -319,6 +322,7 @@ defmodule Xandra.Connection do
:connection_name,
:default_consistency,
:disconnection_reason,
:max_concurrent_requests,
:options,
:original_options,
:peername,
Expand All @@ -344,7 +348,7 @@ defmodule Xandra.Connection do

actions = [
{:next_event, :internal, :connect},
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}
flush_timed_out_stream_ids_timeout_action()
]

{:ok, :disconnected, data, actions}
Expand All @@ -368,11 +372,22 @@ defmodule Xandra.Connection do
send_reply(req_alias, {:error, :disconnected})
end)

data = put_in(data.in_flight_requests, %{})
# Reset in-flight requests and timed out stream IDs. We just disconnected, so all the
# in-flight requests (including the timed-out ones) are now invalid and the server should
# have killed them anyway.
data = %__MODULE__{data | in_flight_requests: %{}, timed_out_ids: %{}}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, we don't have to clean things up when disconnected.


if data.backoff do
{backoff_time, data} = get_and_update_in(data.backoff, &Backoff.backoff/1)
{:keep_state, data, {{:timeout, :reconnect}, backoff_time, _content = nil}}

# Set a reconnection timer and cancel the timer that flushes timed out stream IDs,
# since we just emptied them.
actions = [
{{:timeout, :reconnect}, backoff_time, _content = nil},
{{:timeout, :flush_timed_out_stream_ids}, :infinity, nil}
]

{:keep_state, data, actions}
else
{:stop, reason}
end
Expand Down Expand Up @@ -408,6 +423,7 @@ defmodule Xandra.Connection do
address: address,
port: port,
connect_timeout: Keyword.fetch!(options, :connect_timeout),
max_concurrent_requests: Keyword.fetch!(options, :max_concurrent_requests_per_connection),
connection_name: Keyword.get(options, :name),
cluster_pid: Keyword.get(options, :cluster_pid),
protocol_version: data.protocol_version || Keyword.get(options, :protocol_version),
Expand Down Expand Up @@ -534,30 +550,11 @@ defmodule Xandra.Connection do
{:keep_state, data}
end

def disconnected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

{:keep_state, data}
end

def disconnected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()

data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
# The caller notified the conn that, on its side, the request timed out. However,
# here we're disconnected so we really don't need to do anything as there are no
# in-flight requests or timed-out requests to clean up.
def disconnected(:cast, {:request_timed_out_at_caller, _stream_id}, %__MODULE__{}) do
:keep_state_and_data
end

## "Connected" state
Expand Down Expand Up @@ -592,13 +589,18 @@ defmodule Xandra.Connection do
end
end

def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{
in_flight_requests: in_flight_requests
})
when map_size(in_flight_requests) == @max_concurrent_requests do
# We reached the max number of in-flight requests, so we don't do anything and just
# return an error to the caller.
def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{} = data)
when map_size(data.in_flight_requests) == data.max_concurrent_requests do
{:keep_state_and_data, {:reply, from, {:error, :too_many_concurrent_requests}}}
end

# When we check out the state to a caller so that that caller can perform a request,
# we don't need to *monitor* that caller. This is because the caller is identified
# by an alias (Process.alias/1). Even if the caller dies, and only *after* C*
# sends us the corresponding response, we can still route that C* response to the
# alias and it'll not go anywhere and not cause any issues.
def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do
stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids)

Expand All @@ -622,10 +624,17 @@ defmodule Xandra.Connection do
{:keep_state, data, {:reply, from, {:ok, response}}}
end

# Only used in tests.
def connected({:call, from}, :get_transport, %__MODULE__{transport: transport}) do
{:keep_state_and_data, {:reply, from, {:ok, transport}}}
end

# Only used in tests.
def connected({:call, from}, :flush_timed_out_stream_ids, %__MODULE__{} = data) do
{:keep_state, data, actions} = connected({:timeout, :flush_timed_out_stream_ids}, nil, data)
{:keep_state, data, List.wrap(actions) ++ [{:reply, from, :ok}]}
end

def connected(:info, message, data) when is_data_message(data.transport, message) do
:ok = Transport.setopts(data.transport, active: :once)
{_mod, _socket, bytes} = message
Expand All @@ -651,30 +660,31 @@ defmodule Xandra.Connection do
{:keep_state, data}
end

def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
# The caller is notifying the connection that the request (on stream_id) timed
# out on its side (that is, the caller reached its "after" in the receive block).
# We need to remove the stream ID from the in-flight requests but still keep
# track of it, because C* might still send us a response for that query (and when it
# does, we will throw it away and free the timed-out stream ID).
#
# C* does not support CANCELING queries, by the way, otherwise that's what we'd do here.
def connected(:cast, {:request_timed_out_at_caller, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

data = put_in(data.timed_out_ids[stream_id], System.system_time(:millisecond))
{:keep_state, data}
end

def connected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()
def connected({:timeout, :flush_timed_out_stream_ids}, _content, %__MODULE__{} = data) do
now = System.system_time(:millisecond)
whatyouhide marked this conversation as resolved.
Show resolved Hide resolved

data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)
new_timed_out_ids =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With 4e2ac5e, I get the following stacktrace, just trying to execute a query:

** (exit) exited in: :gen_statem.call(#PID<0.933.0>, {:checkout_state_for_next_request, #Reference<0.0.121219.3000101197.1395982337.31962>}, :infinity)
    ** (EXIT) an exception was raised:
        ** (BadMapError) expected a map, got: []
            (erts 14.2.1) :erlang.is_map_key(21374, [])
            (xandra 0.18.1) lib/xandra/connection.ex:900: Xandra.Connection.random_free_stream_id/2
            (xandra 0.18.1) lib/xandra/connection.ex:600: Xandra.Connection.connected/3
            (stdlib 5.2) gen_statem.erl:1395: :gen_statem.loop_state_callback/11
            (stdlib 5.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
    (stdlib 5.2) gen.erl:246: :gen.do_call/4
    (stdlib 5.2) gen_statem.erl:923: :gen_statem.call/3
    (xandra 0.18.1) lib/xandra/connection.ex:158: Xandra.Connection.execute/4
    (xandra 0.18.1) lib/xandra/retry_strategy.ex:309: Xandra.RetryStrategy.run_on_cluster/5

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it seems that preparing a query doesn't work and fails with :timeout

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harunzengin I fixed the bug and added a regression test for that, it wasn't the nicest test to add but I’m happy we have a test for this now 🙃

As for the prepare, I’m not sure what you mean. I have a test in there that exercises Xandra.prepare/3 and it's passing. Do you have an example of when preparing a query fails?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested this out, the timeouts in prepared were probably related to the first case, it is resolved now.

for {id, ts} <- data.timed_out_ids,
now - ts < @max_timed_out_stream_id_age_in_millisec,
into: %{},
do: {id, ts}

data = %__MODULE__{data | timed_out_ids: new_timed_out_ids}

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
{:keep_state, data, flush_timed_out_stream_ids_timeout_action()}
end

## Helpers
Expand Down Expand Up @@ -757,25 +767,30 @@ defmodule Xandra.Connection do
end
end

defp handle_frame(%__MODULE__{} = data, %Frame{stream_id: stream_id} = frame) do
defp handle_frame(
%__MODULE__{timed_out_ids: timed_out_ids} = data,
%Frame{stream_id: stream_id} = frame
) do
case pop_in(data.in_flight_requests[stream_id]) do
{nil, data} ->
if Map.has_key?(data.timed_out_ids, stream_id) do
:telemetry.execute(
[:xandra, :timed_out_response],
telemetry_meta(data, %{stream_id: stream_id})
)
# There is no in-flight req for this response frame, BUT there is a request
# for it that timed out on the caller's side. Let's just emit a
{nil, data} when is_map_key(timed_out_ids, stream_id) ->
:telemetry.execute(
[:xandra, :debug, :received_timed_out_response],
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, let's keep this a "debug" event so that you folks can measure this but we don't have to make it part of the public API.

%{},
telemetry_meta(data, %{stream_id: stream_id})
)
Comment on lines +777 to +782
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored to flatten the nesting.


update_in(data.timed_out_ids, &Map.delete(&1, stream_id))
else
raise """
internal error in Xandra connection, we received a frame from the server with \
stream ID #{stream_id}, but there was no in-flight request for this stream ID. \
The frame is:
%__MODULE__{data | timed_out_ids: Map.delete(timed_out_ids, stream_id)}

#{inspect(frame)}
"""
end
{nil, _data} ->
raise """
internal error in Xandra connection, we received a frame from the server with \
stream ID #{stream_id}, but there was no in-flight request for this stream ID. \
The frame is:

#{inspect(frame)}
"""

{req_alias, data} ->
send_reply(req_alias, {:ok, frame})
Expand Down Expand Up @@ -901,4 +916,8 @@ defmodule Xandra.Connection do
random_id
end
end

defp flush_timed_out_stream_ids_timeout_action do
{{:timeout, :flush_timed_out_stream_ids}, @flush_timed_out_stream_id_interval_millisec, nil}
end
end
13 changes: 13 additions & 0 deletions lib/xandra/connection_error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ defmodule Xandra.ConnectionError do
"request timeout"
end

defp format_reason(:too_many_concurrent_requests) do
"""
this connection has too many requests in flight; to fix this, consider:

1. increasing the size of the connection pool so that you'll have more
connections available and you'll be able to spread the request load
over more connections
2. increasing the maximum number of allowed in-flight requests per connection,
which can be configured through the :max_concurrent_requests_per_connection
option when starting connections (defaults to 100)
"""
end

defp format_reason({:cluster, :not_connected}) do
"not connected to any of the nodes"
end
Expand Down
Loading
Loading