From 0d90baaa27ca579d90e751ff5d09435d60c6dd02 Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Tue, 20 Feb 2024 17:34:05 +0100 Subject: [PATCH 1/5] try getting first stream id --- lib/xandra/connection.ex | 2 +- mix.exs | 3 ++- mix.lock | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index dcdef84f..ed48a5c7 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -573,7 +573,7 @@ defmodule Xandra.Connection do def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do {stream_id, data} = get_and_update_in(data.free_stream_ids, fn ids -> - id = Enum.random(ids) + id = Enum.at(ids, 0) {id, MapSet.delete(ids, id)} end) diff --git a/mix.exs b/mix.exs index c6751df3..f79e79fa 100644 --- a/mix.exs +++ b/mix.exs @@ -162,7 +162,8 @@ defmodule Xandra.Mixfile do {:mox, "~> 1.0", only: :test}, {:stream_data, "~> 0.6.0", only: [:dev, :test]}, {:nimble_lz4, "~> 0.1.3", only: [:dev, :test]}, - {:toxiproxy_ex, github: "whatyouhide/toxiproxy_ex", only: :test} + {:toxiproxy_ex, github: "whatyouhide/toxiproxy_ex", only: :test}, + { :uuid, "~> 1.1" } ] end diff --git a/mix.lock b/mix.lock index 375009d2..67d3e2b3 100644 --- a/mix.lock +++ b/mix.lock @@ -24,4 +24,5 @@ "tesla": {:hex, :tesla, "1.7.0", "a62dda2f80d4f8a925eb7b8c5b78c461e0eb996672719fe1a63b26321a5f8b4e", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, "~> 1.3", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "2e64f01ebfdb026209b47bc651a0e65203fcff4ae79c11efb73c4852b00dc313"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "toxiproxy_ex": {:git, "https://github.com/whatyouhide/toxiproxy_ex.git", "f77d7371bd0d58c3c6206589fba8c419e331113e", []}, + "uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"}, } From 3cedab5d3da00314bcf8f66193d9558b00a7b58f Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Wed, 21 Feb 2024 13:05:48 +0100 Subject: [PATCH 2/5] Use Enum.random with range to run in constant time --- lib/xandra/connection.ex | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index ed48a5c7..92baf744 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -19,7 +19,7 @@ defmodule Xandra.Connection do @forced_transport_options [packet: :raw, mode: :binary, active: false] @max_concurrent_requests 5000 - @possible_ids MapSet.new(1..6000) + @max_cassandra_stream_id 32768 require Logger @@ -321,7 +321,6 @@ defmodule Xandra.Connection do :transport, in_flight_requests: %{}, timed_out_ids: MapSet.new(), - free_stream_ids: MapSet.new(1..32768), current_keyspace: nil, buffer: <<>> ] @@ -351,13 +350,9 @@ defmodule Xandra.Connection do send(data.cluster_pid, {:xandra, :disconnected, data.peername, self()}) end - data = - Enum.reduce(data.in_flight_requests, data, fn {stream_id, req_alias}, data_acc -> - send_reply(req_alias, {:error, :disconnected}) - update_in(data_acc.free_stream_ids, &MapSet.put(&1, stream_id)) - end) - - data = put_in(data.in_flight_requests, %{}) + Enum.each(data.in_flight_requests, fn {_stream_id, req_alias} -> + send_reply(req_alias, {:error, :disconnected}) + end) if data.backoff do {backoff_time, data} = get_and_update_in(data.backoff, &Backoff.backoff/1) @@ -571,11 +566,7 @@ defmodule Xandra.Connection do end def connected({:call, from}, {:checkout_state_for_next_request, req_alias}, data) do - {stream_id, data} = - get_and_update_in(data.free_stream_ids, fn ids -> - id = Enum.at(ids, 0) - {id, MapSet.delete(ids, id)} - end) + stream_id = random_free_stream_id(data.in_flight_requests, data.timed_out_ids) response = checked_out_state( @@ -623,7 +614,6 @@ defmodule Xandra.Connection do def connected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) - data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id)) {:keep_state, data} end @@ -731,7 +721,7 @@ defmodule Xandra.Connection do {req_alias, data} -> send_reply(req_alias, {:ok, frame}) - update_in(data.free_stream_ids, &MapSet.put(&1, stream_id)) + data end end @@ -843,4 +833,14 @@ defmodule Xandra.Connection do :telemetry.execute([:xandra, :server_warnings], %{warnings: warnings}, metadata) end end + + defp random_free_stream_id(in_flight_requests, timed_out_ids) do + random_id = Enum.random(1..@max_cassandra_stream_id) + + if MapSet.member?(timed_out_ids, random_id) or Map.has_key?(in_flight_requests, random_id) do + random_free_stream_id(in_flight_requests, timed_out_ids) + else + random_id + end + end end From f78afffe900791642d1e5a4210f6f6f268eb9d31 Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Wed, 21 Feb 2024 13:49:21 +0100 Subject: [PATCH 3/5] Remove free_stream_id_reference --- lib/xandra/connection.ex | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 92baf744..3d6ef4b8 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -514,7 +514,6 @@ defmodule Xandra.Connection do end def disconnected(:cast, {:release_stream_id, stream_id}, %__MODULE__{} = data) do - data = update_in(data.free_stream_ids, &MapSet.put(&1, stream_id)) data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) {:keep_state, data} end From 3c9dc7369af0fc52527c54331d19366642c3c747 Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Wed, 21 Feb 2024 14:24:20 +0100 Subject: [PATCH 4/5] Release timed out stream ids after 30 minutes --- lib/xandra/connection.ex | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/lib/xandra/connection.ex b/lib/xandra/connection.ex index 3d6ef4b8..cbd4d2d7 100644 --- a/lib/xandra/connection.ex +++ b/lib/xandra/connection.ex @@ -19,7 +19,8 @@ defmodule Xandra.Connection do @forced_transport_options [packet: :raw, mode: :binary, active: false] @max_concurrent_requests 5000 - @max_cassandra_stream_id 32768 + @max_cassandra_stream_id 32_768 + @restore_timed_out_stream_id_timeout 30 * 60 * 60 * 1000 require Logger @@ -522,6 +523,21 @@ defmodule Xandra.Connection do data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id)) + actions = [ + {{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout, + :restore_timed_out_stream_id} + ] + + {:keep_state, data, actions} + end + + def disconnected( + {:timeout, {:stream_id, stream_id}}, + :restore_timed_out_stream_id, + %__MODULE__{} = data + ) do + data = update_in(data.timed_out_ids, &MapSet.delete(&1, stream_id)) + {:keep_state, data} end @@ -557,7 +573,7 @@ defmodule Xandra.Connection do end end - def connected({:call, from}, {:checkout_state_for_next_request, _}, %{ + def connected({:call, from}, {:checkout_state_for_next_request, _}, %__MODULE__{ in_flight_requests: in_flight_requests }) when map_size(in_flight_requests) == @max_concurrent_requests do @@ -619,6 +635,22 @@ defmodule Xandra.Connection do def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id)) data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id)) + + actions = [ + {{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout, + :restore_timed_out_stream_id} + ] + + {:keep_state, data, actions} + end + + def connected( + {:timeout, {:stream_id, stream_id}}, + :restore_timed_out_stream_id, + %__MODULE__{} = data + ) do + data = update_in(data.timed_out_ids, &MapSet.delete(&1, stream_id)) + {:keep_state, data} end From 92119852225c2af6b69a244f7e1098e11a13aea5 Mon Sep 17 00:00:00 2001 From: Harun Zengin Date: Wed, 21 Feb 2024 14:47:16 +0100 Subject: [PATCH 5/5] Remove uuid lib --- mix.exs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index f79e79fa..c6751df3 100644 --- a/mix.exs +++ b/mix.exs @@ -162,8 +162,7 @@ defmodule Xandra.Mixfile do {:mox, "~> 1.0", only: :test}, {:stream_data, "~> 0.6.0", only: [:dev, :test]}, {:nimble_lz4, "~> 0.1.3", only: [:dev, :test]}, - {:toxiproxy_ex, github: "whatyouhide/toxiproxy_ex", only: :test}, - { :uuid, "~> 1.1" } + {:toxiproxy_ex, github: "whatyouhide/toxiproxy_ex", only: :test} ] end