Skip to content

Commit

Permalink
Refactor Xandra.Cluster.Pool
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Oct 16, 2023
1 parent 62626cc commit cd58f64
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 23 deletions.
53 changes: 30 additions & 23 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ defmodule Xandra.Cluster.Pool do
## Callbacks

@impl true
def callback_mode, do: :handle_event_function
def callback_mode, do: [:handle_event_function, :state_enter]

@impl true
def init({cluster_opts, pool_opts, sync_connect_alias_or_nil}) do
Expand Down Expand Up @@ -178,26 +178,24 @@ defmodule Xandra.Cluster.Pool do
@impl true
def handle_event(type, event, state, data)

def handle_event(:enter, :never_connected, :never_connected, _data) do
:keep_state_and_data
end

# This is the only state transition we can do: :never_connected -> :has_connected_once.
# We can never go the other way, of course.
def handle_event(:enter, _from = :never_connected, _to = :has_connected_once, data) do
{data, actions} = flush_checkout_queue(data)
{:keep_state, data, actions}
end

def handle_event(:internal, :start_control_connection, _state, data) do
case start_control_connection(data) do
{:ok, data} -> {:keep_state, data}
:error -> {:keep_state_and_data, timeout_action(:reconnect_control_connection, 1000)}
end
end

def handle_event(:internal, :flush_checkout_queue, :has_connected_once, %__MODULE__{} = data) do
{checkout_queue(queue: queue), data} = get_and_update_in(data.checkout_queue, &{&1, nil})

{reply_actions, data} =
Enum.map_reduce(:queue.to_list(queue), data, fn from, data ->
{data, reply_action} = checkout_connection(data, from)
{reply_action, data}
end)

cancel_timeout_action = timeout_action(:flush_checkout_queue, :infinity)
{:keep_state, data, [cancel_timeout_action] ++ reply_actions}
end

def handle_event({:timeout, :flush_checkout_queue}, nil, :never_connected, %__MODULE__{} = data) do
{checkout_queue(queue: queue), data} = get_and_update_in(data.checkout_queue, &{&1, nil})
reply_actions = for from <- :queue.to_list(queue), do: {:reply, from, {:error, :empty}}
Expand Down Expand Up @@ -297,15 +295,17 @@ defmodule Xandra.Cluster.Pool do
end

# Sent by the connection itself.
# Whatever the state is, we move to the :has_connected_once state. If the current state
# is :never_connected, that means that this will result in a state transition, which
# triggers other events and stuff.
def handle_event(
:info,
{:xandra, :connected, peername, _pid},
state,
_state,
%__MODULE__{} = data
)
when is_peername(peername) do
data = put_in(data.peers[peername].status, :connected)

host = data.peers[peername].host

data =
Expand All @@ -315,13 +315,7 @@ defmodule Xandra.Cluster.Pool do
send(alias, {alias, :connected})
end

actions =
case state do
:has_connected_once -> []
:never_connected -> [{:next_event, :internal, :flush_checkout_queue}]
end

{:next_state, :has_connected_once, data, actions}
{:next_state, :has_connected_once, data}
end

# Sent by the connection itself.
Expand Down Expand Up @@ -593,6 +587,19 @@ defmodule Xandra.Cluster.Pool do
end
end

defp flush_checkout_queue(%__MODULE__{} = data) do
{checkout_queue(queue: queue), data} = get_and_update_in(data.checkout_queue, &{&1, nil})

{reply_actions, data} =
Enum.map_reduce(:queue.to_list(queue), data, fn from, data ->
{data, reply_action} = checkout_connection(data, from)
{reply_action, data}
end)

cancel_timeout_action = timeout_action(:flush_checkout_queue, :infinity)
{data, [cancel_timeout_action] ++ reply_actions}
end

defp timeout_action(name, time) do
{{:timeout, name}, time, _event_content = nil}
end
Expand Down
19 changes: 19 additions & 0 deletions test/xandra/cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,25 @@ defmodule Xandra.ClusterTest do
cluster_state = get_state(pid)
assert %{status: :down} = cluster_state.peers[{{127, 0, 0, 1}, 8092}]
end

@tag telemetry_events: [
[:xandra, :connected]
]
test "starts multiple connections to each node through the :pool_size option",
%{base_options: opts, telemetry_ref: telemetry_ref} do
opts = Keyword.merge(opts, pool_size: 3)
pid = start_link_supervised!({Cluster, opts})

assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, %{connection: pid1}}
assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, %{connection: pid2}}
assert_receive {[:xandra, :connected], ^telemetry_ref, %{}, %{connection: pid3}}

# Assert that the connections are different.
assert Enum.uniq([pid1, pid2, pid3]) == [pid1, pid2, pid3]

# Get the state to make sure the process is alive.
get_state(pid)
end
end

describe "child_spec/1" do
Expand Down

0 comments on commit cd58f64

Please sign in to comment.