Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make retry strategies cluster aware #329

Closed
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
d704b07
Cluster aware retry logic
harunzengin Sep 1, 2023
c9c578c
rename retry in Xandra
harunzengin Sep 1, 2023
73d812c
format
harunzengin Sep 1, 2023
72c3b00
remove unused alias
harunzengin Sep 1, 2023
946e1f0
format
harunzengin Sep 1, 2023
cf0b482
format
harunzengin Sep 1, 2023
fc5eb12
Update lib/xandra/cluster/pool.ex
harunzengin Sep 4, 2023
e6370ae
Implement suggested changes
harunzengin Sep 4, 2023
0d00473
Preserve old behaviour, implement cluster retry logic seperately
harunzengin Sep 5, 2023
17d7818
Update docs
harunzengin Sep 5, 2023
291e7eb
fix warnings
harunzengin Sep 5, 2023
0c4ca24
fix comment
harunzengin Sep 5, 2023
12ab189
Add tests for returning multiple hosts
harunzengin Sep 6, 2023
e3b3cd8
wait until status is not nil
harunzengin Sep 6, 2023
9c5348a
fix test
harunzengin Sep 6, 2023
05d8947
format
harunzengin Sep 6, 2023
3156e02
Empty commit for CI
harunzengin Sep 6, 2023
db11031
Don't update state exlicitly
harunzengin Sep 6, 2023
784960e
remove inspect
harunzengin Sep 6, 2023
2d8d093
fix warning
harunzengin Sep 7, 2023
93801a8
One liner for apply/3
harunzengin Sep 26, 2023
5eda5ad
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
b23202e
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
d3b629a
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
c99628f
previous phrasing
harunzengin Sep 26, 2023
a515a74
Merge branch 'main' into cluster-aware-retry-strategy
harunzengin Sep 26, 2023
d54ab5b
use nil instead of :random
harunzengin Sep 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,19 @@ defmodule Xandra.Cluster do
end

defp with_conn_and_retrying(cluster, options, fun) do
RetryStrategy.run_with_retrying(options, fn -> with_conn(cluster, fun) end)
case Pool.checkout(cluster) do
{:error, :empty} ->
action = "checkout from cluster #{inspect(cluster)}"
{:error, ConnectionError.new(action, {:cluster, :not_connected})}

{:ok, connected_hosts} ->
RetryStrategy.run_cluster_with_retrying(options, connected_hosts, fun)
end
end

defp with_conn(cluster, fun) do
case Pool.checkout(cluster) do
{:ok, pool} ->
{:ok, [{pool, _host} | _connected_hosts]} ->
fun.(pool)

{:error, :empty} ->
Expand Down
49 changes: 41 additions & 8 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ defmodule Xandra.Cluster.Pool do
:gen_statem.stop(pid, reason, timeout)
end

@spec checkout(:gen_statem.server_ref()) :: {:ok, pid()} | {:error, :empty}
@spec checkout(:gen_statem.server_ref()) ::
{:ok, [{pid(), Host.t()}, ...]} | {:error, :empty}
def checkout(pid) do
:gen_statem.call(pid, :checkout)
end
Expand Down Expand Up @@ -333,6 +334,33 @@ defmodule Xandra.Cluster.Pool do
{:keep_state, data}
end

# For testing purposes
def handle_event(:info, {:add_test_hosts, hosts_with_status}, _state, %__MODULE__{} = data) do
data =
Enum.reduce(hosts_with_status, data, fn {%Host{} = host, status}, data_acc ->
data_acc =
update_in(data_acc.load_balancing_state, fn current_state ->
current_state = data_acc.load_balancing_module.host_added(current_state, host)
m = data_acc.load_balancing_module
f = String.to_existing_atom("host_" <> Atom.to_string(status))
a = [current_state, host]
apply(m, f, a)
harunzengin marked this conversation as resolved.
Show resolved Hide resolved
end)

update_in(
data_acc.peers,
&Map.put(&1, Host.to_peername(host), %{
host: host,
status: status,
pool_pid: Process.spawn(fn -> nil end, []),
pool_ref: make_ref()
})
)
end)

{:keep_state, data}
end

# Sent by the connection itself.
def handle_event(
:info,
Expand Down Expand Up @@ -412,14 +440,19 @@ defmodule Xandra.Cluster.Pool do
data.load_balancing_module.query_plan(lb_state)
end)

# Find the first host in the plan for which we have a pool.
# Find all connected hosts
connected_hosts =
for host <- query_plan,
%{pool_pid: pid, host: host} = Map.get(data.peers, Host.to_peername(host)),
not is_nil(host),
is_pid(pid),
do: {pid, host}

reply =
query_plan
|> Stream.map(fn %Host{} = host -> Map.fetch(data.peers, Host.to_peername(host)) end)
|> Enum.find_value(_default = {:error, :empty}, fn
{:ok, %{pool_pid: pid}} when is_pid(pid) -> {:ok, pid}
_other -> nil
end)
case connected_hosts do
[] -> {:error, :empty}
connected_hosts -> {:ok, connected_hosts}
end

{data, {:reply, from, reply}}
end
Expand Down
145 changes: 130 additions & 15 deletions lib/xandra/retry_strategy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,26 @@ defmodule Xandra.RetryStrategy do
When a query fails and a retry strategy module was passed as an option, Xandra
will:

1. invoke the `c:new/1` callback with the options passed to the failing
function to initialize the given retry strategy
1. invoke the `c:new/1` callback with options passed to the failing function
to initialize the given retry strategy
harunzengin marked this conversation as resolved.
Show resolved Hide resolved

1. ask the retry strategy whether to retry or error out (`c:retry/3`) until
2. ask the retry strategy whether to retry or error out (`c:retry/3`) until
harunzengin marked this conversation as resolved.
Show resolved Hide resolved
either the query succeeds or `c:retry/3` says to error out

The `c:new/1` and `c:retry/3` callbacks will be invoked in the same
process that executed the original query.

If `c:retry/3` says to retry a query, such query will be retried on a
different Xandra connection than the one the query was first executed
through. For more information, see the documentation for `c:retry/3`.
There are two levels where RetryStrategy is invoked, distinguishable with the
`:execution_level` key in the options passed to `c:new/1` and `c:retry/3`,
namely `:cluster` level and `:xandra` level. On `:cluster` level, you have the option
to select a `:target_connection` from the list of `:connected_hosts`, in order to
retry on a different node for instance. The `:connected_hosts` in `options` is a
list of tuples, where the first element is the Xandra connection pid and the
second is of `Host.t()` describing the host.

If on `:cluster` level `c:retry/3` says to retry a query, such query can be retried on the
Xandra connection that is returned in the new `option` by `c:retry/3` under the `:target_connection`
key.

## Examples

Expand Down Expand Up @@ -75,12 +83,54 @@ defmodule Xandra.RetryStrategy do
end
end

A particularly useful application is to retry on queries on different hosts
when using `Xandra.Cluster`. We can even choose not to execute on certain `Host.t()`s
(because they may be in a different datacenter). Following example retries on all hosts
after the first `:connected_node` has failed:

defmodule AllNodesStrategy do
@behaviour Xandra.RetryStrategy

alias Xandra.Cluster.Host

def new(options) do
if options[:execution_level] == :cluster do
[_already_tried_node | rest_of_nodes] = options[:connected_hosts]

rest_of_nodes
end
end

def retry(_error, options, state) do
case options[:execution_level] do
:xandra ->
:error

:cluster ->
case state do
[] ->
:error

[{conn, %Host{}} | rest_of_nodes] ->
options = Keyword.put(options, :target_connection, conn)
{:retry, options, rest_of_nodes}
end
end
end
end
"""

alias Xandra.Cluster.Host

@type state :: term

@doc """
Initializes the state of a retry strategy based on the given `options`.

`connected_hosts` is a list of tuples with Xandra connection pids as its first
element and the `Host.t()` information as second. You would need to save the connection
information to the state as applicable to your retry logic in order to select the next
host in `c:retry/3`. See ##Examples about an example.
"""
@callback new(options :: keyword) :: state

Expand All @@ -105,25 +155,31 @@ defmodule Xandra.RetryStrategy do
third argument. This process will continue until either the query is executed
successfully or this callback returns `:error`.

Note that when `{:retry, new_options, new_state}` is returned, the query will
be executed again *on a different Xandra connection*. This behaviour is
particularly useful with pooled connections and especially when using
`Xandra.Cluster` as the pool, since it will mean that there's a chance the
retried query will be executed on a different node altogether.
Note that when `execution_level: :cluster` if we would return a `:target_connection` pid,
the query would be retried on the specified `Xandra` connection. To select a connection pid,
we can use `:connected_hosts` key in `options`.

When retrying on `execution_level: :xandra`, we are retrying with the exact same connection.
"""
@callback retry(error :: term, options :: keyword, state) ::
:error | {:retry, new_options :: keyword, new_state :: state}

@doc false
@spec run_with_retrying(keyword, (-> result)) :: result when result: var
@spec run_with_retrying(keyword, (-> result)) :: result
when result: var
harunzengin marked this conversation as resolved.
Show resolved Hide resolved
def run_with_retrying(options, fun) do
options = Keyword.put(options, :execution_level, :xandra)

case Keyword.pop(options, :retry_strategy) do
{nil, _options} -> fun.()
{retry_strategy, options} -> run_with_retrying(options, retry_strategy, fun)
{nil, _options} ->
fun.()

{retry_strategy, options} ->
run_with_retrying(options, retry_strategy, fun)
harunzengin marked this conversation as resolved.
Show resolved Hide resolved
end
end

defp run_with_retrying(options, retry_strategy, fun) do
def run_with_retrying(options, retry_strategy, fun) do
with {:error, reason} <- fun.() do
{retry_state, options} =
Keyword.pop_lazy(options, :retrying_state, fn ->
Expand All @@ -146,4 +202,63 @@ defmodule Xandra.RetryStrategy do
end
end
end

@spec run_cluster_with_retrying(Keyword.t(), [{pid(), Host.t()}, ...], (pid() -> result)) ::
result
when result: var
def run_cluster_with_retrying(options, connected_hosts, fun) do
[{conn, _host} | _connected_hosts] = connected_hosts

options =
Keyword.merge(options,
execution_level: :cluster,
connected_hosts: connected_hosts,
target_connection: conn
)

case Keyword.pop(options, :retry_strategy) do
{nil, _options} ->
fun.(conn)

{retry_strategy, options} ->
run_cluster_with_retrying(options, connected_hosts, retry_strategy, fun)
end
end

defp run_cluster_with_retrying(options, connected_hosts, retry_strategy, fun) do
{conn, options} =
case Keyword.pop(options, :target_connection) do
{conn, options} when is_pid(conn) ->
{conn, options}

{:random, options} ->
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we add :random here? Can't a retry strategy implement it by itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only for internal use, not exposed to the outside. It's basically a fallback for when a :target_connection doesn't exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether I understood "implement it by itself"

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, a retry strategy can implement choosing a random node if :target_connection doesn't exist, right? Unless we have a strong reason, let's just remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, exactly, we pop the :target_connection before calling retry_strategy.retryand then do

            |> Keyword.put_new(:target_connection, :random)

after calling retry_strategy.retry, so that if it is empty, we'll be assigning it a random node. This is to have a mechanism to initially select the first node from load balancing strategy.

Copy link
Contributor Author

@harunzengin harunzengin Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@whatyouhide But using nil instead of :random also works, as in d54ab5b, if that's what you meant

[{conn, _host}] = Enum.take_random(connected_hosts, 1)
{conn, options}
end

with {:error, reason} <- fun.(conn) do
{retry_state, options} =
Keyword.pop_lazy(options, :retrying_state, fn ->
retry_strategy.new(options)
end)

case retry_strategy.retry(reason, options, retry_state) do
:error ->
{:error, reason}

{:retry, new_options, new_retry_state} ->
new_options =
Keyword.put(new_options, :retrying_state, new_retry_state)
|> Keyword.put_new(:target_connection, :random)

run_cluster_with_retrying(new_options, connected_hosts, retry_strategy, fun)

other ->
raise ArgumentError,
"invalid return value #{inspect(other)} from " <>
"retry strategy #{inspect(retry_strategy)} " <>
"with state #{inspect(retry_state)}"
end
end
end
end
Loading
Loading