Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak in Xandra.Connection #355

Merged
merged 27 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
06e89b0
demonitor processes on failure
harunzengin Jan 31, 2024
94ac336
Release stream id on timeout
harunzengin Feb 7, 2024
8346b77
remove todo
harunzengin Feb 9, 2024
73b685a
wait for timed out stream ids
harunzengin Feb 15, 2024
a71b67f
fix match error
harunzengin Feb 15, 2024
465fa6d
finish experiment
harunzengin Feb 15, 2024
bc22c4a
hold free stream ids in state
harunzengin Feb 19, 2024
0bac294
release stream id
harunzengin Feb 19, 2024
d619ad8
Decrease possible ids
harunzengin Feb 20, 2024
4d4f58e
Decrease possible ids further
harunzengin Feb 20, 2024
0d90baa
try getting first stream id
harunzengin Feb 20, 2024
f61b7c2
use first id to improve performance
harunzengin Feb 20, 2024
3cedab5
Use Enum.random with range to run in constant time
harunzengin Feb 21, 2024
f78afff
Remove free_stream_id_reference
harunzengin Feb 21, 2024
3c9dc73
Release timed out stream ids after 30 minutes
harunzengin Feb 21, 2024
9211985
Remove uuid lib
harunzengin Feb 21, 2024
63dc88d
Merge branch 'attempt-fix-memory-leak-experiments' into attempt-fix-m…
harunzengin Feb 21, 2024
13c8dce
Add telemetry event for timed out response
harunzengin Feb 21, 2024
6025706
remove uuid from mix.lock
harunzengin Feb 21, 2024
47bce50
fix timeout to 30 minutes
harunzengin Feb 21, 2024
db323fb
reduce timeout to 5 minutes
harunzengin Feb 26, 2024
760e250
semantically-correct error message
harunzengin Feb 26, 2024
c04eee9
Flush timed out stream ids in 1 minute intervals
harunzengin Feb 26, 2024
bfbefb2
Flush every 30 seconds
harunzengin Feb 26, 2024
3a5f2b2
Add client_timeout telemetry event
harunzengin Feb 27, 2024
a0e23c6
Empty-Commit to retrigger pipeline
harunzengin Feb 27, 2024
ece50de
Fix older MapSet reference
harunzengin Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 125 additions & 32 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ defmodule Xandra.Connection do

@forced_transport_options [packet: :raw, mode: :binary, active: false]
@max_concurrent_requests 5000
@max_cassandra_stream_id 32_768
@timed_out_stream_id_timeout_minutes 5
@flush_timed_out_stream_id_interval 30 * 1000

# This record is used internally when we check out a "view" of the state of
# the connection. This holds all the necessary info to encode queries and more.
Expand Down Expand Up @@ -87,7 +90,8 @@ defmodule Xandra.Connection do

:telemetry.span([:xandra, :prepare_query], metadata, fn ->
with :ok <- send_prepare_frame(state, prepared, options),
{:ok, %Frame{} = frame} <- receive_response_frame(req_alias, state, timeout) do
{:ok, %Frame{} = frame} <-
receive_response_frame(conn_pid, req_alias, state, timeout, metadata) do
case protocol_module.decode_response(frame, prepared, options) do
{%Prepared{} = prepared, warnings} ->
Prepared.Cache.insert(prepared_cache, prepared)
Expand Down Expand Up @@ -159,11 +163,22 @@ defmodule Xandra.Connection do
timeout = Keyword.fetch!(options, :timeout)
payload = query_mod.encode(query, params, options)

telemetry_meta =
checked_out_state
|> telemetry_meta(conn_pid, %{query: query})
|> Map.put(:extra_metadata, options[:telemetry_metadata])

# This is in an anonymous function so that we can use it in a Telemetry span.
fun = fn ->
with :ok <- Transport.send(transport, payload),
{:ok, %Frame{} = frame} <-
receive_response_frame(req_alias, checked_out_state, timeout) do
receive_response_frame(
conn_pid,
req_alias,
checked_out_state,
timeout,
telemetry_meta
) do
case protocol_module.decode_response(frame, query, options) do
{%_{} = response, warnings} ->
maybe_execute_telemetry_for_warnings(checked_out_state, conn_pid, query, warnings)
Expand All @@ -190,11 +205,6 @@ defmodule Xandra.Connection do
end
end

telemetry_meta =
checked_out_state
|> telemetry_meta(conn_pid, %{query: query})
|> Map.put(:extra_metadata, options[:telemetry_metadata])

:telemetry.span([:xandra, :execute_query], telemetry_meta, fn ->
{fun.(), telemetry_meta}
end)
Expand Down Expand Up @@ -236,7 +246,13 @@ defmodule Xandra.Connection do
}
end

defp receive_response_frame(req_alias, checked_out_state(atom_keys?: atom_keys?), timeout) do
defp receive_response_frame(
conn_pid,
req_alias,
checked_out_state(atom_keys?: atom_keys?, stream_id: stream_id),
timeout,
telemetry_metadata
) do
receive do
{^req_alias, {:ok, %Frame{} = frame}} ->
frame = %Frame{frame | atom_keys?: atom_keys?}
Expand All @@ -249,7 +265,8 @@ defmodule Xandra.Connection do
{:error, {:connection_crashed, reason}}
after
timeout ->
Process.demonitor(req_alias, [:flush])
:telemetry.execute([:xandra, :client_timeout], %{}, telemetry_metadata)
:gen_statem.cast(conn_pid, {:timed_out_id, stream_id})
{:error, :timeout}
end
end
Expand Down Expand Up @@ -279,8 +296,8 @@ defmodule Xandra.Connection do
current_keyspace: String.t() | nil,
default_consistency: atom(),
disconnection_reason: term(),
free_stream_ids: MapSet.t(stream_id()),
in_flight_requests: %{optional(stream_id()) => term()},
timed_out_ids: %{optional(stream_id()) => DateTime.t()},
options: keyword(),
original_options: keyword(),
peername: {:inet.ip_address(), :inet.port_number()},
Expand Down Expand Up @@ -310,8 +327,8 @@ defmodule Xandra.Connection do
:protocol_module,
:protocol_version,
:transport,
free_stream_ids: MapSet.new(1..@max_concurrent_requests),
in_flight_requests: %{},
timed_out_ids: %{},
current_keyspace: nil,
buffer: <<>>
]
Expand All @@ -324,7 +341,13 @@ defmodule Xandra.Connection do
@impl true
def init(options) do
data = %__MODULE__{original_options: options, configure: Keyword.get(options, :configure)}
{:ok, :disconnected, data, {:next_event, :internal, :connect}}

actions = [
{:next_event, :internal, :connect},
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}
]

{:ok, :disconnected, data, actions}
end

## "Disconnected" state
Expand All @@ -341,11 +364,9 @@ defmodule Xandra.Connection do
send(data.cluster_pid, {:xandra, :disconnected, data.peername, self()})
end

data =
Enum.reduce(data.in_flight_requests, data, fn {stream_id, req_alias}, data_acc ->
send_reply(req_alias, {:error, :disconnected})
update_in(data_acc.free_stream_ids, &MapSet.put(&1, stream_id))
end)
Enum.each(data.in_flight_requests, fn {_stream_id, req_alias} ->
send_reply(req_alias, {:error, :disconnected})
end)
Comment on lines +367 to +369
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m confused: now, we send the reply to the caller but we don't update the in_flight_requests, which we were resetting to %{} before. How does this work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I must've overseen this, I deleted it in another iteration.


data = put_in(data.in_flight_requests, %{})

Expand Down Expand Up @@ -509,11 +530,36 @@ defmodule Xandra.Connection do
end

def disconnected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
{:keep_state, data}
end

def disconnected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

{:keep_state, data}
end

def disconnected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()

data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
end

## "Connected" state

def connected(:enter, :disconnected, %__MODULE__{} = data) do
Expand Down Expand Up @@ -546,12 +592,15 @@ defmodule Xandra.Connection do
end
end

def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{
in_flight_requests: in_flight_requests
})
when map_size(in_flight_requests) == @max_concurrent_requests do
{:keep_state_and_data, {:reply, from, {:error, :too_many_concurrent_requests}}}
end

def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do
{stream_id, data} =
get_and_update_in(data.free_stream_ids, fn ids ->
id = Enum.at(ids, 0)
{id, MapSet.delete(ids, id)}
end)
stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids)

response =
checked_out_state(
Expand Down Expand Up @@ -598,11 +647,36 @@ defmodule Xandra.Connection do
end

def connected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
{:keep_state, data}
end

def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

{:keep_state, data}
end

def connected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()

data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
end

## Helpers

defp startup_connection(
Expand Down Expand Up @@ -685,18 +759,27 @@ defmodule Xandra.Connection do

defp handle_frame(%__MODULE__{} = data, %Frame{stream_id: stream_id} = frame) do
case pop_in(data.in_flight_requests[stream_id]) do
{nil, _data} ->
raise """
internal error in Xandra connection, we received a frame from the server with \
stream ID #{stream_id}, but there was no in-flight request for this stream ID. \
The frame is:
{nil, data} ->
if Map.has_key?(data.timed_out_ids, stream_id) do
:telemetry.execute(
[:xandra, :timed_out_response],
telemetry_meta(data, %{stream_id: stream_id})
)

update_in(data.timed_out_ids, &Map.delete(&1, stream_id))
else
raise """
internal error in Xandra connection, we received a frame from the server with \
stream ID #{stream_id}, but there was no in-flight request for this stream ID. \
The frame is:

#{inspect(frame)}
"""
#{inspect(frame)}
"""
end

{req_alias, data} ->
send_reply(req_alias, {:ok, frame})
update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
data
end
end

Expand Down Expand Up @@ -808,4 +891,14 @@ defmodule Xandra.Connection do
:telemetry.execute([:xandra, :server_warnings], %{warnings: warnings}, metadata)
end
end

defp random_free_stream_id(in_flight_requests, timed_out_ids) do
random_id = Enum.random(1..@max_cassandra_stream_id)

if Map.has_key?(timed_out_ids, random_id) or Map.has_key?(in_flight_requests, random_id) do
random_free_stream_id(in_flight_requests, timed_out_ids)
else
random_id
end
end
end
11 changes: 11 additions & 0 deletions lib/xandra/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ defmodule Xandra.Telemetry do
[:xandra, :prepared_cache, :miss],
[:xandra, :prepare_query, :stop],
[:xandra, :execute_query, :stop],
[:xandra, :client_timeout],
[:xandra, :timed_out_response],
[:xandra, :server_warnings],
[:xandra, :cluster, :change_event],
[:xandra, :cluster, :control_connection, :connected],
Expand Down Expand Up @@ -170,6 +172,15 @@ defmodule Xandra.Telemetry do
[:server_warnings] ->
Logger.warning("Received warnings: #{inspect(measurements.warnings)}", logger_meta)

[:client_timeout] ->
Logger.error("Client timeout for query: #{inspect(metadata.query)}")

[:timed_out_response] ->
Logger.warning(
"Received response for stream id #{metadata.stream_id}, but request had already timed out",
logger_meta
)

[:prepared_cache, status] when status in [:hit, :miss] ->
query = inspect(metadata.query)
Logger.debug("Prepared cache #{status} for query: #{query}", logger_meta)
Expand Down
Loading