Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make retry strategies cluster-aware #335

Merged
merged 28 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
441e7d5
Cluster aware retry logic
harunzengin Sep 1, 2023
95ceb33
rename retry in Xandra
harunzengin Sep 1, 2023
81e7cc8
format
harunzengin Sep 1, 2023
dbcc3f7
remove unused alias
harunzengin Sep 1, 2023
74e6561
format
harunzengin Sep 1, 2023
ab95700
format
harunzengin Sep 1, 2023
783a51c
Update lib/xandra/cluster/pool.ex
harunzengin Sep 4, 2023
6261039
Implement suggested changes
harunzengin Sep 4, 2023
1d088e8
Preserve old behaviour, implement cluster retry logic seperately
harunzengin Sep 5, 2023
fdfd30f
Update docs
harunzengin Sep 5, 2023
62a1d10
fix warnings
harunzengin Sep 5, 2023
b07672d
fix comment
harunzengin Sep 5, 2023
187dcce
Add tests for returning multiple hosts
harunzengin Sep 6, 2023
562fa7d
wait until status is not nil
harunzengin Sep 6, 2023
0885946
fix test
harunzengin Sep 6, 2023
60005ff
format
harunzengin Sep 6, 2023
e203344
Empty commit for CI
harunzengin Sep 6, 2023
d2b8d2a
Don't update state exlicitly
harunzengin Sep 6, 2023
1cd7116
remove inspect
harunzengin Sep 6, 2023
e6e2ac9
fix warning
harunzengin Sep 7, 2023
2194f6b
One liner for apply/3
harunzengin Sep 26, 2023
e5f7a8f
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
8769e98
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
7027dac
Update lib/xandra/retry_strategy.ex
harunzengin Sep 26, 2023
e85f97d
previous phrasing
harunzengin Sep 26, 2023
7f86aa7
use nil instead of :random
harunzengin Sep 27, 2023
0efca39
Docs
whatyouhide Oct 4, 2023
0a1bf5b
Changes
whatyouhide Oct 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions lib/xandra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ defmodule Xandra do
exactly the same as calling `execute(conn, query, params_or_options, [])`.

When `query` is a batch query, successful results will always be `Xandra.Void`
structs.
structs. See `execute/4` for full documentation on all supported options.

When `{:error, error}` is returned, `error` can be either a `Xandra.Error` or
a `Xandra.ConnectionError` struct. See the module documentation for more
Expand All @@ -730,6 +730,9 @@ defmodule Xandra do
timestamp will apply to all the statements in the batch that do not
explicitly specify a timestamp.

See `execute/4` for full documentation on all supported options if `query` is not a batch
query.

## Examples

For examples on executing simple queries or prepared queries, see the
Expand Down Expand Up @@ -949,6 +952,17 @@ defmodule Xandra do

## Options

This function supports any arbitrary option, since Xandra passes those down
to the `Xandra.RetryStrategy` module passed in `:retry_strategy`. However, below
is a list of the options that are specific to Xandra and that Xandra uses when executing
the query. Note that we might *add* options to this list in the future, which could
potentially change the meaning of custom options you use to implement your own retry
strategy, and we wouldn't consider this a breaking change. Because of this, we recommend
*scoping* custom options in your retry strategy module (for example, by prefixing them
with `<my_module>_<option_name`).

Here are the Xandra-specific options:

#{NimbleOptions.docs(@execute_opts_schema)}

## Parameters
Expand Down Expand Up @@ -1177,7 +1191,7 @@ defmodule Xandra do
{xandra_opts, other_opts} = Keyword.split(options, @execute_opts_keys)
options = NimbleOptions.validate!(xandra_opts, @execute_opts_schema) ++ other_opts

RetryStrategy.run_with_retrying(options, fn ->
RetryStrategy.run_on_single_conn(options, fn ->
execute_without_retrying(conn, query, params, options)
end)
end
Expand Down
29 changes: 24 additions & 5 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,13 @@ defmodule Xandra.Cluster do
end

def execute(cluster, %Batch{} = batch, options) when is_list(options) do
with_conn_and_retrying(cluster, options, &Xandra.execute(&1, batch, options))
options_without_retry_strategy = Keyword.delete(options, :retry_strategy)

with_conn_and_retrying(
cluster,
options,
&Xandra.execute(&1, batch, options_without_retry_strategy)
)
end

@doc """
Expand All @@ -433,7 +439,13 @@ defmodule Xandra.Cluster do
@spec execute(cluster, Xandra.statement() | Xandra.Prepared.t(), Xandra.values(), keyword) ::
{:ok, Xandra.result()} | {:error, Xandra.error()}
def execute(cluster, query, params, options) do
with_conn_and_retrying(cluster, options, &Xandra.execute(&1, query, params, options))
options_without_retry_strategy = Keyword.delete(options, :retry_strategy)

with_conn_and_retrying(
cluster,
options,
&Xandra.execute(&1, query, params, options_without_retry_strategy)
)
end

@doc """
Expand Down Expand Up @@ -507,13 +519,20 @@ defmodule Xandra.Cluster do
Pool.connected_hosts(cluster)
end

defp with_conn_and_retrying(cluster, options, fun) do
RetryStrategy.run_with_retrying(options, fn -> with_conn(cluster, fun) end)
defp with_conn_and_retrying(cluster, options, fun) when is_function(fun, 1) do
case Pool.checkout(cluster) do
{:error, :empty} ->
action = "checkout from cluster #{inspect(cluster)}"
{:error, ConnectionError.new(action, {:cluster, :not_connected})}

{:ok, connected_hosts} ->
RetryStrategy.run_on_cluster(options, connected_hosts, fun)
end
end

defp with_conn(cluster, fun) do
case Pool.checkout(cluster) do
{:ok, pool} ->
{:ok, [{pool, _host} | _connected_hosts]} ->
fun.(pool)

{:error, :empty} ->
Expand Down
49 changes: 40 additions & 9 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ defmodule Xandra.Cluster.Pool do
:gen_statem.stop(pid, reason, timeout)
end

@spec checkout(:gen_statem.server_ref()) :: {:ok, pid()} | {:error, :empty}
@spec checkout(:gen_statem.server_ref()) ::
{:ok, [{pid(), Host.t()}, ...]} | {:error, :empty}
def checkout(pid) do
:gen_statem.call(pid, :checkout)
end
Expand Down Expand Up @@ -348,6 +349,30 @@ defmodule Xandra.Cluster.Pool do
{:keep_state, data}
end

# For testing purposes
def handle_event(:info, {:add_test_hosts, hosts_with_status}, _state, %__MODULE__{} = data) do
data =
Enum.reduce(hosts_with_status, data, fn {%Host{} = host, status}, data_acc ->
data_acc =
update_in(data_acc.load_balancing_state, fn current_state ->
current_state = data_acc.load_balancing_module.host_added(current_state, host)
apply(data_acc.load_balancing_module, :"host_#{status}", [current_state, host])
end)

update_in(
data_acc.peers,
&Map.put(&1, Host.to_peername(host), %{
host: host,
status: status,
pool_pid: Process.spawn(fn -> nil end, []),
pool_ref: make_ref()
})
)
end)

{:keep_state, data}
end

# Sent by the connection itself.
def handle_event(
:info,
Expand Down Expand Up @@ -457,20 +482,26 @@ defmodule Xandra.Cluster.Pool do
data.load_balancing_module.query_plan(lb_state)
end)

# Find the first host in the plan for which we have a pool.
# Find all connected hosts
connected_hosts =
for host <- query_plan,
%{pool_pid: pid, host: host} = Map.get(data.peers, Host.to_peername(host)),
not is_nil(host),
is_pid(pid),
do: {pid, host}

reply =
query_plan
|> Stream.map(fn %Host{} = host -> Map.fetch(data.peers, Host.to_peername(host)) end)
|> Enum.find_value(_default = {:error, :empty}, fn
{:ok, %{pool_pid: pid}} when is_pid(pid) -> {:ok, pid}
_other -> nil
end)
case connected_hosts do
[] -> {:error, :empty}
connected_hosts -> {:ok, connected_hosts}
end

{data, {:reply, from, reply}}
end

defp handle_host_added(%__MODULE__{} = data, %Host{} = host) do
data = update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host))
data =
update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host))

data =
put_in(data.peers[Host.to_peername(host)], %{
Expand Down
Loading
Loading