diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index cd3de39b..94f00ac5 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -508,12 +508,19 @@ defmodule Xandra.Cluster do end defp with_conn_and_retrying(cluster, options, fun) do - RetryStrategy.run_with_retrying(options, fn -> with_conn(cluster, fun) end) + case Pool.checkout(cluster) do + {:error, :empty} -> + action = "checkout from cluster #{inspect(cluster)}" + {:error, ConnectionError.new(action, {:cluster, :not_connected})} + + {:ok, connected_hosts} -> + RetryStrategy.run_cluster_with_retrying(options, connected_hosts, fun) + end end defp with_conn(cluster, fun) do case Pool.checkout(cluster) do - {:ok, pool} -> + {:ok, [{pool, _host} | _connected_hosts]} -> fun.(pool) {:error, :empty} -> diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index d5042a6c..ff776010 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -75,7 +75,8 @@ defmodule Xandra.Cluster.Pool do :gen_statem.stop(pid, reason, timeout) end - @spec checkout(:gen_statem.server_ref()) :: {:ok, pid()} | {:error, :empty} + @spec checkout(:gen_statem.server_ref()) :: + {:ok, [{pid(), Host.t()}, ...]} | {:error, :empty} def checkout(pid) do :gen_statem.call(pid, :checkout) end @@ -348,6 +349,30 @@ defmodule Xandra.Cluster.Pool do {:keep_state, data} end + # For testing purposes + def handle_event(:info, {:add_test_hosts, hosts_with_status}, _state, %__MODULE__{} = data) do + data = + Enum.reduce(hosts_with_status, data, fn {%Host{} = host, status}, data_acc -> + data_acc = + update_in(data_acc.load_balancing_state, fn current_state -> + current_state = data_acc.load_balancing_module.host_added(current_state, host) + apply(data_acc.load_balancing_module, :"host_#{status}", [current_state, host]) + end) + + update_in( + data_acc.peers, + &Map.put(&1, Host.to_peername(host), %{ + host: host, + status: status, + pool_pid: Process.spawn(fn -> nil end, []), + pool_ref: make_ref() + }) + ) + end) + + {:keep_state, data} + end + # Sent by the connection itself. def handle_event( :info, @@ -457,14 +482,19 @@ defmodule Xandra.Cluster.Pool do data.load_balancing_module.query_plan(lb_state) end) - # Find the first host in the plan for which we have a pool. + # Find all connected hosts + connected_hosts = + for host <- query_plan, + %{pool_pid: pid, host: host} = Map.get(data.peers, Host.to_peername(host)), + not is_nil(host), + is_pid(pid), + do: {pid, host} + reply = - query_plan - |> Stream.map(fn %Host{} = host -> Map.fetch(data.peers, Host.to_peername(host)) end) - |> Enum.find_value(_default = {:error, :empty}, fn - {:ok, %{pool_pid: pid}} when is_pid(pid) -> {:ok, pid} - _other -> nil - end) + case connected_hosts do + [] -> {:error, :empty} + connected_hosts -> {:ok, connected_hosts} + end {data, {:reply, from, reply}} end diff --git a/lib/xandra/retry_strategy.ex b/lib/xandra/retry_strategy.ex index f7f17a51..f24ab297 100644 --- a/lib/xandra/retry_strategy.ex +++ b/lib/xandra/retry_strategy.ex @@ -16,8 +16,8 @@ defmodule Xandra.RetryStrategy do When a query fails and a retry strategy module was passed as an option, Xandra will: - 1. invoke the `c:new/1` callback with the options passed to the failing - function to initialize the given retry strategy + 1. invoke the `c:new/1` callback with the options passed to the failing function + to initialize the given retry strategy 1. ask the retry strategy whether to retry or error out (`c:retry/3`) until either the query succeeds or `c:retry/3` says to error out @@ -25,9 +25,17 @@ defmodule Xandra.RetryStrategy do The `c:new/1` and `c:retry/3` callbacks will be invoked in the same process that executed the original query. - If `c:retry/3` says to retry a query, such query will be retried on a - different Xandra connection than the one the query was first executed - through. For more information, see the documentation for `c:retry/3`. + There are two levels where RetryStrategy is invoked, distinguishable with the + `:execution_level` key in the options passed to `c:new/1` and `c:retry/3`, + namely `:cluster` level and `:xandra` level. On `:cluster` level, you have the option + to select a `:target_connection` from the list of `:connected_hosts`, in order to + retry on a different node for instance. The `:connected_hosts` in `options` is a + list of tuples, where the first element is the Xandra connection pid and the + second is of `Host.t()` describing the host. + + If on `:cluster` level `c:retry/3` says to retry a query, such query can be retried on the + Xandra connection that is returned in the new `option` by `c:retry/3` under the `:target_connection` + key. ## Examples @@ -75,12 +83,54 @@ defmodule Xandra.RetryStrategy do end end + A particularly useful application is to retry on queries on different hosts + when using `Xandra.Cluster`. We can even choose not to execute on certain `Host.t()`s + (because they may be in a different datacenter). Following example retries on all hosts + after the first `:connected_node` has failed: + + defmodule AllNodesStrategy do + @behaviour Xandra.RetryStrategy + + alias Xandra.Cluster.Host + + def new(options) do + if options[:execution_level] == :cluster do + [_already_tried_node | rest_of_nodes] = options[:connected_hosts] + + rest_of_nodes + end + end + + def retry(_error, options, state) do + case options[:execution_level] do + :xandra -> + :error + + :cluster -> + case state do + [] -> + :error + + [{conn, %Host{}} | rest_of_nodes] -> + options = Keyword.put(options, :target_connection, conn) + {:retry, options, rest_of_nodes} + end + end + end + end """ + alias Xandra.Cluster.Host + @type state :: term @doc """ Initializes the state of a retry strategy based on the given `options`. + + `connected_hosts` is a list of tuples with Xandra connection pids as its first + element and the `Host.t()` information as second. You would need to save the connection + information to the state as applicable to your retry logic in order to select the next + host in `c:retry/3`. See ##Examples about an example. """ @callback new(options :: keyword) :: state @@ -105,11 +155,11 @@ defmodule Xandra.RetryStrategy do third argument. This process will continue until either the query is executed successfully or this callback returns `:error`. - Note that when `{:retry, new_options, new_state}` is returned, the query will - be executed again *on a different Xandra connection*. This behaviour is - particularly useful with pooled connections and especially when using - `Xandra.Cluster` as the pool, since it will mean that there's a chance the - retried query will be executed on a different node altogether. + Note that when `execution_level: :cluster` if we would return a `:target_connection` pid, + the query would be retried on the specified `Xandra` connection. To select a connection pid, + we can use `:connected_hosts` key in `options`. + + When retrying on `execution_level: :xandra`, we are retrying with the exact same connection. """ @callback retry(error :: term, options :: keyword, state) :: :error | {:retry, new_options :: keyword, new_state :: state} @@ -117,13 +167,15 @@ defmodule Xandra.RetryStrategy do @doc false @spec run_with_retrying(keyword, (-> result)) :: result when result: var def run_with_retrying(options, fun) do + options = Keyword.put(options, :execution_level, :xandra) + case Keyword.pop(options, :retry_strategy) do {nil, _options} -> fun.() {retry_strategy, options} -> run_with_retrying(options, retry_strategy, fun) end end - defp run_with_retrying(options, retry_strategy, fun) do + def run_with_retrying(options, retry_strategy, fun) do with {:error, reason} <- fun.() do {retry_state, options} = Keyword.pop_lazy(options, :retrying_state, fn -> @@ -146,4 +198,62 @@ defmodule Xandra.RetryStrategy do end end end + + @spec run_cluster_with_retrying(Keyword.t(), [{pid(), Host.t()}, ...], (pid() -> result)) :: + result + when result: var + def run_cluster_with_retrying(options, connected_hosts, fun) do + [{conn, _host} | _connected_hosts] = connected_hosts + + options = + Keyword.merge(options, + execution_level: :cluster, + connected_hosts: connected_hosts, + target_connection: conn + ) + + case Keyword.pop(options, :retry_strategy) do + {nil, _options} -> + fun.(conn) + + {retry_strategy, options} -> + run_cluster_with_retrying(options, connected_hosts, retry_strategy, fun) + end + end + + defp run_cluster_with_retrying(options, connected_hosts, retry_strategy, fun) do + {conn, options} = + case Keyword.pop(options, :target_connection) do + {conn, options} when is_pid(conn) -> + {conn, options} + + {nil, _options} -> + [{conn, _host}] = Enum.take_random(connected_hosts, 1) + {conn, options} + end + + with {:error, reason} <- fun.(conn) do + {retry_state, options} = + Keyword.pop_lazy(options, :retrying_state, fn -> + retry_strategy.new(options) + end) + + case retry_strategy.retry(reason, options, retry_state) do + :error -> + {:error, reason} + + {:retry, new_options, new_retry_state} -> + new_options = + Keyword.put(new_options, :retrying_state, new_retry_state) + + run_cluster_with_retrying(new_options, connected_hosts, retry_strategy, fun) + + other -> + raise ArgumentError, + "invalid return value #{inspect(other)} from " <> + "retry strategy #{inspect(retry_strategy)} " <> + "with state #{inspect(retry_state)}" + end + end + end end diff --git a/test/integration/retry_strategies_test.exs b/test/integration/retry_strategies_test.exs index 53d0181d..a97bd6ba 100644 --- a/test/integration/retry_strategies_test.exs +++ b/test/integration/retry_strategies_test.exs @@ -1,57 +1,168 @@ defmodule Xandra.RetryStrategiesTest do - use XandraTest.IntegrationCase, async: true + use XandraTest.IntegrationCase, async: false alias Xandra.Error - test "strategy that retries for a fixed amount of times", %{conn: conn} do - defmodule CounterStrategy do - @behaviour Xandra.RetryStrategy + describe "RetryStrategy on Xandra level" do + test "that retries for a fixed amount of times", %{conn: conn} do + defmodule CounterStrategy do + @behaviour Xandra.RetryStrategy - def new(options) do - Keyword.fetch!(options, :retries_count) + def new(options) do + retries_left = Keyword.fetch!(options, :retry_count) + + %{retries_left: retries_left} + end + + def retry(_error, _options, %{retries_left: 0}) do + :error + end + + def retry(error, options, %{retries_left: retries_left}) do + send(self(), {:retrying, error, retries_left}) + + {:retry, options, %{retries_left: retries_left - 1}} + end end - def retry(_error, _options, 0) do - :error + assert_raise KeyError, fn -> + Xandra.execute(conn, "USE nonexistent_keyspace", [], retry_strategy: CounterStrategy) end - def retry(error, options, retries_count) do - send(self(), {:retrying, error, retries_count}) - {:retry, options, retries_count - 1} + options = [retry_strategy: CounterStrategy, retry_count: 2] + assert {:error, _} = Xandra.execute(conn, "USE nonexistent_keyspace", [], options) + + assert_received {:retrying, %Error{reason: :invalid}, 2} + assert_received {:retrying, %Error{reason: :invalid}, 1} + after + :code.delete(CounterStrategy) + :code.purge(CounterStrategy) + end + + test "raises an error if retry/3 returns an invalid value", %{conn: conn} do + defmodule InvalidStrategy do + @behaviour Xandra.RetryStrategy + + def new(_options), do: %{} + def retry(_error, _options, _state), do: :invalid_value end + + message = + "invalid return value :invalid_value from " <> + "retry strategy Xandra.RetryStrategiesTest.InvalidStrategy with state %{}" + + assert_raise ArgumentError, message, fn -> + Xandra.execute(conn, "USE nonexistend_keyspace", [], retry_strategy: InvalidStrategy) + end + after + :code.delete(InvalidStrategy) + :code.purge(InvalidStrategy) end + end + + describe "RetryStrategy on cluster level" do + defmodule MockXandra do + use GenServer + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end - assert_raise KeyError, fn -> - Xandra.execute(conn, "USE nonexistent_keyspace", [], retry_strategy: CounterStrategy) + @impl true + def init(opts) do + {:ok, opts} + end + + @impl true + def handle_info(info, state) do + send(state.conn, info) + send(state.client, {:received_request, self()}) + {:noreply, state} + end end - options = [retry_strategy: CounterStrategy, retries_count: 2] - assert {:error, _} = Xandra.execute(conn, "USE nonexistent_keyspace", [], options) + defmodule MockCluster do + @behaviour :gen_statem - assert_received {:retrying, %Error{reason: :invalid}, 2} - assert_received {:retrying, %Error{reason: :invalid}, 1} - after - :code.delete(CounterStrategy) - :code.purge(CounterStrategy) - end + alias Xandra.Cluster.Host - test "an error is raised if retry/3 returns an invalid value", %{conn: conn} do - defmodule InvalidStrategy do - @behaviour Xandra.RetryStrategy + def child_spec(arg) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, [arg]} + } + end + + def start_link(nodes_count: nodes_count, conn: conn, client: client) do + hosts = + Enum.map(1..nodes_count, fn n -> + {:ok, xandra_conn} = MockXandra.start_link(%{conn: conn, client: client}) + {xandra_conn, %Host{address: {127, 0, 0, n}}} + end) + + :gen_statem.start_link(__MODULE__, hosts, []) + end + + def callback_mode(), + do: [:handle_event_function] - def new(_options), do: %{} - def retry(_error, _options, _state), do: :invalid_value + def init(hosts) do + {:ok, :state, hosts} + end + + def handle_event({:call, from}, :checkout, _state, hosts) do + {:keep_state_and_data, {:reply, from, {:ok, hosts}}} + end end - message = - "invalid return value :invalid_value from " <> - "retry strategy Xandra.RetryStrategiesTest.InvalidStrategy with state %{}" + test "works with target_connection", %{conn: conn} do + defmodule AllNodesStrategy do + @behaviour Xandra.RetryStrategy + + def new(options) do + if options[:execution_level] == :cluster do + [_already_tried_node | rest_of_nodes] = options[:connected_hosts] + + rest_of_nodes + end + end + + def retry(_error, options, state) do + case options[:execution_level] do + :xandra -> + :error + + :cluster -> + case state do + [] -> + :error + + [{conn, _host} | rest_of_nodes] -> + options = Keyword.put(options, :target_connection, conn) + + {:retry, options, rest_of_nodes} + end + end + end + end + + {:ok, mock_cluster} = + start_supervised({MockCluster, [nodes_count: 3, conn: conn, client: self()]}) + + assert {:ok, [{pid1, _host1}, {pid2, _host2}, {pid3, _host3}]} = + Xandra.Cluster.Pool.checkout(mock_cluster) + + assert {:error, %Xandra.Error{reason: :invalid}} = + Xandra.Cluster.execute(mock_cluster, "USE nonexistent_keyspace", [], + retry_strategy: AllNodesStrategy + ) - assert_raise ArgumentError, message, fn -> - Xandra.execute(conn, "USE nonexistend_keyspace", [], retry_strategy: InvalidStrategy) + assert_receive({:received_request, ^pid1}) + assert_receive({:received_request, ^pid2}) + assert_receive({:received_request, ^pid3}) + after + :code.delete(AllNodesStrategy) + :code.purge(AllNodesStrategy) end - after - :code.delete(InvalidStrategy) - :code.purge(InvalidStrategy) end end diff --git a/test/xandra/cluster/pool_test.exs b/test/xandra/cluster/pool_test.exs index 7f46015b..10521a19 100644 --- a/test/xandra/cluster/pool_test.exs +++ b/test/xandra/cluster/pool_test.exs @@ -297,10 +297,37 @@ defmodule Xandra.Cluster.PoolTest do %{cluster_options: cluster_options, pool_options: pool_options} do assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) - assert {:ok, pool_pid} = Pool.checkout(pid) + assert {:ok, [{pool_pid, %Host{}}]} = Pool.checkout(pid) assert is_pid(pool_pid) end + test "returns all connected pools", + %{cluster_options: cluster_options, pool_options: pool_options} do + assert {:ok, pid} = start_supervised(spec(cluster_options, pool_options)) + + hosts_with_statuses = [ + {%Host{address: {127, 0, 0, 1}, port: 8091}, :up}, + {%Host{address: {127, 0, 0, 1}, port: 8092}, :down}, + {%Host{address: {127, 0, 0, 1}, port: 8093}, :connected} + ] + + wait_until_connected(pid) + send(pid, {:add_test_hosts, hosts_with_statuses}) + + assert {:ok, pids_with_hosts} = Pool.checkout(pid) + assert is_list(pids_with_hosts) + + assert Enum.all?(pids_with_hosts, fn {conn, %Host{}} -> is_pid(conn) end) + + expected_set_of_connected_hosts = + MapSet.new([{{127, 0, 0, 1}, @port}, {{127, 0, 0, 1}, 8093}]) + + existing_set_of_connected_hosts = + MapSet.new(pids_with_hosts, fn {_, host} -> Host.to_peername(host) end) + + assert existing_set_of_connected_hosts == expected_set_of_connected_hosts + end + test "returns {:error, :empty} when there are no pools", %{cluster_options: cluster_options, pool_options: pool_options} do cluster_options = @@ -321,7 +348,7 @@ defmodule Xandra.Cluster.PoolTest do cluster_options = Keyword.merge(cluster_options, sync_connect: 1000) cluster = start_supervised!(spec(cluster_options, pool_options)) - assert {:ok, pool_pid} = Pool.checkout(cluster) + assert {:ok, [{pool_pid, %Host{}}]} = Pool.checkout(cluster) ref = Process.monitor(pool_pid) Process.exit(pool_pid, :kill) @@ -388,6 +415,21 @@ defmodule Xandra.Cluster.PoolTest do end end + defp wait_until_connected(pid, retries \\ 10) + + defp wait_until_connected(_pid, 0), do: :error + + defp wait_until_connected(pid, retries) do + case :sys.get_state(pid) do + {:has_connected_once, _} -> + :ok + + _ -> + Process.sleep(10) + wait_until_connected(pid, retries - 1) + end + end + defp spec(cluster_opts, pool_opts) do %{ id: Pool,