Skip to content

Commit

Permalink
Merge branch 'attempt-fix-memory-leak-experiments' into attempt-fix-m…
Browse files Browse the repository at this point in the history
…emory-leak
  • Loading branch information
harunzengin committed Feb 21, 2024
2 parents f61b7c2 + 9211985 commit 63dc88d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 18 deletions.
67 changes: 49 additions & 18 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ defmodule Xandra.Connection do

@forced_transport_options [packet: :raw, mode: :binary, active: false]
@max_concurrent_requests 5000
@possible_ids MapSet.new(1..6000)
@max_cassandra_stream_id 32_768
@restore_timed_out_stream_id_timeout 30 * 60 * 60 * 1000

require Logger

Expand Down Expand Up @@ -321,7 +322,6 @@ defmodule Xandra.Connection do
:transport,
in_flight_requests: %{},
timed_out_ids: MapSet.new(),
free_stream_ids: MapSet.new(1..32768),
current_keyspace: nil,
buffer: <<>>
]
Expand Down Expand Up @@ -351,13 +351,9 @@ defmodule Xandra.Connection do
send(data.cluster_pid, {:xandra, :disconnected, data.peername, self()})
end

data =
Enum.reduce(data.in_flight_requests, data, fn {stream_id, req_alias}, data_acc ->
send_reply(req_alias, {:error, :disconnected})
update_in(data_acc.free_stream_ids, &MapSet.put(&1, stream_id))
end)

data = put_in(data.in_flight_requests, %{})
Enum.each(data.in_flight_requests, fn {_stream_id, req_alias} ->
send_reply(req_alias, {:error, :disconnected})
end)

if data.backoff do
{backoff_time, data} = get_and_update_in(data.backoff, &Backoff.backoff/1)
Expand Down Expand Up @@ -519,7 +515,6 @@ defmodule Xandra.Connection do
end

def disconnected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
{:keep_state, data}
end
Expand All @@ -528,6 +523,21 @@ defmodule Xandra.Connection do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id))

actions = [
{{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout,
:restore_timed_out_stream_id}
]

{:keep_state, data, actions}
end

def disconnected(
{:timeout, {:stream_id, stream_id}},
:restore_timed_out_stream_id,
%__MODULE__{} = data
) do
data = update_in(data.timed_out_ids, &MapSet.delete(&1, stream_id))

{:keep_state, data}
end

Expand Down Expand Up @@ -563,19 +573,15 @@ defmodule Xandra.Connection do
end
end

def connected({:call, from}, {:checkout_state_for_next_request, _}, %{
def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{
in_flight_requests: in_flight_requests
})
when map_size(in_flight_requests) == @max_concurrent_requests do
{:keep_state_and_data, {:reply, from, {:error, :too_many_concurrent_connections}}}
end

def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do
{stream_id, data} =
get_and_update_in(data.free_stream_ids, fn ids ->
id = Enum.at(ids, 0)
{id, MapSet.delete(ids, id)}
end)
stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids)

response =
checked_out_state(
Expand Down Expand Up @@ -623,13 +629,28 @@ defmodule Xandra.Connection do

def connected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
{:keep_state, data}
end

def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id))

actions = [
{{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout,
:restore_timed_out_stream_id}
]

{:keep_state, data, actions}
end

def connected(
{:timeout, {:stream_id, stream_id}},
:restore_timed_out_stream_id,
%__MODULE__{} = data
) do
data = update_in(data.timed_out_ids, &MapSet.delete(&1, stream_id))

{:keep_state, data}
end

Expand Down Expand Up @@ -731,7 +752,7 @@ defmodule Xandra.Connection do

{req_alias, data} ->
send_reply(req_alias, {:ok, frame})
update_in(data.free_stream_ids, &MapSet.put(&1, stream_id))
data
end
end

Expand Down Expand Up @@ -843,4 +864,14 @@ defmodule Xandra.Connection do
:telemetry.execute([:xandra, :server_warnings], %{warnings: warnings}, metadata)
end
end

defp random_free_stream_id(in_flight_requests, timed_out_ids) do
random_id = Enum.random(1..@max_cassandra_stream_id)

if MapSet.member?(timed_out_ids, random_id) or Map.has_key?(in_flight_requests, random_id) do
random_free_stream_id(in_flight_requests, timed_out_ids)
else
random_id
end
end
end
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@
"tesla": {:hex, :tesla, "1.7.0", "a62dda2f80d4f8a925eb7b8c5b78c461e0eb996672719fe1a63b26321a5f8b4e", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, "~> 1.3", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "2e64f01ebfdb026209b47bc651a0e65203fcff4ae79c11efb73c4852b00dc313"},
"toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"},
"toxiproxy_ex": {:git, "https://github.com/whatyouhide/toxiproxy_ex.git", "f77d7371bd0d58c3c6206589fba8c419e331113e", []},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"},
}

0 comments on commit 63dc88d

Please sign in to comment.