Skip to content

Commit

Permalink
Flush timed out stream ids in 1 minute intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
harunzengin committed Feb 26, 2024
1 parent 760e250 commit c04eee9
Showing 1 changed file with 49 additions and 32 deletions.
81 changes: 49 additions & 32 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ defmodule Xandra.Connection do
@forced_transport_options [packet: :raw, mode: :binary, active: false]
@max_concurrent_requests 5000
@max_cassandra_stream_id 32_768
@restore_timed_out_stream_id_timeout 5 * 60 * 1000
@timed_out_stream_id_timeout_minutes 5
@flush_timed_out_stream_id_interval 60 * 1000

# This record is used internally when we check out a "view" of the state of
# the connection. This holds all the necessary info to encode queries and more.
Expand Down Expand Up @@ -319,7 +320,7 @@ defmodule Xandra.Connection do
:protocol_version,
:transport,
in_flight_requests: %{},
timed_out_ids: MapSet.new(),
timed_out_ids: %{},
current_keyspace: nil,
buffer: <<>>
]
Expand All @@ -332,7 +333,13 @@ defmodule Xandra.Connection do
@impl true
def init(options) do
data = %__MODULE__{original_options: options, configure: Keyword.get(options, :configure)}
{:ok, :disconnected, data, {:next_event, :internal, :connect}}

actions = [
{:next_event, :internal, :connect},
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}
]

{:ok, :disconnected, data, actions}
end

## "Disconnected" state
Expand All @@ -353,6 +360,8 @@ defmodule Xandra.Connection do
send_reply(req_alias, {:error, :disconnected})
end)

data = put_in(data.in_flight_requests, %{})

if data.backoff do
{backoff_time, data} = get_and_update_in(data.backoff, &Backoff.backoff/1)
{:keep_state, data, {{:timeout, :reconnect}, backoff_time, _content = nil}}
Expand Down Expand Up @@ -519,24 +528,28 @@ defmodule Xandra.Connection do

def disconnected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id))
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

actions = [
{{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout,
:restore_timed_out_stream_id}
]

{:keep_state, data, actions}
{:keep_state, data}
end

def disconnected(
{:timeout, {:stream_id, stream_id}},
:restore_timed_out_stream_id,
%__MODULE__{} = data
) do
data = update_in(data.timed_out_ids, &MapSet.delete(&1, stream_id))
def disconnected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()

{:keep_state, data}
data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
end

## "Connected" state
Expand Down Expand Up @@ -632,24 +645,28 @@ defmodule Xandra.Connection do

def connected(:cast, {:timed_out_id, stream_id}, %__MODULE__{} = data) do
data = update_in(data.in_flight_requests, &Map.delete(&1, stream_id))
data = update_in(data.timed_out_ids, &MapSet.put(&1, stream_id))

actions = [
{{:timeout, {:stream_id, stream_id}}, @restore_timed_out_stream_id_timeout,
:restore_timed_out_stream_id}
]
data = update_in(data.timed_out_ids, &Map.put(&1, stream_id, DateTime.utc_now()))

{:keep_state, data, actions}
{:keep_state, data}
end

def connected(
{:timeout, {:stream_id, stream_id}},
:restore_timed_out_stream_id,
%__MODULE__{} = data
) do
data = update_in(data.timed_out_ids, &MapSet.delete(&1, stream_id))
def connected({:timeout, :flush_timed_out_stream_id}, :flush, %__MODULE__{} = data) do
now = DateTime.utc_now()

{:keep_state, data}
data =
update_in(
data.timed_out_ids,
&Enum.reduce(&1, %{}, fn {stream_id, timestamp}, acc ->
if DateTime.diff(now, timestamp, :minute) > @timed_out_stream_id_timeout_minutes do
acc
else
Map.put(acc, stream_id, timestamp)
end
end)
)

{:keep_state, data,
{{:timeout, :flush_timed_out_stream_id}, @flush_timed_out_stream_id_interval, :flush}}
end

## Helpers
Expand Down Expand Up @@ -870,7 +887,7 @@ defmodule Xandra.Connection do
defp random_free_stream_id(in_flight_requests, timed_out_ids) do
random_id = Enum.random(1..@max_cassandra_stream_id)

if MapSet.member?(timed_out_ids, random_id) or Map.has_key?(in_flight_requests, random_id) do
if Map.has_key?(timed_out_ids, random_id) or Map.has_key?(in_flight_requests, random_id) do
random_free_stream_id(in_flight_requests, timed_out_ids)
else
random_id
Expand Down

0 comments on commit c04eee9

Please sign in to comment.