diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index ed48a5c7..cbd4d2d7 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -19,7 +19,8 @@ defmodule Xandra.Connection do @forced_transport_options [packet: :raw, mode: :binary, active: false] @max_concurrent_requests 5000 - @possible_ids MapSet.new(1..6000) + @max_cassandra_stream_id 32_768 + @restore_timed_out_stream_id_timeout 30 * 60 * 60 * 1000 require Logger @@ -321,7 +322,6 @@ defmodule Xandra.Connection do :transport, in_flight_requests: %{}, timed_out_ids: MapSet.new(), - free_stream_ids: MapSet.new(1..32768), current_keyspace: nil, buffer: <<>> ] @@ -351,13 +351,9 @@ defmodule Xandra.Connection do send(data.cluster_pid, {:xandra, :disconnected, data.peername, self()}) end - data = - Enum.reduce(data.in_flight_requests, data, fn {stream_id, req_alias}, data_acc -> - send_reply(req_alias, {:error, :disconnected}) - update_in(data_acc.free_stream_ids, &MapSet.put(&1, stream_id)) - end) - - data = put_in(data.in_flight_requests, %{}) + Enum.each(data.in_flight_requests, fn {_stream_id, req_alias} -> + send_reply(req_alias, {:error, :disconnected}) + end) if data.backoff do {backoff_time, data} = get_and_update_in(data.backoff, &Backoff.backoff/1) @@ -519,7 +515,6 @@ defmodule Xandra.Connection do end def disconnected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do - data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id)) data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) {:keep_state, data} end @@ -528,6 +523,21 @@ defmodule Xandra.Connection do data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id)) + actions = [ + {{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout, + :restore_timed_out_stream_id} + ] + + {:keep_state, data, actions} + end + + def disconnected( + {:timeout, {:stream_id, stream_id}}, + :restore_timed_out_stream_id, + %__MODULE__{} = data + ) do + data = update_in(data.timed_out_ids, &MapSet.delete(&1, stream_id)) + {:keep_state, data} end @@ -563,7 +573,7 @@ defmodule Xandra.Connection do end end - def connected({:call, from}, {:checkout_state_for_next_request, _}, %{ + def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{ in_flight_requests: in_flight_requests }) when map_size(in_flight_requests) == @max_concurrent_requests do @@ -571,11 +581,7 @@ defmodule Xandra.Connection do end def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do - {stream_id, data} = - get_and_update_in(data.free_stream_ids, fn ids -> - id = Enum.at(ids, 0) - {id, MapSet.delete(ids, id)} - end) + stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids) response = checked_out_state( @@ -623,13 +629,28 @@ defmodule Xandra.Connection do def connected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) - data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id)) {:keep_state, data} end def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id)) + + actions = [ + {{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout, + :restore_timed_out_stream_id} + ] + + {:keep_state, data, actions} + end + + def connected( + {:timeout, {:stream_id, stream_id}}, + :restore_timed_out_stream_id, + %__MODULE__{} = data + ) do + data = update_in(data.timed_out_ids, &MapSet.delete(&1, stream_id)) + {:keep_state, data} end @@ -731,7 +752,7 @@ defmodule Xandra.Connection do {req_alias, data} -> send_reply(req_alias, {:ok, frame}) - update_in(data.free_stream_ids, &MapSet.put(&1, stream_id)) + data end end @@ -843,4 +864,14 @@ defmodule Xandra.Connection do :telemetry.execute([:xandra, :server_warnings], %{warnings: warnings}, metadata) end end + + defp random_free_stream_id(in_flight_requests, timed_out_ids) do + random_id = Enum.random(1..@max_cassandra_stream_id) + + if MapSet.member?(timed_out_ids, random_id) or Map.has_key?(in_flight_requests, random_id) do + random_free_stream_id(in_flight_requests, timed_out_ids) + else + random_id + end + end end diff --git a/mix.lock b/mix.lock index 375009d2..67d3e2b3 100644 --- a/mix.lock +++ b/mix.lock @@ -24,4 +24,5 @@ "tesla": {:hex, :tesla, "1.7.0", "a62dda2f80d4f8a925eb7b8c5b78c461e0eb996672719fe1a63b26321a5f8b4e", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, "~> 1.3", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "2e64f01ebfdb026209b47bc651a0e65203fcff4ae79c11efb73c4852b00dc313"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "toxiproxy_ex": {:git, "https://github.com/whatyouhide/toxiproxy_ex.git", "f77d7371bd0d58c3c6206589fba8c419e331113e", []}, + "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"}, }