diff --git a/CHANGELOG.md b/CHANGELOG.md index bfbb7517..95d48299 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ This change is breaking, and affect Xandra pretty significantly. The user-facing * Make retry strategies **cluster aware**, by adding the `{:retry, new_options, new_state, conn_pid}` return value to the `retry/3` callback. See the updated documentation for `Xandra.RetryStrategy`. * Support `GenServer.start_link/3` options in `Xandra.Cluster.start_link/1` (like `:spawn_opt` and friends). - * Add the `:queue_before_connecting` option to `Xandra.Cluster.start_link/1` to queue requests in the cluster until at least one connection to one node is established. + * Add the `:queue_checkouts_before_connecting` option to `Xandra.Cluster.start_link/1` to queue checkout requests in the cluster until at least one connection to one node is established. * Fix a few bugs with rogue data in the native protocol parser. * Fix a small bug when negotiating the native protocol version. * Fix IPv6 support in `Xandra.Cluster`. diff --git a/docker-compose.yml b/docker-compose.yml index 7fdcaa8b..748c2322 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -88,3 +88,9 @@ services: driver: "json-file" options: max-size: 50m + + toxiproxy: + image: ghcr.io/shopify/toxiproxy + ports: + - "8474:8474" + - "19052:19052" diff --git a/lib/xandra/cluster.ex b/lib/xandra/cluster.ex index 15050450..6aabdb8d 100644 --- a/lib/xandra/cluster.ex +++ b/lib/xandra/cluster.ex @@ -201,28 +201,31 @@ defmodule Xandra.Cluster do the cluster could connect once and then drop connections right away, so this doesn't mean that the cluster is connected, but rather that it *connected at least once*. This is useful, for example, in test suites where you're not worried about - resiliency but rather race conditions. In most cases, the `:queue_before_connecting` - option is what you want. + resiliency but rather race conditions. In most cases, the + `:queue_checkouts_before_connecting` option is what you want. """ ], - queue_before_connecting: [ + queue_checkouts_before_connecting: [ type: :keyword_list, doc: """ - Controls how to handle requests that go through the cluster *before* the cluster - is able to establish a connection to **any node**. This is useful because if you - try to use the Xandra cluster right away after starting it, you'll likely get errors - since the cluster needs to establish at least a connection to at least one node first. - The strategy that `Xandra.Cluster` uses is to queue requests until a connection - is established, and at that point flush those requests. If you don't want this behaviour, - you can set `:buffer_size` to `0`. *Available since v0.18.0*. This option supports these - keys: + Controls how to handle checkouts that go through the cluster *before* the cluster + is able to establish a connection to **any node**. Whenever you run a cluster function, + the cluster checks out a connection from one of the connected nodes and executes the + request on that connection. However, if you try to run any cluster function before the + cluster connects to any of the nodes, you'll likely get `Xandra.ConnectionError`s + with reason `{:cluster, :not_connected}`. This is because the cluster needs to establish + at least one connection to one node before it can execute requests. This option addresses + this issue by queueing "checkout requests" until the cluster establishes a connection + to a node. Once the connection is established, the cluster starts to hand over + connections. If you want to **disable this behavior**, set `:max_size` to `0`. *Available + since v0.18.0*. This option supports the following sub-options: """, keys: [ - buffer_size: [ + max_size: [ type: :non_neg_integer, default: 100, doc: """ - The number of requests to queue in the cluster and flush as soon as a connection + The number of checkouts to queue in the cluster and flush as soon as a connection is established. """ ], @@ -230,8 +233,8 @@ defmodule Xandra.Cluster do type: :timeout, default: 5_000, doc: """ - How long to hold on to requests for. When this timeout expires, all requests - are dropped and an error is returned to the caller. + How long to hold on to checkout requests for. When this timeout expires, all requests + are dropped and a connection error is returned to each caller. """ ] ], diff --git a/lib/xandra/cluster/pool.ex b/lib/xandra/cluster/pool.ex index 8c349ace..90c43562 100644 --- a/lib/xandra/cluster/pool.ex +++ b/lib/xandra/cluster/pool.ex @@ -12,6 +12,10 @@ defmodule Xandra.Cluster.Pool do alias Xandra.Cluster.{ConnectionPool, Host, LoadBalancingPolicy} alias Xandra.GenStatemHelpers + require Record + + Record.defrecordp(:checkout_queue, [:max_size, :queue]) + ## Public API @spec start_link(keyword(), keyword()) :: :gen_statem.start_ret() @@ -107,7 +111,7 @@ defmodule Xandra.Cluster.Pool do # A queue of requests that were received by this process *before* connecting # to *any* node. We "buffer" these for a while until we establish a connection. - reqs_before_connecting: %{ + checkout_queue: %{ queue: :queue.new(), max_size: nil }, @@ -142,8 +146,9 @@ defmodule Xandra.Cluster.Pool do {mod, opts} -> {mod, opts} end - queue_before_connecting_opts = Keyword.fetch!(cluster_opts, :queue_before_connecting) - queue_before_connecting_timeout = Keyword.fetch!(queue_before_connecting_opts, :timeout) + checkout_queue_opts = Keyword.fetch!(cluster_opts, :queue_checkouts_before_connecting) + checkout_queue_timeout = Keyword.fetch!(checkout_queue_opts, :timeout) + checkout_queue_max_size = Keyword.fetch!(checkout_queue_opts, :max_size) data = %__MODULE__{ connection_options: pool_opts, @@ -158,16 +163,13 @@ defmodule Xandra.Cluster.Pool do pool_size: Keyword.fetch!(cluster_opts, :pool_size), pool_supervisor: pool_sup, refresh_topology_interval: Keyword.fetch!(cluster_opts, :refresh_topology_interval), - reqs_before_connecting: %{ - queue: :queue.new(), - max_size: Keyword.fetch!(queue_before_connecting_opts, :buffer_size) - }, + checkout_queue: checkout_queue(queue: :queue.new(), max_size: checkout_queue_max_size), sync_connect_alias: sync_connect_alias_or_nil } actions = [ {:next_event, :internal, :start_control_connection}, - {{:timeout, :flush_queue_before_connecting}, queue_before_connecting_timeout, nil} + timeout_action(:flush_checkout_queue, checkout_queue_timeout) ] {:ok, :never_connected, data, actions} @@ -178,67 +180,53 @@ defmodule Xandra.Cluster.Pool do def handle_event(:internal, :start_control_connection, _state, data) do case start_control_connection(data) do - {:ok, data} -> - {:keep_state, data} - - :error -> - {:keep_state, data, {{:timeout, :reconnect_control_connection}, 1000, nil}} + {:ok, data} -> {:keep_state, data} + :error -> {:keep_state_and_data, timeout_action(:reconnect_control_connection, 1000)} end end - def handle_event( - :internal, - :flush_queue_before_connecting, - _state = :has_connected_once, - %__MODULE__{reqs_before_connecting: %{queue: queue}} = data - ) do + def handle_event(:internal, :flush_checkout_queue, :has_connected_once, %__MODULE__{} = data) do + checkout_queue(queue: queue) = data.checkout_queue + {reply_actions, data} = Enum.map_reduce(:queue.to_list(queue), data, fn from, data -> {data, reply_action} = checkout_connection(data, from) {reply_action, data} end) - {:keep_state, data, - reply_actions ++ [timeout_action(:flush_queue_before_connecting, :infinity)]} + cancel_timeout_action = timeout_action(:flush_checkout_queue, :infinity) + {:keep_state, data, [cancel_timeout_action] ++ reply_actions} end - def handle_event( - {:timeout, :flush_queue_before_connecting}, - nil, - _state = :never_connected, - %__MODULE__{} = data - ) do - actions = - for from <- :queue.to_list(data.reqs_before_connecting.queue), - do: {:reply, from, {:error, :empty}} - - data = put_in(data.reqs_before_connecting, nil) - - {:keep_state, data, actions} + def handle_event({:timeout, :flush_checkout_queue}, nil, :never_connected, %__MODULE__{} = data) do + {checkout_queue(queue: queue), data} = get_and_update_in(data.checkout_queue, &{&1, nil}) + reply_actions = for from <- :queue.to_list(queue), do: {:reply, from, {:error, :empty}} + {:keep_state, data, reply_actions} end - # We already flushed once, so we won't keep adding requests to the queue. - def handle_event( - {:call, from}, - :checkout, - _state = :never_connected, - %__MODULE__{reqs_before_connecting: nil} - ) do + # We have never connected, but we already flushed once, so we won't keep adding requests to + # the queue. + def handle_event({:call, from}, :checkout, :never_connected, %__MODULE__{checkout_queue: nil}) do {:keep_state_and_data, {:reply, from, {:error, :empty}}} end - def handle_event({:call, from}, :checkout, _state = :never_connected, %__MODULE__{} = data) do - %{queue: queue, max_size: max_size} = data.reqs_before_connecting + def handle_event({:call, from}, :checkout, :never_connected, %__MODULE__{} = data) do + checkout_queue(queue: queue, max_size: max_size) = data.checkout_queue if :queue.len(queue) == max_size do {:keep_state_and_data, {:reply, from, {:error, :empty}}} else - data = update_in(data.reqs_before_connecting.queue, &:queue.in(from, &1)) + data = + put_in( + data.checkout_queue, + checkout_queue(data.checkout_queue, queue: :queue.in(from, queue)) + ) + {:keep_state, data} end end - def handle_event({:call, from}, :checkout, _state = :has_connected_once, %__MODULE__{} = data) do + def handle_event({:call, from}, :checkout, :has_connected_once, %__MODULE__{} = data) do {data, reply_action} = checkout_connection(data, from) {:keep_state, data, reply_action} end @@ -327,7 +315,7 @@ defmodule Xandra.Cluster.Pool do send(alias, {alias, :connected}) end - actions = [{:next_event, :internal, :flush_queue_before_connecting}] + actions = [{:next_event, :internal, :flush_checkout_queue}] {:next_state, :has_connected_once, data, actions} end diff --git a/mix.exs b/mix.exs index 3db13eaa..9be256b7 100644 --- a/mix.exs +++ b/mix.exs @@ -105,7 +105,8 @@ defmodule Xandra.Mixfile do {:excoveralls, "~> 0.17", only: :test}, {:mox, "~> 1.0", only: :test}, {:stream_data, "~> 0.6.0", only: [:dev, :test]}, - {:nimble_lz4, "~> 0.1.3", only: [:dev, :test]} + {:nimble_lz4, "~> 0.1.3", only: [:dev, :test]}, + {:toxiproxy_ex, "~> 1.1", only: :test} ] end end diff --git a/mix.lock b/mix.lock index c06c5418..aa97cec0 100644 --- a/mix.lock +++ b/mix.lock @@ -6,10 +6,13 @@ "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.30.6", "5f8b54854b240a2b55c9734c4b1d0dd7bdd41f71a095d42a70445c03cf05a281", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "bd48f2ddacf4e482c727f9293d9498e0881597eae6ddc3d9562bd7923375109f"}, "excoveralls": {:hex, :excoveralls, "0.17.1", "83fa7906ef23aa7fc8ad7ee469c357a63b1b3d55dd701ff5b9ce1f72442b2874", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "95bc6fda953e84c60f14da4a198880336205464e75383ec0f570180567985ae0"}, + "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, + "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, + "mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"}, "mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"}, "nimble_lz4": {:hex, :nimble_lz4, "0.1.4", "22b9fa4163e8057a10e6a2238285c1ed8137ea2e2659b8166d7354c0f2957312", [:mix], [{:rustler, "~> 0.29.0", [hex: :rustler, repo: "hexpm", optional: false]}, {:rustler_precompiled, "~> 0.6.2", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}], "hexpm", "1ae42465181aca22924972682fa52e10e46ecc6541d9df59af6ff3ada00fc592"}, "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, @@ -18,5 +21,7 @@ "rustler_precompiled": {:hex, :rustler_precompiled, "0.6.3", "f838d94bc35e1844973ee7266127b156fdc962e9e8b7ff666c8fb4fed7964d23", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "e18ecca3669a7454b3a2be75ae6c3ef01d550bc9a8cf5fbddcfff843b881d7c6"}, "stream_data": {:hex, :stream_data, "0.6.0", "e87a9a79d7ec23d10ff83eb025141ef4915eeb09d4491f79e52f2562b73e5f47", [:mix], [], "hexpm", "b92b5031b650ca480ced047578f1d57ea6dd563f5b57464ad274718c9c29501c"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "tesla": {:hex, :tesla, "1.7.0", "a62dda2f80d4f8a925eb7b8c5b78c461e0eb996672719fe1a63b26321a5f8b4e", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:exjsx, ">= 3.0.0", [hex: :exjsx, repo: "hexpm", optional: true]}, {:finch, "~> 0.13", [hex: :finch, repo: "hexpm", optional: true]}, {:fuse, "~> 2.4", [hex: :fuse, repo: "hexpm", optional: true]}, {:gun, "~> 1.3", [hex: :gun, repo: "hexpm", optional: true]}, {:hackney, "~> 1.6", [hex: :hackney, repo: "hexpm", optional: true]}, {:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: true]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: true]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: true]}, {:msgpax, "~> 2.3", [hex: :msgpax, repo: "hexpm", optional: true]}, {:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "2e64f01ebfdb026209b47bc651a0e65203fcff4ae79c11efb73c4852b00dc313"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, + "toxiproxy_ex": {:hex, :toxiproxy_ex, "1.1.1", "af605b9f54a4508e2c8987764301609f457076cafc472ba83beaec93e2796e99", [:mix], [{:castore, "~> 1.0.3", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mint, "~> 1.0", [hex: :mint, repo: "hexpm", optional: false]}, {:tesla, "~> 1.7.0", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm", "826c415c6e8ec1708894a86091afd0674f0a4115ed09ac6604088ef916226fe7"}, } diff --git a/test/docker/health-check-services.sh b/test/docker/health-check-services.sh index b6562880..3a4fab00 100755 --- a/test/docker/health-check-services.sh +++ b/test/docker/health-check-services.sh @@ -5,6 +5,10 @@ MAX_SECONDS=120 END_SECONDS=$((SECONDS+MAX_SECONDS)) for name in $(docker ps --format '{{.Names}}'); do + if [[ "$name" == *"toxiproxy"* ]]; then + continue + fi + HEALTHY=false while [[ "$SECONDS" -lt "$END_SECONDS" ]]; do diff --git a/test/test_helper.exs b/test/test_helper.exs index ffeb88ae..26515298 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -3,6 +3,14 @@ if System.get_env("XANDRA_DEBUG") do Xandra.Telemetry.attach_debug_handler() end +ToxiproxyEx.populate!([ + %{ + name: "xandra_test_cassandra", + listen: "0.0.0.0:19052", + upstream: "cassandra:9042" + } +]) + Logger.configure(level: String.to_existing_atom(System.get_env("LOG_LEVEL", "info"))) excluded = diff --git a/test/xandra/cluster/control_connection_test.exs b/test/xandra/cluster/control_connection_test.exs index 868f078a..eb28aff6 100644 --- a/test/xandra/cluster/control_connection_test.exs +++ b/test/xandra/cluster/control_connection_test.exs @@ -275,7 +275,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do socket = :sys.get_state(ctrl_conn).transport.socket :ok = :gen_tcp.shutdown(socket, :read_write) - GenServer.cast(ctrl_conn, {:refresh_topology, []}) + send(ctrl_conn, :refresh_topology) assert_receive {:DOWN, ^ref, _, _, _} end diff --git a/test/xandra/cluster_test.exs b/test/xandra/cluster_test.exs index 2d352085..f4c96d8c 100644 --- a/test/xandra/cluster_test.exs +++ b/test/xandra/cluster_test.exs @@ -3,6 +3,7 @@ defmodule Xandra.ClusterTest do import Mox + alias Xandra.ConnectionError alias Xandra.Cluster alias Xandra.Cluster.Host alias Xandra.Cluster.Pool @@ -46,7 +47,7 @@ defmodule Xandra.ClusterTest do quote do event = [:xandra, :cluster] ++ unquote(postfix) telemetry_ref = var!(telemetry_ref) - assert_receive {^event, ^telemetry_ref, measurements, unquote(meta)} + assert_receive {^event, ^telemetry_ref, measurements, unquote(meta)}, 2000 assert measurements == %{} end end @@ -472,7 +473,44 @@ defmodule Xandra.ClusterTest do ) end - test "returns the result directly for the bang! version" do + test "executes batch queries", %{base_options: opts} do + cluster = start_link_supervised!({Cluster, opts}) + + Xandra.Cluster.run(cluster, fn conn -> + :ok = XandraTest.IntegrationCase.setup_keyspace(conn, "cluster_batch_test") + end) + + Xandra.Cluster.execute!( + cluster, + "CREATE TABLE cluster_batch_test.cluster_batches (id int PRIMARY KEY)" + ) + + batch = + Xandra.Batch.new() + |> Xandra.Batch.add("INSERT INTO cluster_batch_test.cluster_batches(id) VALUES (?)", [ + {"int", 1} + ]) + + assert {:ok, %Xandra.Void{}} = Xandra.Cluster.execute(cluster, batch) + end + + test "returns an error if the cluster is not connected to any node", %{base_options: opts} do + opts = + Keyword.merge(opts, + nodes: ["127.0.0.1:8092"], + sync_connect: false, + queue_checkouts_before_connecting: [max_size: 0] + ) + + cluster = start_link_supervised!({Cluster, opts}) + + assert {:error, %ConnectionError{reason: {:cluster, :not_connected}}} = + Xandra.Cluster.execute(cluster, "SELECT * FROM system.local") + end + end + + describe "execute!/3,4" do + test "returns the result directly" do opts = [ nodes: ["127.0.0.1:#{@port}"], sync_connect: 1000 @@ -497,6 +535,30 @@ defmodule Xandra.ClusterTest do _options = [] ) end + + test "raises errors" do + opts = [ + nodes: ["127.0.0.1:#{@port}"], + sync_connect: 1000 + ] + + opts = + if @protocol_version do + Keyword.put(opts, :protocol_version, @protocol_version) + else + opts + end + + cluster = start_link_supervised!({Xandra.Cluster, opts}) + + assert_raise Xandra.Error, "Keyspace 'nonexisting_keyspace' does not exist", fn -> + Xandra.Cluster.execute!(cluster, "USE nonexisting_keyspace") + end + + assert_raise Xandra.Error, "Keyspace 'nonexisting_keyspace' does not exist", fn -> + Xandra.Cluster.execute!(cluster, "USE nonexisting_keyspace", _params = [], _options = []) + end + end end describe "prepare/3" do @@ -521,6 +583,22 @@ defmodule Xandra.ClusterTest do assert {:ok, _page} = Xandra.Cluster.execute(cluster, prepared) end + test "returns an error if the cluster is not connected to any node", %{base_options: opts} do + opts = + Keyword.merge(opts, + nodes: ["127.0.0.1:8092"], + sync_connect: false, + queue_checkouts_before_connecting: [max_size: 0] + ) + + cluster = start_link_supervised!({Cluster, opts}) + + assert {:error, %ConnectionError{reason: {:cluster, :not_connected}}} = + Xandra.Cluster.prepare(cluster, "SELECT * FROM system.local") + end + end + + describe "prepare!/3" do test "returns the result directly for the bang! version" do opts = [ nodes: ["127.0.0.1:#{@port}"], @@ -541,6 +619,34 @@ defmodule Xandra.ClusterTest do assert {:ok, _page} = Xandra.Cluster.execute(cluster, prepared) end + + test "raises errors" do + opts = [ + nodes: ["127.0.0.1:#{@port}"], + sync_connect: 1000 + ] + + opts = + if @protocol_version do + Keyword.put(opts, :protocol_version, @protocol_version) + else + opts + end + + cluster = start_link_supervised!({Xandra.Cluster, opts}) + + assert_raise Xandra.Error, ~r/no viable alternative at input/, fn -> + Xandra.Cluster.prepare!(cluster, "SELECT bad syntax") + end + end + end + + describe "stream_pages!/4" do + test "streams pages", %{base_options: opts} do + cluster = start_link_supervised!({Cluster, opts}) + stream = Xandra.Cluster.stream_pages!(cluster, "SELECT * FROM system.local", _params = []) + assert [%{}] = Enum.to_list(stream) + end end describe "stop/1" do @@ -581,7 +687,7 @@ defmodule Xandra.ClusterTest do Keyword.merge(opts, nodes: ["127.0.0.1:8092"], sync_connect: false, - queue_before_connecting: [timeout: 0, buffer_size: 0] + queue_checkouts_before_connecting: [timeout: 0, max_size: 0] ) pid = start_supervised!({Cluster, opts}) @@ -765,6 +871,44 @@ defmodule Xandra.ClusterTest do # Make sure we reconnect to the control connection. assert_telemetry [:control_connection, :connected], _meta end + + @tag :toxiproxy + @tag telemetry_events: [ + [:xandra, :cluster, :pool, :stopped] + ] + test "when a single connection goes down", %{base_options: opts, telemetry_ref: telemetry_ref} do + opts = Keyword.merge(opts, sync_connect: 1000, nodes: ["127.0.0.1:19052"]) + pid = start_link_supervised!({Cluster, opts}) + + ToxiproxyEx.get!(:xandra_test_cassandra) + |> ToxiproxyEx.down!(fn -> + assert_telemetry [:pool, :stopped], %{cluster_pid: ^pid} + end) + end + + @tag :toxiproxy + @tag telemetry_events: [ + [:xandra, :cluster, :control_connection, :failed_to_connect], + [:xandra, :cluster, :control_connection, :connected] + ] + test "reconnects to the control connection if it goes down", + %{base_options: opts, telemetry_ref: telemetry_ref} do + test_pid = self() + test_ref = make_ref() + opts = Keyword.merge(opts, sync_connect: false, nodes: ["127.0.0.1:19052"]) + + ToxiproxyEx.get!(:xandra_test_cassandra) + |> ToxiproxyEx.down!(fn -> + pid = start_link_supervised!({Cluster, opts}) + send(test_pid, {test_ref, :cluster_pid, pid}) + assert_telemetry [:control_connection, :failed_to_connect], %{cluster_pid: ^pid} = meta + assert %ConnectionError{reason: :closed} = meta.reason + end) + + assert_receive {^test_ref, :cluster_pid, pid} + + assert_telemetry [:control_connection, :connected], %{cluster_pid: ^pid} + end end defp get_state(cluster) do