Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make retry strategies cluster aware #329

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d704b07
Cluster aware retry logic
harunzengin Sep 1, 2023
c9c578c
rename retry in Xandra
harunzengin Sep 1, 2023
73d812c
format
harunzengin Sep 1, 2023
72c3b00
remove unused alias
harunzengin Sep 1, 2023
946e1f0
format
harunzengin Sep 1, 2023
cf0b482
format
harunzengin Sep 1, 2023
fc5eb12
Update lib/xandra/cluster/pool.ex
harunzengin Sep 4, 2023
e6370ae
Implement suggested changes
harunzengin Sep 4, 2023
0d00473
Preserve old behaviour, implement cluster retry logic seperately
harunzengin Sep 5, 2023
17d7818
Update docs
harunzengin Sep 5, 2023
291e7eb
fix warnings
harunzengin Sep 5, 2023
0c4ca24
fix comment
harunzengin Sep 5, 2023
12ab189
Add tests for returning multiple hosts
harunzengin Sep 6, 2023
e3b3cd8
wait until status is not nil
harunzengin Sep 6, 2023
9c5348a
fix test
harunzengin Sep 6, 2023
05d8947
format
harunzengin Sep 6, 2023
3156e02
Empty commit for CI
harunzengin Sep 6, 2023
db11031
Don't update state exlicitly
harunzengin Sep 6, 2023
784960e
remove inspect
harunzengin Sep 6, 2023
2d8d093
fix warning
harunzengin Sep 7, 2023
93801a8
One liner for apply/3
harunzengin Sep 26, 2023
5eda5ad
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
b23202e
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
d3b629a
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
c99628f
previous phrasing
harunzengin Sep 26, 2023
a515a74
Merge branch 'main' into cluster-aware-retry-strategy
harunzengin Sep 26, 2023
d54ab5b
use nil instead of :random
harunzengin Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,19 @@ defmodule Xandra.Cluster do
end

defp with_conn_and_retrying(cluster, options, fun) do
RetryStrategy.run_with_retrying(options, fn -> with_conn(cluster, fun) end)
case Pool.checkout(cluster) do
{:error, :empty} ->
action = "checkout from cluster #{inspect(cluster)}"
{:error, ConnectionError.new(action, {:cluster, :not_connected})}

{:ok, connected_hosts} ->
RetryStrategy.run_cluster_with_retrying(options, connected_hosts, fun)
end
end

defp with_conn(cluster, fun) do
case Pool.checkout(cluster) do
{:ok, pool} ->
{:ok, [{pool, _host} | _connected_hosts]} ->
fun.(pool)

{:error, :empty} ->
Expand Down
46 changes: 38 additions & 8 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ defmodule Xandra.Cluster.Pool do
:gen_statem.stop(pid, reason, timeout)
end

@spec checkout(:gen_statem.server_ref()) :: {:ok, pid()} | {:error, :empty}
@spec checkout(:gen_statem.server_ref()) ::
{:ok, [{pid(), Host.t()}, ...]} | {:error, :empty}
def checkout(pid) do
:gen_statem.call(pid, :checkout)
end
Expand Down Expand Up @@ -348,6 +349,30 @@ defmodule Xandra.Cluster.Pool do
{:keep_state, data}
end

# For testing purposes
def handle_event(:info, {:add_test_hosts, hosts_with_status}, _state, %__MODULE__{} = data) do
data =
Enum.reduce(hosts_with_status, data, fn {%Host{} = host, status}, data_acc ->
data_acc =
update_in(data_acc.load_balancing_state, fn current_state ->
current_state = data_acc.load_balancing_module.host_added(current_state, host)
apply(data_acc.load_balancing_module, :"host_#{status}", [current_state, host])
end)

update_in(
data_acc.peers,
&Map.put(&1, Host.to_peername(host), %{
host: host,
status: status,
pool_pid: Process.spawn(fn -> nil end, []),
pool_ref: make_ref()
})
)
end)

{:keep_state, data}
end

# Sent by the connection itself.
def handle_event(
:info,
Expand Down Expand Up @@ -457,14 +482,19 @@ defmodule Xandra.Cluster.Pool do
data.load_balancing_module.query_plan(lb_state)
end)

# Find the first host in the plan for which we have a pool.
# Find all connected hosts
connected_hosts =
for host <- query_plan,
%{pool_pid: pid, host: host} = Map.get(data.peers, Host.to_peername(host)),
not is_nil(host),
is_pid(pid),
do: {pid, host}

reply =
query_plan
|> Stream.map(fn %Host{} = host -> Map.fetch(data.peers, Host.to_peername(host)) end)
|> Enum.find_value(_default = {:error, :empty}, fn
{:ok, %{pool_pid: pid}} when is_pid(pid) -> {:ok, pid}
_other -> nil
end)
case connected_hosts do
[] -> {:error, :empty}
connected_hosts -> {:ok, connected_hosts}
end

{data, {:reply, from, reply}}
end
Expand Down
132 changes: 121 additions & 11 deletions lib/xandra/retry_strategy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@ defmodule Xandra.RetryStrategy do
When a query fails and a retry strategy module was passed as an option, Xandra
will:

1. invoke the `c:new/1` callback with the options passed to the failing
function to initialize the given retry strategy
1. invoke the `c:new/1` callback with the options passed to the failing function
to initialize the given retry strategy

1. ask the retry strategy whether to retry or error out (`c:retry/3`) until
either the query succeeds or `c:retry/3` says to error out

The `c:new/1` and `c:retry/3` callbacks will be invoked in the same
process that executed the original query.

If `c:retry/3` says to retry a query, such query will be retried on a
different Xandra connection than the one the query was first executed
through. For more information, see the documentation for `c:retry/3`.
There are two levels where RetryStrategy is invoked, distinguishable with the
`:execution_level` key in the options passed to `c:new/1` and `c:retry/3`,
namely `:cluster` level and `:xandra` level. On `:cluster` level, you have the option
to select a `:target_connection` from the list of `:connected_hosts`, in order to
retry on a different node for instance. The `:connected_hosts` in `options` is a
list of tuples, where the first element is the Xandra connection pid and the
second is of `Host.t()` describing the host.

If on `:cluster` level `c:retry/3` says to retry a query, such query can be retried on the
Xandra connection that is returned in the new `option` by `c:retry/3` under the `:target_connection`
key.

## Examples

Expand Down Expand Up @@ -75,12 +83,54 @@ defmodule Xandra.RetryStrategy do
end
end

A particularly useful application is to retry on queries on different hosts
when using `Xandra.Cluster`. We can even choose not to execute on certain `Host.t()`s
(because they may be in a different datacenter). Following example retries on all hosts
after the first `:connected_node` has failed:

defmodule AllNodesStrategy do
@behaviour Xandra.RetryStrategy

alias Xandra.Cluster.Host

def new(options) do
if options[:execution_level] == :cluster do
[_already_tried_node | rest_of_nodes] = options[:connected_hosts]

rest_of_nodes
end
end

def retry(_error, options, state) do
case options[:execution_level] do
:xandra ->
:error

:cluster ->
case state do
[] ->
:error

[{conn, %Host{}} | rest_of_nodes] ->
options = Keyword.put(options, :target_connection, conn)
{:retry, options, rest_of_nodes}
end
end
end
end
"""

alias Xandra.Cluster.Host

@type state :: term

@doc """
Initializes the state of a retry strategy based on the given `options`.

`connected_hosts` is a list of tuples with Xandra connection pids as its first
element and the `Host.t()` information as second. You would need to save the connection
information to the state as applicable to your retry logic in order to select the next
host in `c:retry/3`. See ##Examples about an example.
"""
@callback new(options :: keyword) :: state

Expand All @@ -105,25 +155,27 @@ defmodule Xandra.RetryStrategy do
third argument. This process will continue until either the query is executed
successfully or this callback returns `:error`.

Note that when `{:retry, new_options, new_state}` is returned, the query will
be executed again *on a different Xandra connection*. This behaviour is
particularly useful with pooled connections and especially when using
`Xandra.Cluster` as the pool, since it will mean that there's a chance the
retried query will be executed on a different node altogether.
Note that when `execution_level: :cluster` if we would return a `:target_connection` pid,
the query would be retried on the specified `Xandra` connection. To select a connection pid,
we can use `:connected_hosts` key in `options`.

When retrying on `execution_level: :xandra`, we are retrying with the exact same connection.
"""
@callback retry(error :: term, options :: keyword, state) ::
:error | {:retry, new_options :: keyword, new_state :: state}

@doc false
@spec run_with_retrying(keyword, (-> result)) :: result when result: var
def run_with_retrying(options, fun) do
options = Keyword.put(options, :execution_level, :xandra)

case Keyword.pop(options, :retry_strategy) do
{nil, _options} -> fun.()
{retry_strategy, options} -> run_with_retrying(options, retry_strategy, fun)
end
end

defp run_with_retrying(options, retry_strategy, fun) do
def run_with_retrying(options, retry_strategy, fun) do
with {:error, reason} <- fun.() do
{retry_state, options} =
Keyword.pop_lazy(options, :retrying_state, fn ->
Expand All @@ -146,4 +198,62 @@ defmodule Xandra.RetryStrategy do
end
end
end

@spec run_cluster_with_retrying(Keyword.t(), [{pid(), Host.t()}, ...], (pid() -> result)) ::
result
when result: var
def run_cluster_with_retrying(options, connected_hosts, fun) do
[{conn, _host} | _connected_hosts] = connected_hosts

options =
Keyword.merge(options,
execution_level: :cluster,
connected_hosts: connected_hosts,
target_connection: conn
)

case Keyword.pop(options, :retry_strategy) do
{nil, _options} ->
fun.(conn)

{retry_strategy, options} ->
run_cluster_with_retrying(options, connected_hosts, retry_strategy, fun)
end
end

defp run_cluster_with_retrying(options, connected_hosts, retry_strategy, fun) do
{conn, options} =
case Keyword.pop(options, :target_connection) do
{conn, options} when is_pid(conn) ->
{conn, options}

{nil, _options} ->
[{conn, _host}] = Enum.take_random(connected_hosts, 1)
{conn, options}
end

with {:error, reason} <- fun.(conn) do
{retry_state, options} =
Keyword.pop_lazy(options, :retrying_state, fn ->
retry_strategy.new(options)
end)

case retry_strategy.retry(reason, options, retry_state) do
:error ->
{:error, reason}

{:retry, new_options, new_retry_state} ->
new_options =
Keyword.put(new_options, :retrying_state, new_retry_state)

run_cluster_with_retrying(new_options, connected_hosts, retry_strategy, fun)

other ->
raise ArgumentError,
"invalid return value #{inspect(other)} from " <>
"retry strategy #{inspect(retry_strategy)} " <>
"with state #{inspect(retry_state)}"
end
end
end
end
Loading
Loading